Files
TIR_PROJ/training/herding_env.py
T
Johnny Fernandes ee77c8606c Gym mecanum kinematics matching to Webots roller-hinge proto
Mecanum proto rewrite in b3cf990 made the wheels truly omnidirectional
in Webots, but with asymmetric slip: forward command produces ~89% of
textbook speed while strafe produces only ~38% plus a consistent
~28% backward bleed-through. v1 BC/RL trained on perfect mecanum
gym kinematics could not herd the new dynamics. To unblock that:

* `mecanum_kinematics_step` gains two parameters that scale the
  realised motion to match a deployed-platform calibration:
    - strafe_efficiency  ∈ (0, 1]  default 1.0
    - strafe_to_forward_bleed     default 0.0
  Forward motion is untouched (textbook X-pattern continues to apply
  to vx_body); only the lateral channel is scaled and bleed is added.
* `RobotConfig` exposes both as drive-config fields with the same
  pass-through defaults so existing diff-drive code and existing
  mecanum training pipelines see no behaviour change.
* `HERDING_MEC_WEBOTS` preset bakes in the values measured against the
  current Webots mecanum proto (strafe_efficiency=0.4,
  strafe_to_forward_bleed=-0.28). Training mecanum BC/RL with this
  preset produces policies that compensate for the imperfect
  physical mecanum at deploy.
* `HerdingEnv` plumbs `RobotConfig.strafe_*` through to
  `mecanum_kinematics_step` so the preset takes effect.
* tools/gen_mecanum_wheels.py is added so the proto's 32 roller
  hinges can be regenerated by editing a single set of constants
  rather than hand-editing 1500+ lines of VRML.

Tests:
* 4 new mecanum_kinematics_step tests (default pass-through, strafe
  scaling, backward bleed, forward unaffected by strafe params).
* 3 new RobotConfig tests (defaults, validation, preset shape).
* Sanity check: gym strafe with HERDING_MEC_WEBOTS over 100 steps
  reproduces the Webots calibration to 2 decimal places.

126 unit tests pass (was 120).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 01:09:47 +00:00

569 lines
24 KiB
Python

"""Gymnasium environment for the shepherd-dog herding task.
Single-agent: the dog is the policy; sheep are env-controlled flocking
agents (``herding.world.flocking_sim``). Kinematics match the proto specs
(``herding.world.diffdrive``) so a policy trained here transfers to Webots
without re-tuning.
* **Action** (differential): ``Box(-1, 1, (2,))`` — ``(vx, vy)`` intent.
* **Action** (mecanum): ``Box(-1, 1, (3,))`` — ``(vx, vy, omega)`` intent.
* **Observation**: ``Box(-inf, inf, (32·K,))`` from ``herding.perception.obs.build_obs``
with optional frame stacking K (concatenated oldest → newest).
* **Reset**: ``options["n_sheep"]`` overrides flock size; otherwise
sampled uniformly from ``[1, max_n_sheep]``.
* **Reward**: dense shaping (per-sheep distance progress, time
penalty, Strömbom-imitation cosine bonus) + sparse pen/done jackpots.
Weights live as class attributes on :class:`HerdingEnv`.
"""
from __future__ import annotations
import math
import random
from typing import Optional
import gymnasium as gym
import numpy as np
from gymnasium import spaces
from herding.world.diffdrive import (
heading_speed_to_wheels, kinematics_step,
mecanum_kinematics_step, velocity_to_mecanum_wheels, velocity_to_wheels,
)
from herding.world.flocking_sim import (
FLEE_SPEED, MAX_SPEED, WANDER_SPEED, compute_heading_speed,
)
from herding.world.geometry import (
DOG_MAX_LINEAR, DOG_MAX_WHEEL_OMEGA,
DOG_SOUTH_LIMIT, DOG_WHEEL_BASE, DOG_WHEEL_BASE_X, DOG_WHEEL_BASE_Y,
DOG_WHEEL_RADIUS, FIELD_SHAPE, FIELD_ROUND_R, FIELD_X, FIELD_Y,
GATE_X, GATE_Y, MAX_SHEEP,
PEN_ENTRY, PEN_X, PEN_Y,
SHEEP_MAX_WHEEL_OMEGA, SHEEP_WHEEL_BASE, SHEEP_WHEEL_RADIUS,
WEBOTS_DT, clip_to_field, is_penned_position,
)
from herding.perception.lidar_perception import detections_from_scan
from herding.perception.lidar_sim import simulate_scan
from herding.perception.obs import OBS_DIM, build_obs
from herding.perception.sheep_tracker import SheepTracker
from herding.control.strombom import compute_action as strombom_action
from herding.config import HerdingConfig
class HerdingEnv(gym.Env):
"""Single-agent shepherd-dog herding env.
Each step is one Webots ``basicTimeStep`` (16 ms). Episodes terminate
when all sheep are penned, or after ``max_steps`` steps (truncation).
"""
metadata = {"render_modes": []}
# Reward weights. Sparse jackpots (W_PEN_DELTA, W_DONE) dominate;
# dense shaping (W_PROGRESS on Δ mean-distance-to-pen) provides the
# gradient; W_IMITATE adds a small cosine bonus toward the analytic
# teacher's action; W_TIME is a per-step penalty (0 by default).
W_PEN_DELTA = 100.0
W_PROGRESS = 20.0
W_IMITATE = 0.5
W_TIME = 0.0
W_WALL = 0.0
W_COLLISION = 0.0
W_DONE = 500.0
# In-env action EMA. 0 = none; the Webots controller applies its own
# EMA at inference, so the policy needn't learn smoothness.
ACTION_SMOOTH = 0.0
DEFAULT_MAX_STEPS = 5000
COLLISION_DIST = 0.30
def __init__(
self,
n_sheep: Optional[int] = None,
max_n_sheep: int = MAX_SHEEP,
max_steps: int = DEFAULT_MAX_STEPS,
difficulty: float = 0.0,
seed: Optional[int] = None,
use_lidar: bool = True,
frame_stack: int = 1,
drive_mode: str = "differential",
herding_cfg: Optional[HerdingConfig] = None,
):
super().__init__()
# Store the config; fall back to defaults when None.
self._herding_cfg = herding_cfg
# Apply robot config overrides — these shadow the class attributes
# so that per-instance configuration is possible without touching
# the class-level defaults used by unconfigured instances.
if herding_cfg is not None:
self.ACTION_SMOOTH = herding_cfg.robot.action_smooth
# ``use_lidar=True`` (default): obs and imitation-reward teacher
# see only LiDAR-perceived positions via a tracker, matching the
# Webots controller. ``False`` exposes ground truth for ablation.
self._use_lidar = bool(use_lidar)
tracker_cfg = herding_cfg.tracker if herding_cfg is not None else None
self._tracker = SheepTracker(tracker_cfg=tracker_cfg) if self._use_lidar else None
self._np_rng_lidar: Optional[np.random.Generator] = None
# Frame stacking: the policy receives the last K obs concatenated,
# giving a memoryless MLP temporal context. K=1 → single frame.
self._frame_stack = max(1, int(frame_stack))
self._frame_buffer: list[np.ndarray] = []
# Drive mode: "differential" (2-wheel) or "mecanum" (4-wheel omni).
self._drive_mode = drive_mode.lower()
if self._drive_mode not in ("differential", "mecanum"):
raise ValueError(f"Unknown drive_mode: {drive_mode!r}")
action_dim = 3 if self._drive_mode == "mecanum" else 2
self.action_space = spaces.Box(-1.0, 1.0, shape=(action_dim,),
dtype=np.float32)
self._single_obs_dim = OBS_DIM
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf,
shape=(OBS_DIM * self._frame_stack,), dtype=np.float32,
)
# n_sheep=None → sample uniformly from [1, max_n_sheep] each reset.
self._fixed_n_sheep = n_sheep
self._max_n_sheep = max_n_sheep
self.max_steps = max_steps
# difficulty ∈ [0, 1]: 0 = sheep spawn near the gate (easy);
# 1 = sheep spawn anywhere in the field (deployment distribution).
self._difficulty = float(difficulty)
self._initial_seed = seed
self._initial_seed_used = False
# Env-owned RNG for wander jitter, re-seeded from np_random on reset.
self._py_rng = random.Random()
self._action_dim = action_dim
# State (initialised in reset)
self.dog_x = self.dog_y = self.dog_heading = 0.0
self.sheep_x = np.zeros(0, dtype=np.float32)
self.sheep_y = np.zeros(0, dtype=np.float32)
self.sheep_h = np.zeros(0, dtype=np.float32)
self.sheep_penned = np.zeros(0, dtype=bool)
self.sheep_wander = np.zeros(0, dtype=np.float32)
self.prev_action = np.zeros(self._action_dim, dtype=np.float32)
self.smoothed_action = np.zeros(self._action_dim, dtype=np.float32)
self.steps = 0
self.n_sheep = 0
self.prev_n_penned = 0
self.prev_d_pen = 0.0
self.prev_radius = 0.0
# --- Public knobs ---
def set_max_n_sheep(self, value: int) -> None:
self._max_n_sheep = int(np.clip(value, 1, MAX_SHEEP))
def set_difficulty(self, value: float) -> None:
self._difficulty = float(np.clip(value, 0.0, 1.0))
def set_imitate_weight(self, value: float) -> None:
"""Override the instance W_IMITATE — used to disable Strömbom
imitation during PPO fine-tune."""
self.W_IMITATE = float(value)
def set_time_weight(self, value: float) -> None:
"""Override the instance W_TIME — set negative to penalise step
count and encourage faster time-to-pen during PPO fine-tune."""
self.W_TIME = float(value)
# --- gym API ---
def reset(self, *, seed=None, options=None):
if (seed is None and self._initial_seed is not None
and not self._initial_seed_used):
seed = self._initial_seed
self._initial_seed_used = True
super().reset(seed=seed)
self._py_rng.seed(int(self.np_random.integers(0, 2**31 - 1)))
opts = options or {}
if "n_sheep" in opts and opts["n_sheep"] is not None:
self.n_sheep = int(opts["n_sheep"])
elif self._fixed_n_sheep is not None:
self.n_sheep = int(self._fixed_n_sheep)
else:
self.n_sheep = int(self.np_random.integers(1, self._max_n_sheep + 1))
# Dog spawns near origin with random heading.
self.dog_x = float(self.np_random.uniform(-2.5, 2.5))
self.dog_y = float(self.np_random.uniform(-2.5, 2.5))
self.dog_heading = float(self.np_random.uniform(-math.pi, math.pi))
# Sheep spawn region linearly interpolates with difficulty:
# 0 → small box near the gate, 1 → full field.
d = self._difficulty
if FIELD_SHAPE == "field_round":
# Round field: spawn in a sector near the gate (south),
# expanding to the full circle at difficulty=1.
spawn_r_lo = 3.0
spawn_r_hi = d * FIELD_ROUND_R * 0.8 + (1.0 - d) * 6.0
# Angle spread around south (±60° at d=0, full circle at d=1).
half_angle = math.radians(60) + d * math.radians(120)
angle_lo = math.pi / 2.0 - half_angle # from south = -π/2
angle_hi = math.pi / 2.0 + half_angle
else:
sx_lo = 7.0 - d * 20.0
sx_hi = 14.0 - d * 1.0
sy_lo = -12.0 + d * 0.0
sy_hi = -6.0 + d * 19.0
sxs, sys_, shs, sws = [], [], [], []
for _ in range(self.n_sheep):
for _try in range(100):
if FIELD_SHAPE == "field_round":
r_spawn = float(self.np_random.uniform(spawn_r_lo, spawn_r_hi))
a_spawn = float(self.np_random.uniform(angle_lo, angle_hi))
sx = r_spawn * math.cos(a_spawn)
sy = -r_spawn * math.sin(a_spawn)
else:
sx = float(self.np_random.uniform(sx_lo, sx_hi))
sy = float(self.np_random.uniform(sy_lo, sy_hi))
# Reject if too close to the dog or another sheep, or
# already in the gate column (would start "penned").
if math.hypot(sx - self.dog_x, sy - self.dog_y) < 3.0:
continue
if any(math.hypot(sx - x, sy - y) < 1.5
for x, y in zip(sxs, sys_)):
continue
if PEN_X[0] <= sx <= PEN_X[1] and sy < -8.0:
continue
break
sxs.append(sx); sys_.append(sy)
shs.append(float(self.np_random.uniform(-math.pi, math.pi)))
sws.append(float(self.np_random.uniform(-math.pi, math.pi)))
self.sheep_x = np.asarray(sxs, dtype=np.float32)
self.sheep_y = np.asarray(sys_, dtype=np.float32)
self.sheep_h = np.asarray(shs, dtype=np.float32)
self.sheep_wander = np.asarray(sws, dtype=np.float32)
self.sheep_penned = np.zeros(self.n_sheep, dtype=bool)
self.prev_action = np.zeros(self._action_dim, dtype=np.float32)
self.smoothed_action = np.zeros(self._action_dim, dtype=np.float32)
self.steps = 0
self.prev_n_penned = 0
self.prev_d_pen, self.prev_radius = self._flock_metrics()
if self._tracker is not None:
self._tracker.reset()
self._np_rng_lidar = np.random.default_rng(
int(self.np_random.integers(0, 2**31 - 1)))
self._update_tracker()
self._frame_buffer = []
obs = self._build_obs()
info = {"n_sheep": self.n_sheep}
return obs, info
def step(self, action):
action = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
self.smoothed_action = (
self.ACTION_SMOOTH * self.prev_action
+ (1.0 - self.ACTION_SMOOTH) * action
)
self.prev_action = self.smoothed_action.copy()
vx, vy = float(self.smoothed_action[0]), float(self.smoothed_action[1])
omega = float(self.smoothed_action[2]) if self._action_dim >= 3 else 0.0
# Domain randomisation: compass (heading) noise.
dr = (self._herding_cfg.domain_random
if self._herding_cfg is not None else None)
slip_std = dr.wheel_slip_std if dr is not None else 0.0
if dr is not None and dr.compass_noise_std > 0.0 and self._np_rng_lidar is not None:
self.dog_heading = float(self.dog_heading + self._np_rng_lidar.normal(
0.0, dr.compass_noise_std))
# Safety supervisor — dog stays north of the gate.
if self.dog_y < DOG_SOUTH_LIMIT and vy < 0.0:
vx, vy = 0.0, 1.0
# Step the dog.
if self._drive_mode == "mecanum":
w_fl, w_fr, w_rl, w_rr = velocity_to_mecanum_wheels(
vx, vy, omega, self.dog_heading,
max_linear=DOG_MAX_LINEAR,
wheel_radius=DOG_WHEEL_RADIUS,
lx=DOG_WHEEL_BASE_X / 2.0, ly=DOG_WHEEL_BASE_Y / 2.0,
max_wheel_omega=DOG_MAX_WHEEL_OMEGA,
k_turn=4.0,
wheel_base=DOG_WHEEL_BASE,
)
robot_cfg = (self._herding_cfg.robot
if self._herding_cfg is not None else None)
strafe_efficiency = (robot_cfg.strafe_efficiency
if robot_cfg is not None else 1.0)
strafe_bleed = (robot_cfg.strafe_to_forward_bleed
if robot_cfg is not None else 0.0)
self.dog_x, self.dog_y, self.dog_heading = mecanum_kinematics_step(
self.dog_x, self.dog_y, self.dog_heading,
w_fl, w_fr, w_rl, w_rr,
DOG_WHEEL_RADIUS,
DOG_WHEEL_BASE_X / 2.0, DOG_WHEEL_BASE_Y / 2.0,
WEBOTS_DT,
slip_std=slip_std,
rng=self._np_rng_lidar,
strafe_efficiency=strafe_efficiency,
strafe_to_forward_bleed=strafe_bleed,
)
else:
wL, wR = velocity_to_wheels(
vx, vy, self.dog_heading,
max_linear=DOG_MAX_LINEAR,
wheel_radius=DOG_WHEEL_RADIUS,
max_wheel_omega=DOG_MAX_WHEEL_OMEGA,
k_turn=4.0,
)
self.dog_x, self.dog_y, self.dog_heading = kinematics_step(
self.dog_x, self.dog_y, self.dog_heading,
wL, wR, DOG_WHEEL_RADIUS, DOG_WHEEL_BASE, WEBOTS_DT,
slip_std=slip_std,
rng=self._np_rng_lidar,
)
self.dog_x, self.dog_y = clip_to_field(self.dog_x, self.dog_y, margin=0.3)
# Extra constraint: dog stays north of the gate area.
if self.dog_y < DOG_SOUTH_LIMIT:
self.dog_y = DOG_SOUTH_LIMIT
# Step sheep and update penned flags (GT-based).
for i in range(self.n_sheep):
self._step_one_sheep(i)
for i in range(self.n_sheep):
if (not self.sheep_penned[i]
and is_penned_position(self.sheep_x[i], self.sheep_y[i])):
self.sheep_penned[i] = True
# LiDAR perception runs after sheep move; feeds the obs and the
# imitation reward. Reward/termination still use GT.
if self._tracker is not None:
self._update_tracker()
d_pen, radius = self._flock_metrics()
reward = self._compute_reward(d_pen, radius, action=action)
self.prev_d_pen = d_pen
self.prev_radius = radius
self.prev_n_penned = int(self.sheep_penned.sum())
self.steps += 1
all_penned = bool(self.sheep_penned.all())
terminated = all_penned
truncated = self.steps >= self.max_steps
if all_penned:
reward += self.W_DONE
obs = self._build_obs()
info = {
"n_sheep": self.n_sheep,
"n_penned": self.prev_n_penned,
"is_success": all_penned,
"steps": self.steps,
}
return obs, float(reward), terminated, truncated, info
# --- Internals ---
def _step_one_sheep(self, i: int) -> None:
x, y = float(self.sheep_x[i]), float(self.sheep_y[i])
peers = [(float(self.sheep_x[j]), float(self.sheep_y[j]))
for j in range(self.n_sheep) if j != i]
heading, speed_motor, new_wander = compute_heading_speed(
x, y,
penned=bool(self.sheep_penned[i]),
dog_xy=(self.dog_x, self.dog_y),
peers=peers,
wander_angle=float(self.sheep_wander[i]),
rng=self._py_rng,
)
self.sheep_wander[i] = new_wander
wL, wR = heading_speed_to_wheels(
heading, speed_motor, float(self.sheep_h[i]),
max_wheel_omega=SHEEP_MAX_WHEEL_OMEGA, k_turn=4.0,
)
nx, ny, nh = kinematics_step(
x, y, float(self.sheep_h[i]), wL, wR,
SHEEP_WHEEL_RADIUS, SHEEP_WHEEL_BASE, WEBOTS_DT,
)
# Wall clipping.
if FIELD_SHAPE == "field_round":
in_gate_col = PEN_X[0] <= nx <= PEN_X[1]
if in_gate_col:
# Allow passage through the gate column (south of the wall).
ny = float(np.clip(ny, PEN_Y[0] + 0.2, FIELD_Y[1] - 0.2))
else:
nx, ny = clip_to_field(nx, ny, margin=0.2)
else:
nx = float(np.clip(nx, FIELD_X[0] + 0.2, FIELD_X[1] - 0.2))
in_gate_col = PEN_X[0] <= nx <= PEN_X[1]
if in_gate_col:
ny = float(np.clip(ny, PEN_Y[0] + 0.2, FIELD_Y[1] - 0.2))
else:
ny = float(np.clip(ny, FIELD_Y[0] + 0.2, FIELD_Y[1] - 0.2))
self.sheep_x[i] = nx
self.sheep_y[i] = ny
self.sheep_h[i] = nh
def _flock_metrics(self):
"""Return (per-sheep mean distance to pen, max radius from CoM).
The per-sheep mean (not CoM distance) makes the progress signal
sensitive to stragglers: the dog can't game it by herding most of
the flock and abandoning one outlier.
"""
active_mask = ~self.sheep_penned
if not active_mask.any():
return 0.0, 0.0
xs = self.sheep_x[active_mask]
ys = self.sheep_y[active_mask]
per_sheep_d = np.hypot(xs - PEN_ENTRY[0], ys - PEN_ENTRY[1])
d_pen = float(per_sheep_d.mean())
com_x, com_y = float(xs.mean()), float(ys.mean())
if active_mask.sum() == 1:
radius = 0.0
else:
radius = float(np.hypot(xs - com_x, ys - com_y).max())
return d_pen, radius
def _compute_reward(self, d_pen: float, radius: float, action=None) -> float:
"""Sparse pen jackpot + dense progress shaping + Strömbom imitation."""
n_penned = int(self.sheep_penned.sum())
delta_pen = n_penned - self.prev_n_penned
d_progress = max(-5.0, min(5.0, self.prev_d_pen - d_pen))
r = (self.W_PEN_DELTA * delta_pen
+ self.W_PROGRESS * d_progress
+ self.W_TIME)
if action is not None and self.W_IMITATE > 0.0:
positions = self._perceived_positions()
if positions:
sx, sy, _mode = strombom_action(
(self.dog_x, self.dog_y), positions, PEN_ENTRY,
)
a_norm = math.hypot(float(action[0]), float(action[1]))
s_norm = math.hypot(sx, sy)
if a_norm > 1e-3 and s_norm > 1e-3:
cos_sim = (float(action[0]) * sx + float(action[1]) * sy) / (a_norm * s_norm)
r += self.W_IMITATE * cos_sim
return float(r)
def _build_single_obs(self) -> np.ndarray:
if self._tracker is not None:
# LiDAR mode: the obs sees only the tracker's active set.
active = self._tracker.get_positions()
sheep_xy_list = list(active.values())
sheep_penned_list = [False] * len(sheep_xy_list)
else:
sheep_xy_list = list(zip(self.sheep_x.tolist(), self.sheep_y.tolist()))
sheep_penned_list = self.sheep_penned.tolist()
return build_obs(
(self.dog_x, self.dog_y), self.dog_heading,
sheep_xy_list, sheep_penned_list,
n_max=self._max_n_sheep,
n_expected=self.n_sheep,
)
def _build_obs(self) -> np.ndarray:
single = self._build_single_obs()
if self._frame_stack <= 1:
return single
# On reset the buffer is empty — pad with copies of the first frame.
if not self._frame_buffer:
self._frame_buffer = [single.copy() for _ in range(self._frame_stack)]
else:
self._frame_buffer.append(single)
if len(self._frame_buffer) > self._frame_stack:
self._frame_buffer = self._frame_buffer[-self._frame_stack:]
return np.concatenate(self._frame_buffer, axis=0).astype(np.float32)
# --- LiDAR perception ---
def _all_sheep_xy(self) -> list[tuple[float, float]]:
"""Every sheep, including penned (the LiDAR sees them)."""
return [(float(self.sheep_x[i]), float(self.sheep_y[i]))
for i in range(self.n_sheep)]
def _update_tracker(self) -> None:
lidar_cfg = (self._herding_cfg.lidar
if self._herding_cfg is not None else None)
detection_cfg = (self._herding_cfg.detection
if self._herding_cfg is not None else None)
ranges = simulate_scan(
self.dog_x, self.dog_y, self.dog_heading,
self._all_sheep_xy(),
rng=self._np_rng_lidar,
lidar_cfg=lidar_cfg,
)
detections = detections_from_scan(
ranges, self.dog_x, self.dog_y, self.dog_heading,
detection_cfg=detection_cfg,
lidar_cfg=lidar_cfg,
)
# Domain randomisation: inject false-positive detections near static
# features to mimic the spurious clusters Webots' physical LiDAR
# produces from real 3D geometry (walls, posts, fence rails).
dr = (self._herding_cfg.domain_random
if self._herding_cfg is not None else None)
if dr is not None and dr.fp_rate > 0.0 and self._np_rng_lidar is not None:
detections = list(detections)
detections.extend(
self._sample_false_positives(dr.fp_rate, dr.fp_std_pos))
self._tracker.update(detections)
# Static feature anchor points used for FP injection.
# The rectangular list covers gate posts and field corners; the round
# list covers just the gate posts (the circular wall is handled separately).
_FP_ANCHORS_RECT = (
(10.0, -15.0), (13.0, -15.0), # gate posts
(15.0, 15.0), (15.0, -15.0),
(-15.0, 15.0), (-15.0, -15.0), # field corners
(15.0, 0.0), (-15.0, 0.0), # mid-wall returns
(0.0, 15.0), (0.0, -15.0),
)
_FP_ANCHORS_ROUND = (
(0.0, -15.0), # gate centre
(-1.5, -15.0), (1.5, -15.0), # gate posts
(0.0, 15.0), # north wall
(10.6, -10.6), (-10.6, -10.6), # circular wall quadrants
)
def _sample_false_positives(
self, fp_rate: float, fp_std: float,
) -> list[tuple[float, float]]:
"""Return a list of spurious (x, y) detections for this step."""
from herding.world.geometry import FIELD_SHAPE
anchors = (self._FP_ANCHORS_ROUND
if FIELD_SHAPE == "field_round"
else self._FP_ANCHORS_RECT)
n_fps = int(self._np_rng_lidar.poisson(fp_rate))
if n_fps == 0:
return []
fps = []
chosen = self._np_rng_lidar.integers(0, len(anchors), size=n_fps)
noise = self._np_rng_lidar.normal(0.0, fp_std, size=(n_fps, 2))
for k in range(n_fps):
ax, ay = anchors[chosen[k]]
fps.append((float(ax + noise[k, 0]), float(ay + noise[k, 1])))
return fps
def perceived_positions(self) -> dict[str, tuple[float, float]]:
"""What the controller would "see" this step: tracker output in
LiDAR mode, ground truth in privileged mode. Used by demo
collection and analytic-policy eval so the teacher runs on the
same perception the deployed controller has.
"""
if self._tracker is not None:
return self._tracker.get_positions()
return {f"s{i}": (float(self.sheep_x[i]), float(self.sheep_y[i]))
for i in range(self.n_sheep) if not self.sheep_penned[i]}
_perceived_positions = perceived_positions