7ab69ab0f3
Naming pass: rename functions whose third+ segment is redundant or implementation-detail, sticking to the codebase's preferred ``noun_verb`` / ``verb_noun`` two-concept idiom. Renames are atomic across definitions, callers, and tests. is_penned_position → is_penned modulate_speed_near_sheep → modulate_speed mecanum_kinematics_step → mecanum_step policy_forward_mean → forward_mean Two-concept patterns like ``velocity_to_wheels`` / ``detections_from_scan`` / ``make_strombom_predictor`` are left alone — they're idiomatic converters / factories that read as a single concept, and the longer form aids grep-ability. Docstring polish: * ``herding/config.py`` header drops the "previously lived as a module-level literal" historical framing — we ship as a single thing, so the refactor anecdote no longer earns its keep. The usage examples now mention both ``HERDING_WEBOTS`` and ``HERDING_MEC_WEBOTS`` presets. 126 pytest cases still pass. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
439 lines
16 KiB
Python
439 lines
16 KiB
Python
"""Central configuration dataclasses for the herding simulation.
|
||
|
||
Every tunable parameter lives here as a frozen dataclass field — LiDAR
|
||
spec, cluster detection thresholds, tracker gates, robot kinematics,
|
||
and domain-randomisation knobs — composed into :class:`HerdingConfig`.
|
||
|
||
Usage — accept the defaults::
|
||
|
||
env = HerdingEnv()
|
||
|
||
Override a subset::
|
||
|
||
cfg = HerdingConfig(tracker=TrackerConfig(forget_steps=60))
|
||
env = HerdingEnv(herding_cfg=cfg)
|
||
|
||
Use a named preset::
|
||
|
||
env = HerdingEnv(herding_cfg=HERDING_WEBOTS) # 140° FOV
|
||
env = HerdingEnv(herding_cfg=HERDING_MEC_WEBOTS) # + mecanum slip
|
||
|
||
Design notes
|
||
------------
|
||
* All dataclasses are frozen so instances are immutable after construction.
|
||
* This module must not import from other ``herding.*`` packages —
|
||
field-geometry constants live in ``herding.world.geometry`` because
|
||
they depend on the world variant selected at runtime via
|
||
``HERDING_WORLD``, which would create an import cycle here.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
from dataclasses import dataclass, field, replace
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# LiDAR hardware spec
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class LidarConfig:
|
||
"""Parameters of the simulated / physical LiDAR sensor.
|
||
|
||
The two canonical presets are :data:`LIDAR_FULL` (360°, oracle mode)
|
||
and :data:`LIDAR_WEBOTS` (140°/180-ray, matches the ShepherdDog proto).
|
||
"""
|
||
|
||
n_rays: int = 360
|
||
"""Number of rays in the scan."""
|
||
|
||
fov_rad: float = 2.0 * math.pi
|
||
"""Full field-of-view in radians, centred on the robot's forward axis."""
|
||
|
||
max_range: float = 12.0
|
||
"""Maximum detectable range in metres."""
|
||
|
||
noise_std: float = 0.005
|
||
"""Gaussian standard deviation (metres) applied to each hit reading."""
|
||
|
||
sheep_radius: float = 0.30
|
||
"""Effective disc radius of a sheep in the 2-D LiDAR plane (metres)."""
|
||
|
||
post_radius: float = 0.25
|
||
"""Effective disc radius of gate / corner posts (metres)."""
|
||
|
||
def __post_init__(self) -> None:
|
||
if self.n_rays < 1:
|
||
raise ValueError(f"n_rays must be ≥ 1, got {self.n_rays}")
|
||
if not (0.0 < self.fov_rad <= 2.0 * math.pi):
|
||
raise ValueError(f"fov_rad must be in (0, 2π], got {self.fov_rad:.4f}")
|
||
if self.max_range <= 0.0:
|
||
raise ValueError(f"max_range must be > 0, got {self.max_range}")
|
||
|
||
|
||
# Named presets -----------------------------------------------------------
|
||
|
||
LIDAR_FULL = LidarConfig(
|
||
n_rays=360,
|
||
fov_rad=2.0 * math.pi,
|
||
)
|
||
"""360° full-circle scan — oracle / ablation mode."""
|
||
|
||
LIDAR_WEBOTS = LidarConfig(
|
||
n_rays=180,
|
||
fov_rad=math.radians(140.0),
|
||
)
|
||
"""Matches the ShepherdDog.proto Lidar device (180 rays, 140° FOV).
|
||
|
||
Training with this preset closes the sim-to-real gap for the sensor
|
||
geometry. Because the observation is built from tracker output (not raw
|
||
rays), a policy trained here can be deployed on a wider-FOV LiDAR (e.g.
|
||
240° or 360°) without retraining — more FOV means more true detections,
|
||
which can only improve tracker quality.
|
||
"""
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cluster-detection pipeline
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class DetectionConfig:
|
||
"""Parameters for the LiDAR-scan → detection clustering pipeline."""
|
||
|
||
gap_threshold: float = 0.6
|
||
"""Adjacent hit-points farther apart than this (metres) start a new cluster."""
|
||
|
||
max_cluster_span: float = 1.5
|
||
"""Clusters wider than this (metres) are rejected as walls / structures."""
|
||
|
||
range_hit_eps: float = 0.05
|
||
"""A ray is considered a hit if ``range < max_range - range_hit_eps``."""
|
||
|
||
split_range_gap: float = 0.20
|
||
"""Range increase within a cluster that triggers a multi-peak split."""
|
||
|
||
wall_reject: float = 0.5
|
||
"""Drop detections within this distance (metres) of any field wall."""
|
||
|
||
static_reject: float = 0.8
|
||
"""Drop detections within this distance (metres) of known static features
|
||
(gate posts, field corners)."""
|
||
|
||
def __post_init__(self) -> None:
|
||
if self.wall_reject < 0.0:
|
||
raise ValueError(f"wall_reject must be ≥ 0, got {self.wall_reject}")
|
||
if self.static_reject < 0.0:
|
||
raise ValueError(f"static_reject must be ≥ 0, got {self.static_reject}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Multi-target tracker
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class TrackerConfig:
|
||
"""Parameters for the nearest-neighbour sheep tracker."""
|
||
|
||
gate_m: float = 2.5
|
||
"""Primary NN association gate in metres (recently observed tracks)."""
|
||
|
||
reacquire_gate_m: float = 4.5
|
||
"""Wider gate used when re-acquiring tracks stale for ≥ ``reacquire_min_age`` steps."""
|
||
|
||
reacquire_min_age: int = 20
|
||
"""Minimum staleness (steps) before the wider re-acquisition gate activates."""
|
||
|
||
penned_gate_m: float = 4.0
|
||
"""Gate for matching new detections to already-penned tracks."""
|
||
|
||
forget_steps: int = 200
|
||
"""Delete an active track that has not been observed for this many steps (~3.2 s)."""
|
||
|
||
predict_steps: int = 120
|
||
"""Extrapolate a track's position using constant velocity for this many steps (~1.9 s)."""
|
||
|
||
velocity_clamp: float = 1.0
|
||
"""Maximum predicted speed (m/s) used during extrapolation."""
|
||
|
||
max_new_tracks_per_step: int = 10
|
||
"""Maximum number of new tracks that may be spawned in a single step.
|
||
|
||
Capping this limits the damage from LiDAR false-positive bursts (e.g.
|
||
wall reflections in Webots) that would otherwise flood the track set.
|
||
The default (10 = MAX_SHEEP) preserves the original behaviour; reduce
|
||
to 2–3 for Webots deployment robustness.
|
||
"""
|
||
|
||
pen_latch_depth: float = 0.0
|
||
"""Minimum depth past the gate line (metres) before a track is latched
|
||
as penned. 0.0 = original behaviour (latch at y ≤ GATE_Y). Increase
|
||
to 0.5 for Webots to prevent gate-hardware LiDAR reflections near y=-15
|
||
from permanently consuming tracker slots as false "penned" sheep.
|
||
"""
|
||
|
||
consensus_k: int = 3
|
||
"""New tracks must accumulate this many matches before they appear in
|
||
``get_positions``. ``1`` disables the candidate stage entirely;
|
||
``3`` (default) requires three nearby confirmations within
|
||
``consensus_max_age`` and reliably filters single-shot detection
|
||
splits / out-of-range stragglers that confuse the policy on the
|
||
round field while real sheep promote in ~50 ms (3 frames).
|
||
"""
|
||
|
||
consensus_radius_m: float = 0.5
|
||
"""Maximum distance (metres) between successive matches for a candidate
|
||
to age toward promotion. Tighter than ``gate_m`` so wall-cluster
|
||
centroid jitter cannot keep a phantom alive. Real sheep move
|
||
≪ 0.05 m / step at max speed so this gate is very loose for them.
|
||
"""
|
||
|
||
consensus_max_age: int = 15
|
||
"""A candidate that has not been matched for this many steps is dropped.
|
||
Short enough that a one-shot phantom can't keep itself alive, long
|
||
enough that a real sheep glimpsed twice in a short interval
|
||
confirms.
|
||
"""
|
||
|
||
def __post_init__(self) -> None:
|
||
if self.forget_steps < 1:
|
||
raise ValueError(f"forget_steps must be ≥ 1, got {self.forget_steps}")
|
||
if self.max_new_tracks_per_step < 1:
|
||
raise ValueError(
|
||
f"max_new_tracks_per_step must be ≥ 1, got {self.max_new_tracks_per_step}"
|
||
)
|
||
if self.consensus_k < 1:
|
||
raise ValueError(f"consensus_k must be ≥ 1, got {self.consensus_k}")
|
||
if self.consensus_radius_m <= 0.0:
|
||
raise ValueError(
|
||
f"consensus_radius_m must be > 0, got {self.consensus_radius_m}"
|
||
)
|
||
if self.consensus_max_age < 1:
|
||
raise ValueError(
|
||
f"consensus_max_age must be ≥ 1, got {self.consensus_max_age}"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Robot physical specification
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class RobotConfig:
|
||
"""Physical parameters of the shepherd-dog robot.
|
||
|
||
Values mirror ``protos/ShepherdDog.proto`` and ``protos/ShepherdDogMecanum.proto``.
|
||
"""
|
||
|
||
wheel_radius: float = 0.038
|
||
"""Wheel radius in metres."""
|
||
|
||
wheel_base: float = 0.28
|
||
"""Axle-to-axle distance for differential drive (metres)."""
|
||
|
||
wheel_base_x: float = 0.28
|
||
"""Front-to-back axle distance for mecanum drive (metres)."""
|
||
|
||
wheel_base_y: float = 0.28
|
||
"""Left-to-right axle distance for mecanum drive (metres)."""
|
||
|
||
max_wheel_omega: float = 70.0
|
||
"""Maximum wheel angular velocity (rad/s)."""
|
||
|
||
action_smooth: float = 0.0
|
||
"""Exponential moving-average coefficient applied to actions inside the env.
|
||
|
||
``0.0`` means no smoothing (gym default).
|
||
``0.55`` matches the hard-coded EMA in ``shepherd_dog.py`` — use this
|
||
when training so the policy learns to act through the same filter it
|
||
sees at deployment.
|
||
"""
|
||
|
||
strafe_efficiency: float = 1.0
|
||
"""Mecanum strafe magnitude as a fraction of textbook X-pattern.
|
||
|
||
``1.0`` (default) = perfect mecanum kinematics. ``0.4`` matches the
|
||
Webots roller-hinge mecanum proto calibration (62% slip on strafe,
|
||
11% on forward). Used by ``mecanum_step`` only — has no
|
||
effect on differential drive.
|
||
"""
|
||
|
||
strafe_to_forward_bleed: float = 0.0
|
||
"""Fraction of ideal strafe magnitude that bleeds into body-frame x.
|
||
|
||
``0.0`` (default) = no bleed. ``-0.28`` matches the Webots proto's
|
||
consistent backward push under strafe commands. Used by
|
||
``mecanum_step`` only.
|
||
"""
|
||
|
||
def __post_init__(self) -> None:
|
||
if not (0.0 <= self.action_smooth < 1.0):
|
||
raise ValueError(
|
||
f"action_smooth must be in [0, 1), got {self.action_smooth}"
|
||
)
|
||
if not (0.0 < self.strafe_efficiency <= 1.0):
|
||
raise ValueError(
|
||
f"strafe_efficiency must be in (0, 1], got {self.strafe_efficiency}"
|
||
)
|
||
|
||
@property
|
||
def max_linear(self) -> float:
|
||
"""Maximum achievable linear speed (m/s)."""
|
||
return self.wheel_radius * self.max_wheel_omega
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Domain randomisation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class DomainRandomConfig:
|
||
"""Parameters that inject physics / sensor noise for domain randomisation.
|
||
|
||
All values default to 0 (disabled) so the base env is deterministic and
|
||
backwards-compatible. Enable them gradually to close the sim-to-real gap.
|
||
"""
|
||
|
||
fp_rate: float = 0.0
|
||
"""Mean number of false-positive detections injected per step (Poisson λ).
|
||
|
||
FPs are placed near static features (walls, posts) with positional
|
||
noise ``fp_std_pos``, mimicking the spurious clusters Webots' physical
|
||
LiDAR returns from 3D geometry.
|
||
"""
|
||
|
||
fp_std_pos: float = 0.3
|
||
"""Positional standard deviation (metres) of injected false-positive clusters."""
|
||
|
||
wheel_slip_std: float = 0.0
|
||
"""Gaussian noise standard deviation (rad/s) added to each wheel speed
|
||
before kinematic integration. Models real-world wheel slip and motor
|
||
variation. Suggested starting value: 0.05.
|
||
"""
|
||
|
||
compass_noise_std: float = 0.0
|
||
"""Gaussian noise standard deviation (radians) added to the heading
|
||
reading each step. Models magnetometer drift in Webots.
|
||
Suggested starting value: 0.02.
|
||
"""
|
||
|
||
def __post_init__(self) -> None:
|
||
if self.fp_rate < 0.0:
|
||
raise ValueError(f"fp_rate must be ≥ 0, got {self.fp_rate}")
|
||
if self.wheel_slip_std < 0.0:
|
||
raise ValueError(f"wheel_slip_std must be ≥ 0, got {self.wheel_slip_std}")
|
||
if self.compass_noise_std < 0.0:
|
||
raise ValueError(f"compass_noise_std must be ≥ 0, got {self.compass_noise_std}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Aggregate config
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class HerdingConfig:
|
||
"""Root configuration object passed to :class:`~training.herding_env.HerdingEnv`.
|
||
|
||
Sub-configs default to the original simulation parameters so that
|
||
``HerdingEnv()`` and ``HerdingEnv(herding_cfg=HerdingConfig())`` produce
|
||
identical behaviour.
|
||
"""
|
||
|
||
lidar: LidarConfig = field(default_factory=LidarConfig)
|
||
detection: DetectionConfig = field(default_factory=DetectionConfig)
|
||
tracker: TrackerConfig = field(default_factory=TrackerConfig)
|
||
robot: RobotConfig = field(default_factory=RobotConfig)
|
||
domain_random: DomainRandomConfig = field(default_factory=DomainRandomConfig)
|
||
|
||
def replace(self, **kwargs) -> "HerdingConfig":
|
||
"""Return a new config with selected top-level sub-configs replaced.
|
||
|
||
Example::
|
||
|
||
cfg = HERDING_WEBOTS.replace(
|
||
domain_random=DomainRandomConfig(fp_rate=2.0, wheel_slip_std=0.05)
|
||
)
|
||
"""
|
||
return replace(self, **kwargs)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Named full-pipeline presets
|
||
# ---------------------------------------------------------------------------
|
||
|
||
HERDING_DEFAULT = HerdingConfig()
|
||
"""Original simulation defaults — zero behaviour change."""
|
||
|
||
HERDING_WEBOTS = HerdingConfig(
|
||
lidar=LIDAR_WEBOTS,
|
||
detection=DetectionConfig(wall_reject=0.5, static_reject=1.2),
|
||
tracker=TrackerConfig(
|
||
forget_steps=300,
|
||
max_new_tracks_per_step=1,
|
||
pen_latch_depth=2.0,
|
||
predict_steps=180,
|
||
consensus_k=3,
|
||
consensus_radius_m=0.3,
|
||
consensus_max_age=20,
|
||
),
|
||
robot=RobotConfig(action_smooth=0.55),
|
||
)
|
||
|
||
HERDING_MEC_WEBOTS = HerdingConfig(
|
||
lidar=LIDAR_WEBOTS,
|
||
detection=DetectionConfig(wall_reject=0.5, static_reject=1.2),
|
||
tracker=TrackerConfig(
|
||
forget_steps=300,
|
||
max_new_tracks_per_step=1,
|
||
pen_latch_depth=2.0,
|
||
predict_steps=180,
|
||
consensus_k=3,
|
||
consensus_radius_m=0.3,
|
||
consensus_max_age=20,
|
||
),
|
||
robot=RobotConfig(
|
||
action_smooth=0.55,
|
||
strafe_efficiency=0.4,
|
||
strafe_to_forward_bleed=-0.28,
|
||
),
|
||
)
|
||
"""Webots-mecanum-matched training preset.
|
||
|
||
Same as HERDING_WEBOTS but with the gym mecanum kinematics scaled to
|
||
match the Webots roller-hinge mecanum proto:
|
||
* ``strafe_efficiency=0.4`` — strafing produces ~40% of textbook
|
||
X-pattern lateral velocity in Webots; this matches the bias.
|
||
* ``strafe_to_forward_bleed=-0.28`` — strafe commands bleed ~28% of
|
||
their magnitude into backward body motion in Webots.
|
||
|
||
Use this preset when training BC/RL for the mecanum drive so the
|
||
policy learns to compensate for the imperfect physical mecanum.
|
||
Differential drive ignores both parameters and behaves identically
|
||
to HERDING_WEBOTS.
|
||
"""
|
||
"""Webots-matched training preset.
|
||
|
||
Changes vs HERDING_DEFAULT:
|
||
* LiDAR: 180 rays / 140° FOV matching ShepherdDog.proto hardware
|
||
* Detection: wall_reject kept at 0.5 m (original default; static_reject
|
||
handles post FPs; 1.0 m was too aggressive near the south gate)
|
||
* Tracker:
|
||
- consensus_k=3, radius=0.3 m, max_age=20 (~320 ms window): a new
|
||
detection must be confirmed by two more nearby detections within
|
||
a tight 0.3 m radius to promote. Real sheep barely move
|
||
frame-to-frame (≪0.05 m/step) so they easily self-confirm while
|
||
the dog is rotating across them; wall-return phantoms whose
|
||
cluster centroid jitters by more than 0.3 m as the dog moves
|
||
can't accumulate three nearby hits and decay as separate
|
||
candidates.
|
||
- forget_steps=300 (~4.8 s) + predict_steps=180 (~2.9 s): once a
|
||
real sheep is confirmed, it lives in tracker memory long enough
|
||
for the policy — trained on 360° full-visibility obs — to plan
|
||
while the dog sweeps a sparse cone across the field. Set short
|
||
enough that any phantom that does leak through promotion dies
|
||
after the dog walks away from the wall that created it.
|
||
- max_new_tracks_per_step=1 still rate-caps spawn bursts.
|
||
* Robot: action_smooth 0.0 → 0.55 (matches Webots controller EMA)
|
||
"""
|