Mimics webots approach better + debug. Lucky number

This commit is contained in:
Johnny Fernandes
2026-04-26 20:36:36 +01:00
parent deeae3193e
commit 57b1735e1a
4 changed files with 472 additions and 110 deletions
+154 -90
View File
@@ -61,6 +61,21 @@ class HerdingEnv(gym.Env):
DOG_MOTOR_MAX = 70.0 # rad/s (ShepherdDog.proto motor maxVelocity)
DOG_STOP_THRESHOLD = 0.05 # ||action|| below this → dog stops in place
# Differential-drive sheep dynamics — mirrors sheep.py drive():
SHEEP_K_TURN = 4.0 # rad/s per rad heading error (sheep.py k=4.0)
SHEEP_WHEEL_R = 0.031 # m (Sheep.proto wheel radius)
SHEEP_AXLE_TRACK = 0.20 # m (wheel anchors at y=+/-0.10 in proto)
SHEEP_MOTOR_MAX = 22.0 # rad/s (sheep.py MAX_SPEED clamp)
# Sub-stepping: 6 x ~16.7ms ≈ 100ms per env step (Webots basicTimeStep=16ms)
N_SUBSTEPS = 6
# Peer communication lag — sheep broadcast every 3 Webots steps
PEER_BROADCAST_INTERVAL = 3
# Action smoothing EMA alpha — matches shepherd_dog_rl.py ACTION_SMOOTH
ACTION_SMOOTH = 0.3
# Boid parameters — identical to sheep.py
FLEE_DIST = 7.0
SEPARATION_DIST = 2.5
@@ -138,8 +153,11 @@ class HerdingEnv(gym.Env):
self.dog_pos = np.zeros(2, dtype=np.float32)
self.dog_heading = 0.0 # radians, world frame
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self.sheep_heading = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._delayed_sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self._prev_action = np.zeros(2, dtype=np.float32)
self._fig = None
# Differential-drive debug CSV for sim/Webots parity checks.
@@ -222,10 +240,17 @@ class HerdingEnv(gym.Env):
# Random initial heading so the policy learns to handle any orientation.
self.dog_heading = float(self.np_random.uniform(-np.pi, np.pi))
self.sheep_heading = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
self.wander_ang = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
self._delayed_sheep_pos[:self.n_sheep] = self.sheep_pos[:self.n_sheep].copy()
self._prev_action = np.zeros(2, dtype=np.float32)
# Initialise per-sheep pen-distance sum for progress reward
active = ~self.penned[:self.n_sheep]
target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
@@ -249,81 +274,35 @@ class HerdingEnv(gym.Env):
self._step_count += 1
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
old_dog = self.dog_pos.copy()
# Action smoothing EMA — matches shepherd_dog_rl.py ACTION_SMOOTH
if self.ACTION_SMOOTH > 0:
act = self.ACTION_SMOOTH * self._prev_action + (1.0 - self.ACTION_SMOOTH) * act
self._prev_action = act.copy()
act_mag = float(np.linalg.norm(act))
sub_dt = self.DT / self.N_SUBSTEPS
dog_dbg = {
"target_heading": float(self.dog_heading),
"err": 0.0,
"fwd_speed": 0.0,
"left_w": 0.0,
"right_w": 0.0,
"v": 0.0,
"w": 0.0,
"err": 0.0, "fwd_speed": 0.0,
"left_w": 0.0, "right_w": 0.0, "v": 0.0, "w": 0.0,
}
# Differential-drive kinematics — mirrors Webots drive():
# action -> desired heading/speed -> wheel angular velocities (with
# saturation) -> body linear/angular velocity via wheel geometry.
act_mag = float(np.linalg.norm(act))
if act_mag < self.DOG_STOP_THRESHOLD:
# Below threshold the Webots dog stops; treat the same way here.
new_dog = self.dog_pos.copy()
else:
target_heading = float(np.arctan2(act[1], act[0]))
err = target_heading - self.dog_heading
# Wrap to (-pi, pi]
err = (err + np.pi) % (2 * np.pi) - np.pi
for _sub in range(self.N_SUBSTEPS):
# Snapshot peer positions every 3 sub-steps (mirrors sheep broadcast)
if _sub % self.PEER_BROADCAST_INTERVAL == 0:
self._delayed_sheep_pos[:self.n_sheep] = self.sheep_pos[:self.n_sheep].copy()
target_speed = act_mag * self.DOG_SPEED
fwd_speed = target_speed * max(0.0, float(np.cos(err)))
fwd_rad = fwd_speed / self.DOG_WHEEL_R
turn = self.DOG_K_TURN * err
# Dog differential-drive sub-step
dbg = self._step_dog_substep(act, sub_dt)
if dbg["v"] != 0.0 or dbg["w"] != 0.0:
dog_dbg = dbg
left_w = np.clip(fwd_rad - turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX)
right_w = np.clip(fwd_rad + turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX)
v = self.DOG_WHEEL_R * 0.5 * (right_w + left_w)
w = (self.DOG_WHEEL_R / self.DOG_AXLE_TRACK) * (right_w - left_w)
dog_dbg.update({
"target_heading": target_heading,
"err": float(err),
"fwd_speed": float(fwd_speed),
"left_w": float(left_w),
"right_w": float(right_w),
"v": float(v),
"w": float(w),
})
self.dog_heading = float(
((self.dog_heading + w * self.DT) + np.pi) % (2 * np.pi) - np.pi
)
step_vec = np.array(
[np.cos(self.dog_heading), np.sin(self.dog_heading)],
dtype=np.float32
)
new_dog = np.clip(
self.dog_pos + step_vec * v * self.DT,
-self.FIELD, self.FIELD,
)
# Pen wall collision — west and east pen walls block the dog within
# the pen's y-range. North face is open, south is the field edge.
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
if py0 < new_dog[1] < py1:
if old_dog[0] < px0 <= new_dog[0]:
new_dog[0] = px0 - 1e-3
elif old_dog[0] > px0 >= new_dog[0]:
new_dog[0] = px0 + 1e-3
if old_dog[0] > px1 >= new_dog[0]:
new_dog[0] = px1 + 1e-3
elif old_dog[0] < px1 <= new_dog[0]:
new_dog[0] = px1 - 1e-3
self.dog_pos = new_dog.astype(np.float32)
for i in range(self.n_sheep):
self.sheep_pos[i] = self._step_sheep(i)
if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True
# Sheep dynamics sub-step
for i in range(self.n_sheep):
self.sheep_pos[i] = self._step_sheep(i, sub_dt)
if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True
n_penned = int(self.penned[:self.n_sheep].sum())
newly_penned = n_penned - self._prev_penned
@@ -416,6 +395,95 @@ class HerdingEnv(gym.Env):
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
def _sheep_drive(self, i: int, target_heading: float, speed_rad: float,
dt: float) -> np.ndarray:
"""Differential-drive integration for sheep i over one sub-step dt.
Mirrors sheep.py drive(): heading error -> cos(err) forward scaling ->
wheel speeds with saturation -> unicycle kinematics.
"""
heading = float(self.sheep_heading[i])
err = (target_heading - heading + np.pi) % (2 * np.pi) - np.pi
fwd_rad = speed_rad * max(0.0, float(np.cos(err)))
turn = self.SHEEP_K_TURN * err
left_w = np.clip(fwd_rad - turn, -self.SHEEP_MOTOR_MAX, self.SHEEP_MOTOR_MAX)
right_w = np.clip(fwd_rad + turn, -self.SHEEP_MOTOR_MAX, self.SHEEP_MOTOR_MAX)
v = self.SHEEP_WHEEL_R * 0.5 * (right_w + left_w)
w = (self.SHEEP_WHEEL_R / self.SHEEP_AXLE_TRACK) * (right_w - left_w)
self.sheep_heading[i] = float(
((heading + w * dt) + np.pi) % (2 * np.pi) - np.pi
)
step_vec = np.array(
[np.cos(self.sheep_heading[i]), np.sin(self.sheep_heading[i])],
dtype=np.float32
)
return (self.sheep_pos[i] + step_vec * v * dt).astype(np.float32)
def _step_dog_substep(self, act: np.ndarray, dt: float) -> dict:
"""Move the dog one sub-step with differential-drive kinematics.
Returns debug dict with wheel/velocity info.
"""
old_dog = self.dog_pos.copy()
act_mag = float(np.linalg.norm(act))
dog_dbg = {
"target_heading": float(self.dog_heading),
"err": 0.0, "fwd_speed": 0.0,
"left_w": 0.0, "right_w": 0.0, "v": 0.0, "w": 0.0,
}
if act_mag < self.DOG_STOP_THRESHOLD:
return dog_dbg
target_heading = float(np.arctan2(act[1], act[0]))
err = (target_heading - self.dog_heading + np.pi) % (2 * np.pi) - np.pi
target_speed = act_mag * self.DOG_SPEED
fwd_speed = target_speed * max(0.0, float(np.cos(err)))
fwd_rad = fwd_speed / self.DOG_WHEEL_R
turn = self.DOG_K_TURN * err
left_w = np.clip(fwd_rad - turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX)
right_w = np.clip(fwd_rad + turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX)
v = self.DOG_WHEEL_R * 0.5 * (right_w + left_w)
w = (self.DOG_WHEEL_R / self.DOG_AXLE_TRACK) * (right_w - left_w)
dog_dbg.update({
"target_heading": target_heading, "err": float(err),
"fwd_speed": float(fwd_speed), "left_w": float(left_w),
"right_w": float(right_w), "v": float(v), "w": float(w),
})
self.dog_heading = float(
((self.dog_heading + w * dt) + np.pi) % (2 * np.pi) - np.pi
)
step_vec = np.array(
[np.cos(self.dog_heading), np.sin(self.dog_heading)],
dtype=np.float32
)
new_dog = np.clip(
self.dog_pos + step_vec * v * dt, -self.FIELD, self.FIELD,
)
# Pen wall collision
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
if py0 < new_dog[1] < py1:
if old_dog[0] < px0 <= new_dog[0]:
new_dog[0] = px0 - 1e-3
elif old_dog[0] > px0 >= new_dog[0]:
new_dog[0] = px0 + 1e-3
if old_dog[0] > px1 >= new_dog[0]:
new_dog[0] = px1 + 1e-3
elif old_dog[0] < px1 <= new_dog[0]:
new_dog[0] = px1 - 1e-3
self.dog_pos = new_dog.astype(np.float32)
return dog_dbg
def _flock_stats(self):
"""Return (COM, radius, mean_dispersion) over active sheep."""
active_mask = ~self.penned[:self.n_sheep]
@@ -574,14 +642,12 @@ class HerdingEnv(gym.Env):
}
return reward, rcomps
def _step_sheep(self, i: int) -> np.ndarray:
"""Apply one timestep of boid dynamics to sheep i (mirrors sheep.py)."""
old_pos = self.sheep_pos[i].copy() # saved for pen wall collision check
def _step_sheep(self, i: int, sub_dt: float) -> np.ndarray:
"""Apply one sub-step of boid dynamics to sheep i (mirrors sheep.py)."""
old_pos = self.sheep_pos[i].copy()
pos = old_pos.copy()
fx, fy = 0.0, 0.0
if self.penned[i]:
# Webots latch behavior: once in pen, sheep keep moving under
# confinement + penned-sheep separation + wander.
pm = 0.8 # PEN_MARGIN in sheep.py
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
@@ -594,7 +660,7 @@ class HerdingEnv(gym.Env):
for j in range(self.n_sheep):
if j == i or not self.penned[j]:
continue
dv = self.sheep_pos[j] - pos
dv = self._delayed_sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
@@ -609,9 +675,10 @@ class HerdingEnv(gym.Env):
force = np.array([fx, fy], dtype=np.float32)
mag = float(np.linalg.norm(force))
if mag > 0.01:
speed = min(self.SHEEP_FLEE_V, mag * 0.3)
pos = np.clip(pos + (force / mag) * speed * self.DT,
-self.FIELD, self.FIELD)
target_heading = float(np.arctan2(fy, fx))
speed_rad = max(3.0, min(20.0, mag * 3.0))
pos = self._sheep_drive(i, target_heading, speed_rad, sub_dt)
pos = np.clip(pos, -self.FIELD, self.FIELD)
return pos.astype(np.float32)
fleeing = False
@@ -634,16 +701,16 @@ class HerdingEnv(gym.Env):
if self.PEN_X[1] < pos[0] < self.PEN_X[1] + pem:
fx += ((self.PEN_X[1] + pem - pos[0]) / pem) * 6.0
# Separation (inverse-distance) + Cohesion
# Separation (inverse-distance) + Cohesion — uses delayed peer positions
cx, cy, cn = 0.0, 0.0, 0
for j in range(self.n_sheep):
if j == i or self.penned[j]:
continue
dv = self.sheep_pos[j] - pos
dv = self._delayed_sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.3 < dj < self.COHESION_DIST:
cx += self.sheep_pos[j][0]
cy += self.sheep_pos[j][1]
cx += self._delayed_sheep_pos[j][0]
cy += self._delayed_sheep_pos[j][1]
cn += 1
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
@@ -677,19 +744,16 @@ class HerdingEnv(gym.Env):
fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5
# Integrate
# Integrate via differential-drive (mirrors sheep.py speed mapping + drive())
force = np.array([fx, fy])
mag = float(np.linalg.norm(force))
if mag > 0.01:
top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V
speed = min(top_speed, mag * 0.3)
pos = np.clip(pos + (force / mag) * speed * self.DT,
-self.FIELD, self.FIELD)
target_heading = float(np.arctan2(fy, fx))
speed_rad = max(3.0, min(20.0, mag * 3.0)) # sheep.py line 229
pos = self._sheep_drive(i, target_heading, speed_rad, sub_dt)
pos = np.clip(pos, -self.FIELD, self.FIELD)
# Pen solid wall collision — mirrors Webots geometry.
# The pen has THREE solid walls: west (x=PEN_X[0]), east (x=PEN_X[1]),
# south (y=PEN_Y[0]). The NORTH face (y=PEN_Y[1]=-8) is the open entrance.
# Sheep may only enter through the north face; crossing a solid wall is blocked.
px0, px1 = self.PEN_X[0], self.PEN_X[1]
py0, py1 = self.PEN_Y[0], self.PEN_Y[1]
entered_from_north = (