Compare commits

...

77 Commits

Author SHA1 Message Date
Johnny Fernandes a2363d882f Trying attention method 2026-04-26 22:28:43 +01:00
Johnny Fernandes 57b1735e1a Mimics webots approach better + debug. Lucky number 2026-04-26 20:36:36 +01:00
Johnny Fernandes deeae3193e Mimics webots approach better + debug. Lucky number 2026-04-26 18:55:53 +01:00
Johnny Fernandes 1af7d03ce2 Mimic webots physics 2026-04-26 18:22:26 +01:00
Johnny Fernandes 8110fc3143 Run n3 2026-04-26 16:42:55 +00:00
Johnny Fernandes ad185b4d7e Approach v4 simpler version 2026-04-26 17:18:20 +01:00
Johnny Fernandes 27fe6d1bf5 Run v3 2026-04-26 16:01:30 +00:00
Johnny Fernandes e2883212c5 Approach v3 w/ south penalty fix 2026-04-26 15:26:24 +01:00
Johnny Fernandes 11e13c6980 Approach v3 w/ south penalty 2026-04-26 14:55:13 +01:00
Johnny Fernandes a561f8a697 Run v2 2026-04-26 13:32:48 +00:00
Johnny Fernandes a44ddb7b08 Approach refinement 2026-04-26 12:59:04 +01:00
Johnny Fernandes acf0810425 Test26_1200 2026-04-26 11:04:23 +00:00
Johnny Fernandes 3cfd6b5e81 Approach refinement 2026-04-26 02:55:14 +01:00
Johnny Fernandes d1aab20322 Approach refinement 2026-04-26 02:19:10 +01:00
Johnny Fernandes 287743709a Approach refinement 2026-04-26 02:02:25 +01:00
Johnny Fernandes 61f8a7db15 Cleanup and new approach 2026-04-26 01:50:01 +01:00
Johnny Fernandes b031473758 Behaviour refinement - fence penalty 2026-04-26 01:09:50 +01:00
Johnny Fernandes 6253850620 Behaviour refinement - fence penalty 2026-04-25 23:42:02 +01:00
Johnny Fernandes 6612dbc1ba Test25_2330 2026-04-25 22:32:06 +00:00
Johnny Fernandes 7b87908410 Behaviour refinement 2026-04-25 21:35:23 +01:00
Johnny Fernandes e302c76886 Test25_2025 2026-04-25 19:25:39 +00:00
Johnny Fernandes 841f5fa520 Test25_2000 2026-04-25 19:17:40 +00:00
Johnny Fernandes 7bfb7d3aae Sheep training flock _ improver 2026-04-25 18:46:41 +01:00
Johnny Fernandes 5005128c07 Test25_1820 2026-04-25 17:19:02 +00:00
Johnny Fernandes 16878c5a0b Sheep training flock _ improver 2026-04-25 18:02:56 +01:00
Johnny Fernandes 75d030cb49 Test25_1800 2026-04-25 17:00:19 +00:00
Johnny Fernandes cc6d72e472 Sheep training flock _ improver 2026-04-25 17:07:03 +01:00
Johnny Fernandes 3a5decb185 Test25_1700 2026-04-25 16:02:10 +00:00
Johnny Fernandes 75c5b7c014 Sheep training flock _ improver 2026-04-25 16:28:15 +01:00
Johnny Fernandes 4350c7d320 Test25_1600 2026-04-25 15:06:06 +00:00
Johnny Fernandes cd7e62b1b2 Sheep training flock _ improver 2026-04-25 13:39:49 +01:00
Johnny Fernandes 9bbef28515 Sheep training flock _ improver 2026-04-25 13:30:37 +01:00
Johnny Fernandes 438fa1be1d Sheep training flock _ improver 2026-04-25 13:24:52 +01:00
Johnny Fernandes e7c1d82f5c Test25_1315 2026-04-25 12:14:36 +00:00
Johnny Fernandes f889dc78cc Sheep training flock _ improver 2026-04-25 12:50:06 +01:00
Johnny Fernandes 19bfac9bd9 Test25_1245 2026-04-25 11:47:37 +00:00
Johnny Fernandes 02b20fbdb4 Sheep training flock _ improver 2026-04-25 12:20:42 +01:00
Johnny Fernandes 433652cb94 Test25_1215 2026-04-25 11:16:12 +00:00
Johnny Fernandes fbe76a0d04 Sheep training flock _ improver 2026-04-25 11:31:39 +01:00
Johnny Fernandes 062de676c9 Test25_0030 2026-04-24 23:37:03 +00:00
Johnny Fernandes 7d5725cc3e Sheep training flock _ improver 2026-04-25 00:18:01 +01:00
Johnny Fernandes 5a61a424ee Test25_0010 2026-04-24 23:10:33 +00:00
Johnny Fernandes c029c3fc6c Sheep training flock _ improver 2026-04-24 23:51:47 +01:00
Johnny Fernandes b77f36b713 Sheep training flock _ improver 2026-04-24 23:38:09 +01:00
Johnny Fernandes 0716c6c3c8 Sheep training flock _ improver 2026-04-24 23:27:05 +01:00
Johnny Fernandes b3251fcca3 Sheep training flock _ improver 2026-04-24 22:46:51 +01:00
Johnny Fernandes d599181d22 Sheep training flock _ improver 2026-04-24 21:29:44 +01:00
Johnny Fernandes 8b54b2a934 Test24_2120 2026-04-24 20:21:53 +00:00
Johnny Fernandes eb29cdf402 Test24_2100 2026-04-24 20:08:25 +00:00
Johnny Fernandes 36b3216c5f Sheep training flock of 10 fix? 2026-04-24 19:05:41 +01:00
Johnny Fernandes 7bb545eab6 Sheep training flock of 10 fix? 2026-04-24 19:03:18 +01:00
Johnny Fernandes efe996a5a9 Test24_1900 2026-04-24 18:00:20 +00:00
Johnny Fernandes 3bac24f406 Sheep training flock of 10 fix? 2026-04-24 18:29:23 +01:00
Johnny Fernandes fc961e651c Sheep training flock of 10 fix? 2026-04-24 18:06:22 +01:00
Johnny Fernandes 65d881aa0f Test24_1800 2026-04-24 17:00:14 +00:00
Johnny Fernandes bf9fe902d9 Sheep training flock of 10 fix? 2026-04-24 17:49:42 +01:00
Johnny Fernandes 4d7f365358 Sheep training flock of 10 fix? 2026-04-24 17:31:11 +01:00
Johnny Fernandes c2da9c10e4 Test24_1725 2026-04-24 16:24:54 +00:00
Johnny Fernandes d8b4e2c042 Sheep training flock of 10 fix? 2026-04-24 17:08:47 +01:00
Johnny Fernandes e0426bf320 Sheep training flock of 10 fix? 2026-04-24 16:46:02 +01:00
Johnny Fernandes 3574d57ba2 Sheep training flock of 10 fix? 2026-04-24 16:30:35 +01:00
Johnny Fernandes 58d773cb7c Sheep training flock of 10 fix? 2026-04-24 16:12:16 +01:00
Johnny Fernandes fe5174e0bd Sheep training flock of 10 fix? 2026-04-24 15:55:15 +01:00
Johnny Fernandes 678d757fe8 Sheep training flock of 10 fix? 2026-04-24 15:24:37 +01:00
Johnny Fernandes 44b2788e78 Sheep training flock of 10 fix? 2026-04-24 15:14:45 +01:00
Johnny Fernandes bdbe8ba1de Sheep training flock of 10 fix? 2026-04-24 15:10:36 +01:00
Johnny Fernandes fcfa2c35c8 Sheep training flock of 10 fix? 2026-04-24 14:54:20 +01:00
Johnny Fernandes 17eb25864e Sheep training flock of 10 fix? 2026-04-24 10:58:36 +01:00
Johnny Fernandes 4189cc8dba Sheep training flock of 10 fix? 2026-04-24 01:59:15 +01:00
Johnny Fernandes 1e3b67d194 Test24_0150 2026-04-24 00:50:17 +00:00
Johnny Fernandes f68dea44da Sheep training flock of 10 fix? 2026-04-23 23:20:23 +01:00
Johnny Fernandes a13f5d0ff0 Sheep training flock of 10 fix? 2026-04-23 20:41:48 +01:00
Johnny Fernandes 81dc2aca01 Sheep training flock of 10 2026-04-23 19:22:39 +01:00
Johnny Fernandes fdac0ae0b0 Shepherd Dog RL 2026-04-23 19:22:14 +01:00
Johnny Fernandes 9e13eb060d Classic approach results 2026-04-23 17:23:57 +00:00
Johnny Fernandes ea6e66b16c Classic approach results 2026-04-23 12:43:47 +00:00
Johnny Fernandes ffbfaa3977 A more classical approach 2026-04-23 11:51:52 +01:00
16 changed files with 2468 additions and 438 deletions
+20 -1
View File
@@ -1,2 +1,21 @@
# Stuff # Stuff
_example/ #_example/
.claude/
# Python
__pycache__/
# Training
training/**/events.out.tfevents.*
training/**/checkpoints/
training/runs/**
!training/runs/.gitkeep
# Controller runtime artefacts
controllers/shepherd_dog_rl/debug*.csv
controllers/shepherd_dog_rl/debug_out*/
controllers/shepherd_dog_rl/final_model*.zip
controllers/shepherd_dog_rl/vecnorm*.pkl
# Optional env parity debug
dog_debug.csv
+20
View File
@@ -133,6 +133,17 @@ while robot.step(timestep) != -1:
fx, fy = 0.0, 0.0 fx, fy = 0.0, 0.0
# Repel unpenned sheep from the exterior of the pen's side walls so they
# don't get pinned by flee forces. Only fires when strictly outside the pen
# (x < PEN_X_MIN or x > PEN_X_MAX) at pen height (y in pen y-range).
# Entrance is open on the north (y > PEN_Y_MAX) — no force there.
PEN_EXT_MARGIN = 0.8
if not penned and PEN_Y_MIN < y < PEN_Y_MAX:
if PEN_X_MIN - PEN_EXT_MARGIN < x < PEN_X_MIN:
fx -= ((x - (PEN_X_MIN - PEN_EXT_MARGIN)) / PEN_EXT_MARGIN) * 6.0
if PEN_X_MAX < x < PEN_X_MAX + PEN_EXT_MARGIN:
fx += ((PEN_X_MAX + PEN_EXT_MARGIN - x) / PEN_EXT_MARGIN) * 6.0
if penned: if penned:
# Inside pen: wander freely, strong boundary forces prevent exit, # Inside pen: wander freely, strong boundary forces prevent exit,
# separation still active to avoid collisions with other penned sheep. # separation still active to avoid collisions with other penned sheep.
@@ -204,6 +215,15 @@ while robot.step(timestep) != -1:
fx += math.cos(wander_angle) * 0.5 fx += math.cos(wander_angle) * 0.5
fy += math.sin(wander_angle) * 0.5 fy += math.sin(wander_angle) * 0.5
# Hard-stop clamp: within 0.5 m of a wall, zero any force component that
# would push further into it. Prevents the flee force from pinning a sheep
# against the boundary when the dog approaches from outside.
HS = 0.5
if x < X_MIN + HS and fx < 0: fx = 0.0
if x > X_MAX - HS and fx > 0: fx = 0.0
if y < Y_MIN + HS and fy < 0: fy = 0.0
if y > Y_MAX - HS and fy > 0: fy = 0.0
heading = math.atan2(fy, fx) heading = math.atan2(fy, fx)
mag = math.hypot(fx, fy) mag = math.hypot(fx, fy)
speed = max(WANDER_SPEED, min(FLEE_SPEED, mag * 3.0)) speed = max(WANDER_SPEED, min(FLEE_SPEED, mag * 3.0))
Binary file not shown.
+153
View File
@@ -0,0 +1,153 @@
"""
Render Webots-side debug trajectory from debug.csv.
The shepherd_dog_rl controller writes per-step state to debug.csv when
DOG_DEBUG=1. This script reads it and produces:
trajectory.png — dog path + sheep paths overlaid on the field
obs_drift.png — normalized observation distribution over time
actions.png — vx, vy time series
Run:
python plot_debug.py # uses debug.csv next to this file
python plot_debug.py --csv path/to.csv --out-dir somewhere/
"""
import argparse
import csv
import os
import sys
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
def load_csv(path):
rows = []
with open(path) as f:
rd = csv.DictReader(f)
for r in rd:
rows.append(r)
if not rows:
sys.exit(f"empty CSV: {path}")
return rows
def parse_floats(s):
return [float(x) for x in s.split(";") if x]
def plot_trajectory(rows, out_path):
fig, ax = plt.subplots(figsize=(7, 7))
ax.set_xlim(-16, 16); ax.set_ylim(-16, 16); ax.set_aspect("equal")
ax.set_facecolor("#dcedc8")
ax.add_patch(mpatches.Rectangle((-15, -15), 30, 30,
fill=False, edgecolor="#795548", lw=2))
ax.add_patch(mpatches.Rectangle((10, -15), 3, 7,
facecolor="#ffe082", edgecolor="#795548", lw=2))
ax.text(11.5, -11.5, "pen", ha="center", va="center", fontsize=8)
dog_x = [float(r["dog_x"]) for r in rows]
dog_y = [float(r["dog_y"]) for r in rows]
ax.plot(dog_x, dog_y, color="#4e342e", lw=1.5, alpha=0.7, label="dog")
ax.plot(dog_x[0], dog_y[0], "s", color="#4e342e", ms=10)
ax.plot(dog_x[-1], dog_y[-1], "D", color="#4e342e", ms=10)
# Sheep — re-shape into per-sheep tracks
sx_all = [parse_floats(r["sheep_xs"]) for r in rows]
sy_all = [parse_floats(r["sheep_ys"]) for r in rows]
if sx_all and sx_all[-1]:
n_sheep = len(sx_all[-1])
palette = ["#e41a1c","#377eb8","#4daf4a","#984ea3","#ff7f00",
"#a65628","#f781bf","#999999","#66c2a5","#fc8d62"]
for i in range(n_sheep):
xs = [r[i] if i < len(r) else None for r in sx_all]
ys = [r[i] if i < len(r) else None for r in sy_all]
xs = [x for x in xs if x is not None]
ys = [y for y in ys if y is not None]
if xs:
c = palette[i % len(palette)]
ax.plot(xs, ys, color=c, lw=0.8, alpha=0.6, label=f"sheep {i+1}")
ax.plot(xs[0], ys[0], "o", color=c, ms=6)
ax.plot(xs[-1], ys[-1], "*", color=c, ms=10)
n_in_pen = int(rows[-1]["n_penned"])
ax.set_title(f"Webots trajectory {len(rows)} steps penned={n_in_pen}",
fontsize=12)
ax.legend(loc="upper left", fontsize=7, ncol=2)
plt.tight_layout()
fig.savefig(out_path, dpi=120)
plt.close(fig)
def plot_actions(rows, out_path):
t = np.arange(len(rows))
vx = np.array([float(r["vx"]) for r in rows])
vy = np.array([float(r["vy"]) for r in rows])
mag = np.sqrt(vx ** 2 + vy ** 2)
fig, axes = plt.subplots(3, 1, figsize=(12, 7), sharex=True)
axes[0].plot(t, vx, color="tab:red", lw=0.8); axes[0].set_ylabel("vx")
axes[0].axhline(0, color="black", lw=0.4); axes[0].set_ylim(-1.1, 1.1)
axes[1].plot(t, vy, color="tab:blue", lw=0.8); axes[1].set_ylabel("vy")
axes[1].axhline(0, color="black", lw=0.4); axes[1].set_ylim(-1.1, 1.1)
axes[2].plot(t, mag, color="tab:purple", lw=0.8); axes[2].set_ylabel("||action||")
axes[2].axhline(np.sqrt(2), color="orange", ls="--", lw=1, label="saturated √2")
axes[2].axhline(1.0, color="gray", ls="--", lw=1)
axes[2].set_xlabel("step"); axes[2].legend(fontsize=8)
fig.suptitle("Webots action time series")
plt.tight_layout()
fig.savefig(out_path, dpi=120)
plt.close(fig)
def plot_obs(rows, out_path):
norm = np.array([parse_floats(r["norm_obs"]) for r in rows])
raw = np.array([parse_floats(r["raw_obs"]) for r in rows])
if norm.size == 0:
return
n_dims = norm.shape[1]
labels = [
"dog_x", "dog_y", "com-dog_x", "com-dog_y",
"far1-com_x", "far1-com_y", "far2-com_x", "far2-com_y",
"far3-com_x", "far3-com_y", "pen-com_x", "pen-com_y",
"pen-far1_x", "pen-far1_y", "radius", "frac_active",
][:n_dims]
t = np.arange(norm.shape[0])
fig, axes = plt.subplots(n_dims, 1, figsize=(11, 1.0 * n_dims), sharex=True)
if n_dims == 1: axes = [axes]
for i in range(n_dims):
axes[i].plot(t, raw[:, i], color="tab:gray", lw=0.6, alpha=0.6, label="raw")
axes[i].plot(t, norm[:, i], color="tab:red", lw=0.8, label="normalised")
axes[i].set_ylabel(labels[i], fontsize=8)
axes[i].tick_params(labelsize=7)
if i == 0:
axes[i].legend(fontsize=7, loc="upper right")
axes[-1].set_xlabel("step")
fig.suptitle("Observation values over time (raw vs VecNormalize-normalised)")
plt.tight_layout()
fig.savefig(out_path, dpi=110)
plt.close(fig)
def main():
p = argparse.ArgumentParser()
here = os.path.dirname(os.path.abspath(__file__))
p.add_argument("--csv", default=os.path.join(here, "debug.csv"))
p.add_argument("--out-dir", default=os.path.join(here, "debug_out"))
args = p.parse_args()
rows = load_csv(args.csv)
os.makedirs(args.out_dir, exist_ok=True)
print(f"loaded {len(rows)} rows from {args.csv}")
plot_trajectory(rows, os.path.join(args.out_dir, "trajectory.png"))
plot_actions(rows, os.path.join(args.out_dir, "actions.png"))
plot_obs(rows, os.path.join(args.out_dir, "obs.png"))
print(f"saved trajectory.png + actions.png + obs.png to {args.out_dir}/")
if __name__ == "__main__":
main()
@@ -0,0 +1,285 @@
"""
Shepherd Dog RL controller — runs a trained SB3 PPO policy inside Webots.
Setup
-----
1. Copy your trained files into this directory:
controllers/shepherd_dog_rl/final_model.zip
controllers/shepherd_dog_rl/vecnorm.pkl
2. In field.wbt, set the ShepherdDog robot's controller field to
"shepherd_dog_rl". You can do this in the Webots GUI:
click the robot → Controller → shepherd_dog_rl
3. Optional: set controllerArgs to ["5"] (number of sheep) if it differs
from the default of 5.
The controller reads GPS (dog position) and Receiver (sheep broadcasts),
builds the same 16-dim flock observation the training env used, normalises
it with the saved VecNormalize stats, and converts the (vx, vy) policy
output into differential wheel speeds.
Debug logging
-------------
Set env var DOG_DEBUG=1 to write a per-step CSV (dog pos, sheep positions,
raw obs, normalised obs, action) to debug.csv alongside this script. Use
plot_debug.py to render trajectories from it.
"""
import sys
import os
import math
import struct
import numpy as np
# ── make training code importable ───────────────────────────────────────────
_HERE = os.path.dirname(os.path.abspath(__file__))
_TRAINING = os.path.join(_HERE, "..", "..", "training")
sys.path.insert(0, _TRAINING)
from controller import Robot
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from herding_env import HerdingEnv
# ── constants (must match herding_env.py) ───────────────────────────────────
FIELD = 15.0
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
DOG_SPEED = 2.5 # m/s
WHEEL_R = 0.038 # wheel radius (metres) — from ShepherdDog.proto
K_TURN = 4.0 # heading-error gain (rad/s per rad)
EAR_AMPLITUDE = 0.35
EAR_RATE = 8.0
# ── model paths ─────────────────────────────────────────────────────────────
MODEL_PATH = os.path.join(_HERE, "final_model.zip")
VECNORM_PATH = os.path.join(_HERE, "vecnorm.pkl")
DEBUG_CSV = os.path.join(_HERE, "debug.csv")
DEBUG_ENABLED = True # set False to disable debug.csv logging
# ── action smoothing ─────────────────────────────────────────────────────────
# EMA on policy output to suppress the rapid oscillation (vx/vy flipping
# between -1 and +1 every step) that stalls the physical dog. 0 = no
# smoothing (raw policy), 1 = frozen. 0.3 keeps ~30% of previous action.
ACTION_SMOOTH = 0.3
prev_action = np.zeros(2, dtype=np.float32)
def norm_angle(a: float) -> float:
while a > math.pi: a -= 2 * math.pi
while a < -math.pi: a += 2 * math.pi
return a
def in_pen(x: float, y: float) -> bool:
return PEN_X[0] < x < PEN_X[1] and PEN_Y[0] < y < PEN_Y[1]
def build_obs(dog_pos: np.ndarray,
sheep_dict: dict,
n_sheep: int,
dog_heading: float = 0.0) -> np.ndarray:
"""
Build the 18-dim flock observation — identical to HerdingEnv._obs().
sheep_dict: {name: (x, y)} for ALL known sheep (penned or not).
dog_heading: dog's current world-frame heading in radians.
"""
D = 2 * FIELD
# Split active vs penned
active_pos = np.array(
[v for v in sheep_dict.values() if not in_pen(*v)],
dtype=np.float32
)
n_active = len(active_pos)
if n_active > 0:
com = active_pos.mean(axis=0)
d_from_com = np.linalg.norm(active_pos - com, axis=1)
sorted_idx = np.argsort(d_from_com)[::-1]
radius = float(d_from_com[sorted_idx[0]])
def nth(n):
return active_pos[sorted_idx[n]] if len(sorted_idx) > n else com
far1, far2, far3 = nth(0), nth(1), nth(2)
else:
com = PEN_CENTER.copy()
radius = 0.0
far1 = far2 = far3 = PEN_CENTER.copy()
frac_active = n_active / max(n_sheep, 1)
return np.array([
dog_pos[0] / FIELD, dog_pos[1] / FIELD,
(com[0] - dog_pos[0]) / D, (com[1] - dog_pos[1]) / D,
(far1[0] - com[0]) / D, (far1[1] - com[1]) / D,
(far2[0] - com[0]) / D, (far2[1] - com[1]) / D,
(far3[0] - com[0]) / D, (far3[1] - com[1]) / D,
(PEN_CENTER[0] - com[0]) / D, (PEN_CENTER[1] - com[1]) / D,
(PEN_CENTER[0] - far1[0]) / D, (PEN_CENTER[1] - far1[1]) / D,
radius / D,
frac_active,
math.cos(dog_heading), math.sin(dog_heading),
], dtype=np.float32)
# ── Webots setup ─────────────────────────────────────────────────────────────
robot = Robot()
timestep = int(robot.getBasicTimeStep())
# Drive motors
left_motor = robot.getDevice("left wheel motor")
right_motor = robot.getDevice("right wheel motor")
left_motor.setPosition(float("inf"))
right_motor.setPosition(float("inf"))
left_motor.setVelocity(0.0)
right_motor.setVelocity(0.0)
MOTOR_MAX = left_motor.getMaxVelocity()
# Sensors
gps = robot.getDevice("gps"); gps.enable(timestep)
compass = robot.getDevice("compass"); compass.enable(timestep)
receiver = robot.getDevice("receiver"); receiver.enable(timestep)
emitter = robot.getDevice("emitter")
# Cosmetic
left_ear = robot.getDevice("left ear motor")
right_ear = robot.getDevice("right ear motor")
left_ear.setPosition(float("inf")); right_ear.setPosition(float("inf"))
left_ear.setVelocity(0.0); right_ear.setVelocity(0.0)
ear_phase = 0.0
# Number of sheep (from controllerArgs or default)
try:
n_sheep = int(sys.argv[1])
except (IndexError, ValueError):
n_sheep = 3
# ── Load model ───────────────────────────────────────────────────────────────
print(f"[RL dog] Loading model from {MODEL_PATH}")
print(f"[RL dog] Loading vecnorm from {VECNORM_PATH}")
dummy_env = DummyVecEnv([lambda: HerdingEnv(n_sheep=n_sheep)])
vecnorm = VecNormalize.load(VECNORM_PATH, dummy_env)
vecnorm.training = False
vecnorm.norm_reward = False
model = PPO.load(MODEL_PATH, device="cpu")
print(f"[RL dog] Model loaded — running with n_sheep={n_sheep}")
# ── Runtime state ─────────────────────────────────────────────────────────────
sheep_positions: dict = {} # {name: (x, y)} — updated every step from receiver
step_count = 0
# Debug CSV — written every step when DOG_DEBUG=1
debug_file = None
if DEBUG_ENABLED:
import csv
debug_file = open(DEBUG_CSV, "w", newline="")
debug_writer = csv.writer(debug_file)
debug_writer.writerow([
"step", "dog_x", "dog_y", "heading",
"sheep_xs", "sheep_ys", "n_active", "n_penned",
"raw_obs", "norm_obs", "vx", "vy",
])
print(f"[RL dog] DEBUG logging to {DEBUG_CSV}")
def bearing() -> float:
"""Current robot heading in world frame (radians)."""
n = compass.getValues()
return math.atan2(n[0], n[1])
def drive(action_vx: float, action_vy: float) -> None:
"""Convert (vx, vy) policy action to differential wheel speeds."""
speed_ms = math.sqrt(action_vx ** 2 + action_vy ** 2) * DOG_SPEED
if speed_ms < 0.05:
left_motor.setVelocity(0.0)
right_motor.setVelocity(0.0)
return
target_heading = math.atan2(action_vy, action_vx)
err = norm_angle(target_heading - bearing())
fwd_ms = speed_ms * max(0.0, math.cos(err))
fwd_rad = fwd_ms / WHEEL_R
turn = K_TURN * err # rad/s correction
l = max(-MOTOR_MAX, min(MOTOR_MAX, fwd_rad - turn))
r = max(-MOTOR_MAX, min(MOTOR_MAX, fwd_rad + turn))
left_motor.setVelocity(l)
right_motor.setVelocity(r)
# ── Main loop ─────────────────────────────────────────────────────────────────
while robot.step(timestep) != -1:
step_count += 1
# 1. Drain receiver — update sheep position table
while receiver.getQueueLength() > 0:
try:
msg = receiver.getString()
parts = msg.split(":")
if parts[0] == "sheep" and len(parts) == 4:
sheep_positions[parts[1]] = (float(parts[2]), float(parts[3]))
except Exception:
pass
receiver.nextPacket()
# 2. Dog GPS
gps_vals = gps.getValues()
dog_pos = np.array([gps_vals[0], gps_vals[1]], dtype=np.float32)
# 3. Build and normalise observation (heading from compass)
raw_obs = build_obs(dog_pos, sheep_positions, n_sheep,
dog_heading=bearing())
obs_norm = vecnorm.normalize_obs(raw_obs[np.newaxis]) # (1, 13)
# 4. Policy inference + smoothing
action, _ = model.predict(obs_norm, deterministic=True)
raw_a = np.array([float(action[0][0]), float(action[0][1])], dtype=np.float32)
if ACTION_SMOOTH > 0:
smoothed = ACTION_SMOOTH * prev_action + (1.0 - ACTION_SMOOTH) * raw_a
prev_action[:] = smoothed
vx, vy = float(smoothed[0]), float(smoothed[1])
else:
vx, vy = float(raw_a[0]), float(raw_a[1])
# 5. Drive
drive(vx, vy)
# 6. Broadcast dog position so sheep can compute flee forces
emitter.send(f"dog:{dog_pos[0]:.4f}:{dog_pos[1]:.4f}")
# 7. Ear animation
ear_phase += 0.12
ep = EAR_AMPLITUDE * math.sin(ear_phase)
left_ear.setVelocity(EAR_RATE); right_ear.setVelocity(EAR_RATE)
left_ear.setPosition( ep); right_ear.setPosition(-ep)
# Periodic status
if step_count % 100 == 0:
n_in_pen = sum(1 for x, y in sheep_positions.values() if in_pen(x, y))
print(f"[RL dog] step={step_count} known_sheep={len(sheep_positions)}"
f" penned={n_in_pen}/{n_sheep} dog=({dog_pos[0]:.2f},{dog_pos[1]:.2f})"
f" action=({vx:.2f}, {vy:.2f})")
# Debug CSV row
if debug_file is not None:
n_active = sum(1 for x, y in sheep_positions.values() if not in_pen(x, y))
n_in_pen = len(sheep_positions) - n_active
debug_writer.writerow([
step_count, f"{dog_pos[0]:.4f}", f"{dog_pos[1]:.4f}",
f"{bearing():.4f}",
";".join(f"{v[0]:.3f}" for v in sheep_positions.values()),
";".join(f"{v[1]:.3f}" for v in sheep_positions.values()),
n_active, n_in_pen,
";".join(f"{x:.4f}" for x in raw_obs),
";".join(f"{x:.4f}" for x in obs_norm[0]),
f"{vx:.4f}", f"{vy:.4f}",
])
if step_count % 200 == 0:
debug_file.flush()
Binary file not shown.
+14
View File
@@ -0,0 +1,14 @@
{
"W_PER_SHEEP": 2.0,
"W_ALIGN": 0.05,
"W_PEN_BONUS": 10.0,
"W_COMPLETE": 100.0,
"W_STEP_COST": 0.02,
"W_COMPACT": 0.0,
"W_WALL_TOUCH": 0.0,
"WALL_TOUCH_BUFFER": 0.4,
"ALIGN_SHAPE": "standoff",
"ALIGN_GATED": true,
"ENTRY_AWARE": true,
"ent_coef": 0.02
}
-143
View File
@@ -1,143 +0,0 @@
"""
Evaluation script for a trained herding policy.
Runs N episodes and reports the three project metrics:
1. Success rate — fraction of episodes where all sheep are penned
2. Time-to-pen — mean steps across successful episodes (per sheep)
3. Flock dispersion — mean pairwise distance among active sheep, averaged
over all timesteps (lower = tighter herding)
Usage
-----
python evaluate.py --model runs/ppo_herding/best_model/best_model.zip \
--vecnorm runs/ppo_herding/vecnorm.pkl \
--n-sheep 5 --episodes 100
Add --render to watch the first episode in a matplotlib window.
"""
import argparse
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from herding_env import HerdingEnv
def make_single_env(n_sheep: int, max_steps: int, render_mode: str = None):
def _init():
return HerdingEnv(n_sheep=n_sheep, max_steps=max_steps,
render_mode=render_mode)
return _init
def pairwise_mean(positions: np.ndarray, n_active: int) -> float:
"""Mean pairwise distance among the first n_active sheep."""
if n_active < 2:
return 0.0
pts = positions[:n_active]
dists = []
for i in range(n_active):
for j in range(i + 1, n_active):
dists.append(float(np.linalg.norm(pts[i] - pts[j])))
return float(np.mean(dists))
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--model", required=True,
help="Path to saved model .zip")
p.add_argument("--vecnorm", default=None,
help="Path to VecNormalize stats .pkl (optional)")
p.add_argument("--n-sheep", type=int, default=1)
p.add_argument("--episodes", type=int, default=50)
p.add_argument("--max-steps", type=int, default=2000)
p.add_argument("--render", action="store_true",
help="Render first episode in matplotlib")
p.add_argument("--seed", type=int, default=42)
return p.parse_args()
def main():
args = parse_args()
render_mode = "human" if args.render else None
raw_env = DummyVecEnv([make_single_env(args.n_sheep, args.max_steps,
render_mode)])
if args.vecnorm:
env = VecNormalize.load(args.vecnorm, raw_env)
env.training = False
env.norm_reward = False
else:
env = raw_env
model = PPO.load(args.model, env=env)
successes = []
steps_to_pen = [] # steps for successful episodes
dispersions = [] # per-episode mean flock dispersion
for ep in range(args.episodes):
obs = env.reset()
done = False
ep_steps = 0
ep_dispersion = []
first_ep = ep == 0
while not done:
action, _ = model.predict(obs, deterministic=True)
obs, _, dones, infos = env.step(action)
done = dones[0]
ep_steps += 1
# Access the underlying HerdingEnv for dispersion calculation
inner = env.envs[0] if hasattr(env, "envs") else env.venv.envs[0]
if not inner.penned[:inner.n_sheep].all():
ep_dispersion.append(
pairwise_mean(inner.sheep_pos, inner.n_sheep)
)
if first_ep and render_mode == "human":
pass # render() is called inside step()
info = infos[0]
n_penned = info.get("n_penned", 0)
n_sheep = info.get("n_sheep", args.n_sheep)
success = n_penned == n_sheep
successes.append(int(success))
if success:
steps_to_pen.append(ep_steps / n_sheep)
if ep_dispersion:
dispersions.append(float(np.mean(ep_dispersion)))
if (ep + 1) % 10 == 0:
print(f" Episode {ep + 1:>4}/{args.episodes} "
f"success={int(success)} steps={ep_steps}")
env.close()
# -----------------------------------------------------------------------
# Report
# -----------------------------------------------------------------------
success_rate = float(np.mean(successes))
mean_ttp = float(np.mean(steps_to_pen)) if steps_to_pen else float("nan")
mean_disp = float(np.mean(dispersions)) if dispersions else float("nan")
print("\n" + "=" * 50)
print(f" Model : {args.model}")
print(f" Sheep : {args.n_sheep}")
print(f" Episodes : {args.episodes}")
print("-" * 50)
print(f" Success rate : {success_rate * 100:.1f}%"
f" ({sum(successes)}/{args.episodes})")
print(f" Time-to-pen : {mean_ttp:.1f} steps/sheep"
f" (successful episodes only)")
print(f" Flock dispersion: {mean_disp:.2f} m"
f" (mean pairwise distance while active)")
print("=" * 50)
if __name__ == "__main__":
main()
+535 -115
View File
@@ -10,12 +10,16 @@ Coordinate system matches the Webots world file:
field : x ∈ [-15, 15], y ∈ [-15, 15] field : x ∈ [-15, 15], y ∈ [-15, 15]
pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north) pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north)
Observation is always sized for MAX_SHEEP (currently 5) regardless of Observation (16-dim, fixed regardless of n_sheep):
how many sheep are active. Inactive slots are pre-penned at the pen dog position (2), flock COM relative to dog (2), top-3 farthest active
centre with flag=1. This keeps the model input dimension fixed across sheep relative to dog (6), pen relative to COM (2), pen relative to
curriculum stages so VecNormalize statistics are preserved throughout. farthest sheep (2), flock radius (1), fraction penned (1).
Permutation-invariant by design: curriculum stages share the same obs dim
so VecNormalize statistics transfer as n_sheep advances.
""" """
import csv
import numpy as np import numpy as np
import gymnasium as gym import gymnasium as gym
from gymnasium import spaces from gymnasium import spaces
@@ -27,22 +31,51 @@ class HerdingEnv(gym.Env):
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
# World constants — must match Webots world file # World constants — must match Webots world file
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
MAX_SHEEP = 5 MAX_SHEEP = 10
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD] FIELD = 15.0 # field wall geometry in world file
PEN_X = (10.0, 13.0) # quarantine pen x bounds SHEEP_WALL_INNER = 14.5 # sheep.py wall checks use ±14.5
PEN_Y = (-15.0, -8.0) # quarantine pen y bounds PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32) PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
PEN_ENTRY = np.array([11.5, -8.0], dtype=np.float32) # north entrance face center
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
# Dynamics — calibrated to match Webots robot specs # Dynamics — calibrated to match Webots robot specs
# wheel radius 0.031 m; sheep FLEE_SPEED 20 rad/s → 0.62 m/s
# wheel radius 0.038 m; dog maxVelocity 70 rad/s → 2.66 m/s
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
DOG_SPEED = 2.5 # m/s DOG_SPEED = 2.5 # m/s
SHEEP_FLEE_V = 0.65 # m/s SHEEP_FLEE_V = 0.62 # m/s (20 rad/s * 0.031 m wheel radius in sheep.py)
SHEEP_WANDER_V = 0.20 # m/s SHEEP_WANDER_V = 0.093 # m/s (3 rad/s * 0.031 m wheel radius in sheep.py)
DT = 0.1 # seconds per step DT = 0.1 # seconds per step
# Differential-drive dog dynamics — mirrors shepherd_dog_rl.py drive():
# speed_ms = ||a|| * DOG_SPEED
# err = wrap(target_heading - heading)
# fwd_ms = speed_ms * max(0, cos(err))
# fwd_rad = fwd_ms / DOG_WHEEL_R
# turn = DOG_K_TURN * err
# l = clamp(fwd_rad - turn), r = clamp(fwd_rad + turn)
# Then integrated as unicycle kinematics using wheel geometry.
DOG_K_TURN = 4.0 # rad/s per rad (matches Webots controller)
DOG_WHEEL_R = 0.038 # m (ShepherdDog.proto wheel radius)
DOG_AXLE_TRACK = 0.28 # m (wheel anchors at y=±0.14 in proto)
DOG_MOTOR_MAX = 70.0 # rad/s (ShepherdDog.proto motor maxVelocity)
DOG_STOP_THRESHOLD = 0.05 # ||action|| below this → dog stops in place
# Differential-drive sheep dynamics — mirrors sheep.py drive():
SHEEP_K_TURN = 4.0 # rad/s per rad heading error (sheep.py k=4.0)
SHEEP_WHEEL_R = 0.031 # m (Sheep.proto wheel radius)
SHEEP_AXLE_TRACK = 0.20 # m (wheel anchors at y=+/-0.10 in proto)
SHEEP_MOTOR_MAX = 22.0 # rad/s (sheep.py MAX_SPEED clamp)
# Sub-stepping: 6 x ~16.7ms ≈ 100ms per env step (Webots basicTimeStep=16ms)
N_SUBSTEPS = 6
# Peer communication lag — sheep broadcast every 3 Webots steps
PEER_BROADCAST_INTERVAL = 3
# Action smoothing EMA alpha; 0 = disabled (smoothing applied at Webots inference)
ACTION_SMOOTH = 0.0
# Boid parameters — identical to sheep.py # Boid parameters — identical to sheep.py
FLEE_DIST = 7.0 FLEE_DIST = 7.0
SEPARATION_DIST = 2.5 SEPARATION_DIST = 2.5
@@ -50,28 +83,62 @@ class HerdingEnv(gym.Env):
WALL_MARGIN = 3.5 WALL_MARGIN = 3.5
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
# Reward weights # Reward weights (simple per-sheep progress — no phases, no gating)
# ----------------------------------------------------------------------- # -----------------------------------------------------------------------
W_ALIGN = 0.4 # dense: dog on anti-pen side of each active sheep W_PER_SHEEP = 2.0 # progress: sum of per-sheep distance-to-pen reductions
W_SHAPING = 0.5 # dense: mean sheep distance to pen W_ALIGN = 0.05 # gated on action magnitude — dog only earns it when moving.
W_APPROACH = 0.1 # dense: dog within flee range of nearest sheep # Without gating this created a sit-still trap from n_sheep≥2.
W_PEN_BONUS = 5.0 # sparse: per sheep successfully penned W_PEN_BONUS = 10.0 # per sheep penned
W_COMPLETE = 20.0 # bonus when ALL active sheep are penned W_COMPLETE = 100.0 # all sheep penned
W_STEP_COST = 0.002 # penalty per step (encourages efficiency) W_STEP_COST = 0.02 # time penalty — strong enough to punish doing nothing
W_SOUTH = 0.01 # per-sheep per-metre penalty for active sheep below the pen
# entrance (y < PEN_Y[1]=-8). Keeps the dog from letting
# sheep drift into the dead zone below the open face where
# they must reverse direction (north) to enter — hard to
# recover. 0.01 ≈ half step_cost per metre below per sheep.
W_COMPACT = 0.0 # reward for flock-radius reduction (off by default)
W_WALL_TOUCH = 0.01 # per-sheep max penalty at wall surface. Linear ramp
# within WALL_TOUCH_BUFFER. Covers field outer walls and
# pen W/E/S walls. Kept small (≈ step_cost/2) so it
# nudges away from walls without dominating progress.
WALL_TOUCH_BUFFER = 0.4 # metres from wall where penalty starts ramping
ALIGN_SHAPE = "standoff" # "standoff" (peaks at IDEAL) | "near" (peaks at 0)
ALIGN_GATED = True # gate alignment on action magnitude
ENTRY_AWARE = False # When True, targets PEN_ENTRY (entrance face) instead
# of PEN_CENTER for progress/obs. Intended to fix wall-
# corralling but collapsed n_sheep≥2 success rate.
# The wall-touch gradient penalty handles wall avoidance
# without breaking the core herding signal.
# Initial sheep spawn: first sheep placed anywhere; rest within CLUSTER_RADIUS
# of it. Set to None for legacy uniform-scatter behaviour.
# Cluster radius ≤ COHESION_DIST (8m) so boid cohesion keeps the flock together.
INIT_CLUSTER_RADIUS = 5.0
def __init__(self, n_sheep: int = 1, max_steps: int = 2000, def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
render_mode: str = None): render_mode: str = None, random_n_sheep: bool = False,
reward_cfg: dict = None):
super().__init__() super().__init__()
assert 1 <= n_sheep <= self.MAX_SHEEP assert 1 <= n_sheep <= self.MAX_SHEEP
self.n_sheep = n_sheep self.n_sheep = n_sheep
self.max_steps = max_steps self.max_steps = max_steps
self.render_mode = render_mode self.render_mode = render_mode
self.random_n_sheep = random_n_sheep # if True, randomise n_sheep each reset
# Observation: dog(x,y) + MAX_SHEEP×sheep(x,y) + MAX_SHEEP×penned # Override class-default reward weights / shape with per-instance config
# Fixed size across all curriculum stages. # so sweeps can ship configs into subprocess envs via pickled make_env.
obs_dim = 2 + 2 * self.MAX_SHEEP + self.MAX_SHEEP if reward_cfg:
for k, v in reward_cfg.items():
if not hasattr(self.__class__, k):
raise ValueError(f"unknown reward_cfg key: {k}")
setattr(self, k, v)
# Fixed 18-dim observation regardless of n_sheep:
# dog_pos(2) + rel_com(2) + rel_far1(2) + rel_far2(2) + rel_far3(2)
# + com_to_pen(2) + far1_to_pen(2) + radius(1) + frac_penned(1)
# + cos(heading)(1) + sin(heading)(1) ← new, for wheeled dynamics
self.observation_space = spaces.Box( self.observation_space = spaces.Box(
low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32 low=-np.inf, high=np.inf, shape=(18,), dtype=np.float32
) )
# Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED # Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED
@@ -82,12 +149,26 @@ class HerdingEnv(gym.Env):
# Runtime state (populated by reset) # Runtime state (populated by reset)
self._step_count = 0 self._step_count = 0
self._prev_penned = 0 self._prev_penned = 0
self._prev_pen_dist_sum = 0.0
self.dog_pos = np.zeros(2, dtype=np.float32) self.dog_pos = np.zeros(2, dtype=np.float32)
self.dog_heading = 0.0 # radians, world frame
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32) self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self.sheep_heading = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self.penned = np.ones(self.MAX_SHEEP, dtype=bool) self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32) self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._delayed_sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self._prev_action = np.zeros(2, dtype=np.float32)
self._fig = None # lazy matplotlib figure self._fig = None
# Differential-drive debug CSV for sim/Webots parity checks.
# Always on by design.
self._dog_debug_file = open("dog_debug.csv", "w", newline="")
self._dog_debug_writer = csv.writer(self._dog_debug_file)
self._dog_debug_writer.writerow([
"step", "act_x", "act_y", "act_mag", "heading", "target_heading",
"heading_err", "fwd_speed", "left_w", "right_w", "v", "w",
"dog_x", "dog_y",
])
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Curriculum interface # Curriculum interface
@@ -107,28 +188,46 @@ class HerdingEnv(gym.Env):
self._step_count = 0 self._step_count = 0
self._prev_penned = 0 self._prev_penned = 0
if self.random_n_sheep:
self.n_sheep = int(self.np_random.integers(1, self.MAX_SHEEP + 1))
# Active sheep (0 .. n_sheep-1): random non-pen positions # Active sheep (0 .. n_sheep-1): random non-pen positions
self.sheep_pos[:] = self.PEN_CENTER self.sheep_pos[:] = self.PEN_CENTER
self.penned[:] = True self.penned[:] = True
# Spawn first sheep anywhere; subsequent sheep clustered around it
# so boid cohesion (active within 8m) keeps the flock together.
# Without clustering, sheep can start 25m apart and never coalesce —
# task becomes intractable for n_sheep ≥ 2.
placed = 0 placed = 0
cluster_center = None
radius = self.INIT_CLUSTER_RADIUS
while placed < self.n_sheep: while placed < self.n_sheep:
if placed == 0 or radius is None:
p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32) p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32)
else:
offset = self.np_random.uniform(-radius, radius, size=(2,))
p = (cluster_center + offset).astype(np.float32)
p = np.clip(p, -12.0, 12.0)
if not self._in_pen(p): if not self._in_pen(p):
self.sheep_pos[placed] = p self.sheep_pos[placed] = p
self.penned[placed] = False self.penned[placed] = False
if placed == 0:
cluster_center = p.copy()
placed += 1 placed += 1
# Dog: 50 % of the time start already on the anti-pen side of the # Dog: 50% of resets start already behind the flock (anti-pen side,
# nearest sheep (within flee range) so early training gets aligned # within flee range) to give early training aligned experiences.
# starts; the other 50 % is fully random to ensure generalisation. # Use the flock COM as the reference (not sheep[0]) so the bias
# generalizes from 1-sheep to multi-sheep without putting the dog
# in front of or inside the flock.
if self.np_random.random() < 0.5: if self.np_random.random() < 0.5:
# Place dog behind the first active sheep relative to the pen active_pts = self.sheep_pos[:self.n_sheep][~self.penned[:self.n_sheep]]
ref = self.sheep_pos[0] ref = active_pts.mean(axis=0) if len(active_pts) else self.sheep_pos[0]
away = ref - self.PEN_CENTER # sheep→anti-pen away = ref - self.PEN_CENTER
dist = float(np.linalg.norm(away)) d = float(np.linalg.norm(away))
if dist > 0.1: if d > 0.1:
away = away / dist away = away / d
offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8) offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8)
self.dog_pos = np.clip( self.dog_pos = np.clip(
(ref + offset).astype(np.float32), -self.FIELD, self.FIELD (ref + offset).astype(np.float32), -self.FIELD, self.FIELD
@@ -138,29 +237,70 @@ class HerdingEnv(gym.Env):
-self.FIELD * 0.8, self.FIELD * 0.8, size=(2,) -self.FIELD * 0.8, self.FIELD * 0.8, size=(2,)
).astype(np.float32) ).astype(np.float32)
# Inactive slots (n_sheep .. MAX_SHEEP-1): already at pen centre, penned=True # Random initial heading so the policy learns to handle any orientation.
self.dog_heading = float(self.np_random.uniform(-np.pi, np.pi))
self.sheep_heading = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
self.wander_ang = self.np_random.uniform( self.wander_ang = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,) -np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32) ).astype(np.float32)
self._delayed_sheep_pos[:self.n_sheep] = self.sheep_pos[:self.n_sheep].copy()
self._prev_action = np.zeros(2, dtype=np.float32)
# Initialise per-sheep pen-distance sum for progress reward
active = ~self.penned[:self.n_sheep]
target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
if active.any():
self._prev_pen_dist_sum = float(
np.linalg.norm(
self.sheep_pos[:self.n_sheep][active] - target, axis=1
).sum()
)
com0 = self.sheep_pos[:self.n_sheep][active].mean(axis=0)
self._prev_radius = float(
np.linalg.norm(self.sheep_pos[:self.n_sheep][active] - com0, axis=1).max()
)
else:
self._prev_pen_dist_sum = 0.0
self._prev_radius = 0.0
return self._obs(), {} return self._obs(), {}
def step(self, action): def step(self, action):
self._step_count += 1 self._step_count += 1
# Move dog — clip each axis independently so the agent can idle
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0) act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
self.dog_pos = np.clip(
self.dog_pos + act * self.DOG_SPEED * self.DT,
-self.FIELD, self.FIELD
)
# Step sheep dynamics # Action smoothing EMA — matches shepherd_dog_rl.py ACTION_SMOOTH
if self.ACTION_SMOOTH > 0:
act = self.ACTION_SMOOTH * self._prev_action + (1.0 - self.ACTION_SMOOTH) * act
self._prev_action = act.copy()
act_mag = float(np.linalg.norm(act))
sub_dt = self.DT / self.N_SUBSTEPS
dog_dbg = {
"target_heading": float(self.dog_heading),
"err": 0.0, "fwd_speed": 0.0,
"left_w": 0.0, "right_w": 0.0, "v": 0.0, "w": 0.0,
}
for _sub in range(self.N_SUBSTEPS):
# Snapshot peer positions every 3 sub-steps (mirrors sheep broadcast)
if _sub % self.PEER_BROADCAST_INTERVAL == 0:
self._delayed_sheep_pos[:self.n_sheep] = self.sheep_pos[:self.n_sheep].copy()
# Dog differential-drive sub-step
dbg = self._step_dog_substep(act, sub_dt)
if dbg["v"] != 0.0 or dbg["w"] != 0.0:
dog_dbg = dbg
# Sheep dynamics sub-step
for i in range(self.n_sheep): for i in range(self.n_sheep):
if self.penned[i]: self.sheep_pos[i] = self._step_sheep(i, sub_dt)
continue
self.sheep_pos[i] = self._step_sheep(i)
if self._in_pen(self.sheep_pos[i]): if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True self.penned[i] = True
@@ -168,10 +308,22 @@ class HerdingEnv(gym.Env):
newly_penned = n_penned - self._prev_penned newly_penned = n_penned - self._prev_penned
self._prev_penned = n_penned self._prev_penned = n_penned
reward = self._reward(n_penned, newly_penned) reward, rcomps = self._reward(n_penned, newly_penned, act)
terminated = n_penned == self.n_sheep terminated = n_penned == self.n_sheep
truncated = self._step_count >= self.max_steps truncated = self._step_count >= self.max_steps
info = {"n_penned": n_penned, "n_sheep": self.n_sheep} info = {"n_penned": n_penned, "n_sheep": self.n_sheep,
"rcomps": rcomps, "dog_dyn": dog_dbg}
self._dog_debug_writer.writerow([
self._step_count,
float(act[0]), float(act[1]), act_mag,
float(self.dog_heading), dog_dbg["target_heading"], dog_dbg["err"],
dog_dbg["fwd_speed"], dog_dbg["left_w"], dog_dbg["right_w"],
dog_dbg["v"], dog_dbg["w"],
float(self.dog_pos[0]), float(self.dog_pos[1]),
])
if self._step_count % 200 == 0:
self._dog_debug_file.flush()
if self.render_mode == "human": if self.render_mode == "human":
self.render() self.render()
@@ -188,16 +340,12 @@ class HerdingEnv(gym.Env):
ax = self._ax ax = self._ax
ax.clear() ax.clear()
ax.set_xlim(-16, 16) ax.set_xlim(-16, 16); ax.set_ylim(-16, 16)
ax.set_ylim(-16, 16) ax.set_aspect("equal"); ax.set_facecolor("#dcedc8")
ax.set_aspect("equal")
ax.set_facecolor("#dcedc8")
# Field boundary
ax.add_patch(mpatches.Rectangle( ax.add_patch(mpatches.Rectangle(
(-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2 (-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2
)) ))
# Pen
pw = self.PEN_X[1] - self.PEN_X[0] pw = self.PEN_X[1] - self.PEN_X[0]
ph = self.PEN_Y[1] - self.PEN_Y[0] ph = self.PEN_Y[1] - self.PEN_Y[0]
ax.add_patch(mpatches.Rectangle( ax.add_patch(mpatches.Rectangle(
@@ -207,21 +355,25 @@ class HerdingEnv(gym.Env):
ax.text(11.5, -11.5, "pen", ha="center", va="center", ax.text(11.5, -11.5, "pen", ha="center", va="center",
fontsize=8, color="#795548") fontsize=8, color="#795548")
# Sheep com, radius, _ = self._flock_stats()
for i in range(self.MAX_SHEEP): ax.add_patch(plt.Circle(com, radius, color="steelblue",
fill=False, linestyle="--", linewidth=1))
ax.plot(*com, "+", color="steelblue", markersize=10)
for i in range(self.n_sheep):
if i >= self.n_sheep: if i >= self.n_sheep:
continue # inactive slot — not shown continue
color = "deeppink" if self.penned[i] else "white" color = "deeppink" if self.penned[i] else "white"
ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11, ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11,
markeredgecolor="#555", markeredgewidth=1.5) markeredgecolor="#555", markeredgewidth=1.5)
# Dog
ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13, ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13,
markeredgecolor="black", markeredgewidth=1.5) markeredgecolor="black", markeredgewidth=1.5)
ax.set_title( ax.set_title(
f"step {self._step_count} | " f"step {self._step_count} | "
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep}", f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | "
f"r={radius:.1f}m",
fontsize=11 fontsize=11
) )
self._fig.canvas.draw() self._fig.canvas.draw()
@@ -233,6 +385,7 @@ class HerdingEnv(gym.Env):
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
plt.close(self._fig) plt.close(self._fig)
self._fig = None self._fig = None
self._dog_debug_file.close()
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internals # Internals
@@ -242,81 +395,322 @@ class HerdingEnv(gym.Env):
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
self.PEN_Y[0] < pos[1] < self.PEN_Y[1]) self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
def _obs(self) -> np.ndarray: def _sheep_drive(self, i: int, target_heading: float, speed_rad: float,
scale = 1.0 / self.FIELD dt: float) -> np.ndarray:
return np.concatenate([ """Differential-drive integration for sheep i over one sub-step dt.
self.dog_pos * scale, # 2
(self.sheep_pos * scale).flatten(), # 2 * MAX_SHEEP
self.penned.astype(np.float32), # MAX_SHEEP
]).astype(np.float32)
def _reward(self, n_penned: int, newly_penned: int) -> float: Mirrors sheep.py drive(): heading error -> cos(err) forward scaling ->
wheel speeds with saturation -> unicycle kinematics.
"""
heading = float(self.sheep_heading[i])
err = (target_heading - heading + np.pi) % (2 * np.pi) - np.pi
fwd_rad = speed_rad * max(0.0, float(np.cos(err)))
turn = self.SHEEP_K_TURN * err
left_w = np.clip(fwd_rad - turn, -self.SHEEP_MOTOR_MAX, self.SHEEP_MOTOR_MAX)
right_w = np.clip(fwd_rad + turn, -self.SHEEP_MOTOR_MAX, self.SHEEP_MOTOR_MAX)
v = self.SHEEP_WHEEL_R * 0.5 * (right_w + left_w)
w = (self.SHEEP_WHEEL_R / self.SHEEP_AXLE_TRACK) * (right_w - left_w)
self.sheep_heading[i] = float(
((heading + w * dt) + np.pi) % (2 * np.pi) - np.pi
)
step_vec = np.array(
[np.cos(self.sheep_heading[i]), np.sin(self.sheep_heading[i])],
dtype=np.float32
)
return (self.sheep_pos[i] + step_vec * v * dt).astype(np.float32)
def _step_dog_substep(self, act: np.ndarray, dt: float) -> dict:
"""Move the dog one sub-step with differential-drive kinematics.
Returns debug dict with wheel/velocity info.
"""
old_dog = self.dog_pos.copy()
act_mag = float(np.linalg.norm(act))
dog_dbg = {
"target_heading": float(self.dog_heading),
"err": 0.0, "fwd_speed": 0.0,
"left_w": 0.0, "right_w": 0.0, "v": 0.0, "w": 0.0,
}
if act_mag < self.DOG_STOP_THRESHOLD:
return dog_dbg
target_heading = float(np.arctan2(act[1], act[0]))
err = (target_heading - self.dog_heading + np.pi) % (2 * np.pi) - np.pi
target_speed = act_mag * self.DOG_SPEED
fwd_speed = target_speed * max(0.0, float(np.cos(err)))
fwd_rad = fwd_speed / self.DOG_WHEEL_R
turn = self.DOG_K_TURN * err
left_w = np.clip(fwd_rad - turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX)
right_w = np.clip(fwd_rad + turn, -self.DOG_MOTOR_MAX, self.DOG_MOTOR_MAX)
v = self.DOG_WHEEL_R * 0.5 * (right_w + left_w)
w = (self.DOG_WHEEL_R / self.DOG_AXLE_TRACK) * (right_w - left_w)
dog_dbg.update({
"target_heading": target_heading, "err": float(err),
"fwd_speed": float(fwd_speed), "left_w": float(left_w),
"right_w": float(right_w), "v": float(v), "w": float(w),
})
self.dog_heading = float(
((self.dog_heading + w * dt) + np.pi) % (2 * np.pi) - np.pi
)
step_vec = np.array(
[np.cos(self.dog_heading), np.sin(self.dog_heading)],
dtype=np.float32
)
new_dog = np.clip(
self.dog_pos + step_vec * v * dt, -self.FIELD, self.FIELD,
)
# Pen wall collision
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
if py0 < new_dog[1] < py1:
if old_dog[0] < px0 <= new_dog[0]:
new_dog[0] = px0 - 1e-3
elif old_dog[0] > px0 >= new_dog[0]:
new_dog[0] = px0 + 1e-3
if old_dog[0] > px1 >= new_dog[0]:
new_dog[0] = px1 + 1e-3
elif old_dog[0] < px1 <= new_dog[0]:
new_dog[0] = px1 - 1e-3
self.dog_pos = new_dog.astype(np.float32)
return dog_dbg
def _flock_stats(self):
"""Return (COM, radius, mean_dispersion) over active sheep."""
active_mask = ~self.penned[:self.n_sheep] active_mask = ~self.penned[:self.n_sheep]
if not active_mask.any():
return self.PEN_CENTER.copy(), 0.0, 0.0
pts = self.sheep_pos[:self.n_sheep][active_mask]
com = pts.mean(axis=0)
dists = np.linalg.norm(pts - com, axis=1)
return com, float(dists.max()), float(dists.mean())
def _obs(self) -> np.ndarray:
com, radius, _ = self._flock_stats()
active_mask = ~self.penned[:self.n_sheep]
if active_mask.any(): if active_mask.any():
active_pos = self.sheep_pos[:self.n_sheep][active_mask] pts = self.sheep_pos[:self.n_sheep][active_mask]
dists_pen = np.linalg.norm(active_pos - self.PEN_CENTER, axis=1) dists = np.linalg.norm(pts - com, axis=1)
dists_dog = np.linalg.norm(active_pos - self.dog_pos, axis=1) sorted_idx = np.argsort(dists)[::-1] # farthest first
# Top-3 stragglers; pad with COM when fewer active sheep exist
# Sheep-to-pen shaping def nth(n):
shaping = -(dists_pen.mean() / (2 * self.FIELD)) return pts[sorted_idx[n]] if len(sorted_idx) > n else com
far1, far2, far3 = nth(0), nth(1), nth(2)
# Approach: dog penalised for being far from nearest sheep
approach = -(dists_dog.min() / (2 * self.FIELD))
# Alignment: reward dog for being on the anti-pen side of each sheep.
# When the dog is opposite the pen relative to a sheep, that sheep
# flees toward the pen. Score ∈ [-1, 1] per sheep, weighted by
# a proximity gate so only nearby dogs count.
align_scores = []
for s_pos, d_pen, d_dog in zip(active_pos, dists_pen, dists_dog):
if d_pen < 0.1 or d_dog < 0.1:
continue
pen_dir = (self.PEN_CENTER - s_pos) / d_pen # sheep → pen
dog_dir = (self.dog_pos - s_pos) / d_dog # sheep → dog
# cos(angle): +1 → dog behind sheep, -1 → dog on pen side
cosine = -float(np.dot(pen_dir, dog_dir))
# gate: full credit inside flee range, fades beyond
proximity = max(0.0, 1.0 - d_dog / self.FLEE_DIST)
align_scores.append(cosine * proximity)
alignment = float(np.mean(align_scores)) if align_scores else 0.0
else: else:
shaping = approach = alignment = 0.0 far1 = far2 = far3 = self.PEN_CENTER.copy()
reward = shaping * self.W_SHAPING S = self.FIELD
reward += approach * self.W_APPROACH D = 2 * self.FIELD
reward += alignment * self.W_ALIGN
reward += newly_penned * self.W_PEN_BONUS
reward -= self.W_STEP_COST
if n_penned == self.n_sheep:
reward += self.W_COMPLETE
return reward
def _step_sheep(self, i: int) -> np.ndarray: # far1/far2/far3 expressed relative to COM, not dog.
"""Apply one timestep of boid dynamics to sheep i.""" # For 1 sheep: far1-COM = far2-COM = far3-COM = [0,0] → cleanly ignorable.
pos = self.sheep_pos[i].copy() # For 3+ sheep: non-zero vectors tell the dog where each straggler is
# within the group, without conflicting with weights trained on 1 sheep.
# Pen reference for the policy. Aligned with the reward target so the
# policy isn't forced to learn an implicit offset between what it sees
# ("pen is here") and what it's rewarded for ("get sheep close to here").
pen_ref = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
return np.array([
self.dog_pos[0] / S, self.dog_pos[1] / S,
(com[0] - self.dog_pos[0]) / D, (com[1] - self.dog_pos[1]) / D,
(far1[0] - com[0]) / D, (far1[1] - com[1]) / D,
(far2[0] - com[0]) / D, (far2[1] - com[1]) / D,
(far3[0] - com[0]) / D, (far3[1] - com[1]) / D,
(pen_ref[0] - com[0]) / D, (pen_ref[1] - com[1]) / D,
(pen_ref[0] - far1[0]) / D, (pen_ref[1] - far1[1]) / D,
radius / D,
active_mask.sum() / self.n_sheep,
float(np.cos(self.dog_heading)),
float(np.sin(self.dog_heading)),
], dtype=np.float32)
def _reward(self, n_penned: int, newly_penned: int, action: np.ndarray):
active = ~self.penned[:self.n_sheep]
# Per-sheep progress toward pen: fires whenever any sheep moves closer.
# Naturally rewards keeping the flock together and pushing toward pen:
# dog behind flock → all sheep flee toward pen → all contribute positive reward.
# Dog from wrong side → sheep scatter away from pen → negative reward.
target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
if active.any():
pen_dists = np.linalg.norm(
self.sheep_pos[:self.n_sheep][active] - target, axis=1
)
cur_sum = float(pen_dists.sum())
r_progress = (self._prev_pen_dist_sum - cur_sum) * self.W_PER_SHEEP
self._prev_pen_dist_sum = cur_sum
else:
r_progress = 0.0
com, _, _ = self._flock_stats()
com_dist = float(np.linalg.norm(com - target))
d_dog_com = float(np.linalg.norm(self.dog_pos - com))
if d_dog_com > 0.1 and com_dist > 0.1:
pen_dir = (target - com) / com_dist
dog_dir = (self.dog_pos - com) / d_dog_com
cosine = -float(np.dot(pen_dir, dog_dir))
if self.ALIGN_SHAPE == "standoff":
IDEAL = 0.5 * (self.SEPARATION_DIST + self.FLEE_DIST)
HALF = self.FLEE_DIST - IDEAL
proximity = max(0.0, 1.0 - abs(d_dog_com - IDEAL) / HALF)
else: # "near"
proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST)
move_gate = (min(1.0, float(np.linalg.norm(action)))
if self.ALIGN_GATED else 1.0)
alignment = cosine * proximity * move_gate * self.W_ALIGN
else:
alignment = 0.0
# Wall-touch penalty: distance-based gradient covering ALL solid surfaces
# the sheep can hit — the four field outer walls (always present) plus
# the three solid pen walls (west, east, south). Linearly ramps from 0
# at buffer edge to W_WALL_TOUCH at the wall surface. Goal: sheep should
# never end up pinned against any wall (transfer concern: Webots fences
# have pillars that can physically trap sheep).
if self.W_WALL_TOUCH and active.any():
pts = self.sheep_pos[:self.n_sheep][active]
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
F = self.FIELD
buf = self.WALL_TOUCH_BUFFER
far = buf + 1.0
# Field outer walls — sheep is always inside [-F, F]^2.
d_fw = pts[:, 0] - (-F) # distance to west field wall
d_fe = F - pts[:, 0] # east field wall
d_fs = pts[:, 1] - (-F) # south field wall
d_fn = F - pts[:, 1] # north field wall
# Pen W/E/S walls — only relevant approached from outside.
d_pw = np.where((pts[:, 0] < px0) & (pts[:, 1] > py0) & (pts[:, 1] < py1),
px0 - pts[:, 0], far)
d_pe = np.where((pts[:, 0] > px1) & (pts[:, 1] > py0) & (pts[:, 1] < py1),
pts[:, 0] - px1, far)
d_ps = np.where((pts[:, 1] < py0) & (pts[:, 0] > px0) & (pts[:, 0] < px1),
py0 - pts[:, 1], far)
d_min = np.minimum.reduce([d_fw, d_fe, d_fs, d_fn, d_pw, d_pe, d_ps])
penalties = np.maximum(0.0, 1.0 - d_min / buf) * self.W_WALL_TOUCH
r_wall_touch = -float(penalties.sum())
else:
r_wall_touch = 0.0
# South penalty: discourage active sheep from drifting below the pen
# entrance (y < PEN_Y[1]) while OUTSIDE the pen's x-range. Sheep at
# y<-8 with x∈[PEN_X] are entering through the gate — that's desired.
# The dead zone is y<-8 and x outside [PEN_X]: stuck against pen walls,
# must reverse direction (north) to reach the entrance — hard to recover.
if self.W_SOUTH and active.any():
pts = self.sheep_pos[:self.n_sheep][active]
depth = np.maximum(0.0, self.PEN_Y[1] - pts[:, 1])
outside_pen_x = (pts[:, 0] < self.PEN_X[0]) | (pts[:, 0] > self.PEN_X[1])
r_south = -float((depth * outside_pen_x).sum()) * self.W_SOUTH
else:
r_south = 0.0
# Compactness shaping: reward decreases in flock radius (active sheep only)
if self.W_COMPACT and active.any():
cur_radius = float(np.linalg.norm(
self.sheep_pos[:self.n_sheep][active] - com, axis=1
).max())
r_compact = (self._prev_radius - cur_radius) * self.W_COMPACT
self._prev_radius = cur_radius
else:
r_compact = 0.0
r_pen_bonus = newly_penned * self.W_PEN_BONUS
r_step_cost = -self.W_STEP_COST
r_complete = self.W_COMPLETE if n_penned == self.n_sheep else 0.0
reward = (r_progress + alignment + r_south + r_compact + r_wall_touch
+ r_pen_bonus + r_step_cost + r_complete)
rcomps = {
"progress": float(r_progress),
"alignment": float(alignment),
"south": float(r_south),
"compact": float(r_compact),
"wall_touch": float(r_wall_touch),
"pen_bonus": float(r_pen_bonus),
"step_cost": float(r_step_cost),
"complete": float(r_complete),
}
return reward, rcomps
def _step_sheep(self, i: int, sub_dt: float) -> np.ndarray:
"""Apply one sub-step of boid dynamics to sheep i (mirrors sheep.py)."""
old_pos = self.sheep_pos[i].copy()
pos = old_pos.copy()
fx, fy = 0.0, 0.0 fx, fy = 0.0, 0.0
if self.penned[i]:
pm = 0.8 # PEN_MARGIN in sheep.py
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
x, y = float(pos[0]), float(pos[1])
if x < px0 + pm: fx += ((px0 + pm - x) / pm) * 15.0
if x > px1 - pm: fx -= ((x - (px1 - pm)) / pm) * 15.0
if y < py0 + pm: fy += ((py0 + pm - y) / pm) * 15.0
if y > py1 - pm: fy -= ((y - (py1 - pm)) / pm) * 15.0
for j in range(self.n_sheep):
if j == i or not self.penned[j]:
continue
dv = self._delayed_sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
fx -= (dv[0] / dj) * push * 2.5
fy -= (dv[1] / dj) * push * 2.5
if self.np_random.random() < 0.02:
self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6))
fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5
force = np.array([fx, fy], dtype=np.float32)
mag = float(np.linalg.norm(force))
if mag > 0.01:
target_heading = float(np.arctan2(fy, fx))
speed_rad = max(3.0, min(20.0, mag * 3.0))
pos = self._sheep_drive(i, target_heading, speed_rad, sub_dt)
pos = np.clip(pos, -self.FIELD, self.FIELD)
return pos.astype(np.float32)
fleeing = False fleeing = False
# Flee from dog — quadratic ramp (mirrors sheep.py) # Flee from dog — quadratic ramp
diff = self.dog_pos - pos diff = self.dog_pos - pos
dist = float(np.linalg.norm(diff)) dist = float(np.linalg.norm(diff))
if 0.01 < dist < self.FLEE_DIST: if 0.01 < dist < self.FLEE_DIST:
t = 1.0 - dist / self.FLEE_DIST t = 1.0 - dist / self.FLEE_DIST
s = t * t * 5.0 s = t * t * 20.0
fx -= (diff[0] / dist) * s fx -= (diff[0] / dist) * s
fy -= (diff[1] / dist) * s fy -= (diff[1] / dist) * s
fleeing = True fleeing = True
# Separation (inverse-distance) + Cohesion # Repel unpenned sheep from pen side-wall exteriors (sheep.py PEN_EXT_MARGIN).
if self.PEN_Y[0] < pos[1] < self.PEN_Y[1]:
pem = 0.8
if self.PEN_X[0] - pem < pos[0] < self.PEN_X[0]:
fx -= ((pos[0] - (self.PEN_X[0] - pem)) / pem) * 6.0
if self.PEN_X[1] < pos[0] < self.PEN_X[1] + pem:
fx += ((self.PEN_X[1] + pem - pos[0]) / pem) * 6.0
# Separation (inverse-distance) + Cohesion — uses delayed peer positions
cx, cy, cn = 0.0, 0.0, 0 cx, cy, cn = 0.0, 0.0, 0
for j in range(self.n_sheep): for j in range(self.n_sheep):
if j == i or self.penned[j]: if j == i or self.penned[j]:
continue continue
dv = self.sheep_pos[j] - pos dv = self._delayed_sheep_pos[j] - pos
dj = float(np.linalg.norm(dv)) dj = float(np.linalg.norm(dv))
if 0.3 < dj < self.COHESION_DIST: if 0.3 < dj < self.COHESION_DIST:
cx += self.sheep_pos[j][0] cx += self._delayed_sheep_pos[j][0]
cy += self.sheep_pos[j][1] cy += self._delayed_sheep_pos[j][1]
cn += 1 cn += 1
if 0.05 < dj < self.SEPARATION_DIST: if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj push = (self.SEPARATION_DIST - dj) / dj
@@ -328,12 +722,21 @@ class HerdingEnv(gym.Env):
fy += (cy / cn - pos[1]) * w fy += (cy / cn - pos[1]) * w
# Wall avoidance # Wall avoidance
m, F = self.WALL_MARGIN, self.FIELD m, F = self.WALL_MARGIN, self.SHEEP_WALL_INNER
if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0 if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0
if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0 if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0
if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0 if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0
if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0 if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0
# Hard-stop clamp: mirrors sheep.py — zero any force driving further
# into the wall within 0.5 m so the flee force cannot pin the sheep.
HS = 0.5
if pos[0] < -F + HS and fx < 0: fx = 0.0
if pos[0] > F - HS and fx > 0: fx = 0.0
if pos[1] < -F + HS and fy < 0: fy = 0.0
if pos[1] > F - HS and fy > 0: fy = 0.0
# Wander — suppressed while fleeing # Wander — suppressed while fleeing
if not fleeing: if not fleeing:
if self.np_random.random() < 0.02: if self.np_random.random() < 0.02:
@@ -341,13 +744,30 @@ class HerdingEnv(gym.Env):
fx += float(np.cos(self.wander_ang[i])) * 0.5 fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5 fy += float(np.sin(self.wander_ang[i])) * 0.5
# Integrate # Integrate via differential-drive (mirrors sheep.py speed mapping + drive())
force = np.array([fx, fy]) force = np.array([fx, fy])
mag = float(np.linalg.norm(force)) mag = float(np.linalg.norm(force))
if mag > 0.01: if mag > 0.01:
top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V target_heading = float(np.arctan2(fy, fx))
speed = min(top_speed, mag * 0.3) speed_rad = max(3.0, min(20.0, mag * 3.0)) # sheep.py line 229
pos = np.clip(pos + (force / mag) * speed * self.DT, pos = self._sheep_drive(i, target_heading, speed_rad, sub_dt)
-self.FIELD, self.FIELD) pos = np.clip(pos, -self.FIELD, self.FIELD)
# Pen solid wall collision — mirrors Webots geometry.
px0, px1 = self.PEN_X[0], self.PEN_X[1]
py0, py1 = self.PEN_Y[0], self.PEN_Y[1]
entered_from_north = (
old_pos[1] >= py1 and pos[1] < py1 and px0 < pos[0] < px1
)
if not entered_from_north:
# Block crossing through west wall from outside
if old_pos[0] < px0 <= pos[0] and py0 < pos[1] < py1:
pos = np.array([px0 - 1e-3, pos[1]], dtype=np.float32)
# Block crossing through east wall from outside
if old_pos[0] > px1 >= pos[0] and py0 < pos[1] < py1:
pos = np.array([px1 + 1e-3, pos[1]], dtype=np.float32)
# Block crossing through south wall from outside
if old_pos[1] < py0 <= pos[1] and px0 < pos[0] < px1:
pos = np.array([pos[0], py0 - 1e-3], dtype=np.float32)
return pos.astype(np.float32) return pos.astype(np.float32)
+318
View File
@@ -0,0 +1,318 @@
"""
Parity test: verify 2D training env matches Webots controller implementations.
Tests:
1. Observation building: HerdingEnv._obs() vs shepherd_dog_rl.build_obs()
2. Dog drive: HerdingEnv._step_dog_substep() vs shepherd_dog_rl.drive() math
3. Sheep drive: HerdingEnv._sheep_drive() vs sheep.py drive() math
"""
import sys
import os
import math
import numpy as np
# Make imports work from project root
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.join(os.path.dirname(__file__)))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "controllers", "shepherd_dog_rl"))
from herding_env import HerdingEnv
# Re-implement the Webots functions standalone (no Webots dependency)
FIELD = 15.0
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
PEN_ENTRY = np.array([11.5, -8.0], dtype=np.float32)
PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
ENTRY_AWARE = True
def webots_build_obs(dog_pos, sheep_positions, n_sheep, dog_heading):
"""Standalone version of shepherd_dog_rl.py build_obs()."""
D = 2 * FIELD
active_pos = np.array(
[p for p in sheep_positions
if not (PEN_X[0] < p[0] < PEN_X[1] and PEN_Y[0] < p[1] < PEN_Y[1])],
dtype=np.float32
)
n_active = len(active_pos)
if n_active > 0:
com = active_pos.mean(axis=0)
d_from_com = np.linalg.norm(active_pos - com, axis=1)
sorted_idx = np.argsort(d_from_com)[::-1]
radius = float(d_from_com[sorted_idx[0]])
def nth(n):
return active_pos[sorted_idx[n]] if len(sorted_idx) > n else com
far1, far2, far3 = nth(0), nth(1), nth(2)
else:
com = PEN_CENTER.copy()
radius = 0.0
far1 = far2 = far3 = PEN_CENTER.copy()
frac_active = n_active / max(n_sheep, 1)
pen_ref = PEN_ENTRY if ENTRY_AWARE else PEN_CENTER
return np.array([
dog_pos[0] / FIELD, dog_pos[1] / FIELD,
(com[0] - dog_pos[0]) / D, (com[1] - dog_pos[1]) / D,
(far1[0] - com[0]) / D, (far1[1] - com[1]) / D,
(far2[0] - com[0]) / D, (far2[1] - com[1]) / D,
(far3[0] - com[0]) / D, (far3[1] - com[1]) / D,
(pen_ref[0] - com[0]) / D, (pen_ref[1] - com[1]) / D,
(pen_ref[0] - far1[0]) / D, (pen_ref[1] - far1[1]) / D,
radius / D,
frac_active,
math.cos(dog_heading), math.sin(dog_heading),
], dtype=np.float32)
def webots_dog_drive(heading, speed_ms, wheel_r=0.038, k_turn=4.0,
motor_max=70.0, axle_track=0.28):
"""Standalone version of shepherd_dog_rl.py drive() kinematics.
Returns (v_linear, omega, left_w, right_w).
"""
err = math.atan2(math.sin(heading), math.cos(heading))
fwd_ms = speed_ms * max(0.0, math.cos(err))
fwd_rad = fwd_ms / wheel_r
turn = k_turn * err
l = max(-motor_max, min(motor_max, fwd_rad - turn))
r = max(-motor_max, min(motor_max, fwd_rad + turn))
v = wheel_r * 0.5 * (r + l)
w = (wheel_r / axle_track) * (r - l)
return v, w, l, r
def webots_sheep_drive(heading, speed_rad, wheel_r=0.031, k_turn=4.0,
motor_max=22.0, axle_track=0.20):
"""Standalone version of sheep.py drive() kinematics."""
err = math.atan2(math.sin(heading), math.cos(heading))
fwd = speed_rad * max(0.0, math.cos(err))
k = 4.0
l = max(-motor_max, min(motor_max, fwd - k * err))
r = max(-motor_max, min(motor_max, fwd + k * err))
v = wheel_r * 0.5 * (r + l)
w = (wheel_r / axle_track) * (r - l)
return v, w, l, r
def test_obs_parity():
"""Test that build_obs matches between 2D env and Webots controller."""
print("=== Test 1: Observation Parity ===")
env = HerdingEnv(n_sheep=3)
# Set ENTRY_AWARE to match our webots constant
env.ENTRY_AWARE = ENTRY_AWARE
env.reset(seed=42)
# Manually set positions for a controlled test
env.dog_pos = np.array([5.0, 3.0], dtype=np.float32)
env.dog_heading = 1.2
env.sheep_pos[0] = np.array([0.0, 0.0], dtype=np.float32)
env.sheep_pos[1] = np.array([2.0, -1.0], dtype=np.float32)
env.sheep_pos[2] = np.array([11.5, -11.5], dtype=np.float32) # penned
env.penned[0] = False
env.penned[1] = False
env.penned[2] = True
obs_2d = env._obs()
# Build equivalent Webots observation
sheep_positions = [
env.sheep_pos[0].tolist(),
env.sheep_pos[1].tolist(),
env.sheep_pos[2].tolist(),
]
obs_webots = webots_build_obs(
env.dog_pos, sheep_positions, 3, env.dog_heading
)
max_diff = float(np.max(np.abs(obs_2d - obs_webots)))
print(f" Max element-wise diff: {max_diff:.2e}")
if max_diff < 1e-6:
print(" PASS: Observations match")
else:
print(" FAIL: Observations differ!")
for i in range(18):
if abs(obs_2d[i] - obs_webots[i]) > 1e-6:
print(f" dim {i}: 2d={obs_2d[i]:.6f} webots={obs_webots[i]:.6f}")
return max_diff < 1e-6
def test_dog_drive_parity():
"""Test that dog diff-drive matches Webots controller."""
print("\n=== Test 2: Dog Drive Parity ===")
env = HerdingEnv(n_sheep=1)
env.reset(seed=42)
all_pass = True
test_cases = [
# (heading_error, speed_ms) — target_heading relative to current heading
(0.0, 2.5), # aligned, full speed
(0.5, 2.5), # 30deg error
(1.5, 2.5), # ~86deg error
(3.14, 2.5), # ~180deg error — should spin in place
(0.0, 0.5), # aligned, slow
(0.3, 1.0), # small error, medium speed
]
for heading_err, speed_ms in test_cases:
env.dog_heading = 0.0
target_heading = heading_err
action = np.array([
math.cos(target_heading), math.sin(target_heading)
], dtype=np.float32) * (speed_ms / env.DOG_SPEED)
# 2D env step
dbg = env._step_dog_substep(action, 0.016)
v_2d = dbg["v"]
w_2d = dbg["w"]
l_2d = dbg["left_w"]
r_2d = dbg["right_w"]
# Webots equivalent
v_w, w_w, l_w, r_w = webots_dog_drive(heading_err, speed_ms)
diffs = {
"v": abs(v_2d - v_w),
"w": abs(w_2d - w_w),
"left": abs(l_2d - l_w),
"right": abs(r_2d - r_w),
}
max_diff = max(diffs.values())
ok = max_diff < 1e-6
status = "PASS" if ok else "FAIL"
print(f" err={heading_err:.2f} spd={speed_ms:.1f}: {status} (max_diff={max_diff:.2e})")
if not ok:
for k, d in diffs.items():
if d > 1e-6:
print(f" {k}: 2d={eval(k+'_2d'):.6f} webots={eval(k+'_w'):.6f}")
all_pass = False
return all_pass
def test_sheep_drive_parity():
"""Test that sheep diff-drive matches Webots sheep controller."""
print("\n=== Test 3: Sheep Drive Parity ===")
env = HerdingEnv(n_sheep=1)
env.reset(seed=42)
all_pass = True
test_cases = [
# (heading_error, speed_rad)
(0.0, 20.0), # aligned, flee speed
(0.0, 3.0), # aligned, wander speed
(0.5, 15.0), # moderate error
(1.57, 10.0), # 90deg — should spin in place
(3.14, 20.0), # 180deg — should spin in place fast
(0.2, 8.0), # small error, medium speed
]
for heading_err, speed_rad in test_cases:
env.sheep_heading[0] = 0.0
env.sheep_pos[0] = np.array([0.0, 0.0], dtype=np.float32)
target_heading = heading_err
# 2D env
new_pos = env._sheep_drive(0, target_heading, speed_rad, 0.016)
v_2d_raw = float(np.linalg.norm(new_pos - np.array([0.0, 0.0]))) / 0.016
# Re-derive v, w from the internal state
heading_2d = env.sheep_heading[0]
# Webots equivalent
v_w, w_w, l_w, r_w = webots_sheep_drive(heading_err, speed_rad)
# For 2D, compute the same intermediate values
err_2d = (target_heading - 0.0 + np.pi) % (2 * np.pi) - np.pi
fwd_2d = speed_rad * max(0.0, math.cos(err_2d))
turn_2d = 4.0 * err_2d
l_2d = max(-22.0, min(22.0, fwd_2d - turn_2d))
r_2d = max(-22.0, min(22.0, fwd_2d + turn_2d))
diffs = {
"left": abs(l_2d - l_w),
"right": abs(r_2d - r_w),
}
max_diff = max(diffs.values())
ok = max_diff < 1e-6
status = "PASS" if ok else "FAIL"
print(f" err={heading_err:.2f} spd={speed_rad:.1f}: {status} (max_diff={max_diff:.2e})")
if not ok:
for k, d in diffs.items():
if d > 1e-6:
print(f" {k}: 2d={l_2d if k=='left' else r_2d:.6f} webots={l_w if k=='left' else r_w:.6f}")
all_pass = False
return all_pass
def test_full_trajectory_parity():
"""Test that running identical actions produces matching trajectories."""
print("\n=== Test 4: Full Trajectory Parity (dog only) ===")
# Run 50 steps with a fixed action, compare dog heading/position
# at each step between 2D env kinematics and pure Webots kinematics.
env = HerdingEnv(n_sheep=1)
env.reset(seed=42)
env.dog_pos = np.array([0.0, 0.0], dtype=np.float32)
env.dog_heading = 0.0
env.ENTRY_AWARE = ENTRY_AWARE
action = np.array([0.8, -0.6], dtype=np.float32) # magnitude 1.0
dt = 0.016667 # sub_dt
# Webots-side tracking
wb_heading = 0.0
wb_x, wb_y = 0.0, 0.0
max_heading_diff = 0.0
max_pos_diff = 0.0
for step in range(50):
# 2D env sub-step
env._step_dog_substep(action, dt)
# Webots-side computation
speed_ms = 1.0 * 2.5
target_heading = math.atan2(-0.6, 0.8)
err = math.atan2(math.sin(target_heading - wb_heading),
math.cos(target_heading - wb_heading))
fwd_ms = speed_ms * max(0.0, math.cos(err))
fwd_rad = fwd_ms / 0.038
turn = 4.0 * err
l = max(-70.0, min(70.0, fwd_rad - turn))
r = max(-70.0, min(70.0, fwd_rad + turn))
v = 0.038 * 0.5 * (r + l)
w = (0.038 / 0.28) * (r - l)
wb_heading = math.atan2(math.sin(wb_heading + w * dt),
math.cos(wb_heading + w * dt))
wb_x += math.cos(wb_heading) * v * dt
wb_y += math.sin(wb_heading) * v * dt
heading_diff = abs(env.dog_heading - wb_heading)
pos_diff = math.hypot(env.dog_pos[0] - wb_x, env.dog_pos[1] - wb_y)
max_heading_diff = max(max_heading_diff, heading_diff)
max_pos_diff = max(max_pos_diff, pos_diff)
print(f" Max heading diff over 50 steps: {max_heading_diff:.2e} rad")
print(f" Max position diff over 50 steps: {max_pos_diff:.2e} m")
ok = max_pos_diff < 1e-4
print(f" {'PASS' if ok else 'FAIL'}: Trajectories match")
return ok
if __name__ == "__main__":
results = []
results.append(("Obs parity", test_obs_parity()))
results.append(("Dog drive parity", test_dog_drive_parity()))
results.append(("Sheep drive parity", test_sheep_drive_parity()))
results.append(("Trajectory parity", test_full_trajectory_parity()))
print("\n" + "=" * 50)
print("RESULTS")
print("=" * 50)
all_pass = True
for name, passed in results:
print(f" {name}: {'PASS' if passed else 'FAIL'}")
if not passed:
all_pass = False
print(f"\nOverall: {'ALL PASS' if all_pass else 'SOME FAILURES'}")
env.close()
+1
View File
@@ -0,0 +1 @@
+338 -157
View File
@@ -1,210 +1,391 @@
""" """
PPO training script for the herding task. PPO training for the herding task with curriculum learning.
Usage examples Trains from scratch through a 1→max_sheep curriculum, evaluates after each
-------------- stage, and auto-generates trajectory/timeseries plots plus a summary chart.
# Start fresh with curriculum (1 → 5 sheep):
python train.py --curriculum
# Resume from checkpoint, skip directly to 3 sheep: Usage
python train.py --resume runs/ppo_herding/ckpt_200000_steps.zip --n-sheep 3 -----
python train.py # defaults from config.json
python train.py --config my_config.json --max-sheep 5
python train.py --max-sheep 3 --steps-per-stage 1000000
# Quick smoke-test (no curriculum, single env): Outputs (in runs/<timestamp>/):
python train.py --n-envs 1 --total-steps 50000 config.json resolved config
final_model.zip trained PPO model
vecnorm.pkl VecNormalize statistics
stage_results.json per-stage evaluation metrics
success_rate.png summary bar chart
eval/ trajectory & timeseries plots per sheep count
""" """
import argparse import argparse
import json
import os import os
import time
from copy import deepcopy
import numpy as np import numpy as np
from stable_baselines3 import PPO from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import ( from stable_baselines3.common.callbacks import BaseCallback
BaseCallback, from stable_baselines3.common.vec_env import (
CallbackList, DummyVecEnv,
CheckpointCallback, SubprocVecEnv,
EvalCallback, VecNormalize,
) )
from stable_baselines3.common.vec_env import SubprocVecEnv, VecNormalize
from herding_env import HerdingEnv from herding_env import HerdingEnv
from viz import (
run_and_record,
plot_trajectory,
plot_timeseries,
plot_success_rate,
save_episode_gif,
)
# --------------------------------------------------------------------------- # ── Callbacks ────────────────────────────────────────────────────────────────
# Curriculum callback
# ---------------------------------------------------------------------------
class CurriculumCallback(BaseCallback): class ProgressCallback(BaseCallback):
""" """One-line progress summary every `freq` env steps."""
Advances the curriculum (number of active sheep) when the rolling mean
episode success rate exceeds a threshold.
Success = episode terminated (all sheep penned) rather than truncated. def __init__(self, stage_label: str, freq: int = 100_000):
""" super().__init__()
self.stage_label = stage_label
THRESHOLD = 0.75 # success rate to graduate self.freq = freq
WINDOW = 100 # episodes to average over self._last = 0
MIN_EPISODES = 50 # don't graduate before seeing this many episodes self._ep_returns = []
self._ep_success = []
def __init__(self, start_sheep: int, max_sheep: int, verbose: int = 1): self._total_eps = 0
super().__init__(verbose) self._total_success = 0
self.max_sheep = max_sheep self._cur_ret = None
self._successes = []
self._cur_sheep = start_sheep
def _on_step(self) -> bool: def _on_step(self) -> bool:
for info, done in zip(self.locals["infos"], self.locals["dones"]): rewards = self.locals.get("rewards")
if done: dones = self.locals.get("dones")
truncated = info.get("TimeLimit.truncated", False) infos = self.locals.get("infos", [])
self._successes.append(0 if truncated else 1) if rewards is None or dones is None:
if len(self._successes) > self.WINDOW: return True
self._successes.pop(0) if self._cur_ret is None or len(self._cur_ret) != len(rewards):
self._cur_ret = np.zeros(len(rewards), dtype=np.float64)
if (self._cur_sheep < self.max_sheep self._cur_ret += np.asarray(rewards, dtype=np.float64)
and len(self._successes) >= self.MIN_EPISODES for i, d in enumerate(dones):
and np.mean(self._successes) >= self.THRESHOLD): if not d:
self._cur_sheep += 1 continue
self.training_env.env_method("set_n_sheep", self._cur_sheep) self._ep_returns.append(float(self._cur_ret[i]))
self._successes.clear() info = infos[i] if i < len(infos) else {}
if self.verbose: success = int(info.get("n_penned", 0) == info.get("n_sheep", -1))
print(f"\n[Curriculum] Advanced to {self._cur_sheep} sheep " self._ep_success.append(success)
f"at step {self.num_timesteps}\n") self._total_eps += 1
self._total_success += success
self._cur_ret[i] = 0.0
if len(self._ep_returns) > 50:
self._ep_returns.pop(0)
self._ep_success.pop(0)
if self.num_timesteps - self._last >= self.freq:
self._last = self.num_timesteps
n = len(self._ep_returns)
mean_r = float(np.mean(self._ep_returns)) if n else float("nan")
win_sr = float(np.mean(self._ep_success)) if n else float("nan")
cum_sr = (self._total_success / self._total_eps
if self._total_eps else float("nan"))
print(f" ... [{self.stage_label} | "
f"{self.num_timesteps:>7,} steps | "
f"ret(last {n})={mean_r:+.2f} "
f"win_sr={win_sr*100:.0f}% cum_sr={cum_sr*100:.0f}%]",
flush=True)
return True return True
# --------------------------------------------------------------------------- # ── Environment factory ──────────────────────────────────────────────────────
# Environment factory
# ---------------------------------------------------------------------------
def make_env(n_sheep: int, seed: int, max_steps: int): def make_env(n_sheep, seed, max_steps, reward_cfg=None):
def _init(): def _init():
env = HerdingEnv(n_sheep=n_sheep, max_steps=max_steps) env = HerdingEnv(n_sheep=n_sheep, max_steps=max_steps,
reward_cfg=reward_cfg)
env.reset(seed=seed) env.reset(seed=seed)
return env return env
return _init return _init
# --------------------------------------------------------------------------- # ── Failure-mode classification ──────────────────────────────────────────────
# Main
# --------------------------------------------------------------------------- COMPACT_RADIUS = 5.0
def _classify(ep_radii, ep_com_dists, n_penned, n_sheep):
if n_penned == n_sheep:
return "SUCCESS"
if min(ep_radii) > COMPACT_RADIUS:
return "NEVER_COMPACT"
first = next(i for i, r in enumerate(ep_radii) if r <= COMPACT_RADIUS)
if min(ep_com_dists[first:]) > 3.0:
return "COMPACT_CANT_DRIVE"
if n_penned == 0:
return "DROVE_NO_SHEEP"
return f"PARTIAL_{n_penned}of{n_sheep}"
# ── Evaluation ───────────────────────────────────────────────────────────────
def evaluate(model, vn_template, n_sheep, n_episodes, max_steps,
reward_cfg=None):
"""Evaluate at a given sheep count; returns metrics dict."""
raw = DummyVecEnv([make_env(n_sheep, 9999, max_steps, reward_cfg)])
vn = VecNormalize(raw, norm_obs=True, norm_reward=False, training=False)
vn.obs_rms = deepcopy(vn_template.obs_rms)
vn.ret_rms = deepcopy(vn_template.ret_rms)
successes = 0
ep_lens = []
min_pen_list = []
action_mags = []
failure_counts = {}
rc_sums = {}
rc_n = 0
for _ in range(n_episodes):
obs = vn.reset()
done = False
steps = 0
min_pen = float("inf")
mags = []
ep_radii = []
ep_com_dists = []
while not done:
action, _ = model.predict(obs, deterministic=True)
obs, _, dones, infos = vn.step(action)
done = dones[0]
inner = vn.envs[0]
com, radius, _ = inner._flock_stats()
min_pen = min(min_pen, float(np.linalg.norm(com - inner.PEN_CENTER)))
mags.append(float(np.linalg.norm(action[0])))
ep_radii.append(radius)
ep_com_dists.append(float(np.linalg.norm(com - inner.PEN_CENTER)))
steps += 1
rc = infos[0].get("rcomps")
if rc:
for k, v in rc.items():
rc_sums[k] = rc_sums.get(k, 0.0) + v
rc_n += 1
n_penned = infos[0].get("n_penned", 0)
success = n_penned == n_sheep
successes += int(success)
ep_lens.append(steps)
min_pen_list.append(min_pen)
action_mags.extend(mags)
mode = _classify(ep_radii, ep_com_dists, n_penned, n_sheep)
failure_counts[mode] = failure_counts.get(mode, 0) + 1
vn.close()
result = {
"sr": successes / n_episodes,
"mean_len": float(np.mean(ep_lens)),
"mean_min_pen": float(np.mean(min_pen_list)),
"mean_act": float(np.mean(action_mags)) if action_mags else 0.0,
"failure_modes": failure_counts,
}
if rc_n > 0:
result["reward_per_step"] = {k: v / rc_n for k, v in rc_sums.items()}
return result
# ── CLI ──────────────────────────────────────────────────────────────────────
DEFAULT_CONFIG = {
"W_PER_SHEEP": 2.0,
"W_ALIGN": 0.05,
"W_PEN_BONUS": 10.0,
"W_COMPLETE": 100.0,
"W_STEP_COST": 0.02,
"W_SOUTH": 0.01,
"W_COMPACT": 0.0,
"W_WALL_TOUCH": 0.04,
"WALL_TOUCH_BUFFER": 0.3,
"ALIGN_SHAPE": "standoff",
"ALIGN_GATED": True,
"ENTRY_AWARE": True,
"ent_coef": 0.02,
}
def parse_args(): def parse_args():
p = argparse.ArgumentParser() p = argparse.ArgumentParser(
p.add_argument("--n-sheep", type=int, default=1, description="PPO training for herding task with curriculum learning")
help="Starting number of sheep (or fixed count if no curriculum)") p.add_argument("--config", type=str, default=None,
p.add_argument("--max-sheep", type=int, default=5, help="JSON config file (reward weights + ent_coef)")
help="Maximum sheep for curriculum (ignored without --curriculum)") p.add_argument("--max-sheep", type=int, default=10)
p.add_argument("--n-envs", type=int, default=8, p.add_argument("--steps-per-stage", type=int, default=1_500_000)
help="Number of parallel environments") p.add_argument("--n-envs", type=int, default=8)
p.add_argument("--total-steps", type=int, default=5_000_000, p.add_argument("--max-steps", type=int, default=2500)
help="Total environment steps to train for") p.add_argument("--eval-episodes", type=int, default=30)
p.add_argument("--max-steps", type=int, default=2000, p.add_argument("--run-dir", type=str, default=None)
help="Episode step limit inside each env") p.add_argument("--no-gif", action="store_true",
p.add_argument("--curriculum", action="store_true", help="Skip per-stage GIF rendering (PNGs still produced).")
help="Enable automatic curriculum advancement") p.add_argument("--gif-fps", type=int, default=20)
p.add_argument("--resume", type=str, default=None, p.add_argument("--gif-skip", type=int, default=3,
help="Path to a .zip checkpoint to resume training from") help="Keep every Nth frame (smaller GIF; default 3).")
p.add_argument("--run-dir", type=str, default="runs/ppo_herding",
help="Output directory for checkpoints and logs")
p.add_argument("--save-freq", type=int, default=100_000,
help="Checkpoint every N steps (per-env, not total)")
p.add_argument("--eval-freq", type=int, default=50_000,
help="Evaluate every N steps")
p.add_argument("--eval-eps", type=int, default=20,
help="Episodes per evaluation run")
return p.parse_args() return p.parse_args()
# ── Main ─────────────────────────────────────────────────────────────────────
def main(): def main():
args = parse_args() args = parse_args()
os.makedirs(args.run_dir, exist_ok=True)
ckpt_dir = os.path.join(args.run_dir, "checkpoints") # Load config: --config overrides, else auto-load config.json if present
best_dir = os.path.join(args.run_dir, "best_model") cfg = dict(DEFAULT_CONFIG)
norm_path = os.path.join(args.run_dir, "vecnorm.pkl") config_path = args.config
os.makedirs(ckpt_dir, exist_ok=True) if config_path is None and os.path.exists("config.json"):
config_path = "config.json"
if config_path:
with open(config_path) as f:
cfg.update(json.load(f))
print(f"Config loaded from {config_path}")
rcfg = {k: v for k, v in cfg.items() if hasattr(HerdingEnv, k)}
# Run directory
run_dir = args.run_dir or os.path.join(
"runs", time.strftime("%Y%m%d_%H%M%S"))
eval_dir = os.path.join(run_dir, "eval")
os.makedirs(eval_dir, exist_ok=True)
with open(os.path.join(run_dir, "config.json"), "w") as f:
json.dump(cfg, f, indent=2)
print(f"Config: {cfg}")
print(f"Run dir: {run_dir}")
print(f"Curriculum: 1 → {args.max_sheep} sheep, "
f"{args.steps_per_stage:,} steps/stage\n")
# Training envs # Training envs
train_env = SubprocVecEnv([ train_env = SubprocVecEnv([
make_env(args.n_sheep, seed=i, max_steps=args.max_steps) make_env(1, seed=i, max_steps=args.max_steps, reward_cfg=rcfg)
for i in range(args.n_envs) for i in range(args.n_envs)
]) ])
if args.resume and os.path.exists(norm_path): vn = VecNormalize(train_env, norm_obs=True, norm_reward=True,
train_env = VecNormalize.load(norm_path, train_env)
train_env.training = True
train_env.norm_reward = True
else:
train_env = VecNormalize(train_env, norm_obs=True, norm_reward=True,
clip_obs=10.0) clip_obs=10.0)
# Eval env (no reward normalisation, deterministic) # Model — force CPU (PPO with MLP runs faster on CPU than GPU; SB3 warns
eval_env = SubprocVecEnv([ # about this otherwise).
make_env(args.n_sheep, seed=1000 + i, max_steps=args.max_steps) model = PPO(
for i in range(2) "MlpPolicy", vn,
]) learning_rate=3e-4, n_steps=2048, batch_size=256, n_epochs=10,
eval_env = VecNormalize(eval_env, norm_obs=True, norm_reward=False, gamma=0.995, gae_lambda=0.95, clip_range=0.2,
clip_obs=10.0, training=False) ent_coef=cfg.get("ent_coef", 0.02), vf_coef=0.5, max_grad_norm=0.5,
policy_kwargs=dict(net_arch=[256, 256]),
# Callbacks device="cpu",
checkpoint_cb = CheckpointCallback( verbose=0,
save_freq=max(args.save_freq // args.n_envs, 1),
save_path=ckpt_dir,
name_prefix="ckpt",
save_vecnormalize=True,
)
eval_cb = EvalCallback(
eval_env,
best_model_save_path=best_dir,
log_path=args.run_dir,
eval_freq=max(args.eval_freq // args.n_envs, 1),
n_eval_episodes=args.eval_eps,
deterministic=True,
verbose=1,
)
callbacks = [checkpoint_cb, eval_cb]
if args.curriculum:
callbacks.append(CurriculumCallback(start_sheep=args.n_sheep,
max_sheep=args.max_sheep))
callback_list = CallbackList(callbacks)
# Model
ppo_kwargs = dict(
policy = "MlpPolicy",
env = train_env,
learning_rate = 3e-4,
n_steps = 2048,
batch_size = 256,
n_epochs = 10,
gamma = 0.995,
gae_lambda = 0.95,
clip_range = 0.2,
ent_coef = 0.005,
vf_coef = 0.5,
max_grad_norm = 0.5,
policy_kwargs = dict(net_arch=[256, 256]),
tensorboard_log = args.run_dir,
verbose = 1,
) )
if args.resume: # Curriculum training
print(f"Resuming from {args.resume}") stage_results = []
model = PPO.load(args.resume, env=train_env, **{ t0 = time.time()
k: v for k, v in ppo_kwargs.items()
if k not in ("policy", "env")
})
else:
model = PPO(**ppo_kwargs)
try:
for n in range(1, args.max_sheep + 1):
if n == 1:
print(f"\n[Stage n_sheep=1] training {args.steps_per_stage:,} steps")
model.learn( model.learn(
total_timesteps=args.total_steps, total_timesteps=args.steps_per_stage,
callback=callback_list, reset_num_timesteps=True,
reset_num_timesteps=args.resume is None, callback=ProgressCallback("1 sheep", freq=100_000),
tb_log_name="ppo", )
else:
# Mixed transition: half envs stay at n-1, half advance to n,
# for the first half of the stage budget. This prevents the
# n+1 task's noisy early gradients from destroying the n policy
# (catastrophic forgetting) before it has a chance to adapt.
half = max(1, args.n_envs // 2)
for i in range(half):
vn.env_method("set_n_sheep", n - 1, indices=[i])
for i in range(half, args.n_envs):
vn.env_method("set_n_sheep", n, indices=[i])
mix_steps = args.steps_per_stage // 2
full_steps = args.steps_per_stage - mix_steps
print(f"\n[Stage n_sheep={n}] mixed ({n-1}/{n} sheep) "
f"{mix_steps:,} steps")
model.learn(
total_timesteps=mix_steps,
reset_num_timesteps=False,
callback=ProgressCallback(f"{n-1}{n} mix", freq=100_000),
)
vn.env_method("set_n_sheep", n)
print(f"[Stage n_sheep={n}] full ({n} sheep) {full_steps:,} steps")
model.learn(
total_timesteps=full_steps,
reset_num_timesteps=False,
callback=ProgressCallback(f"{n} sheep", freq=100_000),
) )
# Save final artefacts # Evaluate
model.save(os.path.join(args.run_dir, "final_model")) print(f"[Stage n_sheep={n}] evaluating {args.eval_episodes} eps")
train_env.save(norm_path) r = evaluate(model, vn, n, args.eval_episodes, args.max_steps, rcfg)
print(f"\nTraining complete. Artefacts saved to {args.run_dir}/") print(f"[Stage n_sheep={n}] sr={r['sr']*100:.0f}% "
f"mean_len={r['mean_len']:.0f} "
f"mean_min_pen={r['mean_min_pen']:.1f}m "
f"mean_act={r['mean_act']:.2f}")
# Failure-mode breakdown
if r["failure_modes"]:
modes = " ".join(
f"{k}={v}" for k, v in sorted(
r["failure_modes"].items(), key=lambda x: -x[1]))
print(f" failure modes: {modes}")
# Reward breakdown
if "reward_per_step" in r:
rps = r["reward_per_step"]
print(f" reward/step: " + " ".join(
f"{k}={v:+.4f}" for k, v in rps.items()))
# Episode visualisation: trajectory + timeseries + animated GIF
hist = run_and_record(model, vn, n, args.max_steps, rcfg,
seed=1000 + n)
tag = "success" if hist["success"] else "fail"
plot_trajectory(
hist,
os.path.join(eval_dir, f"traj_{n}s_{tag}.png"))
plot_timeseries(
hist,
os.path.join(eval_dir, f"ts_{n}s_{tag}.png"))
if not args.no_gif:
save_episode_gif(
hist,
os.path.join(eval_dir, f"ep_{n}s_{tag}.gif"),
fps=args.gif_fps, skip=args.gif_skip)
r["n_sheep"] = n
stage_results.append(r)
# Save artefacts
model.save(os.path.join(run_dir, "final_model"))
vn.save(os.path.join(run_dir, "vecnorm.pkl"))
with open(os.path.join(run_dir, "stage_results.json"), "w") as f:
json.dump(stage_results, f, indent=2)
finally:
try:
vn.close()
except Exception:
pass
# Summary
elapsed = (time.time() - t0) / 60
print("\n" + "=" * 70)
print(" TRAINING SUMMARY")
print("=" * 70)
for r in stage_results:
print(f" n_sheep={r['n_sheep']} sr={r['sr']*100:>3.0f}% "
f"len={r['mean_len']:>5.0f} min_pen={r['mean_min_pen']:>5.1f}m "
f"act={r['mean_act']:.2f}")
print(f"\n Total time: {elapsed:.1f} min")
print(f" Artefacts: {run_dir}/")
plot_success_rate(stage_results, os.path.join(run_dir, "success_rate.png"))
print(f" Plots: {run_dir}/success_rate.png, {eval_dir}/")
if __name__ == "__main__": if __name__ == "__main__":
+411
View File
@@ -0,0 +1,411 @@
"""
PPO training with attention-based policy (train_at.py).
Key difference from train.py
-----------------------------
- Observation exposes ALL sheep as individual per-sheep tokens rather than
only the top-3 farthest. The policy therefore has complete flock visibility
at any sheep count — no hidden sheep even at n=10.
- A TransformerFeaturesExtractor processes the sheep tokens with multi-head
self-attention (permutation-invariant), then mean-pools over valid tokens
and concatenates the result with global dog/pen features.
- Curriculum transition uses the same mixed-env approach as train.py: half
the envs stay at n-1 for the first half of each new stage to suppress
catastrophic forgetting.
Observation layout (7 + MAX_SHEEP*6 = 67 dims, fixed)
-------------------------------------------------------
Global (7):
dog_x / FIELD, dog_y / FIELD,
cos(heading), sin(heading),
(pen_x - dog_x) / D, (pen_y - dog_y) / D,
n_active / n_sheep
Per sheep i (6):
(sheep_x - dog_x) / D, (sheep_y - dog_y) / D, ← pos rel to dog
(pen_x - sheep_x) / D, (pen_y - sheep_y) / D, ← sheep-to-pen
is_active 1.0 if not penned, else 0.0
is_valid 1.0 if i < n_sheep, else 0.0 (padding sentinel)
After VecNormalize, is_valid for real sheep normalises > 0 and for
padding tokens < 0 (because mean ∈ (0,1)), so a threshold of 0 cleanly
separates real from padded without any extra bookkeeping.
Usage
-----
python train_at.py # defaults from config.json
python train_at.py --max-sheep 10 --steps-per-stage 2000000
python train_at.py --embed-dim 128 --n-heads 4 --n-layers 3
"""
import argparse
import json
import os
import time
from copy import deepcopy
import numpy as np
import torch
import torch.nn as nn
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv, VecNormalize
from herding_env import HerdingEnv
from train import ProgressCallback, _classify, COMPACT_RADIUS, DEFAULT_CONFIG
from viz import (
run_and_record, plot_trajectory, plot_timeseries,
plot_success_rate, save_episode_gif,
)
# ── Per-sheep token observation environment ───────────────────────────────────
class HerdingEnvAt(HerdingEnv):
"""
HerdingEnv with a per-sheep token observation for the attention policy.
Everything else (dynamics, reward, curriculum interface) is inherited.
"""
OBS_GLOBAL = 7
OBS_SHEEP = 6
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
obs_dim = self.OBS_GLOBAL + self.MAX_SHEEP * self.OBS_SHEEP
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(obs_dim,), dtype=np.float32
)
def _obs(self) -> np.ndarray:
S = self.FIELD
D = 2.0 * self.FIELD
pen_ref = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
active_mask = ~self.penned[:self.n_sheep]
n_active = int(active_mask.sum())
global_feats = np.array([
self.dog_pos[0] / S,
self.dog_pos[1] / S,
float(np.cos(self.dog_heading)),
float(np.sin(self.dog_heading)),
(pen_ref[0] - self.dog_pos[0]) / D,
(pen_ref[1] - self.dog_pos[1]) / D,
n_active / max(self.n_sheep, 1),
], dtype=np.float32)
sheep_feats = np.zeros((self.MAX_SHEEP, self.OBS_SHEEP), dtype=np.float32)
for i in range(self.n_sheep):
pos = self.sheep_pos[i]
sheep_feats[i] = [
(pos[0] - self.dog_pos[0]) / D,
(pos[1] - self.dog_pos[1]) / D,
(pen_ref[0] - pos[0]) / D,
(pen_ref[1] - pos[1]) / D,
float(not self.penned[i]),
1.0, # is_valid: this sheep exists
]
# i >= n_sheep: all zeros, is_valid=0 → masked out in attention
return np.concatenate([global_feats, sheep_feats.ravel()])
# ── Attention features extractor ──────────────────────────────────────────────
class ShepherdAttentionExtractor(BaseFeaturesExtractor):
"""
Multi-head self-attention over per-sheep tokens, mean-pooled over valid
(non-padding) tokens and concatenated with global dog/pen features.
After VecNormalize:
real sheep → is_valid_norm > 0 (normalised from 1.0)
padding → is_valid_norm ≤ 0 (normalised from 0.0)
so threshold at 0 is always correct regardless of curriculum stage.
"""
GLOBAL_DIM = HerdingEnvAt.OBS_GLOBAL # 7
SHEEP_DIM = HerdingEnvAt.OBS_SHEEP # 6
MAX_SHEEP = HerdingEnv.MAX_SHEEP # 10
VALID_IDX = 5 # index of is_valid within each token
def __init__(self, observation_space, embed_dim: int = 64,
n_heads: int = 4, n_layers: int = 2, ff_dim: int = 128):
super().__init__(observation_space,
features_dim=self.GLOBAL_DIM + embed_dim)
self.sheep_embed = nn.Linear(self.SHEEP_DIM, embed_dim)
encoder_layer = nn.TransformerEncoderLayer(
d_model=embed_dim, nhead=n_heads, dim_feedforward=ff_dim,
dropout=0.0, batch_first=True,
)
self.transformer = nn.TransformerEncoder(encoder_layer,
num_layers=n_layers)
def forward(self, obs: torch.Tensor) -> torch.Tensor:
B = obs.shape[0]
global_feats = obs[:, :self.GLOBAL_DIM] # (B, 7)
tokens = obs[:, self.GLOBAL_DIM:].view(
B, self.MAX_SHEEP, self.SHEEP_DIM) # (B, 10, 6)
# is_valid after VecNorm: real > 0, padding ≤ 0
is_valid_norm = tokens[:, :, self.VALID_IDX] # (B, 10)
key_padding_mask = is_valid_norm <= 0.0 # True → ignore
x = self.sheep_embed(tokens) # (B, 10, E)
x = self.transformer(x, src_key_padding_mask=key_padding_mask)
valid_w = (is_valid_norm > 0.0).float().unsqueeze(-1) # (B, 10, 1)
pooled = (x * valid_w).sum(1) / valid_w.sum(1).clamp(min=1.0)
return torch.cat([global_feats, pooled], dim=1) # (B, 7+E)
# ── Environment factory ───────────────────────────────────────────────────────
def make_env_at(n_sheep, seed, max_steps, reward_cfg=None):
def _init():
env = HerdingEnvAt(n_sheep=n_sheep, max_steps=max_steps,
reward_cfg=reward_cfg)
env.reset(seed=seed)
return env
return _init
# ── Evaluation ────────────────────────────────────────────────────────────────
def evaluate_at(model, vn_template, n_sheep, n_episodes, max_steps,
reward_cfg=None):
raw = DummyVecEnv([make_env_at(n_sheep, 9999, max_steps, reward_cfg)])
vn = VecNormalize(raw, norm_obs=True, norm_reward=False, training=False)
vn.obs_rms = deepcopy(vn_template.obs_rms)
vn.ret_rms = deepcopy(vn_template.ret_rms)
successes = 0
ep_lens, min_pen_list, action_mags = [], [], []
failure_counts, rc_sums = {}, {}
rc_n = 0
for _ in range(n_episodes):
obs = vn.reset()
done = False
steps, min_pen = 0, float("inf")
mags, ep_radii, ep_com_dists = [], [], []
while not done:
action, _ = model.predict(obs, deterministic=True)
obs, _, dones, infos = vn.step(action)
done = dones[0]
inner = vn.envs[0]
com, radius, _ = inner._flock_stats()
min_pen = min(min_pen,
float(np.linalg.norm(com - inner.PEN_CENTER)))
mags.append(float(np.linalg.norm(action[0])))
ep_radii.append(radius)
ep_com_dists.append(float(np.linalg.norm(com - inner.PEN_CENTER)))
steps += 1
rc = infos[0].get("rcomps")
if rc:
for k, v in rc.items():
rc_sums[k] = rc_sums.get(k, 0.0) + v
rc_n += 1
n_penned = infos[0].get("n_penned", 0)
successes += int(n_penned == n_sheep)
ep_lens.append(steps)
min_pen_list.append(min_pen)
action_mags.extend(mags)
mode = _classify(ep_radii, ep_com_dists, n_penned, n_sheep)
failure_counts[mode] = failure_counts.get(mode, 0) + 1
vn.close()
result = {
"sr": successes / n_episodes,
"mean_len": float(np.mean(ep_lens)),
"mean_min_pen": float(np.mean(min_pen_list)),
"mean_act": float(np.mean(action_mags)) if action_mags else 0.0,
"failure_modes": failure_counts,
}
if rc_n > 0:
result["reward_per_step"] = {k: v / rc_n for k, v in rc_sums.items()}
return result
# ── CLI ───────────────────────────────────────────────────────────────────────
def parse_args():
p = argparse.ArgumentParser(
description="PPO + attention training for herding task")
p.add_argument("--config", type=str, default=None)
p.add_argument("--max-sheep", type=int, default=10)
p.add_argument("--steps-per-stage", type=int, default=1_500_000)
p.add_argument("--n-envs", type=int, default=8)
p.add_argument("--max-steps", type=int, default=2500)
p.add_argument("--eval-episodes", type=int, default=30)
p.add_argument("--run-dir", type=str, default=None)
p.add_argument("--no-gif", action="store_true")
p.add_argument("--gif-fps", type=int, default=20)
p.add_argument("--gif-skip", type=int, default=3)
# Attention architecture
p.add_argument("--embed-dim", type=int, default=64,
help="Transformer embedding dimension (default 64)")
p.add_argument("--n-heads", type=int, default=4,
help="Number of attention heads (default 4)")
p.add_argument("--n-layers", type=int, default=2,
help="Number of transformer encoder layers (default 2)")
p.add_argument("--ff-dim", type=int, default=128,
help="Transformer feed-forward dim (default 128)")
return p.parse_args()
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
args = parse_args()
cfg = dict(DEFAULT_CONFIG)
config_path = args.config
if config_path is None and os.path.exists("config.json"):
config_path = "config.json"
if config_path:
with open(config_path) as f:
cfg.update(json.load(f))
print(f"Config loaded from {config_path}")
rcfg = {k: v for k, v in cfg.items() if hasattr(HerdingEnv, k)}
run_dir = args.run_dir or os.path.join(
"runs", "at_" + time.strftime("%Y%m%d_%H%M%S"))
eval_dir = os.path.join(run_dir, "eval")
os.makedirs(eval_dir, exist_ok=True)
with open(os.path.join(run_dir, "config.json"), "w") as f:
json.dump(cfg, f, indent=2)
print(f"Config: {cfg}")
print(f"Run dir: {run_dir}")
print(f"Curriculum: 1 → {args.max_sheep} sheep, "
f"{args.steps_per_stage:,} steps/stage")
print(f"Transformer: embed={args.embed_dim} heads={args.n_heads} "
f"layers={args.n_layers} ff={args.ff_dim}\n")
train_env = SubprocVecEnv([
make_env_at(1, seed=i, max_steps=args.max_steps, reward_cfg=rcfg)
for i in range(args.n_envs)
])
vn = VecNormalize(train_env, norm_obs=True, norm_reward=True, clip_obs=10.0)
model = PPO(
"MlpPolicy", vn,
learning_rate=3e-4, n_steps=2048, batch_size=256, n_epochs=10,
gamma=0.995, gae_lambda=0.95, clip_range=0.2,
ent_coef=cfg.get("ent_coef", 0.02), vf_coef=0.5, max_grad_norm=0.5,
policy_kwargs=dict(
features_extractor_class=ShepherdAttentionExtractor,
features_extractor_kwargs=dict(
embed_dim=args.embed_dim,
n_heads=args.n_heads,
n_layers=args.n_layers,
ff_dim=args.ff_dim,
),
net_arch=[256, 256],
),
device="cpu",
verbose=0,
)
stage_results = []
t0 = time.time()
try:
for n in range(1, args.max_sheep + 1):
if n == 1:
print(f"\n[Stage n_sheep=1] training {args.steps_per_stage:,} steps")
model.learn(
total_timesteps=args.steps_per_stage,
reset_num_timesteps=True,
callback=ProgressCallback("1 sheep", freq=100_000),
)
else:
half = max(1, args.n_envs // 2)
mix_steps = args.steps_per_stage // 2
full_steps = args.steps_per_stage - mix_steps
for i in range(half):
vn.env_method("set_n_sheep", n - 1, indices=[i])
for i in range(half, args.n_envs):
vn.env_method("set_n_sheep", n, indices=[i])
print(f"\n[Stage n_sheep={n}] mixed ({n-1}/{n} sheep) "
f"{mix_steps:,} steps")
model.learn(
total_timesteps=mix_steps,
reset_num_timesteps=False,
callback=ProgressCallback(f"{n-1}{n} mix", freq=100_000),
)
vn.env_method("set_n_sheep", n)
print(f"[Stage n_sheep={n}] full ({n} sheep) {full_steps:,} steps")
model.learn(
total_timesteps=full_steps,
reset_num_timesteps=False,
callback=ProgressCallback(f"{n} sheep", freq=100_000),
)
print(f"[Stage n_sheep={n}] evaluating {args.eval_episodes} eps")
r = evaluate_at(model, vn, n, args.eval_episodes,
args.max_steps, rcfg)
print(f"[Stage n_sheep={n}] sr={r['sr']*100:.0f}% "
f"mean_len={r['mean_len']:.0f} "
f"mean_min_pen={r['mean_min_pen']:.1f}m "
f"mean_act={r['mean_act']:.2f}")
if r["failure_modes"]:
modes = " ".join(
f"{k}={v}" for k, v in sorted(
r["failure_modes"].items(), key=lambda x: -x[1]))
print(f" failure modes: {modes}")
if "reward_per_step" in r:
rps = r["reward_per_step"]
print(" reward/step: " + " ".join(
f"{k}={v:+.4f}" for k, v in rps.items()))
hist = run_and_record(
model, vn, n, args.max_steps, rcfg,
seed=1000 + n, make_env_fn=make_env_at,
)
tag = "success" if hist["success"] else "fail"
plot_trajectory(hist, os.path.join(eval_dir, f"traj_{n}s_{tag}.png"))
plot_timeseries(hist, os.path.join(eval_dir, f"ts_{n}s_{tag}.png"))
if not args.no_gif:
save_episode_gif(
hist,
os.path.join(eval_dir, f"ep_{n}s_{tag}.gif"),
fps=args.gif_fps, skip=args.gif_skip)
r["n_sheep"] = n
stage_results.append(r)
model.save(os.path.join(run_dir, "final_model"))
vn.save(os.path.join(run_dir, "vecnorm.pkl"))
with open(os.path.join(run_dir, "stage_results.json"), "w") as f:
json.dump(stage_results, f, indent=2)
finally:
try:
vn.close()
except Exception:
pass
elapsed = (time.time() - t0) / 60
print("\n" + "=" * 70)
print(" TRAINING SUMMARY (attention policy)")
print("=" * 70)
for r in stage_results:
print(f" n_sheep={r['n_sheep']} sr={r['sr']*100:>3.0f}% "
f"len={r['mean_len']:>5.0f} "
f"min_pen={r['mean_min_pen']:>5.1f}m "
f"act={r['mean_act']:.2f}")
print(f"\n Total time: {elapsed:.1f} min")
print(f" Artefacts: {run_dir}/")
plot_success_rate(stage_results, os.path.join(run_dir, "success_rate.png"))
print(f" Plots: {run_dir}/success_rate.png, {eval_dir}/")
if __name__ == "__main__":
main()
+342
View File
@@ -0,0 +1,342 @@
"""
All visualization for the herding policy: trajectory plots, timeseries plots,
success-rate bar chart, and animated GIFs.
Used both by train.py (auto-rendered after each curriculum stage) and as a CLI
to render a fresh episode against a saved model.
CLI usage:
python viz.py --run-dir runs/v1 --n-sheep 5
python viz.py --run-dir runs/v1 --n-sheep 10 --no-gif
python viz.py --model runs/v1/final_model.zip --vecnorm runs/v1/vecnorm.pkl \\
--n-sheep 3 --out-dir vis_v1_3sheep
"""
import argparse
import os
import json
from copy import deepcopy
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import matplotlib.animation as animation
from matplotlib.collections import LineCollection
import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from herding_env import HerdingEnv
# ── Palette ──────────────────────────────────────────────────────────────────
SHEEP_COLORS = [
"#e41a1c", "#377eb8", "#4daf4a", "#984ea3", "#ff7f00",
"#a65628", "#f781bf", "#999999", "#66c2a5", "#fc8d62",
]
DOG_COLOR = "#4e342e"
# ── Common drawing primitives ────────────────────────────────────────────────
def draw_field(ax):
ax.set_xlim(-16, 16)
ax.set_ylim(-16, 16)
ax.set_aspect("equal")
ax.set_facecolor("#dcedc8")
ax.add_patch(mpatches.Rectangle(
(-15, -15), 30, 30, fill=False, edgecolor="#795548", lw=2))
ax.add_patch(mpatches.Rectangle(
(10, -15), 3, 7, facecolor="#ffe082", edgecolor="#795548", lw=2))
ax.text(11.5, -11.5, "pen", ha="center", va="center",
fontsize=8, color="#795548")
def faded_path(ax, xs, ys, color, lw=1.5, label=None):
n = len(xs)
if n < 2:
return
points = np.array([xs, ys]).T.reshape(-1, 1, 2)
segs = np.concatenate([points[:-1], points[1:]], axis=1)
alphas = np.linspace(0.15, 1.0, len(segs))
colors = [(*matplotlib.colors.to_rgb(color), a) for a in alphas]
ax.add_collection(LineCollection(segs, colors=colors, linewidth=lw))
if label:
ax.plot([], [], color=color, lw=lw, label=label)
# ── Episode rollout ──────────────────────────────────────────────────────────
def make_eval_env(n_sheep, seed, max_steps, reward_cfg=None):
def _init():
env = HerdingEnv(n_sheep=n_sheep, max_steps=max_steps,
reward_cfg=reward_cfg)
env.reset(seed=seed)
return env
return _init
def run_and_record(model, vn_template, n_sheep, max_steps,
reward_cfg=None, seed=42, make_env_fn=None):
"""Run one deterministic episode and return full trajectory history."""
_factory = make_env_fn or make_eval_env
raw = DummyVecEnv([_factory(n_sheep, seed, max_steps, reward_cfg)])
vn = VecNormalize(raw, norm_obs=True, norm_reward=False, training=False)
vn.obs_rms = deepcopy(vn_template.obs_rms)
vn.ret_rms = deepcopy(vn_template.ret_rms)
obs = vn.reset()
inner = vn.envs[0]
done = False
dog_xs, dog_ys = [], []
sheep_xs = [[] for _ in range(n_sheep)]
sheep_ys = [[] for _ in range(n_sheep)]
sheep_penned = [[] for _ in range(n_sheep)]
radii = []
pen_dists = [[] for _ in range(n_sheep)]
action_mags = []
rewards = []
penned_at = [None] * n_sheep
step = 0
while not done:
action, _ = model.predict(obs, deterministic=True)
obs, reward, dones, infos = vn.step(action)
done = dones[0]
step += 1
dog_xs.append(float(inner.dog_pos[0]))
dog_ys.append(float(inner.dog_pos[1]))
com, radius, _ = inner._flock_stats()
radii.append(radius)
rewards.append(float(reward[0]))
action_mags.append(float(np.linalg.norm(action[0])))
for i in range(n_sheep):
sheep_xs[i].append(float(inner.sheep_pos[i][0]))
sheep_ys[i].append(float(inner.sheep_pos[i][1]))
sheep_penned[i].append(bool(inner.penned[i]))
pen_dists[i].append(
float(np.linalg.norm(inner.sheep_pos[i] - inner.PEN_CENTER)))
if inner.penned[i] and penned_at[i] is None:
penned_at[i] = step
n_penned = infos[0].get("n_penned", 0)
vn.close()
return dict(
dog_xs=dog_xs, dog_ys=dog_ys,
sheep_xs=sheep_xs, sheep_ys=sheep_ys,
sheep_penned=sheep_penned,
radii=radii, pen_dists=pen_dists,
action_mags=action_mags, rewards=rewards,
penned_at=penned_at,
n_penned=n_penned, n_sheep=n_sheep,
success=n_penned == n_sheep, steps=step,
)
# ── Static plots ─────────────────────────────────────────────────────────────
def plot_trajectory(hist, out_path):
fig, ax = plt.subplots(figsize=(7, 7))
draw_field(ax)
for i in range(hist["n_sheep"]):
c = SHEEP_COLORS[i % len(SHEEP_COLORS)]
xs, ys = hist["sheep_xs"][i], hist["sheep_ys"][i]
faded_path(ax, xs, ys, c, lw=1.2, label=f"sheep {i+1}")
ax.plot(xs[0], ys[0], "o", color=c, ms=7, zorder=4)
end = hist["penned_at"][i] if hist["penned_at"][i] is not None else -1
ax.plot(xs[end], ys[end], "*", color=c, ms=11, zorder=5)
faded_path(ax, hist["dog_xs"], hist["dog_ys"], DOG_COLOR, lw=2.0,
label="dog")
ax.plot(hist["dog_xs"][0], hist["dog_ys"][0], "s", color=DOG_COLOR,
ms=10, zorder=5)
ax.plot(hist["dog_xs"][-1], hist["dog_ys"][-1], "D", color=DOG_COLOR,
ms=10, zorder=5)
result = ("SUCCESS" if hist["success"]
else f"FAIL ({hist['n_penned']}/{hist['n_sheep']})")
ax.set_title(f"n={hist['n_sheep']} {result} {hist['steps']} steps",
fontsize=12)
ax.legend(loc="upper left", fontsize=8)
plt.tight_layout()
fig.savefig(out_path, dpi=120)
plt.close(fig)
def plot_timeseries(hist, out_path):
t = np.arange(hist["steps"])
fig, axes = plt.subplots(4, 1, figsize=(12, 10), sharex=True)
axes[0].plot(t, hist["radii"], color="steelblue")
axes[0].axhline(5.0, color="orange", ls="--", lw=1, label="compact (5m)")
axes[0].set_ylabel("flock radius (m)")
axes[0].legend(fontsize=8)
axes[0].set_title("Flock radius")
for i in range(hist["n_sheep"]):
c = SHEEP_COLORS[i % len(SHEEP_COLORS)]
axes[1].plot(t, hist["pen_dists"][i], color=c, lw=1,
label=f"sheep {i+1}")
if hist["penned_at"][i] is not None:
axes[1].axvline(hist["penned_at"][i], color=c, ls=":", lw=1)
axes[1].set_ylabel("dist to pen (m)")
axes[1].legend(fontsize=7, ncol=min(hist["n_sheep"], 5))
axes[1].set_title("Per-sheep distance to pen")
axes[2].plot(t, hist["action_mags"], color="tomato", lw=1)
axes[2].axhline(1.0, color="gray", ls="--", lw=1, label="max")
axes[2].set_ylabel("action ||(vx,vy)||")
axes[2].set_ylim(0, 1.5)
axes[2].set_title("Dog action magnitude")
axes[2].legend(fontsize=8)
axes[3].plot(t, hist["rewards"], color="purple", lw=1, alpha=0.7)
axes[3].axhline(0, color="black", lw=0.5)
axes[3].set_ylabel("reward")
axes[3].set_xlabel("step")
axes[3].set_title("Reward per step")
result = ("SUCCESS" if hist["success"]
else f"FAIL ({hist['n_penned']}/{hist['n_sheep']})")
fig.suptitle(f"n_sheep={hist['n_sheep']} {result} {hist['steps']} steps",
fontsize=13)
plt.tight_layout()
fig.savefig(out_path, dpi=120)
plt.close(fig)
def plot_success_rate(stage_results, out_path):
fig, ax = plt.subplots(figsize=(8, 4))
ns = [r["n_sheep"] for r in stage_results]
srs = [r["sr"] * 100 for r in stage_results]
bars = ax.bar(ns, srs, color="steelblue", edgecolor="white")
ax.set_xlabel("Sheep count")
ax.set_ylabel("Success rate (%)")
ax.set_ylim(0, 105)
ax.axhline(90, color="orange", ls="--", lw=1, label="90% target")
for bar, sr in zip(bars, srs):
ax.text(bar.get_x() + bar.get_width() / 2,
bar.get_height() + 1, f"{sr:.0f}%",
ha="center", fontsize=9)
ax.legend()
ax.set_title("Evaluation success rate per sheep count")
plt.tight_layout()
fig.savefig(out_path, dpi=120)
plt.close(fig)
# ── Animated GIF ─────────────────────────────────────────────────────────────
def save_episode_gif(hist, out_path, fps=20, skip=3):
"""Render hist as an animated GIF. `skip` keeps every Nth frame (smaller file)."""
n_sheep = hist["n_sheep"]
frames = list(range(0, hist["steps"], max(1, skip)))
if frames[-1] != hist["steps"] - 1:
frames.append(hist["steps"] - 1)
fig, ax = plt.subplots(figsize=(6, 6))
draw_field(ax)
title = ax.text(0, 16.5, "", ha="center", fontsize=11)
dog_marker, = ax.plot([], [], "s", color=DOG_COLOR, ms=12,
markeredgecolor="black", markeredgewidth=1.5,
zorder=5)
sheep_markers = []
for i in range(n_sheep):
c = SHEEP_COLORS[i % len(SHEEP_COLORS)]
m, = ax.plot([], [], "o", color=c, ms=10,
markeredgecolor="#333", markeredgewidth=1, zorder=4)
sheep_markers.append(m)
dog_trail, = ax.plot([], [], color=DOG_COLOR, lw=1.0, alpha=0.5)
def update(k):
title.set_text(
f"n={n_sheep} step {k+1}/{hist['steps']} "
f"penned {sum(hist['sheep_penned'][i][k] for i in range(n_sheep))}/{n_sheep}")
dog_marker.set_data([hist["dog_xs"][k]], [hist["dog_ys"][k]])
dog_trail.set_data(hist["dog_xs"][:k+1], hist["dog_ys"][:k+1])
for i, m in enumerate(sheep_markers):
m.set_data([hist["sheep_xs"][i][k]], [hist["sheep_ys"][i][k]])
penned = hist["sheep_penned"][i][k]
m.set_color("deeppink" if penned else SHEEP_COLORS[i % len(SHEEP_COLORS)])
return [title, dog_marker, dog_trail, *sheep_markers]
anim = animation.FuncAnimation(
fig, update, frames=frames, interval=1000 / fps, blit=False)
anim.save(out_path, writer=animation.PillowWriter(fps=fps), dpi=80)
plt.close(fig)
# ── CLI ──────────────────────────────────────────────────────────────────────
def _resolve_paths(args):
if args.run_dir:
model_path = os.path.join(args.run_dir, "final_model.zip")
vn_path = os.path.join(args.run_dir, "vecnorm.pkl")
cfg_path = os.path.join(args.run_dir, "config.json")
else:
model_path = args.model
vn_path = args.vecnorm
cfg_path = args.config
return model_path, vn_path, cfg_path
def main():
p = argparse.ArgumentParser(
description="Render trajectory + timeseries + GIF for a saved policy.")
p.add_argument("--run-dir", type=str, default=None,
help="Run directory containing final_model.zip + vecnorm.pkl + config.json")
p.add_argument("--model", type=str, default=None)
p.add_argument("--vecnorm", type=str, default=None)
p.add_argument("--config", type=str, default=None)
p.add_argument("--n-sheep", type=int, default=3)
p.add_argument("--seed", type=int, default=42)
p.add_argument("--max-steps", type=int, default=2500)
p.add_argument("--out-dir", type=str, default=None)
p.add_argument("--no-gif", action="store_true",
help="Skip the animated GIF (PNG-only is faster).")
p.add_argument("--gif-fps", type=int, default=20)
p.add_argument("--gif-skip", type=int, default=3)
args = p.parse_args()
model_path, vn_path, cfg_path = _resolve_paths(args)
if not (model_path and vn_path):
p.error("either --run-dir or both --model and --vecnorm are required")
rcfg = None
if cfg_path and os.path.exists(cfg_path):
with open(cfg_path) as f:
cfg = json.load(f)
rcfg = {k: v for k, v in cfg.items() if hasattr(HerdingEnv, k)}
out_dir = args.out_dir or os.path.join(
os.path.dirname(os.path.abspath(model_path)),
f"vis_{args.n_sheep}s")
os.makedirs(out_dir, exist_ok=True)
print(f"Loading model: {model_path}")
print(f"Loading vecnorm: {vn_path}")
model = PPO.load(model_path, device="cpu")
raw = DummyVecEnv([make_eval_env(args.n_sheep, args.seed, args.max_steps, rcfg)])
vn = VecNormalize.load(vn_path, raw)
print(f"Rolling out n_sheep={args.n_sheep} (seed={args.seed})...")
hist = run_and_record(model, vn, args.n_sheep, args.max_steps,
reward_cfg=rcfg, seed=args.seed)
result = "SUCCESS" if hist["success"] else f"FAIL ({hist['n_penned']}/{hist['n_sheep']})"
print(f" {result} in {hist['steps']} steps")
plot_trajectory(hist, os.path.join(out_dir, "trajectory.png"))
plot_timeseries(hist, os.path.join(out_dir, "timeseries.png"))
print(f" saved trajectory.png + timeseries.png to {out_dir}/")
if not args.no_gif:
gif_path = os.path.join(out_dir, "episode.gif")
print(f" rendering GIF (fps={args.gif_fps}, skip={args.gif_skip})...")
save_episode_gif(hist, gif_path, fps=args.gif_fps, skip=args.gif_skip)
print(f" saved {gif_path}")
if __name__ == "__main__":
main()
+9
View File
@@ -0,0 +1,9 @@
Webots Project File version R2025a
perspectives: 000000ff00000000fd00000002000000010000011c00000298fc0200000001fb0000001400540065007800740045006400690074006f00720100000000000002980000003f00ffffff000000030000084300000238fc0100000001fb0000001a0043006f006e0073006f006c00650041006c006c0041006c006c0100000000000008430000006900ffffff000007250000029800000001000000020000000100000008fc00000000
simulationViewPerspectives: 000000ff000000010000000200000100000006250100000002010000000100
sceneTreePerspectives: 000000ff00000001000000030000001f000000c0000000000100000002010000000200
maximizedDockId: -1
centralWidgetVisible: 1
orthographicViewHeight: 1
textFiles: -1
consoles: Console:All:All
+10 -10
View File
@@ -518,13 +518,13 @@ Sheep {
name "sheep3" name "sheep3"
controller "sheep" controller "sheep"
} }
Sheep { # Sheep {
translation 3.5 1 0.5 # translation 3.5 1 0.5
name "sheep4" # name "sheep4"
controller "sheep" # controller "sheep"
} # }
Sheep { # Sheep {
translation 3.5 -1 0.5 # translation 3.5 -1 0.5
name "sheep5" # name "sheep5"
controller "sheep" # controller "sheep"
} # }