Files
TIR_PROJ/herding/perception/sheep_tracker.py
T
Johnny Fernandes dd5ac669e5 Webots sim-to-real fixes, DAgger pipeline, 360° proto variant
Today's session worked across the full Webots delivery stack — found and
fixed a cluster of bugs blocking the BC/RL transfer, then explored
training-side mitigations for the residual perception gap.

Bug fixes:
- Makefile FP_RATE default 2.0 → 0.0: BC demos used fp_rate=0 but RL
  fine-tune defaulted to fp_rate=2, poisoning the BC obs distribution
  and stalling PPO at 0% success across 1.46M+ steps.
- controllers/{shepherd_dog,sheep}/runtime.ini: Webots was launching
  controllers under system python3 (no numpy) and they were crashing
  silently. Pinned to the conda tir env.
- herding/config.py HERDING_WEBOTS preset: pen_latch_depth 0.5 → 2.0,
  max_new_tracks_per_step 3 → 1, static_reject 0.8 → 1.2. Stops phantom
  FPs near the gate from latching as permanently-penned tracks.
- herding/perception/sheep_tracker.py: penned tracks now decay at
  forget_steps × 8 instead of living forever. Adds get_positions
  min_freshness filter for deploy-time use.

Training/eval matches deployment:
- training/bc/collect.py: --dagger-policy flag for DAgger rollouts
  (policy drives, teacher labels) + --use-webots-preset for matched
  140° tracker + DR config.
- controllers/shepherd_dog/shepherd_dog.py: scan-fallback (0, 0.6) when
  BC/RL sees empty sheep_positions — recovers from FOV gaps.

Tooling:
- tools/dagger_round.sh: one-shot DAgger round (collect + concat + bc).
- tools/webots_sweep_gt.sh: full sweep with HERDING_USE_GT=1 for the
  perception-gap diagnosis matrix.
- protos/ShepherdDog360.proto: 360° FOV variant for the FOV-ablation
  comparison. Canonical proto stays at 140° per project spec.

Artifacts: v1 BC/RL policies for all 4 (drive × world) combos trained
in clean gym (success: diff/field 90-100%, diff/round 58%, mec/field
60-100%, mec/round 50-100%). DAgger r1/r2 BCs for diff/field show
12%→38% progression on gym HERDING_WEBOTS proxy but did not close
to actual Webots LiDAR (0/5 throughout). Next: LSTM policy or
learned tracker per the project-state memory.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 17:21:02 +00:00

308 lines
12 KiB
Python

"""Multi-target tracker for LiDAR-detected sheep.
Greedy nearest-neighbour data association across frames, with a wider
re-acquisition gate for stale tracks (sheep flee during occlusion and
reappear off-position), plus memory of last-seen positions for sheep
out of FOV. Output is ``{name: (x, y)}`` — Strömbom / Sequential
consume it directly.
When **predictive mode** is enabled (the default), tracks carry a
constant-velocity state ``(vx, vy)`` estimated from the last two
observations. While a track is occluded its position is extrapolated
using this velocity for up to ``PREDICT_STEPS`` frames, keeping the
teacher's CoM estimate stable during brief losses. After prediction
expires, the track falls back to its last-seen position (static memory)
until ``FORGET_STEPS`` deletes it entirely.
A track is marked penned once its estimated position crosses the gate
plane south (``is_penned_position``). Penned tracks are excluded from
``get_positions`` and kept indefinitely.
"""
from __future__ import annotations
import math
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from herding.config import TrackerConfig
from herding.world.geometry import MAX_SHEEP, in_pen, is_penned_position
GATE_M = 2.5 # m — primary NN gate (recently observed tracks)
REACQUIRE_GATE_M = 4.5 # m — wider gate for re-binding stale tracks
REACQUIRE_MIN_AGE = 20 # steps — track must be this stale to use the wider gate
PENNED_GATE_M = 4.0 # m — gate for matching detections to existing penned tracks
FORGET_STEPS = 200 # ~3.2 s — delete stale active tracks (penned ones kept forever)
MAX_ACTIVE_TRACKS = MAX_SHEEP
# Predictive tracking constants.
PREDICT_STEPS = 120 # ~1.9 s — extrapolate velocity this many frames
VELOCITY_CLAMP = 1.0 # m/s — max predicted speed (sheep max is ~0.78 m/s)
class Track:
"""Single track with position, velocity, and age."""
__slots__ = ("x", "y", "vx", "vy", "last_seen", "penned")
def __init__(self, x: float, y: float, step: int, penned: bool = False):
self.x = x
self.y = y
self.vx = 0.0
self.vy = 0.0
self.last_seen = step
self.penned = penned
@property
def age(self) -> int:
"""Not-a-property in the hot loop — callers pass current step."""
raise NotImplementedError
def predicted_position(
self,
current_step: int,
predict_steps: int = PREDICT_STEPS,
velocity_clamp: float = VELOCITY_CLAMP,
) -> tuple[float, float]:
"""Extrapolated position using constant velocity, clamped."""
dt = current_step - self.last_seen
if dt <= 0 or dt > predict_steps:
return self.x, self.y
speed = math.hypot(self.vx, self.vy)
if speed < 1e-4:
return self.x, self.y
# Clamp extrapolation distance.
max_d = velocity_clamp * dt * 0.016 # steps → seconds
d = min(speed * dt * 0.016, max_d)
return (
self.x + d * (self.vx / speed),
self.y + d * (self.vy / speed),
)
def update(self, x: float, y: float, step: int) -> None:
"""Absorb a new detection and re-estimate velocity."""
dt = step - self.last_seen
if dt > 0:
dt_s = dt * 0.016 # steps → seconds
new_vx = (x - self.x) / dt_s
new_vy = (y - self.y) / dt_s
# Exponential smoothing on velocity.
alpha = 0.6
self.vx = alpha * new_vx + (1.0 - alpha) * self.vx
self.vy = alpha * new_vy + (1.0 - alpha) * self.vy
self.x = x
self.y = y
self.last_seen = step
class SheepTracker:
"""Online tracker with NN association, prediction, and forgetful memory.
Each track is a :class:`Track` with position, velocity estimate,
last-seen step, and penned flag.
Pass a :class:`~herding.config.TrackerConfig` to override any
module-level defaults without changing this file.
"""
def __init__(
self,
gate: float = GATE_M,
tracker_cfg: "TrackerConfig | None" = None,
):
if tracker_cfg is not None:
self.gate = tracker_cfg.gate_m
self._reacquire_gate = tracker_cfg.reacquire_gate_m
self._reacquire_min_age = tracker_cfg.reacquire_min_age
self._penned_gate = tracker_cfg.penned_gate_m
self._forget_steps = tracker_cfg.forget_steps
self._predict_steps = tracker_cfg.predict_steps
self._velocity_clamp = tracker_cfg.velocity_clamp
self._max_new_per_step = tracker_cfg.max_new_tracks_per_step
self._pen_latch_depth = tracker_cfg.pen_latch_depth
else:
self.gate = gate
self._reacquire_gate = REACQUIRE_GATE_M
self._reacquire_min_age = REACQUIRE_MIN_AGE
self._penned_gate = PENNED_GATE_M
self._forget_steps = FORGET_STEPS
self._predict_steps = PREDICT_STEPS
self._velocity_clamp = VELOCITY_CLAMP
self._max_new_per_step = MAX_ACTIVE_TRACKS
self._pen_latch_depth = 0.0
self._tracks: dict[int, Track] = {}
self._next_id = 0
self.step = 0
def reset(self) -> None:
self._tracks.clear()
self._next_id = 0
self.step = 0
def update(self, detections: list[tuple[float, float]]) -> dict[str, tuple[float, float]]:
"""Fold a new set of detections in and return active positions."""
self.step += 1
det_used: set[int] = set()
updated_tids: set[int] = set()
# Pass 1 — match active tracks within the primary gate.
# Use predicted positions for matching, oldest-first.
active_tids = [tid for tid, t in self._tracks.items() if not t.penned]
active_tids.sort(key=lambda tid: self._tracks[tid].last_seen)
for tid in active_tids:
track = self._tracks[tid]
tx, ty = track.predicted_position(
self.step, self._predict_steps, self._velocity_clamp)
best_j, best_d = -1, self.gate
for j, (dx, dy) in enumerate(detections):
if j in det_used:
continue
d = math.hypot(dx - tx, dy - ty)
if d < best_d:
best_d = d
best_j = j
if best_j >= 0:
dx, dy = detections[best_j]
track.update(dx, dy, self.step)
det_used.add(best_j)
updated_tids.add(tid)
# Pass 1b — re-acquisition with wider gate for stale tracks.
for tid in active_tids:
if tid in updated_tids:
continue
track = self._tracks[tid]
if (self.step - track.last_seen) < self._reacquire_min_age:
continue
tx, ty = track.predicted_position(
self.step, self._predict_steps, self._velocity_clamp)
best_j, best_d = -1, self._reacquire_gate
for j, (dx, dy) in enumerate(detections):
if j in det_used:
continue
d = math.hypot(dx - tx, dy - ty)
if d < best_d:
best_d = d
best_j = j
if best_j >= 0:
dx, dy = detections[best_j]
track.update(dx, dy, self.step)
det_used.add(best_j)
updated_tids.add(tid)
# Pass 2 — match remaining detections to penned tracks.
penned_tids = [tid for tid, t in self._tracks.items() if t.penned]
for tid in penned_tids:
track = self._tracks[tid]
best_j, best_d = -1, self._penned_gate
for j, (dx, dy) in enumerate(detections):
if j in det_used:
continue
d = math.hypot(dx - track.x, dy - track.y)
if d < best_d:
best_d = d
best_j = j
if best_j >= 0:
dx, dy = detections[best_j]
track.update(dx, dy, self.step)
det_used.add(best_j)
# Spawn new tracks for unmatched detections — rate-capped.
spawned = 0
for j, (dx, dy) in enumerate(detections):
if j in det_used:
continue
if spawned >= self._max_new_per_step:
break
penned = self._is_penned(dx, dy)
self._tracks[self._next_id] = Track(dx, dy, self.step, penned)
self._next_id += 1
spawned += 1
# Promote active tracks whose current estimate crosses the gate.
for track in self._tracks.values():
if track.penned:
continue
px, py = track.predicted_position(
self.step, self._predict_steps, self._velocity_clamp)
if self._is_penned(px, py):
track.penned = True
# Forget stale active tracks; penned tracks decay too but at a
# longer horizon (real penned sheep are still observed occasionally
# when the dog faces south; pure FPs at gate posts stop being
# detected once the dog drives away).
penned_forget = self._forget_steps * 8
stale = [tid for tid, t in self._tracks.items()
if (not t.penned and (self.step - t.last_seen) > self._forget_steps)
or (t.penned and (self.step - t.last_seen) > penned_forget)]
for tid in stale:
del self._tracks[tid]
# Hard cap on the active set — drop the oldest-seen overflow.
active = [(tid, t.last_seen) for tid, t in self._tracks.items()
if not t.penned]
if len(active) > MAX_ACTIVE_TRACKS:
active.sort(key=lambda kv: kv[1])
for tid, _ in active[: len(active) - MAX_ACTIVE_TRACKS]:
del self._tracks[tid]
return self.get_positions()
def _is_penned(self, x: float, y: float) -> bool:
"""Check whether a position should be considered penned.
Uses ``pen_latch_depth`` to require the position to be that many
metres past the gate line before latching. Increasing the depth
prevents gate-area LiDAR false positives (gate hardware reflections
at y ≈ -15) from being permanently latched as penned tracks.
"""
from herding.world.geometry import GATE_Y
# Apply depth threshold to both in_pen and is_penned_position so
# that any position in the gate column must clear GATE_Y - depth.
threshold = GATE_Y - self._pen_latch_depth
return (in_pen(x, y) or is_penned_position(x, y)) and y <= threshold
def get_positions(self, min_freshness: int | None = None) -> dict[str, tuple[float, float]]:
"""Active (not-penned) tracks as a ``{name: (x, y)}`` dict.
For tracks currently being predicted (occluded but within
predict_steps), returns the extrapolated position so the teacher
sees a smooth estimate.
``min_freshness`` (optional, deploy-only): drop tracks whose
last_seen is older than ``step - min_freshness``. Real sheep in
FOV are detected nearly every step; phantom tracks from sporadic
Webots FPs stop being re-observed and decay. Default ``None``
preserves training behaviour (extrapolated tracks visible).
"""
result = {}
for tid, track in self._tracks.items():
if track.penned:
continue
if (min_freshness is not None
and self.step - track.last_seen > min_freshness):
continue
px, py = track.predicted_position(
self.step, self._predict_steps, self._velocity_clamp)
result[f"t{tid}"] = (px, py)
return result
def get_penned_set(self) -> set[str]:
return {f"t{tid}" for tid, t in self._tracks.items() if t.penned}
def n_active(self) -> int:
return sum(1 for t in self._tracks.values() if not t.penned)
def n_penned(self) -> int:
return sum(1 for t in self._tracks.values() if t.penned)
def n_predicted(self) -> int:
"""Number of active tracks currently being extrapolated (not directly observed)."""
return sum(1 for t in self._tracks.values()
if not t.penned and (self.step - t.last_seen) > 0
and (self.step - t.last_seen) <= self._predict_steps)