"""Central configuration dataclasses for the herding simulation. Every tunable parameter lives here as a frozen dataclass field — LiDAR spec, cluster detection thresholds, tracker gates, robot kinematics, and domain-randomisation knobs — composed into :class:`HerdingConfig`. Usage — accept the defaults:: env = HerdingEnv() Override a subset:: cfg = HerdingConfig(tracker=TrackerConfig(forget_steps=60)) env = HerdingEnv(herding_cfg=cfg) Use a named preset:: env = HerdingEnv(herding_cfg=HERDING_WEBOTS) # 140° FOV env = HerdingEnv(herding_cfg=HERDING_MEC_WEBOTS) # + mecanum slip Design notes ------------ * All dataclasses are frozen so instances are immutable after construction. * This module must not import from other ``herding.*`` packages — field-geometry constants live in ``herding.world.geometry`` because they depend on the world variant selected at runtime via ``HERDING_WORLD``, which would create an import cycle here. """ from __future__ import annotations import math from dataclasses import dataclass, field, replace # --------------------------------------------------------------------------- # LiDAR hardware spec # --------------------------------------------------------------------------- @dataclass(frozen=True) class LidarConfig: """Parameters of the simulated / physical LiDAR sensor. The two canonical presets are :data:`LIDAR_FULL` (360°, oracle mode) and :data:`LIDAR_WEBOTS` (140°/180-ray, matches the ShepherdDog proto). """ n_rays: int = 360 """Number of rays in the scan.""" fov_rad: float = 2.0 * math.pi """Full field-of-view in radians, centred on the robot's forward axis.""" max_range: float = 12.0 """Maximum detectable range in metres.""" noise_std: float = 0.005 """Gaussian standard deviation (metres) applied to each hit reading.""" sheep_radius: float = 0.30 """Effective disc radius of a sheep in the 2-D LiDAR plane (metres).""" post_radius: float = 0.25 """Effective disc radius of gate / corner posts (metres).""" def __post_init__(self) -> None: if self.n_rays < 1: raise ValueError(f"n_rays must be ≥ 1, got {self.n_rays}") if not (0.0 < self.fov_rad <= 2.0 * math.pi): raise ValueError(f"fov_rad must be in (0, 2π], got {self.fov_rad:.4f}") if self.max_range <= 0.0: raise ValueError(f"max_range must be > 0, got {self.max_range}") # Named presets ----------------------------------------------------------- LIDAR_FULL = LidarConfig( n_rays=360, fov_rad=2.0 * math.pi, ) """360° full-circle scan — oracle / ablation mode.""" LIDAR_WEBOTS = LidarConfig( n_rays=180, fov_rad=math.radians(140.0), ) """Matches the ShepherdDog.proto Lidar device (180 rays, 140° FOV). Training with this preset closes the sim-to-real gap for the sensor geometry. Because the observation is built from tracker output (not raw rays), a policy trained here can be deployed on a wider-FOV LiDAR (e.g. 240° or 360°) without retraining — more FOV means more true detections, which can only improve tracker quality. """ LIDAR_WEBOTS_360 = LidarConfig( n_rays=360, fov_rad=2.0 * math.pi, max_range=15.0, ) """Matches ShepherdDog360.proto (360 rays, 360° FOV, 15 m range). Used by the FOV-ablation Webots launch (HERDING_LIDAR=360). The wider range and full surround visibility hand the tracker more detections per step, so the trained policy — already trained on 360° gym perception — sees an observation distribution closer to training. """ # --------------------------------------------------------------------------- # Cluster-detection pipeline # --------------------------------------------------------------------------- @dataclass(frozen=True) class DetectionConfig: """Parameters for the LiDAR-scan → detection clustering pipeline.""" gap_threshold: float = 0.6 """Adjacent hit-points farther apart than this (metres) start a new cluster.""" max_cluster_span: float = 1.5 """Clusters wider than this (metres) are rejected as walls / structures.""" range_hit_eps: float = 0.05 """A ray is considered a hit if ``range < max_range - range_hit_eps``.""" split_range_gap: float = 0.20 """Range increase within a cluster that triggers a multi-peak split.""" wall_reject: float = 0.5 """Drop detections within this distance (metres) of any field wall.""" static_reject: float = 0.8 """Drop detections within this distance (metres) of known static features (gate posts, field corners).""" def __post_init__(self) -> None: if self.wall_reject < 0.0: raise ValueError(f"wall_reject must be ≥ 0, got {self.wall_reject}") if self.static_reject < 0.0: raise ValueError(f"static_reject must be ≥ 0, got {self.static_reject}") # --------------------------------------------------------------------------- # Multi-target tracker # --------------------------------------------------------------------------- @dataclass(frozen=True) class TrackerConfig: """Parameters for the nearest-neighbour sheep tracker.""" gate_m: float = 2.5 """Primary NN association gate in metres (recently observed tracks).""" reacquire_gate_m: float = 4.5 """Wider gate used when re-acquiring tracks stale for ≥ ``reacquire_min_age`` steps.""" reacquire_min_age: int = 20 """Minimum staleness (steps) before the wider re-acquisition gate activates.""" penned_gate_m: float = 4.0 """Gate for matching new detections to already-penned tracks.""" forget_steps: int = 200 """Delete an active track that has not been observed for this many steps (~3.2 s).""" predict_steps: int = 120 """Extrapolate a track's position using constant velocity for this many steps (~1.9 s).""" velocity_clamp: float = 1.0 """Maximum predicted speed (m/s) used during extrapolation.""" max_new_tracks_per_step: int = 10 """Maximum number of new tracks that may be spawned in a single step. Capping this limits the damage from LiDAR false-positive bursts (e.g. wall reflections in Webots) that would otherwise flood the track set. The default (10 = MAX_SHEEP) preserves the original behaviour; reduce to 2–3 for Webots deployment robustness. """ pen_latch_depth: float = 0.0 """Minimum depth past the gate line (metres) before a track is latched as penned. 0.0 = original behaviour (latch at y ≤ GATE_Y). Increase to 0.5 for Webots to prevent gate-hardware LiDAR reflections near y=-15 from permanently consuming tracker slots as false "penned" sheep. """ consensus_k: int = 3 """New tracks must accumulate this many matches before they appear in ``get_positions``. ``1`` disables the candidate stage entirely; ``3`` (default) requires three nearby confirmations within ``consensus_max_age`` and reliably filters single-shot detection splits / out-of-range stragglers that confuse the policy on the round field while real sheep promote in ~50 ms (3 frames). """ consensus_radius_m: float = 0.5 """Maximum distance (metres) between successive matches for a candidate to age toward promotion. Tighter than ``gate_m`` so wall-cluster centroid jitter cannot keep a phantom alive. Real sheep move ≪ 0.05 m / step at max speed so this gate is very loose for them. """ consensus_max_age: int = 15 """A candidate that has not been matched for this many steps is dropped. Short enough that a one-shot phantom can't keep itself alive, long enough that a real sheep glimpsed twice in a short interval confirms. """ def __post_init__(self) -> None: if self.forget_steps < 1: raise ValueError(f"forget_steps must be ≥ 1, got {self.forget_steps}") if self.max_new_tracks_per_step < 1: raise ValueError( f"max_new_tracks_per_step must be ≥ 1, got {self.max_new_tracks_per_step}" ) if self.consensus_k < 1: raise ValueError(f"consensus_k must be ≥ 1, got {self.consensus_k}") if self.consensus_radius_m <= 0.0: raise ValueError( f"consensus_radius_m must be > 0, got {self.consensus_radius_m}" ) if self.consensus_max_age < 1: raise ValueError( f"consensus_max_age must be ≥ 1, got {self.consensus_max_age}" ) # --------------------------------------------------------------------------- # Robot physical specification # --------------------------------------------------------------------------- @dataclass(frozen=True) class RobotConfig: """Physical parameters of the shepherd-dog robot. Values mirror ``protos/ShepherdDog.proto`` and ``protos/ShepherdDogMecanum.proto``. """ wheel_radius: float = 0.038 """Wheel radius in metres.""" wheel_base: float = 0.28 """Axle-to-axle distance for differential drive (metres).""" wheel_base_x: float = 0.28 """Front-to-back axle distance for mecanum drive (metres).""" wheel_base_y: float = 0.28 """Left-to-right axle distance for mecanum drive (metres).""" max_wheel_omega: float = 70.0 """Maximum wheel angular velocity (rad/s).""" action_smooth: float = 0.0 """Exponential moving-average coefficient applied to actions inside the env. ``0.0`` means no smoothing (gym default). ``0.55`` matches the hard-coded EMA in ``shepherd_dog.py`` — use this when training so the policy learns to act through the same filter it sees at deployment. """ strafe_efficiency: float = 1.0 """Mecanum strafe magnitude as a fraction of textbook X-pattern. ``1.0`` (default) is the ideal kinematic mecanum. Values below 1 model strafe slip; the Webots controller reads the same value and applies it in the Supervisor velocity injection, so gym training and Webots deployment see identical body motion. No effect on differential drive. """ strafe_to_forward_bleed: float = 0.0 """Fraction of ideal strafe magnitude that bleeds into body-frame x. ``0.0`` (default) = no bleed. Non-zero values add ``strafe_to_forward_bleed * |vy_body_ideal|`` to ``vx_body`` to model the consistent forward (or backward) drift that some mecanum chassis exhibit during pure-strafe commands. No effect on differential drive. """ def __post_init__(self) -> None: if not (0.0 <= self.action_smooth < 1.0): raise ValueError( f"action_smooth must be in [0, 1), got {self.action_smooth}" ) if not (0.0 < self.strafe_efficiency <= 1.0): raise ValueError( f"strafe_efficiency must be in (0, 1], got {self.strafe_efficiency}" ) @property def max_linear(self) -> float: """Maximum achievable linear speed (m/s).""" return self.wheel_radius * self.max_wheel_omega # --------------------------------------------------------------------------- # Domain randomisation # --------------------------------------------------------------------------- @dataclass(frozen=True) class DomainRandomConfig: """Parameters that inject physics / sensor noise for domain randomisation. All values default to 0 (disabled) so the base env is deterministic and backwards-compatible. Enable them gradually to close the sim-to-real gap. """ fp_rate: float = 0.0 """Mean number of false-positive detections injected per step (Poisson λ). FPs are placed near static features (walls, posts) with positional noise ``fp_std_pos``, mimicking the spurious clusters Webots' physical LiDAR returns from 3D geometry. """ fp_std_pos: float = 0.3 """Positional standard deviation (metres) of injected false-positive clusters.""" wheel_slip_std: float = 0.0 """Gaussian noise standard deviation (rad/s) added to each wheel speed before kinematic integration. Models real-world wheel slip and motor variation. Suggested starting value: 0.05. """ compass_noise_std: float = 0.0 """Gaussian noise standard deviation (radians) added to the heading reading each step. Models magnetometer drift in Webots. Suggested starting value: 0.02. """ def __post_init__(self) -> None: if self.fp_rate < 0.0: raise ValueError(f"fp_rate must be ≥ 0, got {self.fp_rate}") if self.wheel_slip_std < 0.0: raise ValueError(f"wheel_slip_std must be ≥ 0, got {self.wheel_slip_std}") if self.compass_noise_std < 0.0: raise ValueError(f"compass_noise_std must be ≥ 0, got {self.compass_noise_std}") # --------------------------------------------------------------------------- # Aggregate config # --------------------------------------------------------------------------- @dataclass(frozen=True) class HerdingConfig: """Root configuration object passed to :class:`~training.herding_env.HerdingEnv`. Sub-configs default to the original simulation parameters so that ``HerdingEnv()`` and ``HerdingEnv(herding_cfg=HerdingConfig())`` produce identical behaviour. """ lidar: LidarConfig = field(default_factory=LidarConfig) detection: DetectionConfig = field(default_factory=DetectionConfig) tracker: TrackerConfig = field(default_factory=TrackerConfig) robot: RobotConfig = field(default_factory=RobotConfig) domain_random: DomainRandomConfig = field(default_factory=DomainRandomConfig) def replace(self, **kwargs) -> "HerdingConfig": """Return a new config with selected top-level sub-configs replaced. Example:: cfg = HERDING_WEBOTS.replace( domain_random=DomainRandomConfig(fp_rate=2.0, wheel_slip_std=0.05) ) """ return replace(self, **kwargs) # --------------------------------------------------------------------------- # Named full-pipeline presets # --------------------------------------------------------------------------- HERDING_DEFAULT = HerdingConfig() """Original simulation defaults — zero behaviour change.""" HERDING_WEBOTS = HerdingConfig( lidar=LIDAR_WEBOTS, detection=DetectionConfig(wall_reject=0.5, static_reject=1.2), tracker=TrackerConfig( forget_steps=300, max_new_tracks_per_step=1, pen_latch_depth=2.0, predict_steps=180, consensus_k=3, consensus_radius_m=0.3, consensus_max_age=20, ), robot=RobotConfig(action_smooth=0.55), ) HERDING_MEC_WEBOTS = HerdingConfig( lidar=LIDAR_WEBOTS, detection=DetectionConfig(wall_reject=0.5, static_reject=1.2), tracker=TrackerConfig( forget_steps=300, max_new_tracks_per_step=1, pen_latch_depth=2.0, predict_steps=180, consensus_k=3, consensus_radius_m=0.3, consensus_max_age=20, ), robot=RobotConfig( action_smooth=0.55, strafe_efficiency=0.26, strafe_to_forward_bleed=-0.40, ), ) """Mecanum + 140° LiDAR preset. Mirrors HERDING_WEBOTS but with mecanum-specific kinematic scaling (``strafe_efficiency`` and ``strafe_to_forward_bleed``) applied to the gym forward-kinematics formula. The Webots controller reads these same values via ``RobotConfig`` and feeds them through the Supervisor velocity injection, so gym and Webots produce identical body motion. Diff-drive ignores both fields. """ HERDING_MEC_WEBOTS_360 = HerdingConfig( lidar=LIDAR_WEBOTS_360, # Looser detection thresholds for the wider FOV — the 360° scan # catches far walls, gate posts and pen rails the 140° front cone # never sees, so the cluster/feature filters need slightly more # margin to keep promotion rates similar. detection=DetectionConfig(wall_reject=0.6, static_reject=1.2), tracker=TrackerConfig( forget_steps=300, max_new_tracks_per_step=2, # 360° gives more candidates per step pen_latch_depth=3.0, predict_steps=180, consensus_k=3, consensus_radius_m=0.3, consensus_max_age=20, ), robot=RobotConfig( action_smooth=0.55, strafe_efficiency=0.26, strafe_to_forward_bleed=-0.40, ), ) """Mecanum + 360° LiDAR preset (the deployable mecanum target). The 360° FOV gives the policy perception coverage in every direction, which matches the omnidirectional motion the mecanum chassis can produce. Used for both gym training and Webots deployment so the trained policy sees the same observation geometry it will face at deploy time. """ """Webots-matched training preset. Changes vs HERDING_DEFAULT: * LiDAR: 180 rays / 140° FOV matching ShepherdDog.proto hardware * Detection: wall_reject kept at 0.5 m (original default; static_reject handles post FPs; 1.0 m was too aggressive near the south gate) * Tracker: - consensus_k=3, radius=0.3 m, max_age=20 (~320 ms window): a new detection must be confirmed by two more nearby detections within a tight 0.3 m radius to promote. Real sheep barely move frame-to-frame (≪0.05 m/step) so they easily self-confirm while the dog is rotating across them; wall-return phantoms whose cluster centroid jitters by more than 0.3 m as the dog moves can't accumulate three nearby hits and decay as separate candidates. - forget_steps=300 (~4.8 s) + predict_steps=180 (~2.9 s): once a real sheep is confirmed, it lives in tracker memory long enough for the policy — trained on 360° full-visibility obs — to plan while the dog sweeps a sparse cone across the field. Set short enough that any phantom that does leak through promotion dies after the dog walks away from the wall that created it. - max_new_tracks_per_step=1 still rate-caps spawn bursts. * Robot: action_smooth 0.0 → 0.55 (matches Webots controller EMA) """