Webots sim-to-real fixes, DAgger pipeline, 360° proto variant
Today's session worked across the full Webots delivery stack — found and
fixed a cluster of bugs blocking the BC/RL transfer, then explored
training-side mitigations for the residual perception gap.
Bug fixes:
- Makefile FP_RATE default 2.0 → 0.0: BC demos used fp_rate=0 but RL
fine-tune defaulted to fp_rate=2, poisoning the BC obs distribution
and stalling PPO at 0% success across 1.46M+ steps.
- controllers/{shepherd_dog,sheep}/runtime.ini: Webots was launching
controllers under system python3 (no numpy) and they were crashing
silently. Pinned to the conda tir env.
- herding/config.py HERDING_WEBOTS preset: pen_latch_depth 0.5 → 2.0,
max_new_tracks_per_step 3 → 1, static_reject 0.8 → 1.2. Stops phantom
FPs near the gate from latching as permanently-penned tracks.
- herding/perception/sheep_tracker.py: penned tracks now decay at
forget_steps × 8 instead of living forever. Adds get_positions
min_freshness filter for deploy-time use.
Training/eval matches deployment:
- training/bc/collect.py: --dagger-policy flag for DAgger rollouts
(policy drives, teacher labels) + --use-webots-preset for matched
140° tracker + DR config.
- controllers/shepherd_dog/shepherd_dog.py: scan-fallback (0, 0.6) when
BC/RL sees empty sheep_positions — recovers from FOV gaps.
Tooling:
- tools/dagger_round.sh: one-shot DAgger round (collect + concat + bc).
- tools/webots_sweep_gt.sh: full sweep with HERDING_USE_GT=1 for the
perception-gap diagnosis matrix.
- protos/ShepherdDog360.proto: 360° FOV variant for the FOV-ablation
comparison. Canonical proto stays at 140° per project spec.
Artifacts: v1 BC/RL policies for all 4 (drive × world) combos trained
in clean gym (success: diff/field 90-100%, diff/round 58%, mec/field
60-100%, mec/round 50-100%). DAgger r1/r2 BCs for diff/field show
12%→38% progression on gym HERDING_WEBOTS proxy but did not close
to actual Webots LiDAR (0/5 throughout). Next: LSTM policy or
learned tracker per the project-state memory.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,335 @@
|
||||
"""Central configuration dataclasses for the herding simulation.
|
||||
|
||||
Every tunable constant that previously lived as a module-level literal in
|
||||
perception/lidar_sim.py, perception/lidar_perception.py,
|
||||
perception/sheep_tracker.py, world/geometry.py, or training/herding_env.py
|
||||
is now represented here as a field with its original default value.
|
||||
|
||||
Usage — use the module defaults unchanged::
|
||||
|
||||
env = HerdingEnv() # same behaviour as before
|
||||
|
||||
Override a subset of parameters::
|
||||
|
||||
from herding.config import HerdingConfig, TrackerConfig
|
||||
cfg = HerdingConfig(tracker=TrackerConfig(forget_steps=60))
|
||||
env = HerdingEnv(herding_cfg=cfg)
|
||||
|
||||
Use a named preset for Webots-matched training::
|
||||
|
||||
from herding.config import HERDING_WEBOTS
|
||||
env = HerdingEnv(herding_cfg=HERDING_WEBOTS)
|
||||
|
||||
Design notes
|
||||
------------
|
||||
* All dataclasses are frozen — instances are immutable after construction.
|
||||
* This module must not import from other ``herding.*`` packages to avoid
|
||||
import cycles. Field-geometry constants (pen coordinates, field size)
|
||||
stay in ``herding.world.geometry`` because they depend on the world
|
||||
variant selected at runtime via ``HERDING_WORLD``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass, field, replace
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LiDAR hardware spec
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LidarConfig:
|
||||
"""Parameters of the simulated / physical LiDAR sensor.
|
||||
|
||||
The two canonical presets are :data:`LIDAR_FULL` (360°, oracle mode)
|
||||
and :data:`LIDAR_WEBOTS` (140°/180-ray, matches the ShepherdDog proto).
|
||||
"""
|
||||
|
||||
n_rays: int = 360
|
||||
"""Number of rays in the scan."""
|
||||
|
||||
fov_rad: float = 2.0 * math.pi
|
||||
"""Full field-of-view in radians, centred on the robot's forward axis."""
|
||||
|
||||
max_range: float = 12.0
|
||||
"""Maximum detectable range in metres."""
|
||||
|
||||
noise_std: float = 0.005
|
||||
"""Gaussian standard deviation (metres) applied to each hit reading."""
|
||||
|
||||
sheep_radius: float = 0.30
|
||||
"""Effective disc radius of a sheep in the 2-D LiDAR plane (metres)."""
|
||||
|
||||
post_radius: float = 0.25
|
||||
"""Effective disc radius of gate / corner posts (metres)."""
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.n_rays < 1:
|
||||
raise ValueError(f"n_rays must be ≥ 1, got {self.n_rays}")
|
||||
if not (0.0 < self.fov_rad <= 2.0 * math.pi):
|
||||
raise ValueError(f"fov_rad must be in (0, 2π], got {self.fov_rad:.4f}")
|
||||
if self.max_range <= 0.0:
|
||||
raise ValueError(f"max_range must be > 0, got {self.max_range}")
|
||||
|
||||
|
||||
# Named presets -----------------------------------------------------------
|
||||
|
||||
LIDAR_FULL = LidarConfig(
|
||||
n_rays=360,
|
||||
fov_rad=2.0 * math.pi,
|
||||
)
|
||||
"""360° full-circle scan — oracle / ablation mode."""
|
||||
|
||||
LIDAR_WEBOTS = LidarConfig(
|
||||
n_rays=180,
|
||||
fov_rad=math.radians(140.0),
|
||||
)
|
||||
"""Matches the ShepherdDog.proto Lidar device (180 rays, 140° FOV).
|
||||
|
||||
Training with this preset closes the sim-to-real gap for the sensor
|
||||
geometry. Because the observation is built from tracker output (not raw
|
||||
rays), a policy trained here can be deployed on a wider-FOV LiDAR (e.g.
|
||||
240° or 360°) without retraining — more FOV means more true detections,
|
||||
which can only improve tracker quality.
|
||||
"""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cluster-detection pipeline
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DetectionConfig:
|
||||
"""Parameters for the LiDAR-scan → detection clustering pipeline."""
|
||||
|
||||
gap_threshold: float = 0.6
|
||||
"""Adjacent hit-points farther apart than this (metres) start a new cluster."""
|
||||
|
||||
max_cluster_span: float = 1.5
|
||||
"""Clusters wider than this (metres) are rejected as walls / structures."""
|
||||
|
||||
range_hit_eps: float = 0.05
|
||||
"""A ray is considered a hit if ``range < max_range - range_hit_eps``."""
|
||||
|
||||
split_range_gap: float = 0.20
|
||||
"""Range increase within a cluster that triggers a multi-peak split."""
|
||||
|
||||
wall_reject: float = 0.5
|
||||
"""Drop detections within this distance (metres) of any field wall."""
|
||||
|
||||
static_reject: float = 0.8
|
||||
"""Drop detections within this distance (metres) of known static features
|
||||
(gate posts, field corners)."""
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.wall_reject < 0.0:
|
||||
raise ValueError(f"wall_reject must be ≥ 0, got {self.wall_reject}")
|
||||
if self.static_reject < 0.0:
|
||||
raise ValueError(f"static_reject must be ≥ 0, got {self.static_reject}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Multi-target tracker
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TrackerConfig:
|
||||
"""Parameters for the nearest-neighbour sheep tracker."""
|
||||
|
||||
gate_m: float = 2.5
|
||||
"""Primary NN association gate in metres (recently observed tracks)."""
|
||||
|
||||
reacquire_gate_m: float = 4.5
|
||||
"""Wider gate used when re-acquiring tracks stale for ≥ ``reacquire_min_age`` steps."""
|
||||
|
||||
reacquire_min_age: int = 20
|
||||
"""Minimum staleness (steps) before the wider re-acquisition gate activates."""
|
||||
|
||||
penned_gate_m: float = 4.0
|
||||
"""Gate for matching new detections to already-penned tracks."""
|
||||
|
||||
forget_steps: int = 200
|
||||
"""Delete an active track that has not been observed for this many steps (~3.2 s)."""
|
||||
|
||||
predict_steps: int = 120
|
||||
"""Extrapolate a track's position using constant velocity for this many steps (~1.9 s)."""
|
||||
|
||||
velocity_clamp: float = 1.0
|
||||
"""Maximum predicted speed (m/s) used during extrapolation."""
|
||||
|
||||
max_new_tracks_per_step: int = 10
|
||||
"""Maximum number of new tracks that may be spawned in a single step.
|
||||
|
||||
Capping this limits the damage from LiDAR false-positive bursts (e.g.
|
||||
wall reflections in Webots) that would otherwise flood the track set.
|
||||
The default (10 = MAX_SHEEP) preserves the original behaviour; reduce
|
||||
to 2–3 for Webots deployment robustness.
|
||||
"""
|
||||
|
||||
pen_latch_depth: float = 0.0
|
||||
"""Minimum depth past the gate line (metres) before a track is latched
|
||||
as penned. 0.0 = original behaviour (latch at y ≤ GATE_Y). Increase
|
||||
to 0.5 for Webots to prevent gate-hardware LiDAR reflections near y=-15
|
||||
from permanently consuming tracker slots as false "penned" sheep.
|
||||
"""
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.forget_steps < 1:
|
||||
raise ValueError(f"forget_steps must be ≥ 1, got {self.forget_steps}")
|
||||
if self.max_new_tracks_per_step < 1:
|
||||
raise ValueError(
|
||||
f"max_new_tracks_per_step must be ≥ 1, got {self.max_new_tracks_per_step}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Robot physical specification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RobotConfig:
|
||||
"""Physical parameters of the shepherd-dog robot.
|
||||
|
||||
Values mirror ``protos/ShepherdDog.proto`` and ``protos/ShepherdDogMecanum.proto``.
|
||||
"""
|
||||
|
||||
wheel_radius: float = 0.038
|
||||
"""Wheel radius in metres."""
|
||||
|
||||
wheel_base: float = 0.28
|
||||
"""Axle-to-axle distance for differential drive (metres)."""
|
||||
|
||||
wheel_base_x: float = 0.28
|
||||
"""Front-to-back axle distance for mecanum drive (metres)."""
|
||||
|
||||
wheel_base_y: float = 0.28
|
||||
"""Left-to-right axle distance for mecanum drive (metres)."""
|
||||
|
||||
max_wheel_omega: float = 70.0
|
||||
"""Maximum wheel angular velocity (rad/s)."""
|
||||
|
||||
action_smooth: float = 0.0
|
||||
"""Exponential moving-average coefficient applied to actions inside the env.
|
||||
|
||||
``0.0`` means no smoothing (gym default).
|
||||
``0.55`` matches the hard-coded EMA in ``shepherd_dog.py`` — use this
|
||||
when training so the policy learns to act through the same filter it
|
||||
sees at deployment.
|
||||
"""
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if not (0.0 <= self.action_smooth < 1.0):
|
||||
raise ValueError(
|
||||
f"action_smooth must be in [0, 1), got {self.action_smooth}"
|
||||
)
|
||||
|
||||
@property
|
||||
def max_linear(self) -> float:
|
||||
"""Maximum achievable linear speed (m/s)."""
|
||||
return self.wheel_radius * self.max_wheel_omega
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Domain randomisation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DomainRandomConfig:
|
||||
"""Parameters that inject physics / sensor noise for domain randomisation.
|
||||
|
||||
All values default to 0 (disabled) so the base env is deterministic and
|
||||
backwards-compatible. Enable them gradually to close the sim-to-real gap.
|
||||
"""
|
||||
|
||||
fp_rate: float = 0.0
|
||||
"""Mean number of false-positive detections injected per step (Poisson λ).
|
||||
|
||||
FPs are placed near static features (walls, posts) with positional
|
||||
noise ``fp_std_pos``, mimicking the spurious clusters Webots' physical
|
||||
LiDAR returns from 3D geometry.
|
||||
"""
|
||||
|
||||
fp_std_pos: float = 0.3
|
||||
"""Positional standard deviation (metres) of injected false-positive clusters."""
|
||||
|
||||
wheel_slip_std: float = 0.0
|
||||
"""Gaussian noise standard deviation (rad/s) added to each wheel speed
|
||||
before kinematic integration. Models real-world wheel slip and motor
|
||||
variation. Suggested starting value: 0.05.
|
||||
"""
|
||||
|
||||
compass_noise_std: float = 0.0
|
||||
"""Gaussian noise standard deviation (radians) added to the heading
|
||||
reading each step. Models magnetometer drift in Webots.
|
||||
Suggested starting value: 0.02.
|
||||
"""
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
if self.fp_rate < 0.0:
|
||||
raise ValueError(f"fp_rate must be ≥ 0, got {self.fp_rate}")
|
||||
if self.wheel_slip_std < 0.0:
|
||||
raise ValueError(f"wheel_slip_std must be ≥ 0, got {self.wheel_slip_std}")
|
||||
if self.compass_noise_std < 0.0:
|
||||
raise ValueError(f"compass_noise_std must be ≥ 0, got {self.compass_noise_std}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Aggregate config
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class HerdingConfig:
|
||||
"""Root configuration object passed to :class:`~training.herding_env.HerdingEnv`.
|
||||
|
||||
Sub-configs default to the original simulation parameters so that
|
||||
``HerdingEnv()`` and ``HerdingEnv(herding_cfg=HerdingConfig())`` produce
|
||||
identical behaviour.
|
||||
"""
|
||||
|
||||
lidar: LidarConfig = field(default_factory=LidarConfig)
|
||||
detection: DetectionConfig = field(default_factory=DetectionConfig)
|
||||
tracker: TrackerConfig = field(default_factory=TrackerConfig)
|
||||
robot: RobotConfig = field(default_factory=RobotConfig)
|
||||
domain_random: DomainRandomConfig = field(default_factory=DomainRandomConfig)
|
||||
|
||||
def replace(self, **kwargs) -> "HerdingConfig":
|
||||
"""Return a new config with selected top-level sub-configs replaced.
|
||||
|
||||
Example::
|
||||
|
||||
cfg = HERDING_WEBOTS.replace(
|
||||
domain_random=DomainRandomConfig(fp_rate=2.0, wheel_slip_std=0.05)
|
||||
)
|
||||
"""
|
||||
return replace(self, **kwargs)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Named full-pipeline presets
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
HERDING_DEFAULT = HerdingConfig()
|
||||
"""Original simulation defaults — zero behaviour change."""
|
||||
|
||||
HERDING_WEBOTS = HerdingConfig(
|
||||
lidar=LIDAR_WEBOTS,
|
||||
detection=DetectionConfig(wall_reject=0.5, static_reject=1.2),
|
||||
tracker=TrackerConfig(
|
||||
forget_steps=120,
|
||||
max_new_tracks_per_step=1,
|
||||
pen_latch_depth=2.0,
|
||||
),
|
||||
robot=RobotConfig(action_smooth=0.55),
|
||||
)
|
||||
"""Webots-matched training preset.
|
||||
|
||||
Changes vs HERDING_DEFAULT:
|
||||
* LiDAR: 180 rays / 140° FOV matching ShepherdDog.proto hardware
|
||||
* Detection: wall_reject kept at 0.5 m (original default; static_reject
|
||||
handles post FPs; 1.0 m was too aggressive near the south gate)
|
||||
* Tracker: forget_steps 200 → 60 (~1 s ghost-track lifetime)
|
||||
max_new_tracks_per_step 10 → 3 (rate-caps FP flooding)
|
||||
* Robot: action_smooth 0.0 → 0.55 (matches Webots controller EMA)
|
||||
"""
|
||||
@@ -1,9 +1,23 @@
|
||||
"""Sequential "pin-and-push" shepherd-dog controller.
|
||||
"""Adaptive sequential shepherd-dog controller.
|
||||
|
||||
Single-target alternative to Strömbom: each step, target the sheep
|
||||
closest to the pen, park behind it, drive it through; once it latches
|
||||
penned the next-closest sheep becomes the target. Naturally queues
|
||||
the flock through a narrow gate.
|
||||
Three-phase strategy:
|
||||
|
||||
1. **Collect** (flock scattered): Strömbom collect — park behind the
|
||||
furthest sheep and push it toward the CoM. Identical to the
|
||||
Strömbom heuristic; keeps the flock together.
|
||||
|
||||
2. **Drive** (flock compact, >STRAGGLER_THRESHOLD active): Strömbom
|
||||
drive — park behind the CoM relative to the pen and push the whole
|
||||
group through the gate.
|
||||
|
||||
3. **Targeted** (≤STRAGGLER_THRESHOLD sheep remain active): single-
|
||||
target push on the sheep closest to the pen entry. Safe to isolate
|
||||
individual sheep once the flock is nearly exhausted.
|
||||
|
||||
The original pure pin-and-push (Phase 3 only) caused flock scatter in
|
||||
Webots physics whenever the dog tried to isolate a sheep while others
|
||||
were still spread across the field. Phases 1–2 handle the bulk of
|
||||
herding with flock-aware Strömbom logic; Phase 3 cleans up stragglers.
|
||||
"""
|
||||
|
||||
import math
|
||||
@@ -11,64 +25,103 @@ import math
|
||||
from herding.world.geometry import GATE_Y, PEN_ENTRY, in_pen
|
||||
|
||||
|
||||
DELTA_DRIVE = 1.5 # standoff behind the target sheep
|
||||
APPROACH_GAIN = 1.0 # action magnitude scale (1 = full speed)
|
||||
F_FACTOR = 4.0 # collect/drive threshold: radius > F_FACTOR·√n
|
||||
DELTA_COLLECT = 1.5 # standoff behind the furthest sheep (collect)
|
||||
DELTA_DRIVE = 2.0 # standoff behind CoM (drive)
|
||||
DELTA_TARGET = 1.5 # standoff behind single target sheep (targeted)
|
||||
STRAGGLER_THRESHOLD = 2 # switch to targeted push when ≤ this many active
|
||||
|
||||
|
||||
def _unit(x, y):
|
||||
def _unit(x: float, y: float):
|
||||
d = math.hypot(x, y)
|
||||
if d < 1e-6:
|
||||
return 0.0, 0.0
|
||||
return x / d, y / d
|
||||
|
||||
|
||||
def _is_active(x, y) -> bool:
|
||||
def _is_active(x: float, y: float) -> bool:
|
||||
return (not in_pen(x, y)) and y > GATE_Y
|
||||
|
||||
|
||||
def compute_action(dog_xy, sheep_positions, pen_target=PEN_ENTRY):
|
||||
"""Return ``(vx, vy, mode)`` — same call signature as Strömbom."""
|
||||
active = [(name, x, y) for name, (x, y) in sheep_positions.items()
|
||||
if _is_active(x, y)]
|
||||
"""Return ``(vx, vy, mode)`` — same signature as Strömbom."""
|
||||
active = [(x, y) for (x, y) in sheep_positions.values() if _is_active(x, y)]
|
||||
if not active:
|
||||
return 0.0, 0.0, "idle"
|
||||
|
||||
name, sx, sy = min(
|
||||
active,
|
||||
key=lambda s: math.hypot(s[1] - pen_target[0], s[2] - pen_target[1]),
|
||||
)
|
||||
n = len(active)
|
||||
com_x = sum(p[0] for p in active) / n
|
||||
com_y = sum(p[1] for p in active) / n
|
||||
dists = [math.hypot(p[0] - com_x, p[1] - com_y) for p in active]
|
||||
radius = max(dists)
|
||||
|
||||
ux, uy = _unit(sx - pen_target[0], sy - pen_target[1])
|
||||
tx = sx + DELTA_DRIVE * ux
|
||||
ty = sy + DELTA_DRIVE * uy
|
||||
if n <= STRAGGLER_THRESHOLD:
|
||||
# Targeted: push the sheep closest to the pen entry individually.
|
||||
sx, sy = min(active,
|
||||
key=lambda p: math.hypot(p[0] - pen_target[0],
|
||||
p[1] - pen_target[1]))
|
||||
ux, uy = _unit(sx - pen_target[0], sy - pen_target[1])
|
||||
tx, ty = sx + DELTA_TARGET * ux, sy + DELTA_TARGET * uy
|
||||
mode = "targeted"
|
||||
|
||||
elif radius > F_FACTOR * math.sqrt(n):
|
||||
# Collect: aim behind the furthest sheep from the CoM.
|
||||
idx = max(range(n), key=lambda i: dists[i])
|
||||
sx, sy = active[idx]
|
||||
ux, uy = _unit(sx - com_x, sy - com_y)
|
||||
tx, ty = sx + DELTA_COLLECT * ux, sy + DELTA_COLLECT * uy
|
||||
mode = "collect"
|
||||
|
||||
else:
|
||||
# Drive: push the whole compact flock toward the gate.
|
||||
ux, uy = _unit(com_x - pen_target[0], com_y - pen_target[1])
|
||||
tx, ty = com_x + DELTA_DRIVE * ux, com_y + DELTA_DRIVE * uy
|
||||
mode = "drive"
|
||||
|
||||
ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1])
|
||||
return APPROACH_GAIN * ax, APPROACH_GAIN * ay, f"drive:{name}"
|
||||
return ax, ay, mode
|
||||
|
||||
|
||||
def compute_action_debug(dog_xy, sheep_positions, pen_target=PEN_ENTRY):
|
||||
"""``compute_action`` plus a debug dict (target, drive point)."""
|
||||
active = [(name, x, y) for name, (x, y) in sheep_positions.items()
|
||||
if _is_active(x, y)]
|
||||
"""``compute_action`` plus a debug dict."""
|
||||
active = [(x, y) for (x, y) in sheep_positions.values() if _is_active(x, y)]
|
||||
if not active:
|
||||
return 0.0, 0.0, "idle", {
|
||||
"n_active": 0, "target_name": "",
|
||||
"target_x": 0.0, "target_y": 0.0,
|
||||
"drive_x": dog_xy[0], "drive_y": dog_xy[1],
|
||||
"n_active": 0, "phase": "idle", "radius": 0.0, "threshold": 0.0,
|
||||
"com_x": 0.0, "com_y": 0.0,
|
||||
"target_x": dog_xy[0], "target_y": dog_xy[1],
|
||||
}
|
||||
|
||||
name, sx, sy = min(
|
||||
active,
|
||||
key=lambda s: math.hypot(s[1] - pen_target[0], s[2] - pen_target[1]),
|
||||
)
|
||||
n = len(active)
|
||||
com_x = sum(p[0] for p in active) / n
|
||||
com_y = sum(p[1] for p in active) / n
|
||||
dists = [math.hypot(p[0] - com_x, p[1] - com_y) for p in active]
|
||||
radius = max(dists)
|
||||
threshold = F_FACTOR * math.sqrt(n)
|
||||
|
||||
if n <= STRAGGLER_THRESHOLD:
|
||||
sx, sy = min(active,
|
||||
key=lambda p: math.hypot(p[0] - pen_target[0],
|
||||
p[1] - pen_target[1]))
|
||||
ux, uy = _unit(sx - pen_target[0], sy - pen_target[1])
|
||||
tx, ty = sx + DELTA_TARGET * ux, sy + DELTA_TARGET * uy
|
||||
mode = "targeted"
|
||||
|
||||
elif radius > threshold:
|
||||
idx = max(range(n), key=lambda i: dists[i])
|
||||
sx, sy = active[idx]
|
||||
ux, uy = _unit(sx - com_x, sy - com_y)
|
||||
tx, ty = sx + DELTA_COLLECT * ux, sy + DELTA_COLLECT * uy
|
||||
mode = "collect"
|
||||
|
||||
else:
|
||||
ux, uy = _unit(com_x - pen_target[0], com_y - pen_target[1])
|
||||
tx, ty = com_x + DELTA_DRIVE * ux, com_y + DELTA_DRIVE * uy
|
||||
mode = "drive"
|
||||
|
||||
ux, uy = _unit(sx - pen_target[0], sy - pen_target[1])
|
||||
tx = sx + DELTA_DRIVE * ux
|
||||
ty = sy + DELTA_DRIVE * uy
|
||||
ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1])
|
||||
|
||||
return APPROACH_GAIN * ax, APPROACH_GAIN * ay, f"drive:{name}", {
|
||||
"n_active": len(active), "target_name": name,
|
||||
"target_x": sx, "target_y": sy,
|
||||
"drive_x": tx, "drive_y": ty,
|
||||
return ax, ay, mode, {
|
||||
"n_active": n, "phase": mode, "radius": radius, "threshold": threshold,
|
||||
"com_x": com_x, "com_y": com_y,
|
||||
"target_x": tx, "target_y": ty,
|
||||
}
|
||||
|
||||
@@ -21,9 +21,13 @@ The downstream tracker handles association across frames.
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import numpy as np
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from herding.config import DetectionConfig, LidarConfig
|
||||
|
||||
from herding.world.geometry import (
|
||||
FIELD_SHAPE, FIELD_ROUND_R,
|
||||
FIELD_X, FIELD_Y, GATE_X, GATE_Y,
|
||||
@@ -79,21 +83,22 @@ def _in_field_region(cx: float, cy: float) -> bool:
|
||||
FIELD_Y[0] - 0.2 < cy < FIELD_Y[1] + 0.2)
|
||||
|
||||
|
||||
def _near_wall(cx: float, cy: float) -> bool:
|
||||
def _near_wall(cx: float, cy: float, wall_reject: float = WALL_REJECT) -> bool:
|
||||
"""True if the detection is too close to a wall to be a sheep."""
|
||||
if FIELD_SHAPE == "field_round":
|
||||
r = math.hypot(cx, cy)
|
||||
return r > FIELD_ROUND_R - WALL_REJECT
|
||||
return r > FIELD_ROUND_R - wall_reject
|
||||
return (
|
||||
cx > FIELD_X[1] - WALL_REJECT or cx < FIELD_X[0] + WALL_REJECT or
|
||||
cy > FIELD_Y[1] - WALL_REJECT or
|
||||
(cy < FIELD_Y[0] + WALL_REJECT and not (PEN_X[0] <= cx <= PEN_X[1]))
|
||||
cx > FIELD_X[1] - wall_reject or cx < FIELD_X[0] + wall_reject or
|
||||
cy > FIELD_Y[1] - wall_reject or
|
||||
(cy < FIELD_Y[0] + wall_reject and not (PEN_X[0] <= cx <= PEN_X[1]))
|
||||
)
|
||||
|
||||
|
||||
def _split_cluster_by_range(
|
||||
points: list[tuple[float, float]],
|
||||
range_vals: list[float],
|
||||
split_range_gap: float = SPLIT_RANGE_GAP,
|
||||
) -> list[list[tuple[float, float]]]:
|
||||
"""Split a cluster at range-profile local maxima (gaps between sheep).
|
||||
|
||||
@@ -108,7 +113,7 @@ def _split_cluster_by_range(
|
||||
# Find the maximum range (the dip/gap between sheep).
|
||||
r_max = max(range_vals)
|
||||
# If the range variation is small, it's a single target.
|
||||
if r_max - r_min < SPLIT_RANGE_GAP:
|
||||
if r_max - r_min < split_range_gap:
|
||||
return [points]
|
||||
# Find the split point: the index with the maximum range.
|
||||
split_idx = range_vals.index(r_max)
|
||||
@@ -124,7 +129,7 @@ def _split_cluster_by_range(
|
||||
(right, range_vals[split_idx + 1:]),
|
||||
]:
|
||||
if len(sub_pts) >= 1:
|
||||
result.extend(_split_cluster_by_range(sub_pts, sub_ranges))
|
||||
result.extend(_split_cluster_by_range(sub_pts, sub_ranges, split_range_gap))
|
||||
return result if result else [points]
|
||||
|
||||
|
||||
@@ -132,14 +137,43 @@ def detections_from_scan(
|
||||
ranges: np.ndarray,
|
||||
dog_x: float, dog_y: float, dog_heading: float,
|
||||
max_range: float = LIDAR_MAX_RANGE,
|
||||
detection_cfg: "DetectionConfig | None" = None,
|
||||
lidar_cfg: "LidarConfig | None" = None,
|
||||
) -> list[tuple[float, float]]:
|
||||
"""Return list of (x, y) world-frame sheep position estimates."""
|
||||
"""Return list of (x, y) world-frame sheep position estimates.
|
||||
|
||||
Pass ``detection_cfg`` to override clustering/filtering thresholds, or
|
||||
``lidar_cfg`` to inform the function of a non-default FOV (the number of
|
||||
rays and FOV are inferred from the length of ``ranges`` and
|
||||
``lidar_cfg.fov_rad`` respectively).
|
||||
"""
|
||||
# Resolve parameters — fall back to module-level constants when no cfg.
|
||||
if detection_cfg is not None:
|
||||
gap_thr = detection_cfg.gap_threshold
|
||||
max_span = detection_cfg.max_cluster_span
|
||||
hit_eps = detection_cfg.range_hit_eps
|
||||
split_gap = detection_cfg.split_range_gap
|
||||
wall_rej = detection_cfg.wall_reject
|
||||
static_rej = detection_cfg.static_reject
|
||||
else:
|
||||
gap_thr = GAP_THRESHOLD
|
||||
max_span = MAX_CLUSTER_SPAN
|
||||
hit_eps = RANGE_HIT_EPS
|
||||
split_gap = SPLIT_RANGE_GAP
|
||||
wall_rej = WALL_REJECT
|
||||
static_rej = STATIC_REJECT
|
||||
|
||||
sheep_r = lidar_cfg.sheep_radius if lidar_cfg is not None else SHEEP_RADIUS
|
||||
fov = lidar_cfg.fov_rad if lidar_cfg is not None else LIDAR_FOV
|
||||
if lidar_cfg is not None:
|
||||
max_range = lidar_cfg.max_range
|
||||
|
||||
ranges = np.asarray(ranges, dtype=np.float32)
|
||||
n_rays = ranges.shape[0]
|
||||
if n_rays == 0:
|
||||
return []
|
||||
angles = ray_angles(n_rays, LIDAR_FOV)
|
||||
hit = ranges < max_range - RANGE_HIT_EPS
|
||||
angles = ray_angles(n_rays, fov)
|
||||
hit = ranges < max_range - hit_eps
|
||||
|
||||
world_a = dog_heading + angles
|
||||
px = dog_x + ranges * np.cos(world_a)
|
||||
@@ -159,7 +193,7 @@ def detections_from_scan(
|
||||
prev_xy = None
|
||||
continue
|
||||
pt = (float(px[i]), float(py[i]), float(ranges[i]))
|
||||
if prev_xy is not None and math.hypot(pt[0] - prev_xy[0], pt[1] - prev_xy[1]) > GAP_THRESHOLD:
|
||||
if prev_xy is not None and math.hypot(pt[0] - prev_xy[0], pt[1] - prev_xy[1]) > gap_thr:
|
||||
clusters.append(current)
|
||||
current = []
|
||||
current.append(pt)
|
||||
@@ -174,7 +208,7 @@ def detections_from_scan(
|
||||
|
||||
# Multi-peak splitting.
|
||||
if len(cluster) >= 4:
|
||||
sub_clusters = _split_cluster_by_range(points_xy, range_vals)
|
||||
sub_clusters = _split_cluster_by_range(points_xy, range_vals, split_gap)
|
||||
else:
|
||||
sub_clusters = [points_xy]
|
||||
|
||||
@@ -185,24 +219,24 @@ def detections_from_scan(
|
||||
ys = [p[1] for p in sub]
|
||||
cx, cy = sum(xs) / len(xs), sum(ys) / len(ys)
|
||||
span = math.hypot(max(xs) - min(xs), max(ys) - min(ys))
|
||||
if span > MAX_CLUSTER_SPAN:
|
||||
if span > max_span:
|
||||
continue
|
||||
# Rays hit the front edge of the sheep; offset outward by
|
||||
# SHEEP_RADIUS along the dog→cluster direction.
|
||||
# sheep_radius along the dog→cluster direction.
|
||||
dx, dy = cx - dog_x, cy - dog_y
|
||||
d = math.hypot(dx, dy)
|
||||
if d > 1e-3:
|
||||
cx += SHEEP_RADIUS * dx / d
|
||||
cy += SHEEP_RADIUS * dy / d
|
||||
cx += sheep_r * dx / d
|
||||
cy += sheep_r * dy / d
|
||||
in_main = _in_field_region(cx, cy)
|
||||
in_gate_strip = (PEN_X[0] - 0.2 < cx < PEN_X[1] + 0.2 and
|
||||
GATE_Y - 1.0 < cy < GATE_Y + 0.2)
|
||||
if not (in_main or in_gate_strip):
|
||||
continue
|
||||
if any(math.hypot(cx - fx, cy - fy) < STATIC_REJECT
|
||||
if any(math.hypot(cx - fx, cy - fy) < static_rej
|
||||
for fx, fy in _STATIC_FEATURES):
|
||||
continue
|
||||
if _near_wall(cx, cy):
|
||||
if _near_wall(cx, cy, wall_rej):
|
||||
continue
|
||||
detections.append((cx, cy))
|
||||
return detections
|
||||
|
||||
@@ -2,20 +2,25 @@
|
||||
|
||||
Raycasts against sheep (discs) and static world geometry. For rectangular
|
||||
fields this is axis-aligned walls + gate posts; for round fields it is a
|
||||
circular wall + gate posts. The env reproduces the false-positive cluster
|
||||
distribution Webots produces from real 3D geometry.
|
||||
circular wall + gate posts.
|
||||
|
||||
Returns a range array matching the Webots Lidar device:
|
||||
180 rays, 140° FOV centred on forward, 12 m max range, 5 mm noise.
|
||||
See ``protos/ShepherdDog.proto``.
|
||||
The module-level constants (``LIDAR_N_RAYS``, ``LIDAR_FOV``, etc.) reflect
|
||||
the original 360°/360-ray oracle configuration. Pass a
|
||||
:class:`~herding.config.LidarConfig` to :func:`simulate_scan` to use a
|
||||
different spec (e.g. :data:`~herding.config.LIDAR_WEBOTS` for 180-ray/140°
|
||||
matching the ShepherdDog.proto hardware).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import numpy as np
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from herding.config import LidarConfig
|
||||
|
||||
from herding.world.geometry import (
|
||||
FIELD_SHAPE, FIELD_ROUND_R,
|
||||
FIELD_X, FIELD_Y,
|
||||
@@ -192,14 +197,30 @@ def simulate_scan(
|
||||
noise: float = LIDAR_NOISE,
|
||||
max_range: float = LIDAR_MAX_RANGE,
|
||||
rng: np.random.Generator | None = None,
|
||||
lidar_cfg: "LidarConfig | None" = None,
|
||||
) -> np.ndarray:
|
||||
"""Return a (N,) float32 range array. No-hit entries equal ``max_range``.
|
||||
|
||||
``sheep_xy`` is every sheep (penned or active) in the scene.
|
||||
|
||||
Pass ``lidar_cfg`` to override the module-level defaults for a single
|
||||
call (e.g. to use :data:`~herding.config.LIDAR_WEBOTS`).
|
||||
"""
|
||||
ch, sh = math.cos(dog_heading), math.sin(dog_heading)
|
||||
cos_w = ch * _COS - sh * _SIN
|
||||
sin_w = sh * _COS + ch * _SIN
|
||||
if lidar_cfg is not None:
|
||||
n_rays = lidar_cfg.n_rays
|
||||
fov = lidar_cfg.fov_rad
|
||||
max_range = lidar_cfg.max_range
|
||||
noise = lidar_cfg.noise_std
|
||||
sheep_r2 = lidar_cfg.sheep_radius ** 2
|
||||
angles = ray_angles(n_rays, fov)
|
||||
ch, sh = math.cos(dog_heading), math.sin(dog_heading)
|
||||
cos_w = ch * np.cos(angles) - sh * np.sin(angles)
|
||||
sin_w = sh * np.cos(angles) + ch * np.sin(angles)
|
||||
else:
|
||||
sheep_r2 = SHEEP_RADIUS ** 2
|
||||
ch, sh = math.cos(dog_heading), math.sin(dog_heading)
|
||||
cos_w = ch * _COS - sh * _SIN
|
||||
sin_w = sh * _COS + ch * _SIN
|
||||
|
||||
best = _raycast_static(dog_x, dog_y, cos_w, sin_w)
|
||||
|
||||
@@ -209,9 +230,8 @@ def simulate_scan(
|
||||
t = np.outer(sx, cos_w) + np.outer(sy, sin_w)
|
||||
s_dist2 = (sx ** 2 + sy ** 2)[:, None]
|
||||
perp2 = s_dist2 - t ** 2
|
||||
R2 = SHEEP_RADIUS ** 2
|
||||
hit = (perp2 < R2) & (t > 0.0)
|
||||
half = np.sqrt(np.clip(R2 - perp2, 0.0, None))
|
||||
hit = (perp2 < sheep_r2) & (t > 0.0)
|
||||
half = np.sqrt(np.clip(sheep_r2 - perp2, 0.0, None))
|
||||
candidate = np.where(hit, t - half, np.inf)
|
||||
nearest = candidate.min(axis=0)
|
||||
np.minimum(best, nearest, out=best)
|
||||
|
||||
@@ -22,6 +22,10 @@ plane south (``is_penned_position``). Penned tracks are excluded from
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from herding.config import TrackerConfig
|
||||
|
||||
from herding.world.geometry import MAX_SHEEP, in_pen, is_penned_position
|
||||
|
||||
@@ -56,16 +60,21 @@ class Track:
|
||||
"""Not-a-property in the hot loop — callers pass current step."""
|
||||
raise NotImplementedError
|
||||
|
||||
def predicted_position(self, current_step: int) -> tuple[float, float]:
|
||||
def predicted_position(
|
||||
self,
|
||||
current_step: int,
|
||||
predict_steps: int = PREDICT_STEPS,
|
||||
velocity_clamp: float = VELOCITY_CLAMP,
|
||||
) -> tuple[float, float]:
|
||||
"""Extrapolated position using constant velocity, clamped."""
|
||||
dt = current_step - self.last_seen
|
||||
if dt <= 0 or dt > PREDICT_STEPS:
|
||||
if dt <= 0 or dt > predict_steps:
|
||||
return self.x, self.y
|
||||
speed = math.hypot(self.vx, self.vy)
|
||||
if speed < 1e-4:
|
||||
return self.x, self.y
|
||||
# Clamp extrapolation distance.
|
||||
max_d = VELOCITY_CLAMP * dt * 0.016 # steps → seconds
|
||||
max_d = velocity_clamp * dt * 0.016 # steps → seconds
|
||||
d = min(speed * dt * 0.016, max_d)
|
||||
return (
|
||||
self.x + d * (self.vx / speed),
|
||||
@@ -93,10 +102,36 @@ class SheepTracker:
|
||||
|
||||
Each track is a :class:`Track` with position, velocity estimate,
|
||||
last-seen step, and penned flag.
|
||||
|
||||
Pass a :class:`~herding.config.TrackerConfig` to override any
|
||||
module-level defaults without changing this file.
|
||||
"""
|
||||
|
||||
def __init__(self, gate: float = GATE_M):
|
||||
self.gate = gate
|
||||
def __init__(
|
||||
self,
|
||||
gate: float = GATE_M,
|
||||
tracker_cfg: "TrackerConfig | None" = None,
|
||||
):
|
||||
if tracker_cfg is not None:
|
||||
self.gate = tracker_cfg.gate_m
|
||||
self._reacquire_gate = tracker_cfg.reacquire_gate_m
|
||||
self._reacquire_min_age = tracker_cfg.reacquire_min_age
|
||||
self._penned_gate = tracker_cfg.penned_gate_m
|
||||
self._forget_steps = tracker_cfg.forget_steps
|
||||
self._predict_steps = tracker_cfg.predict_steps
|
||||
self._velocity_clamp = tracker_cfg.velocity_clamp
|
||||
self._max_new_per_step = tracker_cfg.max_new_tracks_per_step
|
||||
self._pen_latch_depth = tracker_cfg.pen_latch_depth
|
||||
else:
|
||||
self.gate = gate
|
||||
self._reacquire_gate = REACQUIRE_GATE_M
|
||||
self._reacquire_min_age = REACQUIRE_MIN_AGE
|
||||
self._penned_gate = PENNED_GATE_M
|
||||
self._forget_steps = FORGET_STEPS
|
||||
self._predict_steps = PREDICT_STEPS
|
||||
self._velocity_clamp = VELOCITY_CLAMP
|
||||
self._max_new_per_step = MAX_ACTIVE_TRACKS
|
||||
self._pen_latch_depth = 0.0
|
||||
self._tracks: dict[int, Track] = {}
|
||||
self._next_id = 0
|
||||
self.step = 0
|
||||
@@ -119,8 +154,8 @@ class SheepTracker:
|
||||
active_tids.sort(key=lambda tid: self._tracks[tid].last_seen)
|
||||
for tid in active_tids:
|
||||
track = self._tracks[tid]
|
||||
# Use predicted position for matching.
|
||||
tx, ty = track.predicted_position(self.step)
|
||||
tx, ty = track.predicted_position(
|
||||
self.step, self._predict_steps, self._velocity_clamp)
|
||||
best_j, best_d = -1, self.gate
|
||||
for j, (dx, dy) in enumerate(detections):
|
||||
if j in det_used:
|
||||
@@ -140,10 +175,11 @@ class SheepTracker:
|
||||
if tid in updated_tids:
|
||||
continue
|
||||
track = self._tracks[tid]
|
||||
if (self.step - track.last_seen) < REACQUIRE_MIN_AGE:
|
||||
if (self.step - track.last_seen) < self._reacquire_min_age:
|
||||
continue
|
||||
tx, ty = track.predicted_position(self.step)
|
||||
best_j, best_d = -1, REACQUIRE_GATE_M
|
||||
tx, ty = track.predicted_position(
|
||||
self.step, self._predict_steps, self._velocity_clamp)
|
||||
best_j, best_d = -1, self._reacquire_gate
|
||||
for j, (dx, dy) in enumerate(detections):
|
||||
if j in det_used:
|
||||
continue
|
||||
@@ -161,7 +197,7 @@ class SheepTracker:
|
||||
penned_tids = [tid for tid, t in self._tracks.items() if t.penned]
|
||||
for tid in penned_tids:
|
||||
track = self._tracks[tid]
|
||||
best_j, best_d = -1, PENNED_GATE_M
|
||||
best_j, best_d = -1, self._penned_gate
|
||||
for j, (dx, dy) in enumerate(detections):
|
||||
if j in det_used:
|
||||
continue
|
||||
@@ -174,25 +210,35 @@ class SheepTracker:
|
||||
track.update(dx, dy, self.step)
|
||||
det_used.add(best_j)
|
||||
|
||||
# Spawn new tracks for unmatched detections.
|
||||
# Spawn new tracks for unmatched detections — rate-capped.
|
||||
spawned = 0
|
||||
for j, (dx, dy) in enumerate(detections):
|
||||
if j in det_used:
|
||||
continue
|
||||
penned = in_pen(dx, dy) or is_penned_position(dx, dy)
|
||||
if spawned >= self._max_new_per_step:
|
||||
break
|
||||
penned = self._is_penned(dx, dy)
|
||||
self._tracks[self._next_id] = Track(dx, dy, self.step, penned)
|
||||
self._next_id += 1
|
||||
spawned += 1
|
||||
|
||||
# Promote active tracks whose current estimate crosses the gate.
|
||||
for track in self._tracks.values():
|
||||
if track.penned:
|
||||
continue
|
||||
px, py = track.predicted_position(self.step)
|
||||
if is_penned_position(px, py):
|
||||
px, py = track.predicted_position(
|
||||
self.step, self._predict_steps, self._velocity_clamp)
|
||||
if self._is_penned(px, py):
|
||||
track.penned = True
|
||||
|
||||
# Forget stale active tracks; penned tracks live forever.
|
||||
# Forget stale active tracks; penned tracks decay too but at a
|
||||
# longer horizon (real penned sheep are still observed occasionally
|
||||
# when the dog faces south; pure FPs at gate posts stop being
|
||||
# detected once the dog drives away).
|
||||
penned_forget = self._forget_steps * 8
|
||||
stale = [tid for tid, t in self._tracks.items()
|
||||
if not t.penned and (self.step - t.last_seen) > FORGET_STEPS]
|
||||
if (not t.penned and (self.step - t.last_seen) > self._forget_steps)
|
||||
or (t.penned and (self.step - t.last_seen) > penned_forget)]
|
||||
for tid in stale:
|
||||
del self._tracks[tid]
|
||||
|
||||
@@ -206,18 +252,42 @@ class SheepTracker:
|
||||
|
||||
return self.get_positions()
|
||||
|
||||
def get_positions(self) -> dict[str, tuple[float, float]]:
|
||||
def _is_penned(self, x: float, y: float) -> bool:
|
||||
"""Check whether a position should be considered penned.
|
||||
|
||||
Uses ``pen_latch_depth`` to require the position to be that many
|
||||
metres past the gate line before latching. Increasing the depth
|
||||
prevents gate-area LiDAR false positives (gate hardware reflections
|
||||
at y ≈ -15) from being permanently latched as penned tracks.
|
||||
"""
|
||||
from herding.world.geometry import GATE_Y
|
||||
# Apply depth threshold to both in_pen and is_penned_position so
|
||||
# that any position in the gate column must clear GATE_Y - depth.
|
||||
threshold = GATE_Y - self._pen_latch_depth
|
||||
return (in_pen(x, y) or is_penned_position(x, y)) and y <= threshold
|
||||
|
||||
def get_positions(self, min_freshness: int | None = None) -> dict[str, tuple[float, float]]:
|
||||
"""Active (not-penned) tracks as a ``{name: (x, y)}`` dict.
|
||||
|
||||
For tracks currently being predicted (occluded but within
|
||||
PREDICT_STEPS), returns the extrapolated position so the teacher
|
||||
predict_steps), returns the extrapolated position so the teacher
|
||||
sees a smooth estimate.
|
||||
|
||||
``min_freshness`` (optional, deploy-only): drop tracks whose
|
||||
last_seen is older than ``step - min_freshness``. Real sheep in
|
||||
FOV are detected nearly every step; phantom tracks from sporadic
|
||||
Webots FPs stop being re-observed and decay. Default ``None``
|
||||
preserves training behaviour (extrapolated tracks visible).
|
||||
"""
|
||||
result = {}
|
||||
for tid, track in self._tracks.items():
|
||||
if track.penned:
|
||||
continue
|
||||
px, py = track.predicted_position(self.step)
|
||||
if (min_freshness is not None
|
||||
and self.step - track.last_seen > min_freshness):
|
||||
continue
|
||||
px, py = track.predicted_position(
|
||||
self.step, self._predict_steps, self._velocity_clamp)
|
||||
result[f"t{tid}"] = (px, py)
|
||||
return result
|
||||
|
||||
@@ -234,4 +304,4 @@ class SheepTracker:
|
||||
"""Number of active tracks currently being extrapolated (not directly observed)."""
|
||||
return sum(1 for t in self._tracks.values()
|
||||
if not t.penned and (self.step - t.last_seen) > 0
|
||||
and (self.step - t.last_seen) <= PREDICT_STEPS)
|
||||
and (self.step - t.last_seen) <= self._predict_steps)
|
||||
|
||||
@@ -2,14 +2,22 @@
|
||||
controllers.
|
||||
|
||||
First-order rigid-body model — no slip, wheel-accel limits, or contact
|
||||
forces. Webots' ODE physics handles those at inference; the env stays
|
||||
close enough to first order that a policy trained here transfers.
|
||||
forces by default. Pass ``slip_std`` and an ``rng`` to
|
||||
:func:`kinematics_step` / :func:`mecanum_kinematics_step` to add
|
||||
per-wheel Gaussian speed noise for domain randomisation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def kinematics_step(x, y, h, w_left, w_right, wheel_radius, wheel_base, dt):
|
||||
def kinematics_step(x, y, h, w_left, w_right, wheel_radius, wheel_base, dt,
|
||||
slip_std: float = 0.0,
|
||||
rng: Optional[np.random.Generator] = None):
|
||||
"""Integrate one step of differential-drive forward kinematics.
|
||||
|
||||
Inputs
|
||||
@@ -19,9 +27,15 @@ def kinematics_step(x, y, h, w_left, w_right, wheel_radius, wheel_base, dt):
|
||||
w_left, w_right : wheel angular velocities (rad/s)
|
||||
wheel_radius, wheel_base : robot dimensions (m)
|
||||
dt : timestep (s)
|
||||
slip_std : optional Gaussian std (rad/s) added to each wheel speed
|
||||
rng : numpy Generator for slip noise; required when slip_std > 0
|
||||
|
||||
Returns (new_x, new_y, new_h).
|
||||
"""
|
||||
if slip_std > 0.0 and rng is not None:
|
||||
noise = rng.normal(0.0, slip_std, size=2)
|
||||
w_left = w_left + noise[0]
|
||||
w_right = w_right + noise[1]
|
||||
v = (w_right + w_left) * wheel_radius * 0.5
|
||||
omega = (w_right - w_left) * wheel_radius / wheel_base
|
||||
new_x = x + v * math.cos(h) * dt
|
||||
@@ -67,7 +81,9 @@ def heading_speed_to_wheels(heading, speed_motor, h, max_wheel_omega,
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def mecanum_kinematics_step(x, y, h, w_fl, w_fr, w_rl, w_rr,
|
||||
wheel_radius, lx, ly, dt):
|
||||
wheel_radius, lx, ly, dt,
|
||||
slip_std: float = 0.0,
|
||||
rng: Optional[np.random.Generator] = None):
|
||||
"""Integrate one step of mecanum forward kinematics.
|
||||
|
||||
Parameters
|
||||
@@ -79,9 +95,15 @@ def mecanum_kinematics_step(x, y, h, w_fl, w_fr, w_rl, w_rr,
|
||||
lx : half the front-to-back axle distance (m)
|
||||
ly : half the left-to-right axle distance (m)
|
||||
dt : timestep (s)
|
||||
slip_std : optional Gaussian std (rad/s) added to each wheel speed
|
||||
rng : numpy Generator for slip noise; required when slip_std > 0
|
||||
|
||||
Returns (new_x, new_y, new_h).
|
||||
"""
|
||||
if slip_std > 0.0 and rng is not None:
|
||||
noise = rng.normal(0.0, slip_std, size=4)
|
||||
w_fl, w_fr = w_fl + noise[0], w_fr + noise[1]
|
||||
w_rl, w_rr = w_rl + noise[2], w_rr + noise[3]
|
||||
r = wheel_radius
|
||||
vx_body = (w_fl + w_fr + w_rl + w_rr) * r / 4.0
|
||||
vy_body = (-w_fl + w_fr + w_rl - w_rr) * r / 4.0
|
||||
|
||||
@@ -72,6 +72,36 @@ if FIELD_SHAPE == "field_round":
|
||||
GATE_Y = FIELD_ROUND_GATE_Y
|
||||
|
||||
|
||||
def configure_from_args(argv: list[str] | None = None) -> str:
|
||||
"""Parse ``--world`` from *argv* (or ``sys.argv[1:]``), call :func:`configure`,
|
||||
and set ``HERDING_WORLD`` in the environment.
|
||||
|
||||
Returns the resolved world name (``"field"`` or ``"field_round"``).
|
||||
|
||||
Call this at the very top of any script that accepts a ``--world`` flag,
|
||||
*before* importing anything from ``herding.*`` that depends on field
|
||||
geometry. This centralises the pre-parse logic that was previously
|
||||
duplicated in ``bc/collect.py``, ``rl/train.py``, and ``eval.py``::
|
||||
|
||||
from herding.world.geometry import configure_from_args
|
||||
configure_from_args() # reads sys.argv
|
||||
"""
|
||||
import os
|
||||
import sys as _sys
|
||||
args = argv if argv is not None else _sys.argv[1:]
|
||||
world = "field"
|
||||
for i, a in enumerate(args):
|
||||
if a == "--world" and i + 1 < len(args):
|
||||
world = args[i + 1]
|
||||
break
|
||||
if a.startswith("--world="):
|
||||
world = a.split("=", 1)[1]
|
||||
break
|
||||
configure(world)
|
||||
os.environ["HERDING_WORLD"] = world
|
||||
return world
|
||||
|
||||
|
||||
def configure(shape: str) -> None:
|
||||
"""Switch the active field geometry at runtime.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user