""" 2D herding environment for PPO training (Gymnasium-compatible). The dog agent (action: 2D velocity vector) must herd n_sheep into the quarantine pen. Sheep dynamics mirror the Webots controller exactly: flee (quadratic ramp), separation (inverse-distance), cohesion, wall avoidance, and wander. Coordinate system matches the Webots world file: field : x ∈ [-15, 15], y ∈ [-15, 15] pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north) Observation (13-dim, fixed regardless of n_sheep): dog position (2), flock COM relative to dog (2), farthest active sheep relative to dog (2), pen relative to COM (2), pen relative to farthest sheep (2), flock radius (1), mean dispersion (1), fraction penned (1). Permutation-invariant by design: curriculum stages share the same obs dim so VecNormalize statistics transfer as n_sheep advances. """ import numpy as np import gymnasium as gym from gymnasium import spaces class HerdingEnv(gym.Env): metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30} # ----------------------------------------------------------------------- # World constants — must match Webots world file # ----------------------------------------------------------------------- MAX_SHEEP = 10 FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD] PEN_X = (10.0, 13.0) PEN_Y = (-15.0, -8.0) PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32) # ----------------------------------------------------------------------- # Dynamics — calibrated to match Webots robot specs # ----------------------------------------------------------------------- DOG_SPEED = 2.5 # m/s SHEEP_FLEE_V = 0.65 # m/s SHEEP_WANDER_V = 0.20 # m/s DT = 0.1 # seconds per step # Boid parameters — identical to sheep.py FLEE_DIST = 7.0 SEPARATION_DIST = 2.5 COHESION_DIST = 8.0 WALL_MARGIN = 3.5 # ----------------------------------------------------------------------- # Reward weights (two-phase: collect first, then drive) # ----------------------------------------------------------------------- W_DRIVE = 2.0 # progress: COM moved toward pen (only when compact) W_COLLECT = 4.0 # progress: radius shrank (2× stronger when scattered) W_ALIGN = 0.5 # position: dog on anti-pen side of COM W_COMPACT_BONUS = 0.0 # disabled: 0.1/step over 4000 steps = 400 >> W_COMPLETE=100 W_PEN_BONUS = 10.0 # per sheep penned W_COMPLETE = 100.0 # all sheep penned W_STEP_COST = 0.002 # time penalty DRIVE_GATE_RADIUS = 5.0 # flock must compact below this (m) before drive reward fires def __init__(self, n_sheep: int = 1, max_steps: int = 2000, render_mode: str = None, random_n_sheep: bool = False): super().__init__() assert 1 <= n_sheep <= self.MAX_SHEEP self.n_sheep = n_sheep self.max_steps = max_steps self.render_mode = render_mode self.random_n_sheep = random_n_sheep # if True, randomise n_sheep each reset # Fixed 17-dim observation regardless of n_sheep: # dog_pos(2) + rel_com(2) + rel_far1(2) + rel_far2(2) + rel_far3(2) # + com_to_pen(2) + far1_to_pen(2) + radius(1) + frac_penned(1) self.observation_space = spaces.Box( low=-np.inf, high=np.inf, shape=(17,), dtype=np.float32 ) # Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED self.action_space = spaces.Box( low=-1.0, high=1.0, shape=(2,), dtype=np.float32 ) # Runtime state (populated by reset) self._step_count = 0 self._prev_penned = 0 self._prev_com_dist = 0.0 # COM-to-pen distance at previous step self._prev_radius = 0.0 # flock radius at previous step self.dog_pos = np.zeros(2, dtype=np.float32) self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32) self.penned = np.ones(self.MAX_SHEEP, dtype=bool) self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32) self._fig = None # ------------------------------------------------------------------ # Curriculum interface # ------------------------------------------------------------------ def set_n_sheep(self, n: int): """Advance curriculum difficulty; takes effect on next reset().""" assert 1 <= n <= self.MAX_SHEEP self.n_sheep = n # ------------------------------------------------------------------ # Gymnasium API # ------------------------------------------------------------------ def reset(self, seed=None, options=None): super().reset(seed=seed) self._step_count = 0 self._prev_penned = 0 if self.random_n_sheep: self.n_sheep = int(self.np_random.integers(1, self.MAX_SHEEP + 1)) # Active sheep (0 .. n_sheep-1): random non-pen positions self.sheep_pos[:] = self.PEN_CENTER self.penned[:] = True placed = 0 while placed < self.n_sheep: p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32) if not self._in_pen(p): self.sheep_pos[placed] = p self.penned[placed] = False placed += 1 # Dog: 50% of resets start already behind the flock (anti-pen side, # within flee range) to give early training aligned experiences. if self.np_random.random() < 0.5: ref = self.sheep_pos[0] away = ref - self.PEN_CENTER d = float(np.linalg.norm(away)) if d > 0.1: away = away / d offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8) self.dog_pos = np.clip( (ref + offset).astype(np.float32), -self.FIELD, self.FIELD ) else: self.dog_pos = self.np_random.uniform( -self.FIELD * 0.8, self.FIELD * 0.8, size=(2,) ).astype(np.float32) self.wander_ang = self.np_random.uniform( -np.pi, np.pi, size=(self.MAX_SHEEP,) ).astype(np.float32) # Initialise previous-step values for progress rewards com, radius, _ = self._flock_stats() self._prev_com_dist = float(np.linalg.norm(com - self.PEN_CENTER)) self._prev_radius = radius return self._obs(), {} def step(self, action): self._step_count += 1 act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) self.dog_pos = np.clip( self.dog_pos + act * self.DOG_SPEED * self.DT, -self.FIELD, self.FIELD ) for i in range(self.n_sheep): if self.penned[i]: continue self.sheep_pos[i] = self._step_sheep(i) if self._in_pen(self.sheep_pos[i]): self.penned[i] = True n_penned = int(self.penned[:self.n_sheep].sum()) newly_penned = n_penned - self._prev_penned self._prev_penned = n_penned reward = self._reward(n_penned, newly_penned) terminated = n_penned == self.n_sheep truncated = self._step_count >= self.max_steps info = {"n_penned": n_penned, "n_sheep": self.n_sheep} if self.render_mode == "human": self.render() return self._obs(), float(reward), terminated, truncated, info def render(self): import matplotlib.pyplot as plt import matplotlib.patches as mpatches if self._fig is None: plt.ion() self._fig, self._ax = plt.subplots(figsize=(6, 6)) ax = self._ax ax.clear() ax.set_xlim(-16, 16); ax.set_ylim(-16, 16) ax.set_aspect("equal"); ax.set_facecolor("#dcedc8") ax.add_patch(mpatches.Rectangle( (-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2 )) pw = self.PEN_X[1] - self.PEN_X[0] ph = self.PEN_Y[1] - self.PEN_Y[0] ax.add_patch(mpatches.Rectangle( (self.PEN_X[0], self.PEN_Y[0]), pw, ph, facecolor="#ffe082", edgecolor="#795548", linewidth=2 )) ax.text(11.5, -11.5, "pen", ha="center", va="center", fontsize=8, color="#795548") com, radius, _ = self._flock_stats() ax.add_patch(plt.Circle(com, radius, color="steelblue", fill=False, linestyle="--", linewidth=1)) ax.plot(*com, "+", color="steelblue", markersize=10) for i in range(self.n_sheep): if i >= self.n_sheep: continue color = "deeppink" if self.penned[i] else "white" ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11, markeredgecolor="#555", markeredgewidth=1.5) ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13, markeredgecolor="black", markeredgewidth=1.5) ax.set_title( f"step {self._step_count} | " f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | " f"r={radius:.1f}m", fontsize=11 ) self._fig.canvas.draw() self._fig.canvas.flush_events() plt.pause(0.001) def close(self): if self._fig is not None: import matplotlib.pyplot as plt plt.close(self._fig) self._fig = None # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _in_pen(self, pos: np.ndarray) -> bool: return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and self.PEN_Y[0] < pos[1] < self.PEN_Y[1]) def _flock_stats(self): """Return (COM, radius, mean_dispersion) over active sheep.""" active_mask = ~self.penned[:self.n_sheep] if not active_mask.any(): return self.PEN_CENTER.copy(), 0.0, 0.0 pts = self.sheep_pos[:self.n_sheep][active_mask] com = pts.mean(axis=0) dists = np.linalg.norm(pts - com, axis=1) return com, float(dists.max()), float(dists.mean()) def _obs(self) -> np.ndarray: com, radius, _ = self._flock_stats() active_mask = ~self.penned[:self.n_sheep] if active_mask.any(): pts = self.sheep_pos[:self.n_sheep][active_mask] dists = np.linalg.norm(pts - com, axis=1) sorted_idx = np.argsort(dists)[::-1] # farthest first # Top-3 stragglers; pad with COM when fewer active sheep exist def nth(n): return pts[sorted_idx[n]] if len(sorted_idx) > n else com far1, far2, far3 = nth(0), nth(1), nth(2) else: far1 = far2 = far3 = self.PEN_CENTER.copy() S = self.FIELD D = 2 * self.FIELD return np.array([ self.dog_pos[0] / S, self.dog_pos[1] / S, (com[0] - self.dog_pos[0]) / D, (com[1] - self.dog_pos[1]) / D, (far1[0] - self.dog_pos[0]) / D, (far1[1] - self.dog_pos[1]) / D, (far2[0] - self.dog_pos[0]) / D, (far2[1] - self.dog_pos[1]) / D, (far3[0] - self.dog_pos[0]) / D, (far3[1] - self.dog_pos[1]) / D, (self.PEN_CENTER[0] - com[0]) / D, (self.PEN_CENTER[1] - com[1]) / D, (self.PEN_CENTER[0] - far1[0]) / D, (self.PEN_CENTER[1] - far1[1]) / D, radius / D, active_mask.sum() / self.n_sheep, ], dtype=np.float32) def _reward(self, n_penned: int, newly_penned: int) -> float: com, radius, _ = self._flock_stats() com_dist = float(np.linalg.norm(com - self.PEN_CENTER)) drive_delta = self._prev_com_dist - com_dist collect_delta = self._prev_radius - radius self._prev_com_dist = com_dist self._prev_radius = radius # Alignment: dog on anti-pen side of COM, gated by proximity. d_dog_com = float(np.linalg.norm(self.dog_pos - com)) if d_dog_com > 0.1 and com_dist > 0.1: pen_dir = (self.PEN_CENTER - com) / com_dist dog_dir = (self.dog_pos - com) / d_dog_com cosine = -float(np.dot(pen_dir, dog_dir)) proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST) alignment = cosine * proximity * self.W_ALIGN else: alignment = 0.0 scattered = radius > self.DRIVE_GATE_RADIUS # Collect always on; 2× scale when scattered to force collect-first. r_collect = collect_delta * self.W_COLLECT * (2.0 if scattered else 1.0) # Drive only fires when flock is compact — prevents rewarding COM movement # while sheep are spread across the field. r_drive = 0.0 if scattered else drive_delta * self.W_DRIVE # Small sustained reward for maintaining a compact flock. r_compact = 0.0 if scattered else self.W_COMPACT_BONUS reward = r_drive + r_collect + r_compact + alignment reward += newly_penned * self.W_PEN_BONUS reward -= self.W_STEP_COST if n_penned == self.n_sheep: reward += self.W_COMPLETE return reward def _step_sheep(self, i: int) -> np.ndarray: """Apply one timestep of boid dynamics to sheep i (mirrors sheep.py).""" pos = self.sheep_pos[i].copy() fx, fy = 0.0, 0.0 fleeing = False # Flee from dog — quadratic ramp diff = self.dog_pos - pos dist = float(np.linalg.norm(diff)) if 0.01 < dist < self.FLEE_DIST: t = 1.0 - dist / self.FLEE_DIST s = t * t * 5.0 fx -= (diff[0] / dist) * s fy -= (diff[1] / dist) * s fleeing = True # Separation (inverse-distance) + Cohesion cx, cy, cn = 0.0, 0.0, 0 for j in range(self.n_sheep): if j == i or self.penned[j]: continue dv = self.sheep_pos[j] - pos dj = float(np.linalg.norm(dv)) if 0.3 < dj < self.COHESION_DIST: cx += self.sheep_pos[j][0] cy += self.sheep_pos[j][1] cn += 1 if 0.05 < dj < self.SEPARATION_DIST: push = (self.SEPARATION_DIST - dj) / dj fx -= (dv[0] / dj) * push * 2.5 fy -= (dv[1] / dj) * push * 2.5 if cn > 0: w = 0.08 if fleeing else 0.15 fx += (cx / cn - pos[0]) * w fy += (cy / cn - pos[1]) * w # Wall avoidance m, F = self.WALL_MARGIN, self.FIELD if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0 if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0 if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0 if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0 # Hard-stop clamp: mirrors sheep.py — zero any force driving further # into the wall within 0.5 m so the flee force cannot pin the sheep. HS = 0.5 if pos[0] < -F + HS and fx < 0: fx = 0.0 if pos[0] > F - HS and fx > 0: fx = 0.0 if pos[1] < -F + HS and fy < 0: fy = 0.0 if pos[1] > F - HS and fy > 0: fy = 0.0 # Wander — suppressed while fleeing if not fleeing: if self.np_random.random() < 0.02: self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6)) fx += float(np.cos(self.wander_ang[i])) * 0.5 fy += float(np.sin(self.wander_ang[i])) * 0.5 # Integrate force = np.array([fx, fy]) mag = float(np.linalg.norm(force)) if mag > 0.01: top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V speed = min(top_speed, mag * 0.3) pos = np.clip(pos + (force / mag) * speed * self.DT, -self.FIELD, self.FIELD) return pos.astype(np.float32)