""" 2D herding environment for PPO training (Gymnasium-compatible). The dog agent (action: 2D velocity vector) must herd n_sheep into the quarantine pen. Sheep dynamics mirror the Webots controller exactly: flee (quadratic ramp), separation (inverse-distance), cohesion, wall avoidance, and wander. Coordinate system matches the Webots world file: field : x ∈ [-15, 15], y ∈ [-15, 15] pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north) Observation (16-dim, fixed regardless of n_sheep): dog position (2), flock COM relative to dog (2), top-3 farthest active sheep relative to dog (6), pen relative to COM (2), pen relative to farthest sheep (2), flock radius (1), fraction penned (1). Permutation-invariant by design: curriculum stages share the same obs dim so VecNormalize statistics transfer as n_sheep advances. """ import numpy as np import gymnasium as gym from gymnasium import spaces class HerdingEnv(gym.Env): metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30} # ----------------------------------------------------------------------- # World constants — must match Webots world file # ----------------------------------------------------------------------- MAX_SHEEP = 10 FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD] PEN_X = (10.0, 13.0) PEN_Y = (-15.0, -8.0) PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32) PEN_ENTRY = np.array([11.5, -8.0], dtype=np.float32) # north entrance face center # ----------------------------------------------------------------------- # Dynamics — calibrated to match Webots robot specs # ----------------------------------------------------------------------- DOG_SPEED = 2.5 # m/s SHEEP_FLEE_V = 0.65 # m/s SHEEP_WANDER_V = 0.20 # m/s DT = 0.1 # seconds per step # Boid parameters — identical to sheep.py FLEE_DIST = 7.0 SEPARATION_DIST = 2.5 COHESION_DIST = 8.0 WALL_MARGIN = 3.5 # ----------------------------------------------------------------------- # Reward weights (simple per-sheep progress — no phases, no gating) # ----------------------------------------------------------------------- W_PER_SHEEP = 2.0 # progress: sum of per-sheep distance-to-pen reductions W_ALIGN = 0.05 # gated on action magnitude — dog only earns it when moving. # Without gating this created a sit-still trap from n_sheep≥2. W_PEN_BONUS = 10.0 # per sheep penned W_COMPLETE = 100.0 # all sheep penned W_STEP_COST = 0.02 # time penalty — strong enough to punish doing nothing W_COMPACT = 0.0 # reward for flock-radius reduction (off by default) W_WALL_TOUCH = 0.01 # per-sheep, per-step penalty when an active sheep is # pinned against the outside of a pen W/E wall. Kept # small ( 0.1: away = away / d offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8) self.dog_pos = np.clip( (ref + offset).astype(np.float32), -self.FIELD, self.FIELD ) else: self.dog_pos = self.np_random.uniform( -self.FIELD * 0.8, self.FIELD * 0.8, size=(2,) ).astype(np.float32) self.wander_ang = self.np_random.uniform( -np.pi, np.pi, size=(self.MAX_SHEEP,) ).astype(np.float32) # Initialise per-sheep pen-distance sum for progress reward active = ~self.penned[:self.n_sheep] target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER if active.any(): self._prev_pen_dist_sum = float( np.linalg.norm( self.sheep_pos[:self.n_sheep][active] - target, axis=1 ).sum() ) com0 = self.sheep_pos[:self.n_sheep][active].mean(axis=0) self._prev_radius = float( np.linalg.norm(self.sheep_pos[:self.n_sheep][active] - com0, axis=1).max() ) else: self._prev_pen_dist_sum = 0.0 self._prev_radius = 0.0 return self._obs(), {} def step(self, action): self._step_count += 1 act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) old_dog = self.dog_pos.copy() new_dog = np.clip( self.dog_pos + act * self.DOG_SPEED * self.DT, -self.FIELD, self.FIELD ) # Pen wall collision — mirrors Webots geometry. West (x=PEN_X[0]) and # east (x=PEN_X[1]) walls block the dog within the pen's y-range. # North face (y=PEN_Y[1]=-8) is open. South is the field edge. px0, px1 = self.PEN_X py0, py1 = self.PEN_Y if py0 < new_dog[1] < py1: if old_dog[0] < px0 <= new_dog[0]: new_dog[0] = px0 - 1e-3 elif old_dog[0] > px0 >= new_dog[0]: new_dog[0] = px0 + 1e-3 if old_dog[0] > px1 >= new_dog[0]: new_dog[0] = px1 + 1e-3 elif old_dog[0] < px1 <= new_dog[0]: new_dog[0] = px1 - 1e-3 self.dog_pos = new_dog.astype(np.float32) for i in range(self.n_sheep): if self.penned[i]: continue self.sheep_pos[i] = self._step_sheep(i) if self._in_pen(self.sheep_pos[i]): self.penned[i] = True n_penned = int(self.penned[:self.n_sheep].sum()) newly_penned = n_penned - self._prev_penned self._prev_penned = n_penned reward, rcomps = self._reward(n_penned, newly_penned, act) terminated = n_penned == self.n_sheep truncated = self._step_count >= self.max_steps info = {"n_penned": n_penned, "n_sheep": self.n_sheep, "rcomps": rcomps} if self.render_mode == "human": self.render() return self._obs(), float(reward), terminated, truncated, info def render(self): import matplotlib.pyplot as plt import matplotlib.patches as mpatches if self._fig is None: plt.ion() self._fig, self._ax = plt.subplots(figsize=(6, 6)) ax = self._ax ax.clear() ax.set_xlim(-16, 16); ax.set_ylim(-16, 16) ax.set_aspect("equal"); ax.set_facecolor("#dcedc8") ax.add_patch(mpatches.Rectangle( (-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2 )) pw = self.PEN_X[1] - self.PEN_X[0] ph = self.PEN_Y[1] - self.PEN_Y[0] ax.add_patch(mpatches.Rectangle( (self.PEN_X[0], self.PEN_Y[0]), pw, ph, facecolor="#ffe082", edgecolor="#795548", linewidth=2 )) ax.text(11.5, -11.5, "pen", ha="center", va="center", fontsize=8, color="#795548") com, radius, _ = self._flock_stats() ax.add_patch(plt.Circle(com, radius, color="steelblue", fill=False, linestyle="--", linewidth=1)) ax.plot(*com, "+", color="steelblue", markersize=10) for i in range(self.n_sheep): if i >= self.n_sheep: continue color = "deeppink" if self.penned[i] else "white" ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11, markeredgecolor="#555", markeredgewidth=1.5) ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13, markeredgecolor="black", markeredgewidth=1.5) ax.set_title( f"step {self._step_count} | " f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | " f"r={radius:.1f}m", fontsize=11 ) self._fig.canvas.draw() self._fig.canvas.flush_events() plt.pause(0.001) def close(self): if self._fig is not None: import matplotlib.pyplot as plt plt.close(self._fig) self._fig = None # ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _in_pen(self, pos: np.ndarray) -> bool: return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and self.PEN_Y[0] < pos[1] < self.PEN_Y[1]) def _flock_stats(self): """Return (COM, radius, mean_dispersion) over active sheep.""" active_mask = ~self.penned[:self.n_sheep] if not active_mask.any(): return self.PEN_CENTER.copy(), 0.0, 0.0 pts = self.sheep_pos[:self.n_sheep][active_mask] com = pts.mean(axis=0) dists = np.linalg.norm(pts - com, axis=1) return com, float(dists.max()), float(dists.mean()) def _obs(self) -> np.ndarray: com, radius, _ = self._flock_stats() active_mask = ~self.penned[:self.n_sheep] if active_mask.any(): pts = self.sheep_pos[:self.n_sheep][active_mask] dists = np.linalg.norm(pts - com, axis=1) sorted_idx = np.argsort(dists)[::-1] # farthest first # Top-3 stragglers; pad with COM when fewer active sheep exist def nth(n): return pts[sorted_idx[n]] if len(sorted_idx) > n else com far1, far2, far3 = nth(0), nth(1), nth(2) else: far1 = far2 = far3 = self.PEN_CENTER.copy() S = self.FIELD D = 2 * self.FIELD # far1/far2/far3 expressed relative to COM, not dog. # For 1 sheep: far1-COM = far2-COM = far3-COM = [0,0] → cleanly ignorable. # For 3+ sheep: non-zero vectors tell the dog where each straggler is # within the group, without conflicting with weights trained on 1 sheep. # Pen reference for the policy. Aligned with the reward target so the # policy isn't forced to learn an implicit offset between what it sees # ("pen is here") and what it's rewarded for ("get sheep close to here"). pen_ref = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER return np.array([ self.dog_pos[0] / S, self.dog_pos[1] / S, (com[0] - self.dog_pos[0]) / D, (com[1] - self.dog_pos[1]) / D, (far1[0] - com[0]) / D, (far1[1] - com[1]) / D, (far2[0] - com[0]) / D, (far2[1] - com[1]) / D, (far3[0] - com[0]) / D, (far3[1] - com[1]) / D, (pen_ref[0] - com[0]) / D, (pen_ref[1] - com[1]) / D, (pen_ref[0] - far1[0]) / D, (pen_ref[1] - far1[1]) / D, radius / D, active_mask.sum() / self.n_sheep, ], dtype=np.float32) def _reward(self, n_penned: int, newly_penned: int, action: np.ndarray): active = ~self.penned[:self.n_sheep] # Per-sheep progress toward pen: fires whenever any sheep moves closer. # Naturally rewards keeping the flock together and pushing toward pen: # dog behind flock → all sheep flee toward pen → all contribute positive reward. # Dog from wrong side → sheep scatter away from pen → negative reward. target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER if active.any(): pen_dists = np.linalg.norm( self.sheep_pos[:self.n_sheep][active] - target, axis=1 ) cur_sum = float(pen_dists.sum()) r_progress = (self._prev_pen_dist_sum - cur_sum) * self.W_PER_SHEEP self._prev_pen_dist_sum = cur_sum else: r_progress = 0.0 com, _, _ = self._flock_stats() com_dist = float(np.linalg.norm(com - target)) d_dog_com = float(np.linalg.norm(self.dog_pos - com)) if d_dog_com > 0.1 and com_dist > 0.1: pen_dir = (target - com) / com_dist dog_dir = (self.dog_pos - com) / d_dog_com cosine = -float(np.dot(pen_dir, dog_dir)) if self.ALIGN_SHAPE == "standoff": IDEAL = 0.5 * (self.SEPARATION_DIST + self.FLEE_DIST) HALF = self.FLEE_DIST - IDEAL proximity = max(0.0, 1.0 - abs(d_dog_com - IDEAL) / HALF) else: # "near" proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST) move_gate = (min(1.0, float(np.linalg.norm(action))) if self.ALIGN_GATED else 1.0) alignment = cosine * proximity * move_gate * self.W_ALIGN else: alignment = 0.0 # Wall-touch penalty: count active sheep pinned against outside W/E pen walls. if self.W_WALL_TOUCH and active.any(): pts = self.sheep_pos[:self.n_sheep][active] px0, px1 = self.PEN_X py0, py1 = self.PEN_Y in_y = (pts[:, 1] > py0) & (pts[:, 1] < py1) near_w = (pts[:, 0] < px0) & (pts[:, 0] > px0 - self.WALL_TOUCH_BUFFER) near_e = (pts[:, 0] > px1) & (pts[:, 0] < px1 + self.WALL_TOUCH_BUFFER) n_touch = int(((near_w | near_e) & in_y).sum()) r_wall_touch = -n_touch * self.W_WALL_TOUCH else: r_wall_touch = 0.0 # Compactness shaping: reward decreases in flock radius (active sheep only) if self.W_COMPACT and active.any(): cur_radius = float(np.linalg.norm( self.sheep_pos[:self.n_sheep][active] - com, axis=1 ).max()) r_compact = (self._prev_radius - cur_radius) * self.W_COMPACT self._prev_radius = cur_radius else: r_compact = 0.0 r_pen_bonus = newly_penned * self.W_PEN_BONUS r_step_cost = -self.W_STEP_COST r_complete = self.W_COMPLETE if n_penned == self.n_sheep else 0.0 reward = (r_progress + alignment + r_compact + r_wall_touch + r_pen_bonus + r_step_cost + r_complete) rcomps = { "progress": float(r_progress), "alignment": float(alignment), "compact": float(r_compact), "wall_touch": float(r_wall_touch), "pen_bonus": float(r_pen_bonus), "step_cost": float(r_step_cost), "complete": float(r_complete), } return reward, rcomps def _step_sheep(self, i: int) -> np.ndarray: """Apply one timestep of boid dynamics to sheep i (mirrors sheep.py).""" old_pos = self.sheep_pos[i].copy() # saved for pen wall collision check pos = old_pos.copy() fx, fy = 0.0, 0.0 fleeing = False # Flee from dog — quadratic ramp diff = self.dog_pos - pos dist = float(np.linalg.norm(diff)) if 0.01 < dist < self.FLEE_DIST: t = 1.0 - dist / self.FLEE_DIST s = t * t * 5.0 fx -= (diff[0] / dist) * s fy -= (diff[1] / dist) * s fleeing = True # Separation (inverse-distance) + Cohesion cx, cy, cn = 0.0, 0.0, 0 for j in range(self.n_sheep): if j == i or self.penned[j]: continue dv = self.sheep_pos[j] - pos dj = float(np.linalg.norm(dv)) if 0.3 < dj < self.COHESION_DIST: cx += self.sheep_pos[j][0] cy += self.sheep_pos[j][1] cn += 1 if 0.05 < dj < self.SEPARATION_DIST: push = (self.SEPARATION_DIST - dj) / dj fx -= (dv[0] / dj) * push * 2.5 fy -= (dv[1] / dj) * push * 2.5 if cn > 0: w = 0.08 if fleeing else 0.15 fx += (cx / cn - pos[0]) * w fy += (cy / cn - pos[1]) * w # Wall avoidance m, F = self.WALL_MARGIN, self.FIELD if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0 if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0 if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0 if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0 # Hard-stop clamp: mirrors sheep.py — zero any force driving further # into the wall within 0.5 m so the flee force cannot pin the sheep. HS = 0.5 if pos[0] < -F + HS and fx < 0: fx = 0.0 if pos[0] > F - HS and fx > 0: fx = 0.0 if pos[1] < -F + HS and fy < 0: fy = 0.0 if pos[1] > F - HS and fy > 0: fy = 0.0 # Wander — suppressed while fleeing if not fleeing: if self.np_random.random() < 0.02: self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6)) fx += float(np.cos(self.wander_ang[i])) * 0.5 fy += float(np.sin(self.wander_ang[i])) * 0.5 # Integrate force = np.array([fx, fy]) mag = float(np.linalg.norm(force)) if mag > 0.01: top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V speed = min(top_speed, mag * 0.3) pos = np.clip(pos + (force / mag) * speed * self.DT, -self.FIELD, self.FIELD) # Pen solid wall collision — mirrors Webots geometry. # The pen has THREE solid walls: west (x=PEN_X[0]), east (x=PEN_X[1]), # south (y=PEN_Y[0]). The NORTH face (y=PEN_Y[1]=-8) is the open entrance. # Sheep may only enter through the north face; crossing a solid wall is blocked. px0, px1 = self.PEN_X[0], self.PEN_X[1] py0, py1 = self.PEN_Y[0], self.PEN_Y[1] entered_from_north = ( old_pos[1] >= py1 and pos[1] < py1 and px0 < pos[0] < px1 ) if not entered_from_north: # Block crossing through west wall from outside if old_pos[0] < px0 <= pos[0] and py0 < pos[1] < py1: pos = np.array([px0 - 1e-3, pos[1]], dtype=np.float32) # Block crossing through east wall from outside if old_pos[0] > px1 >= pos[0] and py0 < pos[1] < py1: pos = np.array([px1 + 1e-3, pos[1]], dtype=np.float32) return pos.astype(np.float32)