dd5ac669e5
Today's session worked across the full Webots delivery stack — found and
fixed a cluster of bugs blocking the BC/RL transfer, then explored
training-side mitigations for the residual perception gap.
Bug fixes:
- Makefile FP_RATE default 2.0 → 0.0: BC demos used fp_rate=0 but RL
fine-tune defaulted to fp_rate=2, poisoning the BC obs distribution
and stalling PPO at 0% success across 1.46M+ steps.
- controllers/{shepherd_dog,sheep}/runtime.ini: Webots was launching
controllers under system python3 (no numpy) and they were crashing
silently. Pinned to the conda tir env.
- herding/config.py HERDING_WEBOTS preset: pen_latch_depth 0.5 → 2.0,
max_new_tracks_per_step 3 → 1, static_reject 0.8 → 1.2. Stops phantom
FPs near the gate from latching as permanently-penned tracks.
- herding/perception/sheep_tracker.py: penned tracks now decay at
forget_steps × 8 instead of living forever. Adds get_positions
min_freshness filter for deploy-time use.
Training/eval matches deployment:
- training/bc/collect.py: --dagger-policy flag for DAgger rollouts
(policy drives, teacher labels) + --use-webots-preset for matched
140° tracker + DR config.
- controllers/shepherd_dog/shepherd_dog.py: scan-fallback (0, 0.6) when
BC/RL sees empty sheep_positions — recovers from FOV gaps.
Tooling:
- tools/dagger_round.sh: one-shot DAgger round (collect + concat + bc).
- tools/webots_sweep_gt.sh: full sweep with HERDING_USE_GT=1 for the
perception-gap diagnosis matrix.
- protos/ShepherdDog360.proto: 360° FOV variant for the FOV-ablation
comparison. Canonical proto stays at 140° per project spec.
Artifacts: v1 BC/RL policies for all 4 (drive × world) combos trained
in clean gym (success: diff/field 90-100%, diff/round 58%, mec/field
60-100%, mec/round 50-100%). DAgger r1/r2 BCs for diff/field show
12%→38% progression on gym HERDING_WEBOTS proxy but did not close
to actual Webots LiDAR (0/5 throughout). Next: LSTM policy or
learned tracker per the project-state memory.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
256 lines
8.6 KiB
Python
256 lines
8.6 KiB
Python
"""Fast 2D LiDAR simulator for the Gymnasium env.
|
|
|
|
Raycasts against sheep (discs) and static world geometry. For rectangular
|
|
fields this is axis-aligned walls + gate posts; for round fields it is a
|
|
circular wall + gate posts.
|
|
|
|
The module-level constants (``LIDAR_N_RAYS``, ``LIDAR_FOV``, etc.) reflect
|
|
the original 360°/360-ray oracle configuration. Pass a
|
|
:class:`~herding.config.LidarConfig` to :func:`simulate_scan` to use a
|
|
different spec (e.g. :data:`~herding.config.LIDAR_WEBOTS` for 180-ray/140°
|
|
matching the ShepherdDog.proto hardware).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
from typing import TYPE_CHECKING
|
|
|
|
import numpy as np
|
|
|
|
if TYPE_CHECKING:
|
|
from herding.config import LidarConfig
|
|
|
|
from herding.world.geometry import (
|
|
FIELD_SHAPE, FIELD_ROUND_R,
|
|
FIELD_X, FIELD_Y,
|
|
GATE_X, GATE_Y,
|
|
PEN_X, PEN_Y,
|
|
)
|
|
|
|
|
|
# Match protos/ShepherdDog.proto Lidar device — extended to 360° for
|
|
# full situational awareness. The original Webots device is 140° FOV /
|
|
# 180 rays; we use 360 rays for full-circle coverage.
|
|
LIDAR_N_RAYS = 360
|
|
LIDAR_FOV = 2.0 * math.pi # 360° full circle
|
|
LIDAR_MAX_RANGE = 12.0
|
|
LIDAR_NOISE = 0.005 # m, gaussian std
|
|
|
|
# Sheep cross-section in the LiDAR plane (horizontal cylinder approx).
|
|
SHEEP_RADIUS = 0.30
|
|
POST_RADIUS = 0.25
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rectangular-field static geometry
|
|
# ---------------------------------------------------------------------------
|
|
_VERTICAL_WALLS_RECT = (
|
|
( 15.0, -15.0, 15.0), # field east
|
|
(-15.0, -15.0, 15.0), # field west
|
|
( 10.0, -22.0, -15.0), # pen west
|
|
( 13.0, -22.0, -15.0), # pen east
|
|
)
|
|
|
|
_HORIZONTAL_WALLS_RECT = (
|
|
( 15.0, -15.0, 15.0), # field north
|
|
(-15.0, -15.0, 10.0), # field south-west of gate
|
|
(-15.0, 13.0, 15.0), # field south-east of gate
|
|
(-22.0, 10.0, 13.0), # pen south
|
|
)
|
|
|
|
_POSTS_RECT = np.array([
|
|
( 10.0, -15.0), ( 13.0, -15.0),
|
|
( 15.0, 15.0), ( 15.0, -15.0),
|
|
(-15.0, 15.0), (-15.0, -15.0),
|
|
], dtype=np.float64)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Round-field static geometry
|
|
# ---------------------------------------------------------------------------
|
|
# Circular wall with gate gap. Gate posts at the edges of the gate gap.
|
|
_gate_cx = 0.5 * (GATE_X[0] + GATE_X[1])
|
|
_POSTS_ROUND = np.array([
|
|
(GATE_X[0], GATE_Y),
|
|
(GATE_X[1], GATE_Y),
|
|
], dtype=np.float64)
|
|
|
|
# Pen walls for round field
|
|
_VERTICAL_WALLS_ROUND = (
|
|
(GATE_X[0], PEN_Y[0], GATE_Y), # pen west
|
|
(GATE_X[1], PEN_Y[0], GATE_Y), # pen east
|
|
)
|
|
_HORIZONTAL_WALLS_ROUND = (
|
|
(PEN_Y[0], GATE_X[0], GATE_X[1]), # pen south
|
|
)
|
|
|
|
|
|
def _build_static_geometry():
|
|
"""Select the correct static geometry for the active field shape."""
|
|
if FIELD_SHAPE == "field_round":
|
|
return (
|
|
_VERTICAL_WALLS_ROUND,
|
|
_HORIZONTAL_WALLS_ROUND,
|
|
_POSTS_ROUND,
|
|
FIELD_ROUND_R,
|
|
)
|
|
return (
|
|
_VERTICAL_WALLS_RECT,
|
|
_HORIZONTAL_WALLS_RECT,
|
|
_POSTS_RECT,
|
|
None, # no circular wall
|
|
)
|
|
|
|
|
|
_VERTS, _HORIZS, _POSTS, _CIRC_R = _build_static_geometry()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Ray helpers
|
|
# ---------------------------------------------------------------------------
|
|
def ray_angles(n: int = LIDAR_N_RAYS, fov: float = LIDAR_FOV) -> np.ndarray:
|
|
"""Local-frame ray angles, CCW from forward, sweeping +fov/2 → -fov/2."""
|
|
return np.linspace(fov / 2.0, -fov / 2.0, n, dtype=np.float64)
|
|
|
|
|
|
_ANGLES = ray_angles()
|
|
_COS = np.cos(_ANGLES)
|
|
_SIN = np.sin(_ANGLES)
|
|
|
|
|
|
def _raycast_static(
|
|
ox: float, oy: float, cos_w: np.ndarray, sin_w: np.ndarray,
|
|
) -> np.ndarray:
|
|
"""Per-ray distance to the nearest wall or post hit (∞ if none)."""
|
|
n_rays = cos_w.shape[0]
|
|
best = np.full(n_rays, np.inf, dtype=np.float64)
|
|
|
|
EPS = 1e-3
|
|
safe_cos = np.where(np.abs(cos_w) < 1e-9, 1e-9, cos_w)
|
|
safe_sin = np.where(np.abs(sin_w) < 1e-9, 1e-9, sin_w)
|
|
|
|
# Vertical walls (x = const)
|
|
for wx, ymin, ymax in _VERTS:
|
|
t = (wx - ox) / safe_cos
|
|
y_at = oy + t * sin_w
|
|
valid = (t > EPS) & (y_at >= ymin - EPS) & (y_at <= ymax + EPS)
|
|
cand = np.where(valid, t, np.inf)
|
|
np.minimum(best, cand, out=best)
|
|
|
|
# Horizontal walls (y = const)
|
|
for wy, xmin, xmax in _HORIZS:
|
|
t = (wy - oy) / safe_sin
|
|
x_at = ox + t * cos_w
|
|
valid = (t > EPS) & (x_at >= xmin - EPS) & (x_at <= xmax + EPS)
|
|
cand = np.where(valid, t, np.inf)
|
|
np.minimum(best, cand, out=best)
|
|
|
|
# Circular wall (round field only)
|
|
if _CIRC_R is not None:
|
|
# Ray: P(t) = O + t·D. ||P(t)||² = R²
|
|
# t² - 2t(O·D) + (||O||² - R²) = 0
|
|
# a = 1 (rays are unit), b = -2(O·D), c = ||O||² - R²
|
|
a = 1.0 # cos_w² + sin_w² = 1
|
|
b = -(ox * cos_w + oy * sin_w)
|
|
c = ox * ox + oy * oy - _CIRC_R * _CIRC_R
|
|
disc = b * b - a * c
|
|
valid_disc = disc >= 0.0
|
|
sqrt_disc = np.sqrt(np.maximum(disc, 0.0))
|
|
# Two intersection candidates: t = (-b ± sqrt(disc)) / a
|
|
t1 = -b - sqrt_disc
|
|
t2 = -b + sqrt_disc
|
|
# We want the smallest positive t.
|
|
t1_valid = valid_disc & (t1 > EPS)
|
|
t2_valid = valid_disc & (t2 > EPS)
|
|
t_circ = np.where(t1_valid, t1, np.where(t2_valid, t2, np.inf))
|
|
|
|
# Exclude rays that hit the gate gap: the hit point must not lie
|
|
# in the gate column (between GATE_X and above GATE_Y).
|
|
hx = ox + t_circ * cos_w
|
|
hy = oy + t_circ * sin_w
|
|
in_gate = ((hx > GATE_X[0]) & (hx < GATE_X[1]) &
|
|
(hy > GATE_Y - 2.0) & (hy < GATE_Y + 2.0))
|
|
t_circ = np.where(in_gate, np.inf, t_circ)
|
|
np.minimum(best, t_circ, out=best)
|
|
|
|
# Posts (treat as discs)
|
|
if _POSTS.size:
|
|
px = _POSTS[:, 0] - ox
|
|
py = _POSTS[:, 1] - oy
|
|
t_post = np.outer(px, cos_w) + np.outer(py, sin_w)
|
|
d2 = (px ** 2 + py ** 2)[:, None]
|
|
perp2 = d2 - t_post ** 2
|
|
R2 = POST_RADIUS ** 2
|
|
hit = (perp2 < R2) & (t_post > 0.0)
|
|
half = np.sqrt(np.clip(R2 - perp2, 0.0, None))
|
|
cand = np.where(hit, t_post - half, np.inf)
|
|
nearest = cand.min(axis=0)
|
|
np.minimum(best, nearest, out=best)
|
|
|
|
return best
|
|
|
|
|
|
def simulate_scan(
|
|
dog_x: float, dog_y: float, dog_heading: float,
|
|
sheep_xy: list[tuple[float, float]],
|
|
noise: float = LIDAR_NOISE,
|
|
max_range: float = LIDAR_MAX_RANGE,
|
|
rng: np.random.Generator | None = None,
|
|
lidar_cfg: "LidarConfig | None" = None,
|
|
) -> np.ndarray:
|
|
"""Return a (N,) float32 range array. No-hit entries equal ``max_range``.
|
|
|
|
``sheep_xy`` is every sheep (penned or active) in the scene.
|
|
|
|
Pass ``lidar_cfg`` to override the module-level defaults for a single
|
|
call (e.g. to use :data:`~herding.config.LIDAR_WEBOTS`).
|
|
"""
|
|
if lidar_cfg is not None:
|
|
n_rays = lidar_cfg.n_rays
|
|
fov = lidar_cfg.fov_rad
|
|
max_range = lidar_cfg.max_range
|
|
noise = lidar_cfg.noise_std
|
|
sheep_r2 = lidar_cfg.sheep_radius ** 2
|
|
angles = ray_angles(n_rays, fov)
|
|
ch, sh = math.cos(dog_heading), math.sin(dog_heading)
|
|
cos_w = ch * np.cos(angles) - sh * np.sin(angles)
|
|
sin_w = sh * np.cos(angles) + ch * np.sin(angles)
|
|
else:
|
|
sheep_r2 = SHEEP_RADIUS ** 2
|
|
ch, sh = math.cos(dog_heading), math.sin(dog_heading)
|
|
cos_w = ch * _COS - sh * _SIN
|
|
sin_w = sh * _COS + ch * _SIN
|
|
|
|
best = _raycast_static(dog_x, dog_y, cos_w, sin_w)
|
|
|
|
if sheep_xy:
|
|
sx = np.asarray([p[0] for p in sheep_xy], dtype=np.float64) - dog_x
|
|
sy = np.asarray([p[1] for p in sheep_xy], dtype=np.float64) - dog_y
|
|
t = np.outer(sx, cos_w) + np.outer(sy, sin_w)
|
|
s_dist2 = (sx ** 2 + sy ** 2)[:, None]
|
|
perp2 = s_dist2 - t ** 2
|
|
hit = (perp2 < sheep_r2) & (t > 0.0)
|
|
half = np.sqrt(np.clip(sheep_r2 - perp2, 0.0, None))
|
|
candidate = np.where(hit, t - half, np.inf)
|
|
nearest = candidate.min(axis=0)
|
|
np.minimum(best, nearest, out=best)
|
|
|
|
ranges = np.minimum(best, max_range).astype(np.float32)
|
|
return _add_noise(ranges, noise, rng, max_range)
|
|
|
|
|
|
def _add_noise(ranges: np.ndarray, sigma: float,
|
|
rng: np.random.Generator | None, max_range: float) -> np.ndarray:
|
|
if sigma <= 0.0:
|
|
return ranges
|
|
if rng is None:
|
|
rng = np.random.default_rng()
|
|
hit_mask = ranges < max_range - 1e-3
|
|
n_hit = int(hit_mask.sum())
|
|
if n_hit:
|
|
ranges = ranges.copy()
|
|
ranges[hit_mask] += rng.normal(0.0, sigma, size=n_hit).astype(np.float32)
|
|
np.clip(ranges, 0.0, max_range, out=ranges)
|
|
return ranges
|