From 3574d57ba24cfed11c40679f507f556c84d4be51 Mon Sep 17 00:00:00 2001 From: Johnny Fernandes Date: Fri, 24 Apr 2026 16:30:35 +0100 Subject: [PATCH] Sheep training flock of 10 fix? --- training/herding_env.py | 118 ++++++++++++---------------------------- 1 file changed, 35 insertions(+), 83 deletions(-) diff --git a/training/herding_env.py b/training/herding_env.py index 2f6800d..8d43eb2 100644 --- a/training/herding_env.py +++ b/training/herding_env.py @@ -51,17 +51,13 @@ class HerdingEnv(gym.Env): WALL_MARGIN = 3.5 # ----------------------------------------------------------------------- - # Reward weights (two-phase: collect first, then drive) + # Reward weights (simple per-sheep progress — no phases, no gating) # ----------------------------------------------------------------------- - W_DRIVE = 2.0 # progress: COM moved toward pen (only when compact) - W_COLLECT = 4.0 # progress: radius shrank (2× stronger when scattered) - W_HERD_POS = 1.5 # progress: dog moved toward ideal herding position behind far1 - W_ALIGN = 0.5 # position: dog on anti-pen side of COM (compact only) - W_PEN_BONUS = 10.0 # per sheep penned - W_COMPLETE = 100.0 # all sheep penned - W_STEP_COST = 0.002 # time penalty - - DRIVE_GATE_RADIUS = 5.0 # flock must compact below this (m) before drive reward fires + W_PER_SHEEP = 2.0 # progress: sum of per-sheep distance-to-pen reductions + W_ALIGN = 0.3 # position: dog on anti-pen side of COM (small, directional hint) + W_PEN_BONUS = 10.0 # per sheep penned + W_COMPLETE = 100.0 # all sheep penned + W_STEP_COST = 0.002 # time penalty def __init__(self, n_sheep: int = 1, max_steps: int = 2000, render_mode: str = None, random_n_sheep: bool = False): @@ -85,11 +81,9 @@ class HerdingEnv(gym.Env): ) # Runtime state (populated by reset) - self._step_count = 0 - self._prev_penned = 0 - self._prev_com_dist = 0.0 - self._prev_radius = 0.0 - self._prev_dog_to_ideal = 0.0 + self._step_count = 0 + self._prev_penned = 0 + self._prev_pen_dist_sum = 0.0 self.dog_pos = np.zeros(2, dtype=np.float32) self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32) self.penned = np.ones(self.MAX_SHEEP, dtype=bool) @@ -151,20 +145,16 @@ class HerdingEnv(gym.Env): -np.pi, np.pi, size=(self.MAX_SHEEP,) ).astype(np.float32) - # Initialise previous-step values for progress rewards - com, radius, _ = self._flock_stats() - self._prev_com_dist = float(np.linalg.norm(com - self.PEN_CENTER)) - self._prev_radius = radius - - active_mask = ~self.penned[:self.n_sheep] - if active_mask.any(): - pts = self.sheep_pos[:self.n_sheep][active_mask] - far1 = pts[int(np.argmax(np.linalg.norm(pts - com, axis=1)))] - self._prev_dog_to_ideal = float( - np.linalg.norm(self.dog_pos - self._ideal_herd_pos(com, far1)) + # Initialise per-sheep pen-distance sum for progress reward + active = ~self.penned[:self.n_sheep] + if active.any(): + self._prev_pen_dist_sum = float( + np.linalg.norm( + self.sheep_pos[:self.n_sheep][active] - self.PEN_CENTER, axis=1 + ).sum() ) else: - self._prev_dog_to_ideal = 0.0 + self._prev_pen_dist_sum = 0.0 return self._obs(), {} @@ -302,66 +292,28 @@ class HerdingEnv(gym.Env): active_mask.sum() / self.n_sheep, ], dtype=np.float32) - def _ideal_herd_pos(self, com: np.ndarray, far1: np.ndarray) -> np.ndarray: - """ - Target position for the dog to push far1 toward COM: - just beyond far1 on the outward radial line from COM. - From here, the dog's approach causes far1 to flee inward. - """ - d = far1 - com - d_norm = float(np.linalg.norm(d)) - if d_norm > 0.5: - direction = d / d_norm - else: - # Sheep all together — use anti-pen direction instead - to_pen = self.PEN_CENTER - com - tp = float(np.linalg.norm(to_pen)) - direction = -(to_pen / tp) if tp > 0.1 else np.array([0.0, -1.0], dtype=np.float32) - target = far1 + direction * self.FLEE_DIST * 0.8 - return np.clip(target, -self.FIELD, self.FIELD).astype(np.float32) - def _reward(self, n_penned: int, newly_penned: int) -> float: - com, radius, _ = self._flock_stats() - com_dist = float(np.linalg.norm(com - self.PEN_CENTER)) - scattered = radius > self.DRIVE_GATE_RADIUS + active = ~self.penned[:self.n_sheep] - drive_delta = self._prev_com_dist - com_dist - collect_delta = self._prev_radius - radius - self._prev_com_dist = com_dist - self._prev_radius = radius - - # Collect: always active, 2× stronger when scattered. - r_collect = collect_delta * self.W_COLLECT * (2.0 if scattered else 1.0) - - # Drive: only when compact — prevents rewarding COM movement while scattered. - r_drive = 0.0 if scattered else drive_delta * self.W_DRIVE - - # Herding-position reward: guides dog to the ideal position BEHIND far1 - # (on the outward radial, FLEE_DIST beyond far1 from COM). - # From there, advancing toward COM pushes far1 inward. - # Fires in scatter phase only; gives gradient even during the outward - # navigation arc when raw approach reward would be zero/negative. - active_mask = ~self.penned[:self.n_sheep] - if scattered and active_mask.any(): - pts = self.sheep_pos[:self.n_sheep][active_mask] - far1 = pts[int(np.argmax(np.linalg.norm(pts - com, axis=1)))] - ideal = self._ideal_herd_pos(com, far1) - cur_dog_to_ideal = float(np.linalg.norm(self.dog_pos - ideal)) - r_herd_pos = (self._prev_dog_to_ideal - cur_dog_to_ideal) * self.W_HERD_POS - self._prev_dog_to_ideal = cur_dog_to_ideal + # Per-sheep progress toward pen: fires whenever any sheep moves closer. + # Naturally rewards keeping the flock together and pushing toward pen: + # dog behind flock → all sheep flee toward pen → all contribute positive reward. + # Dog from wrong side → sheep scatter away from pen → negative reward. + if active.any(): + pen_dists = np.linalg.norm( + self.sheep_pos[:self.n_sheep][active] - self.PEN_CENTER, axis=1 + ) + cur_sum = float(pen_dists.sum()) + r_progress = (self._prev_pen_dist_sum - cur_sum) * self.W_PER_SHEEP + self._prev_pen_dist_sum = cur_sum else: - r_herd_pos = 0.0 - if active_mask.any(): - pts = self.sheep_pos[:self.n_sheep][active_mask] - far1 = pts[int(np.argmax(np.linalg.norm(pts - com, axis=1)))] - ideal = self._ideal_herd_pos(com, far1) - self._prev_dog_to_ideal = float(np.linalg.norm(self.dog_pos - ideal)) + r_progress = 0.0 - # Alignment: dog on anti-pen side of COM — only in drive phase. - # Disabled when scattered: chasing a straggler on the pen side would be - # wrongly penalised otherwise. + # Small alignment hint: reward dog for being on anti-pen side of COM. + com, _, _ = self._flock_stats() + com_dist = float(np.linalg.norm(com - self.PEN_CENTER)) d_dog_com = float(np.linalg.norm(self.dog_pos - com)) - if not scattered and d_dog_com > 0.1 and com_dist > 0.1: + if d_dog_com > 0.1 and com_dist > 0.1: pen_dir = (self.PEN_CENTER - com) / com_dist dog_dir = (self.dog_pos - com) / d_dog_com cosine = -float(np.dot(pen_dir, dog_dir)) @@ -370,7 +322,7 @@ class HerdingEnv(gym.Env): else: alignment = 0.0 - reward = r_drive + r_collect + r_herd_pos + alignment + reward = r_progress + alignment reward += newly_penned * self.W_PEN_BONUS reward -= self.W_STEP_COST if n_penned == self.n_sheep: