1c197e0ff7
Two changes that together raise diff/round gym success ~52%→88% (BC)
and ~68%→88% (RL) without retraining; diff/field stays at 100%.
* TrackerConfig.consensus_k default 1 → 3 (radius 0.5 m, max_age 15
frames). The same candidate-promotion mechanism that closed the
Webots LiDAR gap also filters gym tracker phantoms — they show up
on the round field where sheep run further between detection
cycles than GATE_M, so each new position spawns a fresh track
while the stale one persists in memory. SheepTracker() called with
no tracker_cfg keeps the legacy pass-through behaviour for
backwards compatibility.
* Strömbom + universal teachers now detect when the natural
"behind the flock" drive target leaves the curved boundary and
fall back to pushing the flock radially inward toward the centre.
Breaks the wall-circling pattern that previously trapped both the
analytical baselines and the trained policies.
A/B numbers (n_sheep ∈ {1,2,3,5,10}, 5 seeds each, max_steps=15000):
diff/field bc: baseline 100% consensus 100%
diff/field rl: baseline 100% consensus 100%
diff/round bc: baseline 52% consensus 88%
diff/round rl: baseline 68% consensus 88%
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
387 lines
14 KiB
Python
387 lines
14 KiB
Python
"""Central configuration dataclasses for the herding simulation.
|
||
|
||
Every tunable constant that previously lived as a module-level literal in
|
||
perception/lidar_sim.py, perception/lidar_perception.py,
|
||
perception/sheep_tracker.py, world/geometry.py, or training/herding_env.py
|
||
is now represented here as a field with its original default value.
|
||
|
||
Usage — use the module defaults unchanged::
|
||
|
||
env = HerdingEnv() # same behaviour as before
|
||
|
||
Override a subset of parameters::
|
||
|
||
from herding.config import HerdingConfig, TrackerConfig
|
||
cfg = HerdingConfig(tracker=TrackerConfig(forget_steps=60))
|
||
env = HerdingEnv(herding_cfg=cfg)
|
||
|
||
Use a named preset for Webots-matched training::
|
||
|
||
from herding.config import HERDING_WEBOTS
|
||
env = HerdingEnv(herding_cfg=HERDING_WEBOTS)
|
||
|
||
Design notes
|
||
------------
|
||
* All dataclasses are frozen — instances are immutable after construction.
|
||
* This module must not import from other ``herding.*`` packages to avoid
|
||
import cycles. Field-geometry constants (pen coordinates, field size)
|
||
stay in ``herding.world.geometry`` because they depend on the world
|
||
variant selected at runtime via ``HERDING_WORLD``.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
from dataclasses import dataclass, field, replace
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# LiDAR hardware spec
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class LidarConfig:
|
||
"""Parameters of the simulated / physical LiDAR sensor.
|
||
|
||
The two canonical presets are :data:`LIDAR_FULL` (360°, oracle mode)
|
||
and :data:`LIDAR_WEBOTS` (140°/180-ray, matches the ShepherdDog proto).
|
||
"""
|
||
|
||
n_rays: int = 360
|
||
"""Number of rays in the scan."""
|
||
|
||
fov_rad: float = 2.0 * math.pi
|
||
"""Full field-of-view in radians, centred on the robot's forward axis."""
|
||
|
||
max_range: float = 12.0
|
||
"""Maximum detectable range in metres."""
|
||
|
||
noise_std: float = 0.005
|
||
"""Gaussian standard deviation (metres) applied to each hit reading."""
|
||
|
||
sheep_radius: float = 0.30
|
||
"""Effective disc radius of a sheep in the 2-D LiDAR plane (metres)."""
|
||
|
||
post_radius: float = 0.25
|
||
"""Effective disc radius of gate / corner posts (metres)."""
|
||
|
||
def __post_init__(self) -> None:
|
||
if self.n_rays < 1:
|
||
raise ValueError(f"n_rays must be ≥ 1, got {self.n_rays}")
|
||
if not (0.0 < self.fov_rad <= 2.0 * math.pi):
|
||
raise ValueError(f"fov_rad must be in (0, 2π], got {self.fov_rad:.4f}")
|
||
if self.max_range <= 0.0:
|
||
raise ValueError(f"max_range must be > 0, got {self.max_range}")
|
||
|
||
|
||
# Named presets -----------------------------------------------------------
|
||
|
||
LIDAR_FULL = LidarConfig(
|
||
n_rays=360,
|
||
fov_rad=2.0 * math.pi,
|
||
)
|
||
"""360° full-circle scan — oracle / ablation mode."""
|
||
|
||
LIDAR_WEBOTS = LidarConfig(
|
||
n_rays=180,
|
||
fov_rad=math.radians(140.0),
|
||
)
|
||
"""Matches the ShepherdDog.proto Lidar device (180 rays, 140° FOV).
|
||
|
||
Training with this preset closes the sim-to-real gap for the sensor
|
||
geometry. Because the observation is built from tracker output (not raw
|
||
rays), a policy trained here can be deployed on a wider-FOV LiDAR (e.g.
|
||
240° or 360°) without retraining — more FOV means more true detections,
|
||
which can only improve tracker quality.
|
||
"""
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cluster-detection pipeline
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class DetectionConfig:
|
||
"""Parameters for the LiDAR-scan → detection clustering pipeline."""
|
||
|
||
gap_threshold: float = 0.6
|
||
"""Adjacent hit-points farther apart than this (metres) start a new cluster."""
|
||
|
||
max_cluster_span: float = 1.5
|
||
"""Clusters wider than this (metres) are rejected as walls / structures."""
|
||
|
||
range_hit_eps: float = 0.05
|
||
"""A ray is considered a hit if ``range < max_range - range_hit_eps``."""
|
||
|
||
split_range_gap: float = 0.20
|
||
"""Range increase within a cluster that triggers a multi-peak split."""
|
||
|
||
wall_reject: float = 0.5
|
||
"""Drop detections within this distance (metres) of any field wall."""
|
||
|
||
static_reject: float = 0.8
|
||
"""Drop detections within this distance (metres) of known static features
|
||
(gate posts, field corners)."""
|
||
|
||
def __post_init__(self) -> None:
|
||
if self.wall_reject < 0.0:
|
||
raise ValueError(f"wall_reject must be ≥ 0, got {self.wall_reject}")
|
||
if self.static_reject < 0.0:
|
||
raise ValueError(f"static_reject must be ≥ 0, got {self.static_reject}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Multi-target tracker
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class TrackerConfig:
|
||
"""Parameters for the nearest-neighbour sheep tracker."""
|
||
|
||
gate_m: float = 2.5
|
||
"""Primary NN association gate in metres (recently observed tracks)."""
|
||
|
||
reacquire_gate_m: float = 4.5
|
||
"""Wider gate used when re-acquiring tracks stale for ≥ ``reacquire_min_age`` steps."""
|
||
|
||
reacquire_min_age: int = 20
|
||
"""Minimum staleness (steps) before the wider re-acquisition gate activates."""
|
||
|
||
penned_gate_m: float = 4.0
|
||
"""Gate for matching new detections to already-penned tracks."""
|
||
|
||
forget_steps: int = 200
|
||
"""Delete an active track that has not been observed for this many steps (~3.2 s)."""
|
||
|
||
predict_steps: int = 120
|
||
"""Extrapolate a track's position using constant velocity for this many steps (~1.9 s)."""
|
||
|
||
velocity_clamp: float = 1.0
|
||
"""Maximum predicted speed (m/s) used during extrapolation."""
|
||
|
||
max_new_tracks_per_step: int = 10
|
||
"""Maximum number of new tracks that may be spawned in a single step.
|
||
|
||
Capping this limits the damage from LiDAR false-positive bursts (e.g.
|
||
wall reflections in Webots) that would otherwise flood the track set.
|
||
The default (10 = MAX_SHEEP) preserves the original behaviour; reduce
|
||
to 2–3 for Webots deployment robustness.
|
||
"""
|
||
|
||
pen_latch_depth: float = 0.0
|
||
"""Minimum depth past the gate line (metres) before a track is latched
|
||
as penned. 0.0 = original behaviour (latch at y ≤ GATE_Y). Increase
|
||
to 0.5 for Webots to prevent gate-hardware LiDAR reflections near y=-15
|
||
from permanently consuming tracker slots as false "penned" sheep.
|
||
"""
|
||
|
||
consensus_k: int = 3
|
||
"""New tracks must accumulate this many matches before they appear in
|
||
``get_positions``. ``1`` disables the candidate stage entirely;
|
||
``3`` (default) requires three nearby confirmations within
|
||
``consensus_max_age`` and reliably filters single-shot detection
|
||
splits / out-of-range stragglers that confuse the policy on the
|
||
round field while real sheep promote in ~50 ms (3 frames).
|
||
"""
|
||
|
||
consensus_radius_m: float = 0.5
|
||
"""Maximum distance (metres) between successive matches for a candidate
|
||
to age toward promotion. Tighter than ``gate_m`` so wall-cluster
|
||
centroid jitter cannot keep a phantom alive. Real sheep move
|
||
≪ 0.05 m / step at max speed so this gate is very loose for them.
|
||
"""
|
||
|
||
consensus_max_age: int = 15
|
||
"""A candidate that has not been matched for this many steps is dropped.
|
||
Short enough that a one-shot phantom can't keep itself alive, long
|
||
enough that a real sheep glimpsed twice in a short interval
|
||
confirms.
|
||
"""
|
||
|
||
def __post_init__(self) -> None:
|
||
if self.forget_steps < 1:
|
||
raise ValueError(f"forget_steps must be ≥ 1, got {self.forget_steps}")
|
||
if self.max_new_tracks_per_step < 1:
|
||
raise ValueError(
|
||
f"max_new_tracks_per_step must be ≥ 1, got {self.max_new_tracks_per_step}"
|
||
)
|
||
if self.consensus_k < 1:
|
||
raise ValueError(f"consensus_k must be ≥ 1, got {self.consensus_k}")
|
||
if self.consensus_radius_m <= 0.0:
|
||
raise ValueError(
|
||
f"consensus_radius_m must be > 0, got {self.consensus_radius_m}"
|
||
)
|
||
if self.consensus_max_age < 1:
|
||
raise ValueError(
|
||
f"consensus_max_age must be ≥ 1, got {self.consensus_max_age}"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Robot physical specification
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class RobotConfig:
|
||
"""Physical parameters of the shepherd-dog robot.
|
||
|
||
Values mirror ``protos/ShepherdDog.proto`` and ``protos/ShepherdDogMecanum.proto``.
|
||
"""
|
||
|
||
wheel_radius: float = 0.038
|
||
"""Wheel radius in metres."""
|
||
|
||
wheel_base: float = 0.28
|
||
"""Axle-to-axle distance for differential drive (metres)."""
|
||
|
||
wheel_base_x: float = 0.28
|
||
"""Front-to-back axle distance for mecanum drive (metres)."""
|
||
|
||
wheel_base_y: float = 0.28
|
||
"""Left-to-right axle distance for mecanum drive (metres)."""
|
||
|
||
max_wheel_omega: float = 70.0
|
||
"""Maximum wheel angular velocity (rad/s)."""
|
||
|
||
action_smooth: float = 0.0
|
||
"""Exponential moving-average coefficient applied to actions inside the env.
|
||
|
||
``0.0`` means no smoothing (gym default).
|
||
``0.55`` matches the hard-coded EMA in ``shepherd_dog.py`` — use this
|
||
when training so the policy learns to act through the same filter it
|
||
sees at deployment.
|
||
"""
|
||
|
||
def __post_init__(self) -> None:
|
||
if not (0.0 <= self.action_smooth < 1.0):
|
||
raise ValueError(
|
||
f"action_smooth must be in [0, 1), got {self.action_smooth}"
|
||
)
|
||
|
||
@property
|
||
def max_linear(self) -> float:
|
||
"""Maximum achievable linear speed (m/s)."""
|
||
return self.wheel_radius * self.max_wheel_omega
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Domain randomisation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class DomainRandomConfig:
|
||
"""Parameters that inject physics / sensor noise for domain randomisation.
|
||
|
||
All values default to 0 (disabled) so the base env is deterministic and
|
||
backwards-compatible. Enable them gradually to close the sim-to-real gap.
|
||
"""
|
||
|
||
fp_rate: float = 0.0
|
||
"""Mean number of false-positive detections injected per step (Poisson λ).
|
||
|
||
FPs are placed near static features (walls, posts) with positional
|
||
noise ``fp_std_pos``, mimicking the spurious clusters Webots' physical
|
||
LiDAR returns from 3D geometry.
|
||
"""
|
||
|
||
fp_std_pos: float = 0.3
|
||
"""Positional standard deviation (metres) of injected false-positive clusters."""
|
||
|
||
wheel_slip_std: float = 0.0
|
||
"""Gaussian noise standard deviation (rad/s) added to each wheel speed
|
||
before kinematic integration. Models real-world wheel slip and motor
|
||
variation. Suggested starting value: 0.05.
|
||
"""
|
||
|
||
compass_noise_std: float = 0.0
|
||
"""Gaussian noise standard deviation (radians) added to the heading
|
||
reading each step. Models magnetometer drift in Webots.
|
||
Suggested starting value: 0.02.
|
||
"""
|
||
|
||
def __post_init__(self) -> None:
|
||
if self.fp_rate < 0.0:
|
||
raise ValueError(f"fp_rate must be ≥ 0, got {self.fp_rate}")
|
||
if self.wheel_slip_std < 0.0:
|
||
raise ValueError(f"wheel_slip_std must be ≥ 0, got {self.wheel_slip_std}")
|
||
if self.compass_noise_std < 0.0:
|
||
raise ValueError(f"compass_noise_std must be ≥ 0, got {self.compass_noise_std}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Aggregate config
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass(frozen=True)
|
||
class HerdingConfig:
|
||
"""Root configuration object passed to :class:`~training.herding_env.HerdingEnv`.
|
||
|
||
Sub-configs default to the original simulation parameters so that
|
||
``HerdingEnv()`` and ``HerdingEnv(herding_cfg=HerdingConfig())`` produce
|
||
identical behaviour.
|
||
"""
|
||
|
||
lidar: LidarConfig = field(default_factory=LidarConfig)
|
||
detection: DetectionConfig = field(default_factory=DetectionConfig)
|
||
tracker: TrackerConfig = field(default_factory=TrackerConfig)
|
||
robot: RobotConfig = field(default_factory=RobotConfig)
|
||
domain_random: DomainRandomConfig = field(default_factory=DomainRandomConfig)
|
||
|
||
def replace(self, **kwargs) -> "HerdingConfig":
|
||
"""Return a new config with selected top-level sub-configs replaced.
|
||
|
||
Example::
|
||
|
||
cfg = HERDING_WEBOTS.replace(
|
||
domain_random=DomainRandomConfig(fp_rate=2.0, wheel_slip_std=0.05)
|
||
)
|
||
"""
|
||
return replace(self, **kwargs)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Named full-pipeline presets
|
||
# ---------------------------------------------------------------------------
|
||
|
||
HERDING_DEFAULT = HerdingConfig()
|
||
"""Original simulation defaults — zero behaviour change."""
|
||
|
||
HERDING_WEBOTS = HerdingConfig(
|
||
lidar=LIDAR_WEBOTS,
|
||
detection=DetectionConfig(wall_reject=0.5, static_reject=1.2),
|
||
tracker=TrackerConfig(
|
||
forget_steps=300,
|
||
max_new_tracks_per_step=1,
|
||
pen_latch_depth=2.0,
|
||
predict_steps=180,
|
||
consensus_k=3,
|
||
consensus_radius_m=0.3,
|
||
consensus_max_age=20,
|
||
),
|
||
robot=RobotConfig(action_smooth=0.55),
|
||
)
|
||
"""Webots-matched training preset.
|
||
|
||
Changes vs HERDING_DEFAULT:
|
||
* LiDAR: 180 rays / 140° FOV matching ShepherdDog.proto hardware
|
||
* Detection: wall_reject kept at 0.5 m (original default; static_reject
|
||
handles post FPs; 1.0 m was too aggressive near the south gate)
|
||
* Tracker:
|
||
- consensus_k=3, radius=0.3 m, max_age=20 (~320 ms window): a new
|
||
detection must be confirmed by two more nearby detections within
|
||
a tight 0.3 m radius to promote. Real sheep barely move
|
||
frame-to-frame (≪0.05 m/step) so they easily self-confirm while
|
||
the dog is rotating across them; wall-return phantoms whose
|
||
cluster centroid jitters by more than 0.3 m as the dog moves
|
||
can't accumulate three nearby hits and decay as separate
|
||
candidates.
|
||
- forget_steps=300 (~4.8 s) + predict_steps=180 (~2.9 s): once a
|
||
real sheep is confirmed, it lives in tracker memory long enough
|
||
for the policy — trained on 360° full-visibility obs — to plan
|
||
while the dog sweeps a sparse cone across the field. Set short
|
||
enough that any phantom that does leak through promotion dies
|
||
after the dog walks away from the wall that created it.
|
||
- max_new_tracks_per_step=1 still rate-caps spawn bursts.
|
||
* Robot: action_smooth 0.0 → 0.55 (matches Webots controller EMA)
|
||
"""
|