""" 2D herding environment for PPO training (Gymnasium-compatible). The dog agent (action: 2D velocity vector) must herd n_sheep into the quarantine pen. Sheep dynamics mirror the Webots controller exactly: flee (quadratic ramp), separation (inverse-distance), cohesion, wall avoidance, and wander. Coordinate system matches the Webots world file: field : x ∈ [-15, 15], y ∈ [-15, 15] pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north) Observation is always sized for MAX_SHEEP (currently 5) regardless of how many sheep are active. Inactive slots are pre-penned at the pen centre with flag=1. This keeps the model input dimension fixed across curriculum stages so VecNormalize statistics are preserved throughout. """ import numpy as np import gymnasium as gym from gymnasium import spaces class HerdingEnv(gym.Env): metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30} # ----------------------------------------------------------------------- # World constants — must match Webots world file # ----------------------------------------------------------------------- MAX_SHEEP = 5 FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD] PEN_X = (10.0, 13.0) # quarantine pen x bounds PEN_Y = (-15.0, -8.0) # quarantine pen y bounds PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32) # ----------------------------------------------------------------------- # Dynamics — calibrated to match Webots robot specs # wheel radius 0.031 m; sheep FLEE_SPEED 20 rad/s → 0.62 m/s # wheel radius 0.038 m; dog maxVelocity 70 rad/s → 2.66 m/s # ----------------------------------------------------------------------- DOG_SPEED = 2.5 # m/s SHEEP_FLEE_V = 0.65 # m/s SHEEP_WANDER_V = 0.20 # m/s DT = 0.1 # seconds per step # Boid parameters — identical to sheep.py FLEE_DIST = 7.0 SEPARATION_DIST = 2.5 COHESION_DIST = 8.0 WALL_MARGIN = 3.5 # ----------------------------------------------------------------------- # Reward weights # ----------------------------------------------------------------------- W_ALIGN = 0.4 # dense: dog on anti-pen side of each active sheep W_SHAPING = 0.5 # dense: mean sheep distance to pen W_APPROACH = 0.1 # dense: dog within flee range of nearest sheep W_PEN_BONUS = 5.0 # sparse: per sheep successfully penned W_COMPLETE = 20.0 # bonus when ALL active sheep are penned W_STEP_COST = 0.002 # penalty per step (encourages efficiency) def __init__(self, n_sheep: int = 1, max_steps: int = 2000, render_mode: str = None): super().__init__() assert 1 <= n_sheep <= self.MAX_SHEEP self.n_sheep = n_sheep self.max_steps = max_steps self.render_mode = render_mode # Observation: dog(x,y) + MAX_SHEEP×sheep(x,y) + MAX_SHEEP×penned # Fixed size across all curriculum stages. obs_dim = 2 + 2 * self.MAX_SHEEP + self.MAX_SHEEP self.observation_space = spaces.Box( low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32 ) # Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED self.action_space = spaces.Box( low=-1.0, high=1.0, shape=(2,), dtype=np.float32 ) # Runtime state (populated by reset) self._step_count = 0 self._prev_penned = 0 self.dog_pos = np.zeros(2, dtype=np.float32) self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32) self.penned = np.ones(self.MAX_SHEEP, dtype=bool) self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32) self._fig = None # lazy matplotlib figure # ------------------------------------------------------------------ # Curriculum interface # ------------------------------------------------------------------ def set_n_sheep(self, n: int): """Advance curriculum difficulty; takes effect on next reset().""" assert 1 <= n <= self.MAX_SHEEP self.n_sheep = n # ------------------------------------------------------------------ # Gymnasium API # ------------------------------------------------------------------ def reset(self, seed=None, options=None): super().reset(seed=seed) self._step_count = 0 self._prev_penned = 0 # Active sheep (0 .. n_sheep-1): random non-pen positions self.sheep_pos[:] = self.PEN_CENTER self.penned[:] = True placed = 0 while placed < self.n_sheep: p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32) if not self._in_pen(p): self.sheep_pos[placed] = p self.penned[placed] = False placed += 1 # Dog: 50 % of the time start already on the anti-pen side of the # nearest sheep (within flee range) so early training gets aligned # starts; the other 50 % is fully random to ensure generalisation. if self.np_random.random() < 0.5: # Place dog behind the first active sheep relative to the pen ref = self.sheep_pos[0] away = ref - self.PEN_CENTER # sheep→anti-pen dist = float(np.linalg.norm(away)) if dist > 0.1: away = away / dist offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8) self.dog_pos = np.clip( (ref + offset).astype(np.float32), -self.FIELD, self.FIELD ) else: self.dog_pos = self.np_random.uniform( -self.FIELD * 0.8, self.FIELD * 0.8, size=(2,) ).astype(np.float32) # Inactive slots (n_sheep .. MAX_SHEEP-1): already at pen centre, penned=True self.wander_ang = self.np_random.uniform( -np.pi, np.pi, size=(self.MAX_SHEEP,) ).astype(np.float32) return self._obs(), {} def step(self, action): self._step_count += 1 # Move dog — clip each axis independently so the agent can idle act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) self.dog_pos = np.clip( self.dog_pos + act * self.DOG_SPEED * self.DT, -self.FIELD, self.FIELD ) # Step sheep dynamics for i in range(self.n_sheep): if self.penned[i]: continue self.sheep_pos[i] = self._step_sheep(i) if self._in_pen(self.sheep_pos[i]): self.penned[i] = True n_penned = int(self.penned[:self.n_sheep].sum()) newly_penned = n_penned - self._prev_penned self._prev_penned = n_penned reward = self._reward(n_penned, newly_penned) terminated = n_penned == self.n_sheep truncated = self._step_count >= self.max_steps info = {"n_penned": n_penned, "n_sheep": self.n_sheep} if self.render_mode == "human": self.render() return self._obs(), float(reward), terminated, truncated, info def render(self): import matplotlib.pyplot as plt import matplotlib.patches as mpatches if self._fig is None: plt.ion() self._fig, self._ax = plt.subplots(figsize=(6, 6)) ax = self._ax ax.clear() ax.set_xlim(-16, 16) ax.set_ylim(-16, 16) ax.set_aspect("equal") ax.set_facecolor("#dcedc8") # Field boundary ax.add_patch(mpatches.Rectangle( (-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2 )) # Pen pw = self.PEN_X[1] - self.PEN_X[0] ph = self.PEN_Y[1] - self.PEN_Y[0] ax.add_patch(mpatches.Rectangle( (self.PEN_X[0], self.PEN_Y[0]), pw, ph, facecolor="#ffe082", edgecolor="#795548", linewidth=2 )) ax.text(11.5, -11.5, "pen", ha="center", va="center", fontsize=8, color="#795548") # Sheep for i in range(self.MAX_SHEEP): if i >= self.n_sheep: continue # inactive slot — not shown color = "deeppink" if self.penned[i] else "white" ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11, markeredgecolor="#555", markeredgewidth=1.5) # Dog ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13, markeredgecolor="black", markeredgewidth=1.5) ax.set_title( f"step {self._step_count} | " f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep}", fontsize=11 ) self._fig.canvas.draw() self._fig.canvas.flush_events() plt.pause(0.001) def close(self): if self._fig is not None: import matplotlib.pyplot as plt plt.close(self._fig) self._fig = None # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _in_pen(self, pos: np.ndarray) -> bool: return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and self.PEN_Y[0] < pos[1] < self.PEN_Y[1]) def _obs(self) -> np.ndarray: scale = 1.0 / self.FIELD return np.concatenate([ self.dog_pos * scale, # 2 (self.sheep_pos * scale).flatten(), # 2 * MAX_SHEEP self.penned.astype(np.float32), # MAX_SHEEP ]).astype(np.float32) def _reward(self, n_penned: int, newly_penned: int) -> float: active_mask = ~self.penned[:self.n_sheep] if active_mask.any(): active_pos = self.sheep_pos[:self.n_sheep][active_mask] dists_pen = np.linalg.norm(active_pos - self.PEN_CENTER, axis=1) dists_dog = np.linalg.norm(active_pos - self.dog_pos, axis=1) # Sheep-to-pen shaping shaping = -(dists_pen.mean() / (2 * self.FIELD)) # Approach: dog penalised for being far from nearest sheep approach = -(dists_dog.min() / (2 * self.FIELD)) # Alignment: reward dog for being on the anti-pen side of each sheep. # When the dog is opposite the pen relative to a sheep, that sheep # flees toward the pen. Score ∈ [-1, 1] per sheep, weighted by # a proximity gate so only nearby dogs count. align_scores = [] for s_pos, d_pen, d_dog in zip(active_pos, dists_pen, dists_dog): if d_pen < 0.1 or d_dog < 0.1: continue pen_dir = (self.PEN_CENTER - s_pos) / d_pen # sheep → pen dog_dir = (self.dog_pos - s_pos) / d_dog # sheep → dog # cos(angle): +1 → dog behind sheep, -1 → dog on pen side cosine = -float(np.dot(pen_dir, dog_dir)) # gate: full credit inside flee range, fades beyond proximity = max(0.0, 1.0 - d_dog / self.FLEE_DIST) align_scores.append(cosine * proximity) alignment = float(np.mean(align_scores)) if align_scores else 0.0 else: shaping = approach = alignment = 0.0 reward = shaping * self.W_SHAPING reward += approach * self.W_APPROACH reward += alignment * self.W_ALIGN reward += newly_penned * self.W_PEN_BONUS reward -= self.W_STEP_COST if n_penned == self.n_sheep: reward += self.W_COMPLETE return reward def _step_sheep(self, i: int) -> np.ndarray: """Apply one timestep of boid dynamics to sheep i.""" pos = self.sheep_pos[i].copy() fx, fy = 0.0, 0.0 fleeing = False # Flee from dog — quadratic ramp (mirrors sheep.py) diff = self.dog_pos - pos dist = float(np.linalg.norm(diff)) if 0.01 < dist < self.FLEE_DIST: t = 1.0 - dist / self.FLEE_DIST s = t * t * 5.0 fx -= (diff[0] / dist) * s fy -= (diff[1] / dist) * s fleeing = True # Separation (inverse-distance) + Cohesion cx, cy, cn = 0.0, 0.0, 0 for j in range(self.n_sheep): if j == i or self.penned[j]: continue dv = self.sheep_pos[j] - pos dj = float(np.linalg.norm(dv)) if 0.3 < dj < self.COHESION_DIST: cx += self.sheep_pos[j][0] cy += self.sheep_pos[j][1] cn += 1 if 0.05 < dj < self.SEPARATION_DIST: push = (self.SEPARATION_DIST - dj) / dj fx -= (dv[0] / dj) * push * 2.5 fy -= (dv[1] / dj) * push * 2.5 if cn > 0: w = 0.08 if fleeing else 0.15 fx += (cx / cn - pos[0]) * w fy += (cy / cn - pos[1]) * w # Wall avoidance m, F = self.WALL_MARGIN, self.FIELD if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0 if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0 if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0 if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0 # Wander — suppressed while fleeing if not fleeing: if self.np_random.random() < 0.02: self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6)) fx += float(np.cos(self.wander_ang[i])) * 0.5 fy += float(np.sin(self.wander_ang[i])) * 0.5 # Integrate force = np.array([fx, fy]) mag = float(np.linalg.norm(force)) if mag > 0.01: top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V speed = min(top_speed, mag * 0.3) pos = np.clip(pos + (force / mag) * speed * self.DT, -self.FIELD, self.FIELD) return pos.astype(np.float32)