1c197e0ff7
Two changes that together raise diff/round gym success ~52%→88% (BC)
and ~68%→88% (RL) without retraining; diff/field stays at 100%.
* TrackerConfig.consensus_k default 1 → 3 (radius 0.5 m, max_age 15
frames). The same candidate-promotion mechanism that closed the
Webots LiDAR gap also filters gym tracker phantoms — they show up
on the round field where sheep run further between detection
cycles than GATE_M, so each new position spawns a fresh track
while the stale one persists in memory. SheepTracker() called with
no tracker_cfg keeps the legacy pass-through behaviour for
backwards compatibility.
* Strömbom + universal teachers now detect when the natural
"behind the flock" drive target leaves the curved boundary and
fall back to pushing the flock radially inward toward the centre.
Breaks the wall-circling pattern that previously trapped both the
analytical baselines and the trained policies.
A/B numbers (n_sheep ∈ {1,2,3,5,10}, 5 seeds each, max_steps=15000):
diff/field bc: baseline 100% consensus 100%
diff/field rl: baseline 100% consensus 100%
diff/round bc: baseline 52% consensus 88%
diff/round rl: baseline 68% consensus 88%
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
224 lines
8.4 KiB
Python
224 lines
8.4 KiB
Python
"""Universal shepherd teacher — Strömbom core + mecanum omega + straggler recovery.
|
|
|
|
The core collect/drive logic is **identical** to :mod:`strombom` (same
|
|
``F_FACTOR``, ``DELTA_COLLECT``, ``DELTA_DRIVE`` thresholds and target
|
|
computation) so it inherits the proven ~100 % success rate at n ≤ 8.
|
|
Two additions make it useful as a universal teacher:
|
|
|
|
1. **Omega for mecanum.** When ``drive_mode="mecanum"``, the teacher
|
|
outputs a non-zero ``omega`` channel so the dog **faces the
|
|
direction of travel**. During collect the dog faces the target
|
|
sheep; during drive it faces the pen. This gives the BC student a
|
|
real rotation signal to learn from.
|
|
|
|
2. **Last-straggler recovery.** When exactly one sheep remains active
|
|
and it is near the gate, the dog positions itself behind that
|
|
straggler (opposite the gate) and pushes it straight through. This
|
|
handles the edge case where the last sheep circles the gate posts.
|
|
|
|
Call signature::
|
|
|
|
vx, vy, omega, mode = compute_action(
|
|
dog_xy, dog_heading, sheep_positions, pen_target,
|
|
drive_mode="differential",
|
|
)
|
|
|
|
For differential drive ``omega`` is always 0.0 and can be ignored.
|
|
"""
|
|
|
|
import math
|
|
|
|
from herding.world.geometry import (
|
|
FIELD_ROUND_R, FIELD_SHAPE,
|
|
PEN_ENTRY, GATE_X, GATE_Y, in_pen,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tuning constants — match Strömbom exactly for proven success rates.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
F_FACTOR = 4.0 # collect/drive threshold scaled by √n
|
|
DELTA_COLLECT = 1.5 # standoff behind the furthest sheep
|
|
DELTA_DRIVE = 2.0 # standoff behind flock CoM
|
|
|
|
# Omega gain for mecanum (how strongly the dog turns to face target)
|
|
OMEGA_GAIN = 0.6
|
|
|
|
# Recovery: push small flocks (≤ RECOVERY_MAX_N) through the gate one
|
|
# sheep at a time. n=1 alone is not enough — at n=2..3 on the round
|
|
# field the flock is too small to self-cohere through the 3 m gate but
|
|
# the standard collect/drive standoff just orbits them. Push the sheep
|
|
# nearest the gate first; once it pens, the rule re-applies to the next.
|
|
RECOVERY_MAX_N = 3
|
|
RECOVERY_GATE_DIST = 8.0 # only when target sheep is this close to gate
|
|
RECOVERY_PUSH_DIST = 1.2 # stand-off behind sheep, away from gate
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _unit(x, y):
|
|
d = math.hypot(x, y)
|
|
if d < 1e-6:
|
|
return 0.0, 0.0
|
|
return x / d, y / d
|
|
|
|
|
|
def _is_active(x, y) -> bool:
|
|
return (not in_pen(x, y)) and y > GATE_Y
|
|
|
|
|
|
def _angle_diff(a, b):
|
|
"""Signed shortest angular difference a - b, in [-π, π]."""
|
|
return math.atan2(math.sin(a - b), math.cos(a - b))
|
|
|
|
|
|
def _gate_center():
|
|
"""Centre of the gate opening."""
|
|
return (0.5 * (GATE_X[0] + GATE_X[1]), GATE_Y)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Core teacher
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def compute_action(dog_xy, dog_heading, sheep_positions,
|
|
pen_target=PEN_ENTRY, drive_mode="differential"):
|
|
"""Return ``(vx, vy, omega, mode)``.
|
|
|
|
Parameters
|
|
----------
|
|
dog_xy : (float, float)
|
|
Dog position in world frame.
|
|
dog_heading : float
|
|
Dog heading in world frame (rad), 0 = +x axis.
|
|
sheep_positions : dict[str, (float, float)]
|
|
Visible sheep positions.
|
|
pen_target : (float, float)
|
|
Centre of the pen gate (defaults to geometry.PEN_ENTRY).
|
|
drive_mode : str
|
|
``"differential"`` or ``"mecanum"``.
|
|
|
|
Returns
|
|
-------
|
|
vx, vy : float
|
|
Velocity intent in [-1, 1].
|
|
omega : float
|
|
Yaw intent in [-1, 1] (0 for differential).
|
|
mode : str
|
|
Phase label: ``"idle"``, ``"collect"``, ``"drive"``, ``"recovery"``.
|
|
"""
|
|
active = [(x, y) for (x, y) in sheep_positions.values()
|
|
if _is_active(x, y)]
|
|
if not active:
|
|
return 0.0, 0.0, 0.0, "idle"
|
|
|
|
n = len(active)
|
|
com_x = sum(p[0] for p in active) / n
|
|
com_y = sum(p[1] for p in active) / n
|
|
dists = [math.hypot(p[0] - com_x, p[1] - com_y) for p in active]
|
|
radius = max(dists)
|
|
|
|
# ---- Small-flock recovery (push sheep through the gate one by one) ----
|
|
# Triggers when the active flock is small (≤ RECOVERY_MAX_N) and the
|
|
# sheep nearest the gate is close enough that direct pushing works.
|
|
# For larger flocks the standard collect/drive logic handles them.
|
|
gc = _gate_center()
|
|
if n <= RECOVERY_MAX_N:
|
|
# Pick the sheep closest to the gate as the recovery target —
|
|
# finishing that one first reduces the active count and lets the
|
|
# remaining sheep get their own recovery turn.
|
|
gate_dists = [math.hypot(p[0] - gc[0], p[1] - gc[1]) for p in active]
|
|
target_idx = min(range(n), key=lambda i: gate_dists[i])
|
|
sx, sy = active[target_idx]
|
|
d_to_gate = gate_dists[target_idx]
|
|
if d_to_gate < RECOVERY_GATE_DIST:
|
|
dx_g = sx - gc[0]
|
|
dy_g = sy - gc[1]
|
|
d_g = math.hypot(dx_g, dy_g)
|
|
if d_g > 0.3:
|
|
ux, uy = dx_g / d_g, dy_g / d_g
|
|
else:
|
|
ux, uy = 0.0, 1.0
|
|
tx = sx + RECOVERY_PUSH_DIST * ux
|
|
ty = sy + RECOVERY_PUSH_DIST * uy
|
|
ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1])
|
|
mode = "recovery"
|
|
face_target = (sx, sy)
|
|
omega = 0.0
|
|
if drive_mode == "mecanum":
|
|
desired = math.atan2(
|
|
face_target[1] - dog_xy[1],
|
|
face_target[0] - dog_xy[0],
|
|
)
|
|
err = _angle_diff(desired, dog_heading)
|
|
omega = max(-1.0, min(1.0, OMEGA_GAIN * err / math.pi))
|
|
return ax, ay, omega, mode
|
|
|
|
# ---- Standard Strömbom collect/drive (proven core) ----
|
|
if radius > F_FACTOR * math.sqrt(n):
|
|
# Collect: aim behind the furthest sheep, opposite the CoM.
|
|
idx = max(range(n), key=lambda i: dists[i])
|
|
sx, sy = active[idx]
|
|
ux, uy = _unit(sx - com_x, sy - com_y)
|
|
tx, ty = sx + DELTA_COLLECT * ux, sy + DELTA_COLLECT * uy
|
|
mode = "collect"
|
|
face_target = (sx, sy)
|
|
else:
|
|
# Drive: aim behind the CoM, opposite the pen.
|
|
ux, uy = _unit(com_x - pen_target[0], com_y - pen_target[1])
|
|
tx, ty = com_x + DELTA_DRIVE * ux, com_y + DELTA_DRIVE * uy
|
|
mode = "drive"
|
|
face_target = pen_target
|
|
|
|
# On the round field the natural "behind the flock" point can fall
|
|
# outside the curved wall when the flock CoM is itself close to the
|
|
# wall. The dog tries to reach an unreachable target, ends up
|
|
# tangent to the wall, and the flock circles indefinitely.
|
|
# Fix: when the natural target leaves the field, fall back to
|
|
# pushing the flock radially inward toward the centre — break the
|
|
# wall-circle pattern, then resume normal pen-direction drive once
|
|
# the flock is back in the interior.
|
|
if FIELD_SHAPE == "field_round" and mode == "drive":
|
|
if math.hypot(tx, ty) > FIELD_ROUND_R - 1.0:
|
|
r_com = math.hypot(com_x, com_y)
|
|
if r_com > 1e-3:
|
|
ux2, uy2 = com_x / r_com, com_y / r_com
|
|
tx = com_x + DELTA_DRIVE * ux2
|
|
ty = com_y + DELTA_DRIVE * uy2
|
|
# Clamp to inside-field radius so the dog target is reachable.
|
|
r_t = math.hypot(tx, ty)
|
|
if r_t > FIELD_ROUND_R - 1.0:
|
|
scale = (FIELD_ROUND_R - 1.0) / r_t
|
|
tx *= scale
|
|
ty *= scale
|
|
|
|
ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1])
|
|
|
|
# ---- Omega (mecanum only) ----
|
|
omega = 0.0
|
|
if drive_mode == "mecanum" and mode != "idle":
|
|
desired_heading = math.atan2(
|
|
face_target[1] - dog_xy[1],
|
|
face_target[0] - dog_xy[0],
|
|
)
|
|
err = _angle_diff(desired_heading, dog_heading)
|
|
omega = max(-1.0, min(1.0, OMEGA_GAIN * err / math.pi))
|
|
|
|
return ax, ay, omega, mode
|
|
|
|
|
|
def compute_action_diff(dog_xy, dog_heading, sheep_positions,
|
|
pen_target=PEN_ENTRY):
|
|
"""Compatibility wrapper returning ``(vx, vy, mode)`` — same as Strömbom.
|
|
|
|
Use this when plugging into existing differential-drive code that
|
|
doesn't expect omega.
|
|
"""
|
|
vx, vy, _omega, mode = compute_action(
|
|
dog_xy, dog_heading, sheep_positions, pen_target,
|
|
drive_mode="differential",
|
|
)
|
|
return vx, vy, mode
|