"""Strömbom collect/drive heuristic for the shepherd dog. Adapted from the original ``controllers/shepherd_dog/strombom.py`` and updated for the external pen layout. Used as a baseline controller and as the fallback when the RL policy isn't available. Reference: Strömbom et al. 2014, "Solving the shepherding problem". """ import math from herding.geometry import PEN_ENTRY, GATE_Y, in_pen # Algorithm parameters. DELTA_DRIVE / DELTA_COLLECT were tightened from # the original (4.0 / 2.5) because the new external pen sits ~26 m from # typical sheep spawn locations — at the old 4 m standoff, the flee force # (quadratic ramp, 3.7 at 4 m vs ~10 at 2 m) couldn't move sheep through # the path inside the 3000-step episode budget. # # F_FACTOR was 2.0 in the original Strömbom paper; raised to 4.0 here so # the dog stays in *drive* mode much longer. With our tighter cohesion # (flocking_sim.py), partially-collected flocks consolidate naturally # during a drive, and we don't waste 80% of the time budget on a slow # "collect" pre-phase. F_FACTOR = 4.0 DELTA_COLLECT = 1.5 DELTA_DRIVE = 2.0 def _unit(x, y): d = math.hypot(x, y) if d < 1e-6: return 0.0, 0.0 return x / d, y / d def _is_active(x, y) -> bool: """A sheep is "active" if it's still in the field — not in or below the gate plane (we treat anything south of the gate as committed to the pen and stop trying to herd it).""" return (not in_pen(x, y)) and y > GATE_Y def compute_action(dog_xy, sheep_positions, pen_target=PEN_ENTRY): """Return ``(vx, vy, mode)`` — mode in {idle, collect, drive}. ``sheep_positions`` is a ``{name: (x, y)}`` mapping (matches the Webots controller's representation). """ active = [(x, y) for (x, y) in sheep_positions.values() if _is_active(x, y)] if not active: return 0.0, 0.0, "idle" n = len(active) com_x = sum(p[0] for p in active) / n com_y = sum(p[1] for p in active) / n dists = [math.hypot(p[0] - com_x, p[1] - com_y) for p in active] radius = max(dists) if radius > F_FACTOR * math.sqrt(n): # Collect: aim at a point behind the furthest sheep, opposite the CoM. idx = max(range(n), key=lambda i: dists[i]) sx, sy = active[idx] ux, uy = _unit(sx - com_x, sy - com_y) tx, ty = sx + DELTA_COLLECT * ux, sy + DELTA_COLLECT * uy mode = "collect" else: # Drive: aim at a point behind the flock CoM relative to the goal. ux, uy = _unit(com_x - pen_target[0], com_y - pen_target[1]) tx, ty = com_x + DELTA_DRIVE * ux, com_y + DELTA_DRIVE * uy mode = "drive" ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1]) return ax, ay, mode def compute_action_debug(dog_xy, sheep_positions, pen_target=PEN_ENTRY): """Variant of compute_action that also returns a small debug dict. Kept for parity with the legacy controller's CSV logger. """ active = [(x, y) for (x, y) in sheep_positions.values() if _is_active(x, y)] if not active: return 0.0, 0.0, "idle", { "n_active": 0, "radius": 0.0, "threshold": 0.0, "com_x": 0.0, "com_y": 0.0, "target_x": dog_xy[0], "target_y": dog_xy[1], } n = len(active) com_x = sum(p[0] for p in active) / n com_y = sum(p[1] for p in active) / n dists = [math.hypot(p[0] - com_x, p[1] - com_y) for p in active] radius = max(dists) threshold = F_FACTOR * math.sqrt(n) if radius > threshold: idx = max(range(n), key=lambda i: dists[i]) sx, sy = active[idx] ux, uy = _unit(sx - com_x, sy - com_y) tx, ty = sx + DELTA_COLLECT * ux, sy + DELTA_COLLECT * uy mode = "collect" else: ux, uy = _unit(com_x - pen_target[0], com_y - pen_target[1]) tx, ty = com_x + DELTA_DRIVE * ux, com_y + DELTA_DRIVE * uy mode = "drive" ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1]) dbg = { "n_active": n, "radius": radius, "threshold": threshold, "com_x": com_x, "com_y": com_y, "target_x": tx, "target_y": ty, } return ax, ay, mode, dbg