Mimics webots approach better + debug. Lucky number

This commit is contained in:
Johnny Fernandes
2026-04-26 18:55:53 +01:00
parent 1af7d03ce2
commit deeae3193e
142 changed files with 138 additions and 31306 deletions
+126 -30
View File
@@ -19,6 +19,7 @@ Permutation-invariant by design: curriculum stages share the same obs dim
so VecNormalize statistics transfer as n_sheep advances.
"""
import csv
import numpy as np
import gymnasium as gym
from gymnasium import spaces
@@ -31,7 +32,8 @@ class HerdingEnv(gym.Env):
# World constants — must match Webots world file
# -----------------------------------------------------------------------
MAX_SHEEP = 10
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
FIELD = 15.0 # field wall geometry in world file
SHEEP_WALL_INNER = 14.5 # sheep.py wall checks use ±14.5
PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
@@ -41,17 +43,22 @@ class HerdingEnv(gym.Env):
# Dynamics — calibrated to match Webots robot specs
# -----------------------------------------------------------------------
DOG_SPEED = 2.5 # m/s
SHEEP_FLEE_V = 0.65 # m/s
SHEEP_WANDER_V = 0.20 # m/s
SHEEP_FLEE_V = 0.62 # m/s (20 rad/s * 0.031 m wheel radius in sheep.py)
SHEEP_WANDER_V = 0.093 # m/s (3 rad/s * 0.031 m wheel radius in sheep.py)
DT = 0.1 # seconds per step
# Wheeled dog dynamics — mirror the Webots controller's drive():
# forward speed gated by cos(heading_error); turn rate proportional to
# error. Without this, the env treats the dog as a particle that can
# change direction instantly, producing policies that bang-bang and don't
# transfer to the wheeled Webots robot.
DOG_K_TURN = 4.0 # rad/s per rad (heading-error gain)
DOG_MAX_TURN_RATE = 6.0 # rad/s (cap on turn rate)
# Differential-drive dog dynamics — mirrors shepherd_dog_rl.py drive():
# speed_ms = ||a|| * DOG_SPEED
# err = wrap(target_heading - heading)
# fwd_ms = speed_ms * max(0, cos(err))
# fwd_rad = fwd_ms / DOG_WHEEL_R
# turn = DOG_K_TURN * err
# l = clamp(fwd_rad - turn), r = clamp(fwd_rad + turn)
# Then integrated as unicycle kinematics using wheel geometry.
DOG_K_TURN = 4.0 # rad/s per rad (matches Webots controller)
DOG_WHEEL_R = 0.038 # m (ShepherdDog.proto wheel radius)
DOG_AXLE_TRACK = 0.28 # m (wheel anchors at y=±0.14 in proto)
DOG_MOTOR_MAX = 70.0 # rad/s (ShepherdDog.proto motor maxVelocity)
DOG_STOP_THRESHOLD = 0.05 # ||action|| below this → dog stops in place
# Boid parameters — identical to sheep.py
@@ -135,6 +142,15 @@ class HerdingEnv(gym.Env):
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._fig = None
# Differential-drive debug CSV for sim/Webots parity checks.
# Always on by design.
self._dog_debug_file = open("dog_debug.csv", "w", newline="")
self._dog_debug_writer = csv.writer(self._dog_debug_file)
self._dog_debug_writer.writerow([
"step", "act_x", "act_y", "act_mag", "heading", "target_heading",
"heading_err", "fwd_speed", "left_w", "right_w", "v", "w",
"dog_x", "dog_y",
])
# ------------------------------------------------------------------
# Curriculum interface
@@ -234,12 +250,19 @@ class HerdingEnv(gym.Env):
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
old_dog = self.dog_pos.copy()
dog_dbg = {
"target_heading": float(self.dog_heading),
"err": 0.0,
"fwd_speed": 0.0,
"left_w": 0.0,
"right_w": 0.0,
"v": 0.0,
"w": 0.0,
}
# Wheeled-dog kinematics — mirrors the Webots controller's drive():
# interpret (vx, vy) as a desired velocity vector in world frame; the
# dog turns toward it at a limited rate, and forward speed is gated
# by cos(heading_error). Bang-bang policies still produce smooth
# motion (the dog can't sidestep — it has to turn first).
# Differential-drive kinematics — mirrors Webots drive():
# action -> desired heading/speed -> wheel angular velocities (with
# saturation) -> body linear/angular velocity via wheel geometry.
act_mag = float(np.linalg.norm(act))
if act_mag < self.DOG_STOP_THRESHOLD:
# Below threshold the Webots dog stops; treat the same way here.
@@ -249,19 +272,36 @@ class HerdingEnv(gym.Env):
err = target_heading - self.dog_heading
# Wrap to (-pi, pi]
err = (err + np.pi) % (2 * np.pi) - np.pi
turn_rate = np.clip(self.DOG_K_TURN * err,
-self.DOG_MAX_TURN_RATE,
self.DOG_MAX_TURN_RATE)
self.dog_heading = float(
((self.dog_heading + turn_rate * self.DT) + np.pi)
% (2 * np.pi) - np.pi
)
target_speed = act_mag * self.DOG_SPEED
fwd_speed = target_speed * max(0.0, float(np.cos(err)))
step_vec = np.array([np.cos(self.dog_heading),
np.sin(self.dog_heading)], dtype=np.float32)
fwd_rad = fwd_speed / self.DOG_WHEEL_R
turn = self.DOG_K_TURN * err
left_w = np.clip(fwd_rad - turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX)
right_w = np.clip(fwd_rad + turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX)
v = self.DOG_WHEEL_R * 0.5 * (right_w + left_w)
w = (self.DOG_WHEEL_R / self.DOG_AXLE_TRACK) * (right_w - left_w)
dog_dbg.update({
"target_heading": target_heading,
"err": float(err),
"fwd_speed": float(fwd_speed),
"left_w": float(left_w),
"right_w": float(right_w),
"v": float(v),
"w": float(w),
})
self.dog_heading = float(
((self.dog_heading + w * self.DT) + np.pi) % (2 * np.pi) - np.pi
)
step_vec = np.array(
[np.cos(self.dog_heading), np.sin(self.dog_heading)],
dtype=np.float32
)
new_dog = np.clip(
self.dog_pos + step_vec * fwd_speed * self.DT,
self.dog_pos + step_vec * v * self.DT,
-self.FIELD, self.FIELD,
)
@@ -281,8 +321,6 @@ class HerdingEnv(gym.Env):
self.dog_pos = new_dog.astype(np.float32)
for i in range(self.n_sheep):
if self.penned[i]:
continue
self.sheep_pos[i] = self._step_sheep(i)
if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True
@@ -295,7 +333,18 @@ class HerdingEnv(gym.Env):
terminated = n_penned == self.n_sheep
truncated = self._step_count >= self.max_steps
info = {"n_penned": n_penned, "n_sheep": self.n_sheep,
"rcomps": rcomps}
"rcomps": rcomps, "dog_dyn": dog_dbg}
self._dog_debug_writer.writerow([
self._step_count,
float(act[0]), float(act[1]), act_mag,
float(self.dog_heading), dog_dbg["target_heading"], dog_dbg["err"],
dog_dbg["fwd_speed"], dog_dbg["left_w"], dog_dbg["right_w"],
dog_dbg["v"], dog_dbg["w"],
float(self.dog_pos[0]), float(self.dog_pos[1]),
])
if self._step_count % 200 == 0:
self._dog_debug_file.flush()
if self.render_mode == "human":
self.render()
@@ -357,6 +406,7 @@ class HerdingEnv(gym.Env):
import matplotlib.pyplot as plt
plt.close(self._fig)
self._fig = None
self._dog_debug_file.close()
# ------------------------------------------------------------------
# Internals
@@ -529,6 +579,41 @@ class HerdingEnv(gym.Env):
old_pos = self.sheep_pos[i].copy() # saved for pen wall collision check
pos = old_pos.copy()
fx, fy = 0.0, 0.0
if self.penned[i]:
# Webots latch behavior: once in pen, sheep keep moving under
# confinement + penned-sheep separation + wander.
pm = 0.8 # PEN_MARGIN in sheep.py
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
x, y = float(pos[0]), float(pos[1])
if x < px0 + pm: fx += ((px0 + pm - x) / pm) * 15.0
if x > px1 - pm: fx -= ((x - (px1 - pm)) / pm) * 15.0
if y < py0 + pm: fy += ((py0 + pm - y) / pm) * 15.0
if y > py1 - pm: fy -= ((y - (py1 - pm)) / pm) * 15.0
for j in range(self.n_sheep):
if j == i or not self.penned[j]:
continue
dv = self.sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
fx -= (dv[0] / dj) * push * 2.5
fy -= (dv[1] / dj) * push * 2.5
if self.np_random.random() < 0.02:
self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6))
fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5
force = np.array([fx, fy], dtype=np.float32)
mag = float(np.linalg.norm(force))
if mag > 0.01:
speed = min(self.SHEEP_FLEE_V, mag * 0.3)
pos = np.clip(pos + (force / mag) * speed * self.DT,
-self.FIELD, self.FIELD)
return pos.astype(np.float32)
fleeing = False
# Flee from dog — quadratic ramp
@@ -536,11 +621,19 @@ class HerdingEnv(gym.Env):
dist = float(np.linalg.norm(diff))
if 0.01 < dist < self.FLEE_DIST:
t = 1.0 - dist / self.FLEE_DIST
s = t * t * 5.0
s = t * t * 20.0
fx -= (diff[0] / dist) * s
fy -= (diff[1] / dist) * s
fleeing = True
# Repel unpenned sheep from pen side-wall exteriors (sheep.py PEN_EXT_MARGIN).
if self.PEN_Y[0] < pos[1] < self.PEN_Y[1]:
pem = 0.8
if self.PEN_X[0] - pem < pos[0] < self.PEN_X[0]:
fx -= ((pos[0] - (self.PEN_X[0] - pem)) / pem) * 6.0
if self.PEN_X[1] < pos[0] < self.PEN_X[1] + pem:
fx += ((self.PEN_X[1] + pem - pos[0]) / pem) * 6.0
# Separation (inverse-distance) + Cohesion
cx, cy, cn = 0.0, 0.0, 0
for j in range(self.n_sheep):
@@ -562,7 +655,7 @@ class HerdingEnv(gym.Env):
fy += (cy / cn - pos[1]) * w
# Wall avoidance
m, F = self.WALL_MARGIN, self.FIELD
m, F = self.WALL_MARGIN, self.SHEEP_WALL_INNER
if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0
if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0
if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0
@@ -609,5 +702,8 @@ class HerdingEnv(gym.Env):
# Block crossing through east wall from outside
if old_pos[0] > px1 >= pos[0] and py0 < pos[1] < py1:
pos = np.array([px1 + 1e-3, pos[1]], dtype=np.float32)
# Block crossing through south wall from outside
if old_pos[1] < py0 <= pos[1] and px0 < pos[0] < px1:
pos = np.array([pos[0], py0 - 1e-3], dtype=np.float32)
return pos.astype(np.float32)