""" 2D herding environment for PPO training (Gymnasium-compatible). The dog agent (action: 2D velocity vector) must herd n_sheep into the quarantine pen. Sheep dynamics mirror the Webots controller exactly: flee (quadratic ramp), separation (inverse-distance), cohesion, wall avoidance, and wander. Coordinate system matches the Webots world file: field : x ∈ [-15, 15], y ∈ [-15, 15] pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north) Observation (13-dim, fixed regardless of n_sheep): dog position (2), flock COM relative to dog (2), farthest active sheep relative to dog (2), pen relative to COM (2), pen relative to farthest sheep (2), flock radius (1), mean dispersion (1), fraction penned (1). Permutation-invariant by design: curriculum stages share the same obs dim so VecNormalize statistics transfer as n_sheep advances. """ import numpy as np import gymnasium as gym from gymnasium import spaces class HerdingEnv(gym.Env): metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30} # ----------------------------------------------------------------------- # World constants — must match Webots world file # ----------------------------------------------------------------------- MAX_SHEEP = 10 FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD] PEN_X = (10.0, 13.0) PEN_Y = (-15.0, -8.0) PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32) # ----------------------------------------------------------------------- # Dynamics — calibrated to match Webots robot specs # ----------------------------------------------------------------------- DOG_SPEED = 2.5 # m/s SHEEP_FLEE_V = 0.65 # m/s SHEEP_WANDER_V = 0.20 # m/s DT = 0.1 # seconds per step # Boid parameters — identical to sheep.py FLEE_DIST = 7.0 SEPARATION_DIST = 2.5 COHESION_DIST = 8.0 WALL_MARGIN = 3.5 # ----------------------------------------------------------------------- # Reward weights (progress-based potential shaping + sparse bonuses) # ----------------------------------------------------------------------- W_DRIVE = 2.0 # progress: flock COM moved toward pen W_COLLECT = 2.0 # progress: flock radius shrank (was 0.5 — must match W_DRIVE) W_ALIGN = 0.5 # position: dog on anti-pen side of flock COM W_PEN_BONUS = 10.0 # per sheep penned (was 5.0) W_COMPLETE = 100.0 # all sheep penned (was 20.0 — must dominate dense rewards) W_STEP_COST = 0.002 # time penalty def __init__(self, n_sheep: int = 1, max_steps: int = 2000, render_mode: str = None, random_n_sheep: bool = False): super().__init__() assert 1 <= n_sheep <= self.MAX_SHEEP self.n_sheep = n_sheep self.max_steps = max_steps self.render_mode = render_mode self.random_n_sheep = random_n_sheep # if True, randomise n_sheep each reset # Fixed 13-dim observation regardless of n_sheep: # dog_pos(2) + rel_com(2) + rel_far(2) + com_to_pen(2) # + far_to_pen(2) + radius(1) + mean_disp(1) + frac_penned(1) self.observation_space = spaces.Box( low=-np.inf, high=np.inf, shape=(13,), dtype=np.float32 ) # Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED self.action_space = spaces.Box( low=-1.0, high=1.0, shape=(2,), dtype=np.float32 ) # Runtime state (populated by reset) self._step_count = 0 self._prev_penned = 0 self._prev_com_dist = 0.0 # COM-to-pen distance at previous step self._prev_radius = 0.0 # flock radius at previous step self.dog_pos = np.zeros(2, dtype=np.float32) self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32) self.penned = np.ones(self.MAX_SHEEP, dtype=bool) self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32) self._fig = None # ------------------------------------------------------------------ # Curriculum interface # ------------------------------------------------------------------ def set_n_sheep(self, n: int): """Advance curriculum difficulty; takes effect on next reset().""" assert 1 <= n <= self.MAX_SHEEP self.n_sheep = n # ------------------------------------------------------------------ # Gymnasium API # ------------------------------------------------------------------ def reset(self, seed=None, options=None): super().reset(seed=seed) self._step_count = 0 self._prev_penned = 0 if self.random_n_sheep: self.n_sheep = int(self.np_random.integers(1, self.MAX_SHEEP + 1)) # Active sheep (0 .. n_sheep-1): random non-pen positions self.sheep_pos[:] = self.PEN_CENTER self.penned[:] = True placed = 0 while placed < self.n_sheep: p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32) if not self._in_pen(p): self.sheep_pos[placed] = p self.penned[placed] = False placed += 1 # Dog: 50% of resets start already behind the flock (anti-pen side, # within flee range) to give early training aligned experiences. if self.np_random.random() < 0.5: ref = self.sheep_pos[0] away = ref - self.PEN_CENTER d = float(np.linalg.norm(away)) if d > 0.1: away = away / d offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8) self.dog_pos = np.clip( (ref + offset).astype(np.float32), -self.FIELD, self.FIELD ) else: self.dog_pos = self.np_random.uniform( -self.FIELD * 0.8, self.FIELD * 0.8, size=(2,) ).astype(np.float32) self.wander_ang = self.np_random.uniform( -np.pi, np.pi, size=(self.MAX_SHEEP,) ).astype(np.float32) # Initialise previous-step values for progress rewards com, radius, _ = self._flock_stats() self._prev_com_dist = float(np.linalg.norm(com - self.PEN_CENTER)) self._prev_radius = radius return self._obs(), {} def step(self, action): self._step_count += 1 act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) self.dog_pos = np.clip( self.dog_pos + act * self.DOG_SPEED * self.DT, -self.FIELD, self.FIELD ) for i in range(self.n_sheep): if self.penned[i]: continue self.sheep_pos[i] = self._step_sheep(i) if self._in_pen(self.sheep_pos[i]): self.penned[i] = True n_penned = int(self.penned[:self.n_sheep].sum()) newly_penned = n_penned - self._prev_penned self._prev_penned = n_penned reward = self._reward(n_penned, newly_penned) terminated = n_penned == self.n_sheep truncated = self._step_count >= self.max_steps info = {"n_penned": n_penned, "n_sheep": self.n_sheep} if self.render_mode == "human": self.render() return self._obs(), float(reward), terminated, truncated, info def render(self): import matplotlib.pyplot as plt import matplotlib.patches as mpatches if self._fig is None: plt.ion() self._fig, self._ax = plt.subplots(figsize=(6, 6)) ax = self._ax ax.clear() ax.set_xlim(-16, 16); ax.set_ylim(-16, 16) ax.set_aspect("equal"); ax.set_facecolor("#dcedc8") ax.add_patch(mpatches.Rectangle( (-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2 )) pw = self.PEN_X[1] - self.PEN_X[0] ph = self.PEN_Y[1] - self.PEN_Y[0] ax.add_patch(mpatches.Rectangle( (self.PEN_X[0], self.PEN_Y[0]), pw, ph, facecolor="#ffe082", edgecolor="#795548", linewidth=2 )) ax.text(11.5, -11.5, "pen", ha="center", va="center", fontsize=8, color="#795548") com, radius, _ = self._flock_stats() ax.add_patch(plt.Circle(com, radius, color="steelblue", fill=False, linestyle="--", linewidth=1)) ax.plot(*com, "+", color="steelblue", markersize=10) for i in range(self.n_sheep): if i >= self.n_sheep: continue color = "deeppink" if self.penned[i] else "white" ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11, markeredgecolor="#555", markeredgewidth=1.5) ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13, markeredgecolor="black", markeredgewidth=1.5) ax.set_title( f"step {self._step_count} | " f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | " f"r={radius:.1f}m", fontsize=11 ) self._fig.canvas.draw() self._fig.canvas.flush_events() plt.pause(0.001) def close(self): if self._fig is not None: import matplotlib.pyplot as plt plt.close(self._fig) self._fig = None # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _in_pen(self, pos: np.ndarray) -> bool: return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and self.PEN_Y[0] < pos[1] < self.PEN_Y[1]) def _flock_stats(self): """Return (COM, radius, mean_dispersion) over active sheep.""" active_mask = ~self.penned[:self.n_sheep] if not active_mask.any(): return self.PEN_CENTER.copy(), 0.0, 0.0 pts = self.sheep_pos[:self.n_sheep][active_mask] com = pts.mean(axis=0) dists = np.linalg.norm(pts - com, axis=1) return com, float(dists.max()), float(dists.mean()) def _obs(self) -> np.ndarray: com, radius, mean_disp = self._flock_stats() active_mask = ~self.penned[:self.n_sheep] # Farthest active sheep from COM (outlier the dog needs to chase) if active_mask.any(): pts = self.sheep_pos[:self.n_sheep][active_mask] idx = int(np.argmax(np.linalg.norm(pts - com, axis=1))) far = pts[idx] else: far = self.PEN_CENTER.copy() S = self.FIELD # normalisation scale for positions D = 2 * self.FIELD # for relative vectors that can span the whole field return np.array([ self.dog_pos[0] / S, self.dog_pos[1] / S, # dog abs pos (com[0] - self.dog_pos[0]) / D, # COM relative to dog (com[1] - self.dog_pos[1]) / D, (far[0] - self.dog_pos[0]) / D, # farthest relative to dog (far[1] - self.dog_pos[1]) / D, (self.PEN_CENTER[0] - com[0]) / D, # COM to pen (self.PEN_CENTER[1] - com[1]) / D, (self.PEN_CENTER[0] - far[0]) / D, # farthest to pen (self.PEN_CENTER[1] - far[1]) / D, radius / D, # flock compactness mean_disp / D, # mean spread active_mask.sum() / self.n_sheep, # fraction still active ], dtype=np.float32) def _reward(self, n_penned: int, newly_penned: int) -> float: com, radius, _ = self._flock_stats() com_dist = float(np.linalg.norm(com - self.PEN_CENTER)) # Progress rewards: positive when state improves drive_progress = (self._prev_com_dist - com_dist) * self.W_DRIVE collect_progress = (self._prev_radius - radius) * self.W_COLLECT self._prev_com_dist = com_dist self._prev_radius = radius # Alignment: reward dog for being on the anti-pen side of the flock # COM, gated by proximity so only nearby positioning counts. # +1 = dog directly behind flock, -1 = dog on pen side (wrong). d_dog_com = float(np.linalg.norm(self.dog_pos - com)) if d_dog_com > 0.1 and com_dist > 0.1: pen_dir = (self.PEN_CENTER - com) / com_dist # COM → pen dog_dir = (self.dog_pos - com) / d_dog_com # COM → dog cosine = -float(np.dot(pen_dir, dog_dir)) # +1 when opposite proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST) alignment = cosine * proximity * self.W_ALIGN else: alignment = 0.0 reward = drive_progress + collect_progress + alignment reward += newly_penned * self.W_PEN_BONUS reward -= self.W_STEP_COST if n_penned == self.n_sheep: reward += self.W_COMPLETE return reward def _step_sheep(self, i: int) -> np.ndarray: """Apply one timestep of boid dynamics to sheep i (mirrors sheep.py).""" pos = self.sheep_pos[i].copy() fx, fy = 0.0, 0.0 fleeing = False # Flee from dog — quadratic ramp diff = self.dog_pos - pos dist = float(np.linalg.norm(diff)) if 0.01 < dist < self.FLEE_DIST: t = 1.0 - dist / self.FLEE_DIST s = t * t * 5.0 fx -= (diff[0] / dist) * s fy -= (diff[1] / dist) * s fleeing = True # Separation (inverse-distance) + Cohesion cx, cy, cn = 0.0, 0.0, 0 for j in range(self.n_sheep): if j == i or self.penned[j]: continue dv = self.sheep_pos[j] - pos dj = float(np.linalg.norm(dv)) if 0.3 < dj < self.COHESION_DIST: cx += self.sheep_pos[j][0] cy += self.sheep_pos[j][1] cn += 1 if 0.05 < dj < self.SEPARATION_DIST: push = (self.SEPARATION_DIST - dj) / dj fx -= (dv[0] / dj) * push * 2.5 fy -= (dv[1] / dj) * push * 2.5 if cn > 0: w = 0.08 if fleeing else 0.15 fx += (cx / cn - pos[0]) * w fy += (cy / cn - pos[1]) * w # Wall avoidance m, F = self.WALL_MARGIN, self.FIELD if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0 if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0 if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0 if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0 # Hard-stop clamp: mirrors sheep.py — zero any force driving further # into the wall within 0.5 m so the flee force cannot pin the sheep. HS = 0.5 if pos[0] < -F + HS and fx < 0: fx = 0.0 if pos[0] > F - HS and fx > 0: fx = 0.0 if pos[1] < -F + HS and fy < 0: fy = 0.0 if pos[1] > F - HS and fy > 0: fy = 0.0 # Wander — suppressed while fleeing if not fleeing: if self.np_random.random() < 0.02: self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6)) fx += float(np.cos(self.wander_ang[i])) * 0.5 fy += float(np.sin(self.wander_ang[i])) * 0.5 # Integrate force = np.array([fx, fy]) mag = float(np.linalg.norm(force)) if mag > 0.01: top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V speed = min(top_speed, mag * 0.3) pos = np.clip(pos + (force / mag) * speed * self.DT, -self.FIELD, self.FIELD) return pos.astype(np.float32)