7ab69ab0f3
Naming pass: rename functions whose third+ segment is redundant or implementation-detail, sticking to the codebase's preferred ``noun_verb`` / ``verb_noun`` two-concept idiom. Renames are atomic across definitions, callers, and tests. is_penned_position → is_penned modulate_speed_near_sheep → modulate_speed mecanum_kinematics_step → mecanum_step policy_forward_mean → forward_mean Two-concept patterns like ``velocity_to_wheels`` / ``detections_from_scan`` / ``make_strombom_predictor`` are left alone — they're idiomatic converters / factories that read as a single concept, and the longer form aids grep-ability. Docstring polish: * ``herding/config.py`` header drops the "previously lived as a module-level literal" historical framing — we ship as a single thing, so the refactor anecdote no longer earns its keep. The usage examples now mention both ``HERDING_WEBOTS`` and ``HERDING_MEC_WEBOTS`` presets. 126 pytest cases still pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
123 lines
4.4 KiB
Python
123 lines
4.4 KiB
Python
"""Active-perception wrapper for the analytic shepherd teachers.
|
|
|
|
Under partial-observability LiDAR perception the tracker starts empty
|
|
— a naive analytic teacher returns ``(0, 0, "idle")`` and the dog
|
|
stops. This wrapper interleaves the underlying teacher with two
|
|
exploration behaviours:
|
|
|
|
* opening in-place rotation for the first ``INITIAL_SCAN_STEPS``,
|
|
guaranteeing the LiDAR sweeps a full circle before driving;
|
|
* walk-to-centre when the tracker has been empty for at least
|
|
``EMPTY_DEBOUNCE_STEPS`` consecutive frames (corners can sit
|
|
beyond the 12 m LiDAR range).
|
|
|
|
When the tracker has detections the base teacher's action is used,
|
|
post-processed by ``modulate_speed`` so the dog doesn't
|
|
charge the flock.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
|
|
from herding.control.modulation import modulate_speed
|
|
|
|
|
|
INITIAL_SCAN_STEPS = 80 # ≈1.3 s — covers one full rotation
|
|
EXPLORE_SPEED = 0.7 # action norm while walking blind
|
|
EMPTY_DEBOUNCE_STEPS = 8 # consecutive empty frames before exploring
|
|
|
|
|
|
class ActiveScanTeacher:
|
|
"""Stateful wrapper. Construct one per episode (or call ``reset``).
|
|
|
|
Call signature::
|
|
|
|
vx, vy, omega, mode = teacher(dog_xy, dog_heading, sheep_positions,
|
|
pen_target, drive_mode="differential")
|
|
|
|
``omega`` is the yaw-rate intent (mecanum only); 0.0 for differential
|
|
drive and during blind exploration phases.
|
|
"""
|
|
|
|
def __init__(self, base_action_fn, initial_scan_steps: int = INITIAL_SCAN_STEPS):
|
|
self.base = base_action_fn
|
|
self.initial_scan = int(initial_scan_steps)
|
|
self.reset()
|
|
|
|
def reset(self) -> None:
|
|
self.step = 0
|
|
self.empty_streak = 0
|
|
self.last_action: tuple[float, float] = (0.0, 0.0)
|
|
|
|
@staticmethod
|
|
def _scan_action(dog_heading: float) -> tuple[float, float]:
|
|
# Target opposite to current heading; velocity_to_wheels'
|
|
# cos(err) clamp drives forward speed to ~0 → in-place rotation.
|
|
target = dog_heading + math.pi
|
|
return math.cos(target), math.sin(target)
|
|
|
|
@staticmethod
|
|
def _explore_action(dog_xy) -> tuple[float, float]:
|
|
"""Walk toward (0, 0) while the LiDAR keeps sweeping."""
|
|
dx, dy = -dog_xy[0], -dog_xy[1]
|
|
d = math.hypot(dx, dy)
|
|
if d < 0.5:
|
|
return 0.0, 0.0
|
|
return EXPLORE_SPEED * dx / d, EXPLORE_SPEED * dy / d
|
|
|
|
def __call__(self, dog_xy, dog_heading, sheep_positions, pen_target,
|
|
drive_mode="differential"):
|
|
self.step += 1
|
|
n_visible = len(sheep_positions)
|
|
|
|
if n_visible == 0:
|
|
self.empty_streak += 1
|
|
else:
|
|
self.empty_streak = 0
|
|
|
|
# Phase 1: opening rotation.
|
|
if self.step <= self.initial_scan:
|
|
vx, vy = self._scan_action(dog_heading)
|
|
self.last_action = (vx, vy)
|
|
return vx, vy, 0.0, "scan_initial"
|
|
|
|
# Phase 2: walk-to-centre after a sustained empty tracker.
|
|
if self.empty_streak >= EMPTY_DEBOUNCE_STEPS:
|
|
ex, ey = self._explore_action(dog_xy)
|
|
if ex == 0.0 and ey == 0.0:
|
|
vx, vy = self._scan_action(dog_heading)
|
|
mode = "scan_at_centre"
|
|
else:
|
|
vx, vy = ex, ey
|
|
mode = "explore"
|
|
self.last_action = (vx, vy)
|
|
return vx, vy, 0.0, mode
|
|
|
|
# Phase 2b: brief tracker blink — hold the previous action.
|
|
if n_visible == 0:
|
|
vx, vy = self.last_action
|
|
return vx, vy, 0.0, "hold"
|
|
|
|
# Phase 3: hand off to the underlying analytic teacher, then
|
|
# apply the shared near-sheep speed modulation.
|
|
# Handle both old-style (dog_xy, sheep, pen) and new-style
|
|
# (dog_xy, heading, sheep, pen, drive_mode) teachers.
|
|
try:
|
|
result = self.base(dog_xy, dog_heading, sheep_positions,
|
|
pen_target, drive_mode)
|
|
except TypeError:
|
|
try:
|
|
result = self.base(dog_xy, dog_heading, sheep_positions,
|
|
pen_target)
|
|
except TypeError:
|
|
result = self.base(dog_xy, sheep_positions, pen_target)
|
|
if len(result) == 4:
|
|
vx, vy, omega, mode = result
|
|
else:
|
|
vx, vy, mode = result
|
|
omega = 0.0
|
|
vx, vy = modulate_speed(vx, vy, dog_xy, sheep_positions)
|
|
self.last_action = (vx, vy)
|
|
return vx, vy, omega, mode
|