""" 2D herding environment for PPO training (Gymnasium-compatible). The dog agent (action: 2D velocity vector) must herd n_sheep into the quarantine pen. Sheep dynamics mirror the Webots controller exactly: flee (quadratic ramp), separation (inverse-distance), cohesion, wall avoidance, and wander. Coordinate system matches the Webots world file: field : x ∈ [-15, 15], y ∈ [-15, 15] pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north) Observation is always sized for MAX_SHEEP (currently 5) regardless of how many sheep are active. Inactive slots are pre-penned at the pen centre with flag=1. This keeps the model input dimension fixed across curriculum stages so VecNormalize statistics are preserved throughout. """ import numpy as np import gymnasium as gym from gymnasium import spaces class HerdingEnv(gym.Env): metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30} # ----------------------------------------------------------------------- # World constants — must match Webots world file # ----------------------------------------------------------------------- MAX_SHEEP = 5 FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD] PEN_X = (10.0, 13.0) # quarantine pen x bounds PEN_Y = (-15.0, -8.0) # quarantine pen y bounds PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32) # ----------------------------------------------------------------------- # Dynamics — calibrated to match Webots robot specs # wheel radius 0.031 m; sheep FLEE_SPEED 20 rad/s → 0.62 m/s # wheel radius 0.038 m; dog maxVelocity 70 rad/s → 2.66 m/s # ----------------------------------------------------------------------- DOG_SPEED = 2.5 # m/s SHEEP_FLEE_V = 0.65 # m/s SHEEP_WANDER_V = 0.20 # m/s DT = 0.1 # seconds per step # Boid parameters — identical to sheep.py FLEE_DIST = 7.0 SEPARATION_DIST = 2.5 COHESION_DIST = 8.0 WALL_MARGIN = 3.5 # ----------------------------------------------------------------------- # Reward weights # ----------------------------------------------------------------------- W_APPROACH = 0.3 # dense: dog distance to nearest active sheep W_SHAPING = 0.5 # dense: mean sheep distance to pen (was 0.01) W_PEN_BONUS = 5.0 # sparse: per sheep successfully penned W_COMPLETE = 20.0 # bonus when ALL active sheep are penned W_STEP_COST = 0.002 # penalty per step (encourages efficiency) def __init__(self, n_sheep: int = 1, max_steps: int = 2000, render_mode: str = None): super().__init__() assert 1 <= n_sheep <= self.MAX_SHEEP self.n_sheep = n_sheep self.max_steps = max_steps self.render_mode = render_mode # Observation: dog(x,y) + MAX_SHEEP×sheep(x,y) + MAX_SHEEP×penned # Fixed size across all curriculum stages. obs_dim = 2 + 2 * self.MAX_SHEEP + self.MAX_SHEEP self.observation_space = spaces.Box( low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32 ) # Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED self.action_space = spaces.Box( low=-1.0, high=1.0, shape=(2,), dtype=np.float32 ) # Runtime state (populated by reset) self._step_count = 0 self._prev_penned = 0 self.dog_pos = np.zeros(2, dtype=np.float32) self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32) self.penned = np.ones(self.MAX_SHEEP, dtype=bool) self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32) self._fig = None # lazy matplotlib figure # ------------------------------------------------------------------ # Curriculum interface # ------------------------------------------------------------------ def set_n_sheep(self, n: int): """Advance curriculum difficulty; takes effect on next reset().""" assert 1 <= n <= self.MAX_SHEEP self.n_sheep = n # ------------------------------------------------------------------ # Gymnasium API # ------------------------------------------------------------------ def reset(self, seed=None, options=None): super().reset(seed=seed) self._step_count = 0 self._prev_penned = 0 # Dog: random start in the open field (not near the pen) self.dog_pos = self.np_random.uniform(-8.0, 5.0, size=(2,)).astype(np.float32) # Active sheep (0 .. n_sheep-1): random non-pen positions self.sheep_pos[:] = self.PEN_CENTER # default all to pen centre self.penned[:] = True placed = 0 while placed < self.n_sheep: p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32) if not self._in_pen(p): self.sheep_pos[placed] = p self.penned[placed] = False placed += 1 # Inactive slots (n_sheep .. MAX_SHEEP-1): already at pen centre, penned=True self.wander_ang = self.np_random.uniform( -np.pi, np.pi, size=(self.MAX_SHEEP,) ).astype(np.float32) return self._obs(), {} def step(self, action): self._step_count += 1 # Move dog — clip each axis independently so the agent can idle act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) self.dog_pos = np.clip( self.dog_pos + act * self.DOG_SPEED * self.DT, -self.FIELD, self.FIELD ) # Step sheep dynamics for i in range(self.n_sheep): if self.penned[i]: continue self.sheep_pos[i] = self._step_sheep(i) if self._in_pen(self.sheep_pos[i]): self.penned[i] = True n_penned = int(self.penned[:self.n_sheep].sum()) newly_penned = n_penned - self._prev_penned self._prev_penned = n_penned reward = self._reward(n_penned, newly_penned) terminated = n_penned == self.n_sheep truncated = self._step_count >= self.max_steps info = {"n_penned": n_penned, "n_sheep": self.n_sheep} if self.render_mode == "human": self.render() return self._obs(), float(reward), terminated, truncated, info def render(self): import matplotlib.pyplot as plt import matplotlib.patches as mpatches if self._fig is None: plt.ion() self._fig, self._ax = plt.subplots(figsize=(6, 6)) ax = self._ax ax.clear() ax.set_xlim(-16, 16) ax.set_ylim(-16, 16) ax.set_aspect("equal") ax.set_facecolor("#dcedc8") # Field boundary ax.add_patch(mpatches.Rectangle( (-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2 )) # Pen pw = self.PEN_X[1] - self.PEN_X[0] ph = self.PEN_Y[1] - self.PEN_Y[0] ax.add_patch(mpatches.Rectangle( (self.PEN_X[0], self.PEN_Y[0]), pw, ph, facecolor="#ffe082", edgecolor="#795548", linewidth=2 )) ax.text(11.5, -11.5, "pen", ha="center", va="center", fontsize=8, color="#795548") # Sheep for i in range(self.MAX_SHEEP): if i >= self.n_sheep: continue # inactive slot — not shown color = "deeppink" if self.penned[i] else "white" ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11, markeredgecolor="#555", markeredgewidth=1.5) # Dog ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13, markeredgecolor="black", markeredgewidth=1.5) ax.set_title( f"step {self._step_count} | " f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep}", fontsize=11 ) self._fig.canvas.draw() self._fig.canvas.flush_events() plt.pause(0.001) def close(self): if self._fig is not None: import matplotlib.pyplot as plt plt.close(self._fig) self._fig = None # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _in_pen(self, pos: np.ndarray) -> bool: return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and self.PEN_Y[0] < pos[1] < self.PEN_Y[1]) def _obs(self) -> np.ndarray: scale = 1.0 / self.FIELD return np.concatenate([ self.dog_pos * scale, # 2 (self.sheep_pos * scale).flatten(), # 2 * MAX_SHEEP self.penned.astype(np.float32), # MAX_SHEEP ]).astype(np.float32) def _reward(self, n_penned: int, newly_penned: int) -> float: active_mask = ~self.penned[:self.n_sheep] if active_mask.any(): active_pos = self.sheep_pos[:self.n_sheep][active_mask] # Sheep-to-pen shaping: encourages moving sheep toward pen dists_pen = np.linalg.norm(active_pos - self.PEN_CENTER, axis=1) shaping = -(dists_pen.mean() / (2 * self.FIELD)) # ∈ [-1, 0] # Dog-to-nearest-sheep approach: incentivises the dog to stay # within flee range (FLEE_DIST=7m) rather than wandering away dists_dog = np.linalg.norm(active_pos - self.dog_pos, axis=1) approach = -(dists_dog.min() / (2 * self.FIELD)) # ∈ [-1, 0] else: shaping = approach = 0.0 reward = shaping * self.W_SHAPING reward += approach * self.W_APPROACH reward += newly_penned * self.W_PEN_BONUS reward -= self.W_STEP_COST if n_penned == self.n_sheep: reward += self.W_COMPLETE return reward def _step_sheep(self, i: int) -> np.ndarray: """Apply one timestep of boid dynamics to sheep i.""" pos = self.sheep_pos[i].copy() fx, fy = 0.0, 0.0 fleeing = False # Flee from dog — quadratic ramp (mirrors sheep.py) diff = self.dog_pos - pos dist = float(np.linalg.norm(diff)) if 0.01 < dist < self.FLEE_DIST: t = 1.0 - dist / self.FLEE_DIST s = t * t * 5.0 fx -= (diff[0] / dist) * s fy -= (diff[1] / dist) * s fleeing = True # Separation (inverse-distance) + Cohesion cx, cy, cn = 0.0, 0.0, 0 for j in range(self.n_sheep): if j == i or self.penned[j]: continue dv = self.sheep_pos[j] - pos dj = float(np.linalg.norm(dv)) if 0.3 < dj < self.COHESION_DIST: cx += self.sheep_pos[j][0] cy += self.sheep_pos[j][1] cn += 1 if 0.05 < dj < self.SEPARATION_DIST: push = (self.SEPARATION_DIST - dj) / dj fx -= (dv[0] / dj) * push * 2.5 fy -= (dv[1] / dj) * push * 2.5 if cn > 0: w = 0.08 if fleeing else 0.15 fx += (cx / cn - pos[0]) * w fy += (cy / cn - pos[1]) * w # Wall avoidance m, F = self.WALL_MARGIN, self.FIELD if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0 if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0 if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0 if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0 # Wander — suppressed while fleeing if not fleeing: if self.np_random.random() < 0.02: self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6)) fx += float(np.cos(self.wander_ang[i])) * 0.5 fy += float(np.sin(self.wander_ang[i])) * 0.5 # Integrate force = np.array([fx, fy]) mag = float(np.linalg.norm(force)) if mag > 0.01: top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V speed = min(top_speed, mag * 0.3) pos = np.clip(pos + (force / mag) * speed * self.DT, -self.FIELD, self.FIELD) return pos.astype(np.float32)