"""Gymnasium environment for the shepherd-dog herding task. Single-agent: the dog is the policy; sheep are env-controlled flocking agents (``herding.world.flocking_sim``). Kinematics match the proto specs (``herding.world.diffdrive``) so a policy trained here transfers to Webots without re-tuning. * **Action** (differential): ``Box(-1, 1, (2,))`` — ``(vx, vy)`` intent. * **Action** (mecanum): ``Box(-1, 1, (3,))`` — ``(vx, vy, omega)`` intent. * **Observation**: ``Box(-inf, inf, (32·K,))`` from ``herding.perception.obs.build_obs`` with optional frame stacking K (concatenated oldest → newest). * **Reset**: ``options["n_sheep"]`` overrides flock size; otherwise sampled uniformly from ``[1, max_n_sheep]``. * **Reward**: dense shaping (per-sheep distance progress, time penalty, Strömbom-imitation cosine bonus) + sparse pen/done jackpots. Weights live as class attributes on :class:`HerdingEnv`. """ from __future__ import annotations import math import random from typing import Optional import gymnasium as gym import numpy as np from gymnasium import spaces from herding.world.diffdrive import ( heading_speed_to_wheels, kinematics_step, mecanum_kinematics_step, velocity_to_mecanum_wheels, velocity_to_wheels, ) from herding.world.flocking_sim import ( FLEE_SPEED, MAX_SPEED, WANDER_SPEED, compute_heading_speed, ) from herding.world.geometry import ( DOG_MAX_LINEAR, DOG_MAX_WHEEL_OMEGA, DOG_SOUTH_LIMIT, DOG_WHEEL_BASE, DOG_WHEEL_BASE_X, DOG_WHEEL_BASE_Y, DOG_WHEEL_RADIUS, FIELD_SHAPE, FIELD_ROUND_R, FIELD_X, FIELD_Y, GATE_X, GATE_Y, MAX_SHEEP, PEN_ENTRY, PEN_X, PEN_Y, SHEEP_MAX_WHEEL_OMEGA, SHEEP_WHEEL_BASE, SHEEP_WHEEL_RADIUS, WEBOTS_DT, clip_to_field, is_penned_position, ) from herding.perception.lidar_perception import detections_from_scan from herding.perception.lidar_sim import simulate_scan from herding.perception.obs import OBS_DIM, build_obs from herding.perception.sheep_tracker import SheepTracker from herding.control.strombom import compute_action as strombom_action class HerdingEnv(gym.Env): """Single-agent shepherd-dog herding env. Each step is one Webots ``basicTimeStep`` (16 ms). Episodes terminate when all sheep are penned, or after ``max_steps`` steps (truncation). """ metadata = {"render_modes": []} # Reward weights. Sparse jackpots (W_PEN_DELTA, W_DONE) dominate; # dense shaping (W_PROGRESS on Δ mean-distance-to-pen) provides the # gradient; W_IMITATE adds a small cosine bonus toward the analytic # teacher's action; W_TIME is a per-step penalty (0 by default). W_PEN_DELTA = 100.0 W_PROGRESS = 20.0 W_IMITATE = 0.5 W_TIME = 0.0 W_WALL = 0.0 W_COLLISION = 0.0 W_DONE = 500.0 # In-env action EMA. 0 = none; the Webots controller applies its own # EMA at inference, so the policy needn't learn smoothness. ACTION_SMOOTH = 0.0 DEFAULT_MAX_STEPS = 5000 COLLISION_DIST = 0.30 def __init__( self, n_sheep: Optional[int] = None, max_n_sheep: int = MAX_SHEEP, max_steps: int = DEFAULT_MAX_STEPS, difficulty: float = 0.0, seed: Optional[int] = None, use_lidar: bool = True, frame_stack: int = 1, drive_mode: str = "differential", ): super().__init__() # ``use_lidar=True`` (default): obs and imitation-reward teacher # see only LiDAR-perceived positions via a tracker, matching the # Webots controller. ``False`` exposes ground truth for ablation. self._use_lidar = bool(use_lidar) self._tracker = SheepTracker() if self._use_lidar else None self._np_rng_lidar: Optional[np.random.Generator] = None # Frame stacking: the policy receives the last K obs concatenated, # giving a memoryless MLP temporal context. K=1 → single frame. self._frame_stack = max(1, int(frame_stack)) self._frame_buffer: list[np.ndarray] = [] # Drive mode: "differential" (2-wheel) or "mecanum" (4-wheel omni). self._drive_mode = drive_mode.lower() if self._drive_mode not in ("differential", "mecanum"): raise ValueError(f"Unknown drive_mode: {drive_mode!r}") action_dim = 3 if self._drive_mode == "mecanum" else 2 self.action_space = spaces.Box(-1.0, 1.0, shape=(action_dim,), dtype=np.float32) self._single_obs_dim = OBS_DIM self.observation_space = spaces.Box( low=-np.inf, high=np.inf, shape=(OBS_DIM * self._frame_stack,), dtype=np.float32, ) # n_sheep=None → sample uniformly from [1, max_n_sheep] each reset. self._fixed_n_sheep = n_sheep self._max_n_sheep = max_n_sheep self.max_steps = max_steps # difficulty ∈ [0, 1]: 0 = sheep spawn near the gate (easy); # 1 = sheep spawn anywhere in the field (deployment distribution). self._difficulty = float(difficulty) self._initial_seed = seed self._initial_seed_used = False # Env-owned RNG for wander jitter, re-seeded from np_random on reset. self._py_rng = random.Random() self._action_dim = action_dim # State (initialised in reset) self.dog_x = self.dog_y = self.dog_heading = 0.0 self.sheep_x = np.zeros(0, dtype=np.float32) self.sheep_y = np.zeros(0, dtype=np.float32) self.sheep_h = np.zeros(0, dtype=np.float32) self.sheep_penned = np.zeros(0, dtype=bool) self.sheep_wander = np.zeros(0, dtype=np.float32) self.prev_action = np.zeros(self._action_dim, dtype=np.float32) self.smoothed_action = np.zeros(self._action_dim, dtype=np.float32) self.steps = 0 self.n_sheep = 0 self.prev_n_penned = 0 self.prev_d_pen = 0.0 self.prev_radius = 0.0 # --- Public knobs --- def set_max_n_sheep(self, value: int) -> None: self._max_n_sheep = int(np.clip(value, 1, MAX_SHEEP)) def set_difficulty(self, value: float) -> None: self._difficulty = float(np.clip(value, 0.0, 1.0)) def set_imitate_weight(self, value: float) -> None: """Override the instance W_IMITATE — used to disable Strömbom imitation during PPO fine-tune.""" self.W_IMITATE = float(value) def set_time_weight(self, value: float) -> None: """Override the instance W_TIME — set negative to penalise step count and encourage faster time-to-pen during PPO fine-tune.""" self.W_TIME = float(value) # --- gym API --- def reset(self, *, seed=None, options=None): if (seed is None and self._initial_seed is not None and not self._initial_seed_used): seed = self._initial_seed self._initial_seed_used = True super().reset(seed=seed) self._py_rng.seed(int(self.np_random.integers(0, 2**31 - 1))) opts = options or {} if "n_sheep" in opts and opts["n_sheep"] is not None: self.n_sheep = int(opts["n_sheep"]) elif self._fixed_n_sheep is not None: self.n_sheep = int(self._fixed_n_sheep) else: self.n_sheep = int(self.np_random.integers(1, self._max_n_sheep + 1)) # Dog spawns near origin with random heading. self.dog_x = float(self.np_random.uniform(-2.5, 2.5)) self.dog_y = float(self.np_random.uniform(-2.5, 2.5)) self.dog_heading = float(self.np_random.uniform(-math.pi, math.pi)) # Sheep spawn region linearly interpolates with difficulty: # 0 → small box near the gate, 1 → full field. d = self._difficulty if FIELD_SHAPE == "field_round": # Round field: spawn in a sector near the gate (south), # expanding to the full circle at difficulty=1. spawn_r_lo = 3.0 spawn_r_hi = d * FIELD_ROUND_R * 0.8 + (1.0 - d) * 6.0 # Angle spread around south (±60° at d=0, full circle at d=1). half_angle = math.radians(60) + d * math.radians(120) angle_lo = math.pi / 2.0 - half_angle # from south = -π/2 angle_hi = math.pi / 2.0 + half_angle else: sx_lo = 7.0 - d * 20.0 sx_hi = 14.0 - d * 1.0 sy_lo = -12.0 + d * 0.0 sy_hi = -6.0 + d * 19.0 sxs, sys_, shs, sws = [], [], [], [] for _ in range(self.n_sheep): for _try in range(100): if FIELD_SHAPE == "field_round": r_spawn = float(self.np_random.uniform(spawn_r_lo, spawn_r_hi)) a_spawn = float(self.np_random.uniform(angle_lo, angle_hi)) sx = r_spawn * math.cos(a_spawn) sy = -r_spawn * math.sin(a_spawn) else: sx = float(self.np_random.uniform(sx_lo, sx_hi)) sy = float(self.np_random.uniform(sy_lo, sy_hi)) # Reject if too close to the dog or another sheep, or # already in the gate column (would start "penned"). if math.hypot(sx - self.dog_x, sy - self.dog_y) < 3.0: continue if any(math.hypot(sx - x, sy - y) < 1.5 for x, y in zip(sxs, sys_)): continue if PEN_X[0] <= sx <= PEN_X[1] and sy < -8.0: continue break sxs.append(sx); sys_.append(sy) shs.append(float(self.np_random.uniform(-math.pi, math.pi))) sws.append(float(self.np_random.uniform(-math.pi, math.pi))) self.sheep_x = np.asarray(sxs, dtype=np.float32) self.sheep_y = np.asarray(sys_, dtype=np.float32) self.sheep_h = np.asarray(shs, dtype=np.float32) self.sheep_wander = np.asarray(sws, dtype=np.float32) self.sheep_penned = np.zeros(self.n_sheep, dtype=bool) self.prev_action = np.zeros(self._action_dim, dtype=np.float32) self.smoothed_action = np.zeros(self._action_dim, dtype=np.float32) self.steps = 0 self.prev_n_penned = 0 self.prev_d_pen, self.prev_radius = self._flock_metrics() if self._tracker is not None: self._tracker.reset() self._np_rng_lidar = np.random.default_rng( int(self.np_random.integers(0, 2**31 - 1))) self._update_tracker() self._frame_buffer = [] obs = self._build_obs() info = {"n_sheep": self.n_sheep} return obs, info def step(self, action): action = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) self.smoothed_action = ( self.ACTION_SMOOTH * self.prev_action + (1.0 - self.ACTION_SMOOTH) * action ) self.prev_action = self.smoothed_action.copy() vx, vy = float(self.smoothed_action[0]), float(self.smoothed_action[1]) omega = float(self.smoothed_action[2]) if self._action_dim >= 3 else 0.0 # Safety supervisor — dog stays north of the gate. if self.dog_y < DOG_SOUTH_LIMIT and vy < 0.0: vx, vy = 0.0, 1.0 # Step the dog. if self._drive_mode == "mecanum": w_fl, w_fr, w_rl, w_rr = velocity_to_mecanum_wheels( vx, vy, omega, self.dog_heading, max_linear=DOG_MAX_LINEAR, wheel_radius=DOG_WHEEL_RADIUS, lx=DOG_WHEEL_BASE_X / 2.0, ly=DOG_WHEEL_BASE_Y / 2.0, max_wheel_omega=DOG_MAX_WHEEL_OMEGA, k_turn=4.0, wheel_base=DOG_WHEEL_BASE, ) self.dog_x, self.dog_y, self.dog_heading = mecanum_kinematics_step( self.dog_x, self.dog_y, self.dog_heading, w_fl, w_fr, w_rl, w_rr, DOG_WHEEL_RADIUS, DOG_WHEEL_BASE_X / 2.0, DOG_WHEEL_BASE_Y / 2.0, WEBOTS_DT, ) else: wL, wR = velocity_to_wheels( vx, vy, self.dog_heading, max_linear=DOG_MAX_LINEAR, wheel_radius=DOG_WHEEL_RADIUS, max_wheel_omega=DOG_MAX_WHEEL_OMEGA, k_turn=4.0, ) self.dog_x, self.dog_y, self.dog_heading = kinematics_step( self.dog_x, self.dog_y, self.dog_heading, wL, wR, DOG_WHEEL_RADIUS, DOG_WHEEL_BASE, WEBOTS_DT, ) self.dog_x, self.dog_y = clip_to_field(self.dog_x, self.dog_y, margin=0.3) # Extra constraint: dog stays north of the gate area. if self.dog_y < DOG_SOUTH_LIMIT: self.dog_y = DOG_SOUTH_LIMIT # Step sheep and update penned flags (GT-based). for i in range(self.n_sheep): self._step_one_sheep(i) for i in range(self.n_sheep): if (not self.sheep_penned[i] and is_penned_position(self.sheep_x[i], self.sheep_y[i])): self.sheep_penned[i] = True # LiDAR perception runs after sheep move; feeds the obs and the # imitation reward. Reward/termination still use GT. if self._tracker is not None: self._update_tracker() d_pen, radius = self._flock_metrics() reward = self._compute_reward(d_pen, radius, action=action) self.prev_d_pen = d_pen self.prev_radius = radius self.prev_n_penned = int(self.sheep_penned.sum()) self.steps += 1 all_penned = bool(self.sheep_penned.all()) terminated = all_penned truncated = self.steps >= self.max_steps if all_penned: reward += self.W_DONE obs = self._build_obs() info = { "n_sheep": self.n_sheep, "n_penned": self.prev_n_penned, "is_success": all_penned, "steps": self.steps, } return obs, float(reward), terminated, truncated, info # --- Internals --- def _step_one_sheep(self, i: int) -> None: x, y = float(self.sheep_x[i]), float(self.sheep_y[i]) peers = [(float(self.sheep_x[j]), float(self.sheep_y[j])) for j in range(self.n_sheep) if j != i] heading, speed_motor, new_wander = compute_heading_speed( x, y, penned=bool(self.sheep_penned[i]), dog_xy=(self.dog_x, self.dog_y), peers=peers, wander_angle=float(self.sheep_wander[i]), rng=self._py_rng, ) self.sheep_wander[i] = new_wander wL, wR = heading_speed_to_wheels( heading, speed_motor, float(self.sheep_h[i]), max_wheel_omega=SHEEP_MAX_WHEEL_OMEGA, k_turn=4.0, ) nx, ny, nh = kinematics_step( x, y, float(self.sheep_h[i]), wL, wR, SHEEP_WHEEL_RADIUS, SHEEP_WHEEL_BASE, WEBOTS_DT, ) # Wall clipping. if FIELD_SHAPE == "field_round": in_gate_col = PEN_X[0] <= nx <= PEN_X[1] if in_gate_col: # Allow passage through the gate column (south of the wall). ny = float(np.clip(ny, PEN_Y[0] + 0.2, FIELD_Y[1] - 0.2)) else: nx, ny = clip_to_field(nx, ny, margin=0.2) else: nx = float(np.clip(nx, FIELD_X[0] + 0.2, FIELD_X[1] - 0.2)) in_gate_col = PEN_X[0] <= nx <= PEN_X[1] if in_gate_col: ny = float(np.clip(ny, PEN_Y[0] + 0.2, FIELD_Y[1] - 0.2)) else: ny = float(np.clip(ny, FIELD_Y[0] + 0.2, FIELD_Y[1] - 0.2)) self.sheep_x[i] = nx self.sheep_y[i] = ny self.sheep_h[i] = nh def _flock_metrics(self): """Return (per-sheep mean distance to pen, max radius from CoM). The per-sheep mean (not CoM distance) makes the progress signal sensitive to stragglers: the dog can't game it by herding most of the flock and abandoning one outlier. """ active_mask = ~self.sheep_penned if not active_mask.any(): return 0.0, 0.0 xs = self.sheep_x[active_mask] ys = self.sheep_y[active_mask] per_sheep_d = np.hypot(xs - PEN_ENTRY[0], ys - PEN_ENTRY[1]) d_pen = float(per_sheep_d.mean()) com_x, com_y = float(xs.mean()), float(ys.mean()) if active_mask.sum() == 1: radius = 0.0 else: radius = float(np.hypot(xs - com_x, ys - com_y).max()) return d_pen, radius def _compute_reward(self, d_pen: float, radius: float, action=None) -> float: """Sparse pen jackpot + dense progress shaping + Strömbom imitation.""" n_penned = int(self.sheep_penned.sum()) delta_pen = n_penned - self.prev_n_penned d_progress = max(-5.0, min(5.0, self.prev_d_pen - d_pen)) r = (self.W_PEN_DELTA * delta_pen + self.W_PROGRESS * d_progress + self.W_TIME) if action is not None and self.W_IMITATE > 0.0: positions = self._perceived_positions() if positions: sx, sy, _mode = strombom_action( (self.dog_x, self.dog_y), positions, PEN_ENTRY, ) a_norm = math.hypot(float(action[0]), float(action[1])) s_norm = math.hypot(sx, sy) if a_norm > 1e-3 and s_norm > 1e-3: cos_sim = (float(action[0]) * sx + float(action[1]) * sy) / (a_norm * s_norm) r += self.W_IMITATE * cos_sim return float(r) def _build_single_obs(self) -> np.ndarray: if self._tracker is not None: # LiDAR mode: the obs sees only the tracker's active set. active = self._tracker.get_positions() sheep_xy_list = list(active.values()) sheep_penned_list = [False] * len(sheep_xy_list) else: sheep_xy_list = list(zip(self.sheep_x.tolist(), self.sheep_y.tolist())) sheep_penned_list = self.sheep_penned.tolist() return build_obs( (self.dog_x, self.dog_y), self.dog_heading, sheep_xy_list, sheep_penned_list, n_max=self._max_n_sheep, n_expected=self.n_sheep, ) def _build_obs(self) -> np.ndarray: single = self._build_single_obs() if self._frame_stack <= 1: return single # On reset the buffer is empty — pad with copies of the first frame. if not self._frame_buffer: self._frame_buffer = [single.copy() for _ in range(self._frame_stack)] else: self._frame_buffer.append(single) if len(self._frame_buffer) > self._frame_stack: self._frame_buffer = self._frame_buffer[-self._frame_stack:] return np.concatenate(self._frame_buffer, axis=0).astype(np.float32) # --- LiDAR perception --- def _all_sheep_xy(self) -> list[tuple[float, float]]: """Every sheep, including penned (the LiDAR sees them).""" return [(float(self.sheep_x[i]), float(self.sheep_y[i])) for i in range(self.n_sheep)] def _update_tracker(self) -> None: ranges = simulate_scan( self.dog_x, self.dog_y, self.dog_heading, self._all_sheep_xy(), rng=self._np_rng_lidar, ) detections = detections_from_scan( ranges, self.dog_x, self.dog_y, self.dog_heading, ) self._tracker.update(detections) def perceived_positions(self) -> dict[str, tuple[float, float]]: """What the controller would "see" this step: tracker output in LiDAR mode, ground truth in privileged mode. Used by demo collection and analytic-policy eval so the teacher runs on the same perception the deployed controller has. """ if self._tracker is not None: return self._tracker.get_positions() return {f"s{i}": (float(self.sheep_x[i]), float(self.sheep_y[i])) for i in range(self.n_sheep) if not self.sheep_penned[i]} _perceived_positions = perceived_positions