Files
TIR_PROJ/herding/control/sequential.py
T
Johnny Fernandes dd5ac669e5 Webots sim-to-real fixes, DAgger pipeline, 360° proto variant
Today's session worked across the full Webots delivery stack — found and
fixed a cluster of bugs blocking the BC/RL transfer, then explored
training-side mitigations for the residual perception gap.

Bug fixes:
- Makefile FP_RATE default 2.0 → 0.0: BC demos used fp_rate=0 but RL
  fine-tune defaulted to fp_rate=2, poisoning the BC obs distribution
  and stalling PPO at 0% success across 1.46M+ steps.
- controllers/{shepherd_dog,sheep}/runtime.ini: Webots was launching
  controllers under system python3 (no numpy) and they were crashing
  silently. Pinned to the conda tir env.
- herding/config.py HERDING_WEBOTS preset: pen_latch_depth 0.5 → 2.0,
  max_new_tracks_per_step 3 → 1, static_reject 0.8 → 1.2. Stops phantom
  FPs near the gate from latching as permanently-penned tracks.
- herding/perception/sheep_tracker.py: penned tracks now decay at
  forget_steps × 8 instead of living forever. Adds get_positions
  min_freshness filter for deploy-time use.

Training/eval matches deployment:
- training/bc/collect.py: --dagger-policy flag for DAgger rollouts
  (policy drives, teacher labels) + --use-webots-preset for matched
  140° tracker + DR config.
- controllers/shepherd_dog/shepherd_dog.py: scan-fallback (0, 0.6) when
  BC/RL sees empty sheep_positions — recovers from FOV gaps.

Tooling:
- tools/dagger_round.sh: one-shot DAgger round (collect + concat + bc).
- tools/webots_sweep_gt.sh: full sweep with HERDING_USE_GT=1 for the
  perception-gap diagnosis matrix.
- protos/ShepherdDog360.proto: 360° FOV variant for the FOV-ablation
  comparison. Canonical proto stays at 140° per project spec.

Artifacts: v1 BC/RL policies for all 4 (drive × world) combos trained
in clean gym (success: diff/field 90-100%, diff/round 58%, mec/field
60-100%, mec/round 50-100%). DAgger r1/r2 BCs for diff/field show
12%→38% progression on gym HERDING_WEBOTS proxy but did not close
to actual Webots LiDAR (0/5 throughout). Next: LSTM policy or
learned tracker per the project-state memory.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 17:21:02 +00:00

128 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Adaptive sequential shepherd-dog controller.
Three-phase strategy:
1. **Collect** (flock scattered): Strömbom collect — park behind the
furthest sheep and push it toward the CoM. Identical to the
Strömbom heuristic; keeps the flock together.
2. **Drive** (flock compact, >STRAGGLER_THRESHOLD active): Strömbom
drive — park behind the CoM relative to the pen and push the whole
group through the gate.
3. **Targeted** (≤STRAGGLER_THRESHOLD sheep remain active): single-
target push on the sheep closest to the pen entry. Safe to isolate
individual sheep once the flock is nearly exhausted.
The original pure pin-and-push (Phase 3 only) caused flock scatter in
Webots physics whenever the dog tried to isolate a sheep while others
were still spread across the field. Phases 12 handle the bulk of
herding with flock-aware Strömbom logic; Phase 3 cleans up stragglers.
"""
import math
from herding.world.geometry import GATE_Y, PEN_ENTRY, in_pen
F_FACTOR = 4.0 # collect/drive threshold: radius > F_FACTOR·√n
DELTA_COLLECT = 1.5 # standoff behind the furthest sheep (collect)
DELTA_DRIVE = 2.0 # standoff behind CoM (drive)
DELTA_TARGET = 1.5 # standoff behind single target sheep (targeted)
STRAGGLER_THRESHOLD = 2 # switch to targeted push when ≤ this many active
def _unit(x: float, y: float):
d = math.hypot(x, y)
if d < 1e-6:
return 0.0, 0.0
return x / d, y / d
def _is_active(x: float, y: float) -> bool:
return (not in_pen(x, y)) and y > GATE_Y
def compute_action(dog_xy, sheep_positions, pen_target=PEN_ENTRY):
"""Return ``(vx, vy, mode)`` — same signature as Strömbom."""
active = [(x, y) for (x, y) in sheep_positions.values() if _is_active(x, y)]
if not active:
return 0.0, 0.0, "idle"
n = len(active)
com_x = sum(p[0] for p in active) / n
com_y = sum(p[1] for p in active) / n
dists = [math.hypot(p[0] - com_x, p[1] - com_y) for p in active]
radius = max(dists)
if n <= STRAGGLER_THRESHOLD:
# Targeted: push the sheep closest to the pen entry individually.
sx, sy = min(active,
key=lambda p: math.hypot(p[0] - pen_target[0],
p[1] - pen_target[1]))
ux, uy = _unit(sx - pen_target[0], sy - pen_target[1])
tx, ty = sx + DELTA_TARGET * ux, sy + DELTA_TARGET * uy
mode = "targeted"
elif radius > F_FACTOR * math.sqrt(n):
# Collect: aim behind the furthest sheep from the CoM.
idx = max(range(n), key=lambda i: dists[i])
sx, sy = active[idx]
ux, uy = _unit(sx - com_x, sy - com_y)
tx, ty = sx + DELTA_COLLECT * ux, sy + DELTA_COLLECT * uy
mode = "collect"
else:
# Drive: push the whole compact flock toward the gate.
ux, uy = _unit(com_x - pen_target[0], com_y - pen_target[1])
tx, ty = com_x + DELTA_DRIVE * ux, com_y + DELTA_DRIVE * uy
mode = "drive"
ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1])
return ax, ay, mode
def compute_action_debug(dog_xy, sheep_positions, pen_target=PEN_ENTRY):
"""``compute_action`` plus a debug dict."""
active = [(x, y) for (x, y) in sheep_positions.values() if _is_active(x, y)]
if not active:
return 0.0, 0.0, "idle", {
"n_active": 0, "phase": "idle", "radius": 0.0, "threshold": 0.0,
"com_x": 0.0, "com_y": 0.0,
"target_x": dog_xy[0], "target_y": dog_xy[1],
}
n = len(active)
com_x = sum(p[0] for p in active) / n
com_y = sum(p[1] for p in active) / n
dists = [math.hypot(p[0] - com_x, p[1] - com_y) for p in active]
radius = max(dists)
threshold = F_FACTOR * math.sqrt(n)
if n <= STRAGGLER_THRESHOLD:
sx, sy = min(active,
key=lambda p: math.hypot(p[0] - pen_target[0],
p[1] - pen_target[1]))
ux, uy = _unit(sx - pen_target[0], sy - pen_target[1])
tx, ty = sx + DELTA_TARGET * ux, sy + DELTA_TARGET * uy
mode = "targeted"
elif radius > threshold:
idx = max(range(n), key=lambda i: dists[i])
sx, sy = active[idx]
ux, uy = _unit(sx - com_x, sy - com_y)
tx, ty = sx + DELTA_COLLECT * ux, sy + DELTA_COLLECT * uy
mode = "collect"
else:
ux, uy = _unit(com_x - pen_target[0], com_y - pen_target[1])
tx, ty = com_x + DELTA_DRIVE * ux, com_y + DELTA_DRIVE * uy
mode = "drive"
ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1])
return ax, ay, mode, {
"n_active": n, "phase": mode, "radius": radius, "threshold": threshold,
"com_x": com_x, "com_y": com_y,
"target_x": tx, "target_y": ty,
}