From 57b1735e1aa6c8723e14398a7af120693036c6e9 Mon Sep 17 00:00:00 2001 From: Johnny Fernandes Date: Sun, 26 Apr 2026 20:36:36 +0100 Subject: [PATCH] Mimics webots approach better + debug. Lucky number --- training/herding_env.py | 244 ++++++++++++-------- training/parity_test.py | 318 ++++++++++++++++++++++++++ training/runs/wheeled_n10.log | 5 - training/runs/wheeled_n10/config.json | 15 -- 4 files changed, 472 insertions(+), 110 deletions(-) create mode 100644 training/parity_test.py delete mode 100644 training/runs/wheeled_n10.log delete mode 100644 training/runs/wheeled_n10/config.json diff --git a/training/herding_env.py b/training/herding_env.py index 8aa90e7..4a722e3 100644 --- a/training/herding_env.py +++ b/training/herding_env.py @@ -61,6 +61,21 @@ class HerdingEnv(gym.Env): DOG_MOTOR_MAX = 70.0 # rad/s (ShepherdDog.proto motor maxVelocity) DOG_STOP_THRESHOLD = 0.05 # ||action|| below this → dog stops in place + # Differential-drive sheep dynamics — mirrors sheep.py drive(): + SHEEP_K_TURN = 4.0 # rad/s per rad heading error (sheep.py k=4.0) + SHEEP_WHEEL_R = 0.031 # m (Sheep.proto wheel radius) + SHEEP_AXLE_TRACK = 0.20 # m (wheel anchors at y=+/-0.10 in proto) + SHEEP_MOTOR_MAX = 22.0 # rad/s (sheep.py MAX_SPEED clamp) + + # Sub-stepping: 6 x ~16.7ms ≈ 100ms per env step (Webots basicTimeStep=16ms) + N_SUBSTEPS = 6 + + # Peer communication lag — sheep broadcast every 3 Webots steps + PEER_BROADCAST_INTERVAL = 3 + + # Action smoothing EMA alpha — matches shepherd_dog_rl.py ACTION_SMOOTH + ACTION_SMOOTH = 0.3 + # Boid parameters — identical to sheep.py FLEE_DIST = 7.0 SEPARATION_DIST = 2.5 @@ -138,8 +153,11 @@ class HerdingEnv(gym.Env): self.dog_pos = np.zeros(2, dtype=np.float32) self.dog_heading = 0.0 # radians, world frame self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32) + self.sheep_heading = np.zeros(self.MAX_SHEEP, dtype=np.float32) self.penned = np.ones(self.MAX_SHEEP, dtype=bool) self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32) + self._delayed_sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32) + self._prev_action = np.zeros(2, dtype=np.float32) self._fig = None # Differential-drive debug CSV for sim/Webots parity checks. @@ -222,10 +240,17 @@ class HerdingEnv(gym.Env): # Random initial heading so the policy learns to handle any orientation. self.dog_heading = float(self.np_random.uniform(-np.pi, np.pi)) + self.sheep_heading = self.np_random.uniform( + -np.pi, np.pi, size=(self.MAX_SHEEP,) + ).astype(np.float32) + self.wander_ang = self.np_random.uniform( -np.pi, np.pi, size=(self.MAX_SHEEP,) ).astype(np.float32) + self._delayed_sheep_pos[:self.n_sheep] = self.sheep_pos[:self.n_sheep].copy() + self._prev_action = np.zeros(2, dtype=np.float32) + # Initialise per-sheep pen-distance sum for progress reward active = ~self.penned[:self.n_sheep] target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER @@ -249,81 +274,35 @@ class HerdingEnv(gym.Env): self._step_count += 1 act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) - old_dog = self.dog_pos.copy() + + # Action smoothing EMA — matches shepherd_dog_rl.py ACTION_SMOOTH + if self.ACTION_SMOOTH > 0: + act = self.ACTION_SMOOTH * self._prev_action + (1.0 - self.ACTION_SMOOTH) * act + self._prev_action = act.copy() + + act_mag = float(np.linalg.norm(act)) + sub_dt = self.DT / self.N_SUBSTEPS dog_dbg = { "target_heading": float(self.dog_heading), - "err": 0.0, - "fwd_speed": 0.0, - "left_w": 0.0, - "right_w": 0.0, - "v": 0.0, - "w": 0.0, + "err": 0.0, "fwd_speed": 0.0, + "left_w": 0.0, "right_w": 0.0, "v": 0.0, "w": 0.0, } - # Differential-drive kinematics — mirrors Webots drive(): - # action -> desired heading/speed -> wheel angular velocities (with - # saturation) -> body linear/angular velocity via wheel geometry. - act_mag = float(np.linalg.norm(act)) - if act_mag < self.DOG_STOP_THRESHOLD: - # Below threshold the Webots dog stops; treat the same way here. - new_dog = self.dog_pos.copy() - else: - target_heading = float(np.arctan2(act[1], act[0])) - err = target_heading - self.dog_heading - # Wrap to (-pi, pi] - err = (err + np.pi) % (2 * np.pi) - np.pi + for _sub in range(self.N_SUBSTEPS): + # Snapshot peer positions every 3 sub-steps (mirrors sheep broadcast) + if _sub % self.PEER_BROADCAST_INTERVAL == 0: + self._delayed_sheep_pos[:self.n_sheep] = self.sheep_pos[:self.n_sheep].copy() - target_speed = act_mag * self.DOG_SPEED - fwd_speed = target_speed * max(0.0, float(np.cos(err))) - fwd_rad = fwd_speed / self.DOG_WHEEL_R - turn = self.DOG_K_TURN * err + # Dog differential-drive sub-step + dbg = self._step_dog_substep(act, sub_dt) + if dbg["v"] != 0.0 or dbg["w"] != 0.0: + dog_dbg = dbg - left_w = np.clip(fwd_rad - turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX) - right_w = np.clip(fwd_rad + turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX) - - v = self.DOG_WHEEL_R * 0.5 * (right_w + left_w) - w = (self.DOG_WHEEL_R / self.DOG_AXLE_TRACK) * (right_w - left_w) - dog_dbg.update({ - "target_heading": target_heading, - "err": float(err), - "fwd_speed": float(fwd_speed), - "left_w": float(left_w), - "right_w": float(right_w), - "v": float(v), - "w": float(w), - }) - - self.dog_heading = float( - ((self.dog_heading + w * self.DT) + np.pi) % (2 * np.pi) - np.pi - ) - step_vec = np.array( - [np.cos(self.dog_heading), np.sin(self.dog_heading)], - dtype=np.float32 - ) - new_dog = np.clip( - self.dog_pos + step_vec * v * self.DT, - -self.FIELD, self.FIELD, - ) - - # Pen wall collision — west and east pen walls block the dog within - # the pen's y-range. North face is open, south is the field edge. - px0, px1 = self.PEN_X - py0, py1 = self.PEN_Y - if py0 < new_dog[1] < py1: - if old_dog[0] < px0 <= new_dog[0]: - new_dog[0] = px0 - 1e-3 - elif old_dog[0] > px0 >= new_dog[0]: - new_dog[0] = px0 + 1e-3 - if old_dog[0] > px1 >= new_dog[0]: - new_dog[0] = px1 + 1e-3 - elif old_dog[0] < px1 <= new_dog[0]: - new_dog[0] = px1 - 1e-3 - self.dog_pos = new_dog.astype(np.float32) - - for i in range(self.n_sheep): - self.sheep_pos[i] = self._step_sheep(i) - if self._in_pen(self.sheep_pos[i]): - self.penned[i] = True + # Sheep dynamics sub-step + for i in range(self.n_sheep): + self.sheep_pos[i] = self._step_sheep(i, sub_dt) + if self._in_pen(self.sheep_pos[i]): + self.penned[i] = True n_penned = int(self.penned[:self.n_sheep].sum()) newly_penned = n_penned - self._prev_penned @@ -416,6 +395,95 @@ class HerdingEnv(gym.Env): return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and self.PEN_Y[0] < pos[1] < self.PEN_Y[1]) + def _sheep_drive(self, i: int, target_heading: float, speed_rad: float, + dt: float) -> np.ndarray: + """Differential-drive integration for sheep i over one sub-step dt. + + Mirrors sheep.py drive(): heading error -> cos(err) forward scaling -> + wheel speeds with saturation -> unicycle kinematics. + """ + heading = float(self.sheep_heading[i]) + err = (target_heading - heading + np.pi) % (2 * np.pi) - np.pi + + fwd_rad = speed_rad * max(0.0, float(np.cos(err))) + turn = self.SHEEP_K_TURN * err + + left_w = np.clip(fwd_rad - turn, -self.SHEEP_MOTOR_MAX, self.SHEEP_MOTOR_MAX) + right_w = np.clip(fwd_rad + turn, -self.SHEEP_MOTOR_MAX, self.SHEEP_MOTOR_MAX) + + v = self.SHEEP_WHEEL_R * 0.5 * (right_w + left_w) + w = (self.SHEEP_WHEEL_R / self.SHEEP_AXLE_TRACK) * (right_w - left_w) + + self.sheep_heading[i] = float( + ((heading + w * dt) + np.pi) % (2 * np.pi) - np.pi + ) + step_vec = np.array( + [np.cos(self.sheep_heading[i]), np.sin(self.sheep_heading[i])], + dtype=np.float32 + ) + return (self.sheep_pos[i] + step_vec * v * dt).astype(np.float32) + + def _step_dog_substep(self, act: np.ndarray, dt: float) -> dict: + """Move the dog one sub-step with differential-drive kinematics. + + Returns debug dict with wheel/velocity info. + """ + old_dog = self.dog_pos.copy() + act_mag = float(np.linalg.norm(act)) + dog_dbg = { + "target_heading": float(self.dog_heading), + "err": 0.0, "fwd_speed": 0.0, + "left_w": 0.0, "right_w": 0.0, "v": 0.0, "w": 0.0, + } + + if act_mag < self.DOG_STOP_THRESHOLD: + return dog_dbg + + target_heading = float(np.arctan2(act[1], act[0])) + err = (target_heading - self.dog_heading + np.pi) % (2 * np.pi) - np.pi + + target_speed = act_mag * self.DOG_SPEED + fwd_speed = target_speed * max(0.0, float(np.cos(err))) + fwd_rad = fwd_speed / self.DOG_WHEEL_R + turn = self.DOG_K_TURN * err + + left_w = np.clip(fwd_rad - turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX) + right_w = np.clip(fwd_rad + turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX) + + v = self.DOG_WHEEL_R * 0.5 * (right_w + left_w) + w = (self.DOG_WHEEL_R / self.DOG_AXLE_TRACK) * (right_w - left_w) + dog_dbg.update({ + "target_heading": target_heading, "err": float(err), + "fwd_speed": float(fwd_speed), "left_w": float(left_w), + "right_w": float(right_w), "v": float(v), "w": float(w), + }) + + self.dog_heading = float( + ((self.dog_heading + w * dt) + np.pi) % (2 * np.pi) - np.pi + ) + step_vec = np.array( + [np.cos(self.dog_heading), np.sin(self.dog_heading)], + dtype=np.float32 + ) + new_dog = np.clip( + self.dog_pos + step_vec * v * dt, -self.FIELD, self.FIELD, + ) + + # Pen wall collision + px0, px1 = self.PEN_X + py0, py1 = self.PEN_Y + if py0 < new_dog[1] < py1: + if old_dog[0] < px0 <= new_dog[0]: + new_dog[0] = px0 - 1e-3 + elif old_dog[0] > px0 >= new_dog[0]: + new_dog[0] = px0 + 1e-3 + if old_dog[0] > px1 >= new_dog[0]: + new_dog[0] = px1 + 1e-3 + elif old_dog[0] < px1 <= new_dog[0]: + new_dog[0] = px1 - 1e-3 + self.dog_pos = new_dog.astype(np.float32) + return dog_dbg + def _flock_stats(self): """Return (COM, radius, mean_dispersion) over active sheep.""" active_mask = ~self.penned[:self.n_sheep] @@ -574,14 +642,12 @@ class HerdingEnv(gym.Env): } return reward, rcomps - def _step_sheep(self, i: int) -> np.ndarray: - """Apply one timestep of boid dynamics to sheep i (mirrors sheep.py).""" - old_pos = self.sheep_pos[i].copy() # saved for pen wall collision check + def _step_sheep(self, i: int, sub_dt: float) -> np.ndarray: + """Apply one sub-step of boid dynamics to sheep i (mirrors sheep.py).""" + old_pos = self.sheep_pos[i].copy() pos = old_pos.copy() fx, fy = 0.0, 0.0 if self.penned[i]: - # Webots latch behavior: once in pen, sheep keep moving under - # confinement + penned-sheep separation + wander. pm = 0.8 # PEN_MARGIN in sheep.py px0, px1 = self.PEN_X py0, py1 = self.PEN_Y @@ -594,7 +660,7 @@ class HerdingEnv(gym.Env): for j in range(self.n_sheep): if j == i or not self.penned[j]: continue - dv = self.sheep_pos[j] - pos + dv = self._delayed_sheep_pos[j] - pos dj = float(np.linalg.norm(dv)) if 0.05 < dj < self.SEPARATION_DIST: push = (self.SEPARATION_DIST - dj) / dj @@ -609,9 +675,10 @@ class HerdingEnv(gym.Env): force = np.array([fx, fy], dtype=np.float32) mag = float(np.linalg.norm(force)) if mag > 0.01: - speed = min(self.SHEEP_FLEE_V, mag * 0.3) - pos = np.clip(pos + (force / mag) * speed * self.DT, - -self.FIELD, self.FIELD) + target_heading = float(np.arctan2(fy, fx)) + speed_rad = max(3.0, min(20.0, mag * 3.0)) + pos = self._sheep_drive(i, target_heading, speed_rad, sub_dt) + pos = np.clip(pos, -self.FIELD, self.FIELD) return pos.astype(np.float32) fleeing = False @@ -634,16 +701,16 @@ class HerdingEnv(gym.Env): if self.PEN_X[1] < pos[0] < self.PEN_X[1] + pem: fx += ((self.PEN_X[1] + pem - pos[0]) / pem) * 6.0 - # Separation (inverse-distance) + Cohesion + # Separation (inverse-distance) + Cohesion — uses delayed peer positions cx, cy, cn = 0.0, 0.0, 0 for j in range(self.n_sheep): if j == i or self.penned[j]: continue - dv = self.sheep_pos[j] - pos + dv = self._delayed_sheep_pos[j] - pos dj = float(np.linalg.norm(dv)) if 0.3 < dj < self.COHESION_DIST: - cx += self.sheep_pos[j][0] - cy += self.sheep_pos[j][1] + cx += self._delayed_sheep_pos[j][0] + cy += self._delayed_sheep_pos[j][1] cn += 1 if 0.05 < dj < self.SEPARATION_DIST: push = (self.SEPARATION_DIST - dj) / dj @@ -677,19 +744,16 @@ class HerdingEnv(gym.Env): fx += float(np.cos(self.wander_ang[i])) * 0.5 fy += float(np.sin(self.wander_ang[i])) * 0.5 - # Integrate + # Integrate via differential-drive (mirrors sheep.py speed mapping + drive()) force = np.array([fx, fy]) mag = float(np.linalg.norm(force)) if mag > 0.01: - top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V - speed = min(top_speed, mag * 0.3) - pos = np.clip(pos + (force / mag) * speed * self.DT, - -self.FIELD, self.FIELD) + target_heading = float(np.arctan2(fy, fx)) + speed_rad = max(3.0, min(20.0, mag * 3.0)) # sheep.py line 229 + pos = self._sheep_drive(i, target_heading, speed_rad, sub_dt) + pos = np.clip(pos, -self.FIELD, self.FIELD) # Pen solid wall collision — mirrors Webots geometry. - # The pen has THREE solid walls: west (x=PEN_X[0]), east (x=PEN_X[1]), - # south (y=PEN_Y[0]). The NORTH face (y=PEN_Y[1]=-8) is the open entrance. - # Sheep may only enter through the north face; crossing a solid wall is blocked. px0, px1 = self.PEN_X[0], self.PEN_X[1] py0, py1 = self.PEN_Y[0], self.PEN_Y[1] entered_from_north = ( diff --git a/training/parity_test.py b/training/parity_test.py new file mode 100644 index 0000000..e54fb74 --- /dev/null +++ b/training/parity_test.py @@ -0,0 +1,318 @@ +""" +Parity test: verify 2D training env matches Webots controller implementations. + +Tests: +1. Observation building: HerdingEnv._obs() vs shepherd_dog_rl.build_obs() +2. Dog drive: HerdingEnv._step_dog_substep() vs shepherd_dog_rl.drive() math +3. Sheep drive: HerdingEnv._sheep_drive() vs sheep.py drive() math +""" + +import sys +import os +import math +import numpy as np + +# Make imports work from project root +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +sys.path.insert(0, os.path.join(os.path.dirname(__file__))) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "controllers", "shepherd_dog_rl")) + +from herding_env import HerdingEnv + +# Re-implement the Webots functions standalone (no Webots dependency) +FIELD = 15.0 +PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32) +PEN_ENTRY = np.array([11.5, -8.0], dtype=np.float32) +PEN_X = (10.0, 13.0) +PEN_Y = (-15.0, -8.0) +ENTRY_AWARE = True + + +def webots_build_obs(dog_pos, sheep_positions, n_sheep, dog_heading): + """Standalone version of shepherd_dog_rl.py build_obs().""" + D = 2 * FIELD + active_pos = np.array( + [p for p in sheep_positions + if not (PEN_X[0] < p[0] < PEN_X[1] and PEN_Y[0] < p[1] < PEN_Y[1])], + dtype=np.float32 + ) + n_active = len(active_pos) + if n_active > 0: + com = active_pos.mean(axis=0) + d_from_com = np.linalg.norm(active_pos - com, axis=1) + sorted_idx = np.argsort(d_from_com)[::-1] + radius = float(d_from_com[sorted_idx[0]]) + def nth(n): + return active_pos[sorted_idx[n]] if len(sorted_idx) > n else com + far1, far2, far3 = nth(0), nth(1), nth(2) + else: + com = PEN_CENTER.copy() + radius = 0.0 + far1 = far2 = far3 = PEN_CENTER.copy() + frac_active = n_active / max(n_sheep, 1) + pen_ref = PEN_ENTRY if ENTRY_AWARE else PEN_CENTER + return np.array([ + dog_pos[0] / FIELD, dog_pos[1] / FIELD, + (com[0] - dog_pos[0]) / D, (com[1] - dog_pos[1]) / D, + (far1[0] - com[0]) / D, (far1[1] - com[1]) / D, + (far2[0] - com[0]) / D, (far2[1] - com[1]) / D, + (far3[0] - com[0]) / D, (far3[1] - com[1]) / D, + (pen_ref[0] - com[0]) / D, (pen_ref[1] - com[1]) / D, + (pen_ref[0] - far1[0]) / D, (pen_ref[1] - far1[1]) / D, + radius / D, + frac_active, + math.cos(dog_heading), math.sin(dog_heading), + ], dtype=np.float32) + + +def webots_dog_drive(heading, speed_ms, wheel_r=0.038, k_turn=4.0, + motor_max=70.0, axle_track=0.28): + """Standalone version of shepherd_dog_rl.py drive() kinematics. + + Returns (v_linear, omega, left_w, right_w). + """ + err = math.atan2(math.sin(heading), math.cos(heading)) + fwd_ms = speed_ms * max(0.0, math.cos(err)) + fwd_rad = fwd_ms / wheel_r + turn = k_turn * err + l = max(-motor_max, min(motor_max, fwd_rad - turn)) + r = max(-motor_max, min(motor_max, fwd_rad + turn)) + v = wheel_r * 0.5 * (r + l) + w = (wheel_r / axle_track) * (r - l) + return v, w, l, r + + +def webots_sheep_drive(heading, speed_rad, wheel_r=0.031, k_turn=4.0, + motor_max=22.0, axle_track=0.20): + """Standalone version of sheep.py drive() kinematics.""" + err = math.atan2(math.sin(heading), math.cos(heading)) + fwd = speed_rad * max(0.0, math.cos(err)) + k = 4.0 + l = max(-motor_max, min(motor_max, fwd - k * err)) + r = max(-motor_max, min(motor_max, fwd + k * err)) + v = wheel_r * 0.5 * (r + l) + w = (wheel_r / axle_track) * (r - l) + return v, w, l, r + + +def test_obs_parity(): + """Test that build_obs matches between 2D env and Webots controller.""" + print("=== Test 1: Observation Parity ===") + env = HerdingEnv(n_sheep=3) + # Set ENTRY_AWARE to match our webots constant + env.ENTRY_AWARE = ENTRY_AWARE + env.reset(seed=42) + + # Manually set positions for a controlled test + env.dog_pos = np.array([5.0, 3.0], dtype=np.float32) + env.dog_heading = 1.2 + env.sheep_pos[0] = np.array([0.0, 0.0], dtype=np.float32) + env.sheep_pos[1] = np.array([2.0, -1.0], dtype=np.float32) + env.sheep_pos[2] = np.array([11.5, -11.5], dtype=np.float32) # penned + env.penned[0] = False + env.penned[1] = False + env.penned[2] = True + + obs_2d = env._obs() + + # Build equivalent Webots observation + sheep_positions = [ + env.sheep_pos[0].tolist(), + env.sheep_pos[1].tolist(), + env.sheep_pos[2].tolist(), + ] + obs_webots = webots_build_obs( + env.dog_pos, sheep_positions, 3, env.dog_heading + ) + + max_diff = float(np.max(np.abs(obs_2d - obs_webots))) + print(f" Max element-wise diff: {max_diff:.2e}") + if max_diff < 1e-6: + print(" PASS: Observations match") + else: + print(" FAIL: Observations differ!") + for i in range(18): + if abs(obs_2d[i] - obs_webots[i]) > 1e-6: + print(f" dim {i}: 2d={obs_2d[i]:.6f} webots={obs_webots[i]:.6f}") + return max_diff < 1e-6 + + +def test_dog_drive_parity(): + """Test that dog diff-drive matches Webots controller.""" + print("\n=== Test 2: Dog Drive Parity ===") + env = HerdingEnv(n_sheep=1) + env.reset(seed=42) + + all_pass = True + test_cases = [ + # (heading_error, speed_ms) — target_heading relative to current heading + (0.0, 2.5), # aligned, full speed + (0.5, 2.5), # 30deg error + (1.5, 2.5), # ~86deg error + (3.14, 2.5), # ~180deg error — should spin in place + (0.0, 0.5), # aligned, slow + (0.3, 1.0), # small error, medium speed + ] + + for heading_err, speed_ms in test_cases: + env.dog_heading = 0.0 + target_heading = heading_err + action = np.array([ + math.cos(target_heading), math.sin(target_heading) + ], dtype=np.float32) * (speed_ms / env.DOG_SPEED) + + # 2D env step + dbg = env._step_dog_substep(action, 0.016) + v_2d = dbg["v"] + w_2d = dbg["w"] + l_2d = dbg["left_w"] + r_2d = dbg["right_w"] + + # Webots equivalent + v_w, w_w, l_w, r_w = webots_dog_drive(heading_err, speed_ms) + + diffs = { + "v": abs(v_2d - v_w), + "w": abs(w_2d - w_w), + "left": abs(l_2d - l_w), + "right": abs(r_2d - r_w), + } + max_diff = max(diffs.values()) + ok = max_diff < 1e-6 + status = "PASS" if ok else "FAIL" + print(f" err={heading_err:.2f} spd={speed_ms:.1f}: {status} (max_diff={max_diff:.2e})") + if not ok: + for k, d in diffs.items(): + if d > 1e-6: + print(f" {k}: 2d={eval(k+'_2d'):.6f} webots={eval(k+'_w'):.6f}") + all_pass = False + + return all_pass + + +def test_sheep_drive_parity(): + """Test that sheep diff-drive matches Webots sheep controller.""" + print("\n=== Test 3: Sheep Drive Parity ===") + env = HerdingEnv(n_sheep=1) + env.reset(seed=42) + + all_pass = True + test_cases = [ + # (heading_error, speed_rad) + (0.0, 20.0), # aligned, flee speed + (0.0, 3.0), # aligned, wander speed + (0.5, 15.0), # moderate error + (1.57, 10.0), # 90deg — should spin in place + (3.14, 20.0), # 180deg — should spin in place fast + (0.2, 8.0), # small error, medium speed + ] + + for heading_err, speed_rad in test_cases: + env.sheep_heading[0] = 0.0 + env.sheep_pos[0] = np.array([0.0, 0.0], dtype=np.float32) + target_heading = heading_err + + # 2D env + new_pos = env._sheep_drive(0, target_heading, speed_rad, 0.016) + v_2d_raw = float(np.linalg.norm(new_pos - np.array([0.0, 0.0]))) / 0.016 + # Re-derive v, w from the internal state + heading_2d = env.sheep_heading[0] + + # Webots equivalent + v_w, w_w, l_w, r_w = webots_sheep_drive(heading_err, speed_rad) + + # For 2D, compute the same intermediate values + err_2d = (target_heading - 0.0 + np.pi) % (2 * np.pi) - np.pi + fwd_2d = speed_rad * max(0.0, math.cos(err_2d)) + turn_2d = 4.0 * err_2d + l_2d = max(-22.0, min(22.0, fwd_2d - turn_2d)) + r_2d = max(-22.0, min(22.0, fwd_2d + turn_2d)) + + diffs = { + "left": abs(l_2d - l_w), + "right": abs(r_2d - r_w), + } + max_diff = max(diffs.values()) + ok = max_diff < 1e-6 + status = "PASS" if ok else "FAIL" + print(f" err={heading_err:.2f} spd={speed_rad:.1f}: {status} (max_diff={max_diff:.2e})") + if not ok: + for k, d in diffs.items(): + if d > 1e-6: + print(f" {k}: 2d={l_2d if k=='left' else r_2d:.6f} webots={l_w if k=='left' else r_w:.6f}") + all_pass = False + + return all_pass + + +def test_full_trajectory_parity(): + """Test that running identical actions produces matching trajectories.""" + print("\n=== Test 4: Full Trajectory Parity (dog only) ===") + # Run 50 steps with a fixed action, compare dog heading/position + # at each step between 2D env kinematics and pure Webots kinematics. + env = HerdingEnv(n_sheep=1) + env.reset(seed=42) + env.dog_pos = np.array([0.0, 0.0], dtype=np.float32) + env.dog_heading = 0.0 + env.ENTRY_AWARE = ENTRY_AWARE + + action = np.array([0.8, -0.6], dtype=np.float32) # magnitude 1.0 + dt = 0.016667 # sub_dt + + # Webots-side tracking + wb_heading = 0.0 + wb_x, wb_y = 0.0, 0.0 + + max_heading_diff = 0.0 + max_pos_diff = 0.0 + + for step in range(50): + # 2D env sub-step + env._step_dog_substep(action, dt) + + # Webots-side computation + speed_ms = 1.0 * 2.5 + target_heading = math.atan2(-0.6, 0.8) + err = math.atan2(math.sin(target_heading - wb_heading), + math.cos(target_heading - wb_heading)) + fwd_ms = speed_ms * max(0.0, math.cos(err)) + fwd_rad = fwd_ms / 0.038 + turn = 4.0 * err + l = max(-70.0, min(70.0, fwd_rad - turn)) + r = max(-70.0, min(70.0, fwd_rad + turn)) + v = 0.038 * 0.5 * (r + l) + w = (0.038 / 0.28) * (r - l) + wb_heading = math.atan2(math.sin(wb_heading + w * dt), + math.cos(wb_heading + w * dt)) + wb_x += math.cos(wb_heading) * v * dt + wb_y += math.sin(wb_heading) * v * dt + + heading_diff = abs(env.dog_heading - wb_heading) + pos_diff = math.hypot(env.dog_pos[0] - wb_x, env.dog_pos[1] - wb_y) + max_heading_diff = max(max_heading_diff, heading_diff) + max_pos_diff = max(max_pos_diff, pos_diff) + + print(f" Max heading diff over 50 steps: {max_heading_diff:.2e} rad") + print(f" Max position diff over 50 steps: {max_pos_diff:.2e} m") + ok = max_pos_diff < 1e-4 + print(f" {'PASS' if ok else 'FAIL'}: Trajectories match") + return ok + + +if __name__ == "__main__": + results = [] + results.append(("Obs parity", test_obs_parity())) + results.append(("Dog drive parity", test_dog_drive_parity())) + results.append(("Sheep drive parity", test_sheep_drive_parity())) + results.append(("Trajectory parity", test_full_trajectory_parity())) + + print("\n" + "=" * 50) + print("RESULTS") + print("=" * 50) + all_pass = True + for name, passed in results: + print(f" {name}: {'PASS' if passed else 'FAIL'}") + if not passed: + all_pass = False + print(f"\nOverall: {'ALL PASS' if all_pass else 'SOME FAILURES'}") + env.close() diff --git a/training/runs/wheeled_n10.log b/training/runs/wheeled_n10.log deleted file mode 100644 index 5904e21..0000000 --- a/training/runs/wheeled_n10.log +++ /dev/null @@ -1,5 +0,0 @@ -Config loaded from config.json -Config: {'W_PER_SHEEP': 2.0, 'W_ALIGN': 0.05, 'W_PEN_BONUS': 10.0, 'W_COMPLETE': 100.0, 'W_STEP_COST': 0.02, 'W_SOUTH': 0.01, 'W_COMPACT': 0.0, 'W_WALL_TOUCH': 0.0, 'WALL_TOUCH_BUFFER': 0.4, 'ALIGN_SHAPE': 'standoff', 'ALIGN_GATED': True, 'ENTRY_AWARE': True, 'ent_coef': 0.02} -Run dir: runs/wheeled_n10 -Curriculum: 1 → 10 sheep, 1,500,000 steps/stage - diff --git a/training/runs/wheeled_n10/config.json b/training/runs/wheeled_n10/config.json deleted file mode 100644 index a2a98c3..0000000 --- a/training/runs/wheeled_n10/config.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "W_PER_SHEEP": 2.0, - "W_ALIGN": 0.05, - "W_PEN_BONUS": 10.0, - "W_COMPLETE": 100.0, - "W_STEP_COST": 0.02, - "W_SOUTH": 0.01, - "W_COMPACT": 0.0, - "W_WALL_TOUCH": 0.0, - "WALL_TOUCH_BUFFER": 0.4, - "ALIGN_SHAPE": "standoff", - "ALIGN_GATED": true, - "ENTRY_AWARE": true, - "ent_coef": 0.02 -} \ No newline at end of file