From ffbfaa3977e7a4df8bf45f1f8c10f7bc40d0000f Mon Sep 17 00:00:00 2001 From: Johnny Fernandes Date: Thu, 23 Apr 2026 11:51:52 +0100 Subject: [PATCH] A more classical approach --- training/evaluate.py | 9 +- training/herding_env.py | 184 +++++++++++++++++++++------------------- 2 files changed, 101 insertions(+), 92 deletions(-) diff --git a/training/evaluate.py b/training/evaluate.py index 43853b5..6fe7560 100644 --- a/training/evaluate.py +++ b/training/evaluate.py @@ -94,9 +94,8 @@ def main(): # Access the underlying HerdingEnv for dispersion calculation inner = env.envs[0] if hasattr(env, "envs") else env.venv.envs[0] if not inner.penned[:inner.n_sheep].all(): - ep_dispersion.append( - pairwise_mean(inner.sheep_pos, inner.n_sheep) - ) + _, radius, _ = inner._flock_stats() + ep_dispersion.append(radius) if first_ep and render_mode == "human": pass # render() is called inside step() @@ -134,8 +133,8 @@ def main(): f" ({sum(successes)}/{args.episodes})") print(f" Time-to-pen : {mean_ttp:.1f} steps/sheep" f" (successful episodes only)") - print(f" Flock dispersion: {mean_disp:.2f} m" - f" (mean pairwise distance while active)") + print(f" Flock radius : {mean_disp:.2f} m" + f" (max sheep-to-COM distance while active)") print("=" * 50) diff --git a/training/herding_env.py b/training/herding_env.py index a1d334d..90d3aa9 100644 --- a/training/herding_env.py +++ b/training/herding_env.py @@ -10,10 +10,13 @@ Coordinate system matches the Webots world file: field : x ∈ [-15, 15], y ∈ [-15, 15] pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north) -Observation is always sized for MAX_SHEEP (currently 5) regardless of -how many sheep are active. Inactive slots are pre-penned at the pen -centre with flag=1. This keeps the model input dimension fixed across -curriculum stages so VecNormalize statistics are preserved throughout. +Observation (13-dim, fixed regardless of n_sheep): + dog position (2), flock COM relative to dog (2), farthest active sheep + relative to dog (2), pen relative to COM (2), pen relative to farthest + sheep (2), flock radius (1), mean dispersion (1), fraction penned (1). + +Permutation-invariant by design: curriculum stages share the same obs dim +so VecNormalize statistics transfer as n_sheep advances. """ import numpy as np @@ -27,16 +30,14 @@ class HerdingEnv(gym.Env): # ----------------------------------------------------------------------- # World constants — must match Webots world file # ----------------------------------------------------------------------- - MAX_SHEEP = 5 - FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD] - PEN_X = (10.0, 13.0) # quarantine pen x bounds - PEN_Y = (-15.0, -8.0) # quarantine pen y bounds + MAX_SHEEP = 5 + FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD] + PEN_X = (10.0, 13.0) + PEN_Y = (-15.0, -8.0) PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32) # ----------------------------------------------------------------------- # Dynamics — calibrated to match Webots robot specs - # wheel radius 0.031 m; sheep FLEE_SPEED 20 rad/s → 0.62 m/s - # wheel radius 0.038 m; dog maxVelocity 70 rad/s → 2.66 m/s # ----------------------------------------------------------------------- DOG_SPEED = 2.5 # m/s SHEEP_FLEE_V = 0.65 # m/s @@ -50,28 +51,27 @@ class HerdingEnv(gym.Env): WALL_MARGIN = 3.5 # ----------------------------------------------------------------------- - # Reward weights + # Reward weights (progress-based potential shaping + sparse bonuses) # ----------------------------------------------------------------------- - W_ALIGN = 0.4 # dense: dog on anti-pen side of each active sheep - W_SHAPING = 0.5 # dense: mean sheep distance to pen - W_APPROACH = 0.1 # dense: dog within flee range of nearest sheep - W_PEN_BONUS = 5.0 # sparse: per sheep successfully penned - W_COMPLETE = 20.0 # bonus when ALL active sheep are penned - W_STEP_COST = 0.002 # penalty per step (encourages efficiency) + W_DRIVE = 2.0 # flock COM moved toward pen (per metre, per step) + W_COLLECT = 1.0 # flock radius shrank (per metre, per step) + W_PEN_BONUS = 5.0 # per sheep penned + W_COMPLETE = 20.0 # all sheep penned + W_STEP_COST = 0.002 # time penalty def __init__(self, n_sheep: int = 1, max_steps: int = 2000, render_mode: str = None): super().__init__() assert 1 <= n_sheep <= self.MAX_SHEEP - self.n_sheep = n_sheep - self.max_steps = max_steps + self.n_sheep = n_sheep + self.max_steps = max_steps self.render_mode = render_mode - # Observation: dog(x,y) + MAX_SHEEP×sheep(x,y) + MAX_SHEEP×penned - # Fixed size across all curriculum stages. - obs_dim = 2 + 2 * self.MAX_SHEEP + self.MAX_SHEEP + # Fixed 13-dim observation regardless of n_sheep: + # dog_pos(2) + rel_com(2) + rel_far(2) + com_to_pen(2) + # + far_to_pen(2) + radius(1) + mean_disp(1) + frac_penned(1) self.observation_space = spaces.Box( - low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32 + low=-np.inf, high=np.inf, shape=(13,), dtype=np.float32 ) # Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED @@ -82,12 +82,14 @@ class HerdingEnv(gym.Env): # Runtime state (populated by reset) self._step_count = 0 self._prev_penned = 0 + self._prev_com_dist = 0.0 # COM-to-pen distance at previous step + self._prev_radius = 0.0 # flock radius at previous step self.dog_pos = np.zeros(2, dtype=np.float32) self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32) self.penned = np.ones(self.MAX_SHEEP, dtype=bool) self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32) - self._fig = None # lazy matplotlib figure + self._fig = None # ------------------------------------------------------------------ # Curriculum interface @@ -119,16 +121,14 @@ class HerdingEnv(gym.Env): self.penned[placed] = False placed += 1 - # Dog: 50 % of the time start already on the anti-pen side of the - # nearest sheep (within flee range) so early training gets aligned - # starts; the other 50 % is fully random to ensure generalisation. + # Dog: 50% of resets start already behind the flock (anti-pen side, + # within flee range) to give early training aligned experiences. if self.np_random.random() < 0.5: - # Place dog behind the first active sheep relative to the pen - ref = self.sheep_pos[0] - away = ref - self.PEN_CENTER # sheep→anti-pen - dist = float(np.linalg.norm(away)) - if dist > 0.1: - away = away / dist + ref = self.sheep_pos[0] + away = ref - self.PEN_CENTER + d = float(np.linalg.norm(away)) + if d > 0.1: + away = away / d offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8) self.dog_pos = np.clip( (ref + offset).astype(np.float32), -self.FIELD, self.FIELD @@ -138,25 +138,26 @@ class HerdingEnv(gym.Env): -self.FIELD * 0.8, self.FIELD * 0.8, size=(2,) ).astype(np.float32) - # Inactive slots (n_sheep .. MAX_SHEEP-1): already at pen centre, penned=True - self.wander_ang = self.np_random.uniform( -np.pi, np.pi, size=(self.MAX_SHEEP,) ).astype(np.float32) + # Initialise previous-step values for progress rewards + com, radius, _ = self._flock_stats() + self._prev_com_dist = float(np.linalg.norm(com - self.PEN_CENTER)) + self._prev_radius = radius + return self._obs(), {} def step(self, action): self._step_count += 1 - # Move dog — clip each axis independently so the agent can idle act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) self.dog_pos = np.clip( self.dog_pos + act * self.DOG_SPEED * self.DT, -self.FIELD, self.FIELD ) - # Step sheep dynamics for i in range(self.n_sheep): if self.penned[i]: continue @@ -188,16 +189,12 @@ class HerdingEnv(gym.Env): ax = self._ax ax.clear() - ax.set_xlim(-16, 16) - ax.set_ylim(-16, 16) - ax.set_aspect("equal") - ax.set_facecolor("#dcedc8") + ax.set_xlim(-16, 16); ax.set_ylim(-16, 16) + ax.set_aspect("equal"); ax.set_facecolor("#dcedc8") - # Field boundary ax.add_patch(mpatches.Rectangle( (-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2 )) - # Pen pw = self.PEN_X[1] - self.PEN_X[0] ph = self.PEN_Y[1] - self.PEN_Y[0] ax.add_patch(mpatches.Rectangle( @@ -207,21 +204,25 @@ class HerdingEnv(gym.Env): ax.text(11.5, -11.5, "pen", ha="center", va="center", fontsize=8, color="#795548") - # Sheep - for i in range(self.MAX_SHEEP): + com, radius, _ = self._flock_stats() + ax.add_patch(plt.Circle(com, radius, color="steelblue", + fill=False, linestyle="--", linewidth=1)) + ax.plot(*com, "+", color="steelblue", markersize=10) + + for i in range(self.n_sheep): if i >= self.n_sheep: - continue # inactive slot — not shown + continue color = "deeppink" if self.penned[i] else "white" ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11, markeredgecolor="#555", markeredgewidth=1.5) - # Dog ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13, markeredgecolor="black", markeredgewidth=1.5) ax.set_title( f"step {self._step_count} | " - f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep}", + f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | " + f"r={radius:.1f}m", fontsize=11 ) self._fig.canvas.draw() @@ -242,49 +243,58 @@ class HerdingEnv(gym.Env): return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and self.PEN_Y[0] < pos[1] < self.PEN_Y[1]) + def _flock_stats(self): + """Return (COM, radius, mean_dispersion) over active sheep.""" + active_mask = ~self.penned[:self.n_sheep] + if not active_mask.any(): + return self.PEN_CENTER.copy(), 0.0, 0.0 + pts = self.sheep_pos[:self.n_sheep][active_mask] + com = pts.mean(axis=0) + dists = np.linalg.norm(pts - com, axis=1) + return com, float(dists.max()), float(dists.mean()) + def _obs(self) -> np.ndarray: - scale = 1.0 / self.FIELD - return np.concatenate([ - self.dog_pos * scale, # 2 - (self.sheep_pos * scale).flatten(), # 2 * MAX_SHEEP - self.penned.astype(np.float32), # MAX_SHEEP - ]).astype(np.float32) + com, radius, mean_disp = self._flock_stats() + active_mask = ~self.penned[:self.n_sheep] + + # Farthest active sheep from COM (outlier the dog needs to chase) + if active_mask.any(): + pts = self.sheep_pos[:self.n_sheep][active_mask] + idx = int(np.argmax(np.linalg.norm(pts - com, axis=1))) + far = pts[idx] + else: + far = self.PEN_CENTER.copy() + + S = self.FIELD # normalisation scale for positions + D = 2 * self.FIELD # for relative vectors that can span the whole field + + return np.array([ + self.dog_pos[0] / S, self.dog_pos[1] / S, # dog abs pos + (com[0] - self.dog_pos[0]) / D, # COM relative to dog + (com[1] - self.dog_pos[1]) / D, + (far[0] - self.dog_pos[0]) / D, # farthest relative to dog + (far[1] - self.dog_pos[1]) / D, + (self.PEN_CENTER[0] - com[0]) / D, # COM to pen + (self.PEN_CENTER[1] - com[1]) / D, + (self.PEN_CENTER[0] - far[0]) / D, # farthest to pen + (self.PEN_CENTER[1] - far[1]) / D, + radius / D, # flock compactness + mean_disp / D, # mean spread + active_mask.sum() / self.n_sheep, # fraction still active + ], dtype=np.float32) def _reward(self, n_penned: int, newly_penned: int) -> float: - active_mask = ~self.penned[:self.n_sheep] - if active_mask.any(): - active_pos = self.sheep_pos[:self.n_sheep][active_mask] - dists_pen = np.linalg.norm(active_pos - self.PEN_CENTER, axis=1) - dists_dog = np.linalg.norm(active_pos - self.dog_pos, axis=1) + com, radius, _ = self._flock_stats() + com_dist = float(np.linalg.norm(com - self.PEN_CENTER)) - # Sheep-to-pen shaping - shaping = -(dists_pen.mean() / (2 * self.FIELD)) + # Progress rewards: positive when flock moves toward pen or compacts + drive_progress = (self._prev_com_dist - com_dist) * self.W_DRIVE + collect_progress = (self._prev_radius - radius) * self.W_COLLECT - # Approach: dog penalised for being far from nearest sheep - approach = -(dists_dog.min() / (2 * self.FIELD)) + self._prev_com_dist = com_dist + self._prev_radius = radius - # Alignment: reward dog for being on the anti-pen side of each sheep. - # When the dog is opposite the pen relative to a sheep, that sheep - # flees toward the pen. Score ∈ [-1, 1] per sheep, weighted by - # a proximity gate so only nearby dogs count. - align_scores = [] - for s_pos, d_pen, d_dog in zip(active_pos, dists_pen, dists_dog): - if d_pen < 0.1 or d_dog < 0.1: - continue - pen_dir = (self.PEN_CENTER - s_pos) / d_pen # sheep → pen - dog_dir = (self.dog_pos - s_pos) / d_dog # sheep → dog - # cos(angle): +1 → dog behind sheep, -1 → dog on pen side - cosine = -float(np.dot(pen_dir, dog_dir)) - # gate: full credit inside flee range, fades beyond - proximity = max(0.0, 1.0 - d_dog / self.FLEE_DIST) - align_scores.append(cosine * proximity) - alignment = float(np.mean(align_scores)) if align_scores else 0.0 - else: - shaping = approach = alignment = 0.0 - - reward = shaping * self.W_SHAPING - reward += approach * self.W_APPROACH - reward += alignment * self.W_ALIGN + reward = drive_progress + collect_progress reward += newly_penned * self.W_PEN_BONUS reward -= self.W_STEP_COST if n_penned == self.n_sheep: @@ -292,12 +302,12 @@ class HerdingEnv(gym.Env): return reward def _step_sheep(self, i: int) -> np.ndarray: - """Apply one timestep of boid dynamics to sheep i.""" + """Apply one timestep of boid dynamics to sheep i (mirrors sheep.py).""" pos = self.sheep_pos[i].copy() fx, fy = 0.0, 0.0 fleeing = False - # Flee from dog — quadratic ramp (mirrors sheep.py) + # Flee from dog — quadratic ramp diff = self.dog_pos - pos dist = float(np.linalg.norm(diff)) if 0.01 < dist < self.FLEE_DIST: