A more classical approach

This commit is contained in:
Johnny Fernandes
2026-04-23 11:51:52 +01:00
parent f9c5093211
commit ffbfaa3977
2 changed files with 101 additions and 92 deletions
+4 -5
View File
@@ -94,9 +94,8 @@ def main():
# Access the underlying HerdingEnv for dispersion calculation
inner = env.envs[0] if hasattr(env, "envs") else env.venv.envs[0]
if not inner.penned[:inner.n_sheep].all():
ep_dispersion.append(
pairwise_mean(inner.sheep_pos, inner.n_sheep)
)
_, radius, _ = inner._flock_stats()
ep_dispersion.append(radius)
if first_ep and render_mode == "human":
pass # render() is called inside step()
@@ -134,8 +133,8 @@ def main():
f" ({sum(successes)}/{args.episodes})")
print(f" Time-to-pen : {mean_ttp:.1f} steps/sheep"
f" (successful episodes only)")
print(f" Flock dispersion: {mean_disp:.2f} m"
f" (mean pairwise distance while active)")
print(f" Flock radius : {mean_disp:.2f} m"
f" (max sheep-to-COM distance while active)")
print("=" * 50)
+97 -87
View File
@@ -10,10 +10,13 @@ Coordinate system matches the Webots world file:
field : x ∈ [-15, 15], y ∈ [-15, 15]
pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north)
Observation is always sized for MAX_SHEEP (currently 5) regardless of
how many sheep are active. Inactive slots are pre-penned at the pen
centre with flag=1. This keeps the model input dimension fixed across
curriculum stages so VecNormalize statistics are preserved throughout.
Observation (13-dim, fixed regardless of n_sheep):
dog position (2), flock COM relative to dog (2), farthest active sheep
relative to dog (2), pen relative to COM (2), pen relative to farthest
sheep (2), flock radius (1), mean dispersion (1), fraction penned (1).
Permutation-invariant by design: curriculum stages share the same obs dim
so VecNormalize statistics transfer as n_sheep advances.
"""
import numpy as np
@@ -27,16 +30,14 @@ class HerdingEnv(gym.Env):
# -----------------------------------------------------------------------
# World constants — must match Webots world file
# -----------------------------------------------------------------------
MAX_SHEEP = 5
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
PEN_X = (10.0, 13.0) # quarantine pen x bounds
PEN_Y = (-15.0, -8.0) # quarantine pen y bounds
MAX_SHEEP = 5
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
# -----------------------------------------------------------------------
# Dynamics — calibrated to match Webots robot specs
# wheel radius 0.031 m; sheep FLEE_SPEED 20 rad/s → 0.62 m/s
# wheel radius 0.038 m; dog maxVelocity 70 rad/s → 2.66 m/s
# -----------------------------------------------------------------------
DOG_SPEED = 2.5 # m/s
SHEEP_FLEE_V = 0.65 # m/s
@@ -50,28 +51,27 @@ class HerdingEnv(gym.Env):
WALL_MARGIN = 3.5
# -----------------------------------------------------------------------
# Reward weights
# Reward weights (progress-based potential shaping + sparse bonuses)
# -----------------------------------------------------------------------
W_ALIGN = 0.4 # dense: dog on anti-pen side of each active sheep
W_SHAPING = 0.5 # dense: mean sheep distance to pen
W_APPROACH = 0.1 # dense: dog within flee range of nearest sheep
W_PEN_BONUS = 5.0 # sparse: per sheep successfully penned
W_COMPLETE = 20.0 # bonus when ALL active sheep are penned
W_STEP_COST = 0.002 # penalty per step (encourages efficiency)
W_DRIVE = 2.0 # flock COM moved toward pen (per metre, per step)
W_COLLECT = 1.0 # flock radius shrank (per metre, per step)
W_PEN_BONUS = 5.0 # per sheep penned
W_COMPLETE = 20.0 # all sheep penned
W_STEP_COST = 0.002 # time penalty
def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
render_mode: str = None):
super().__init__()
assert 1 <= n_sheep <= self.MAX_SHEEP
self.n_sheep = n_sheep
self.max_steps = max_steps
self.n_sheep = n_sheep
self.max_steps = max_steps
self.render_mode = render_mode
# Observation: dog(x,y) + MAX_SHEEP×sheep(x,y) + MAX_SHEEP×penned
# Fixed size across all curriculum stages.
obs_dim = 2 + 2 * self.MAX_SHEEP + self.MAX_SHEEP
# Fixed 13-dim observation regardless of n_sheep:
# dog_pos(2) + rel_com(2) + rel_far(2) + com_to_pen(2)
# + far_to_pen(2) + radius(1) + mean_disp(1) + frac_penned(1)
self.observation_space = spaces.Box(
low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32
low=-np.inf, high=np.inf, shape=(13,), dtype=np.float32
)
# Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED
@@ -82,12 +82,14 @@ class HerdingEnv(gym.Env):
# Runtime state (populated by reset)
self._step_count = 0
self._prev_penned = 0
self._prev_com_dist = 0.0 # COM-to-pen distance at previous step
self._prev_radius = 0.0 # flock radius at previous step
self.dog_pos = np.zeros(2, dtype=np.float32)
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._fig = None # lazy matplotlib figure
self._fig = None
# ------------------------------------------------------------------
# Curriculum interface
@@ -119,16 +121,14 @@ class HerdingEnv(gym.Env):
self.penned[placed] = False
placed += 1
# Dog: 50 % of the time start already on the anti-pen side of the
# nearest sheep (within flee range) so early training gets aligned
# starts; the other 50 % is fully random to ensure generalisation.
# Dog: 50% of resets start already behind the flock (anti-pen side,
# within flee range) to give early training aligned experiences.
if self.np_random.random() < 0.5:
# Place dog behind the first active sheep relative to the pen
ref = self.sheep_pos[0]
away = ref - self.PEN_CENTER # sheep→anti-pen
dist = float(np.linalg.norm(away))
if dist > 0.1:
away = away / dist
ref = self.sheep_pos[0]
away = ref - self.PEN_CENTER
d = float(np.linalg.norm(away))
if d > 0.1:
away = away / d
offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8)
self.dog_pos = np.clip(
(ref + offset).astype(np.float32), -self.FIELD, self.FIELD
@@ -138,25 +138,26 @@ class HerdingEnv(gym.Env):
-self.FIELD * 0.8, self.FIELD * 0.8, size=(2,)
).astype(np.float32)
# Inactive slots (n_sheep .. MAX_SHEEP-1): already at pen centre, penned=True
self.wander_ang = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
# Initialise previous-step values for progress rewards
com, radius, _ = self._flock_stats()
self._prev_com_dist = float(np.linalg.norm(com - self.PEN_CENTER))
self._prev_radius = radius
return self._obs(), {}
def step(self, action):
self._step_count += 1
# Move dog — clip each axis independently so the agent can idle
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
self.dog_pos = np.clip(
self.dog_pos + act * self.DOG_SPEED * self.DT,
-self.FIELD, self.FIELD
)
# Step sheep dynamics
for i in range(self.n_sheep):
if self.penned[i]:
continue
@@ -188,16 +189,12 @@ class HerdingEnv(gym.Env):
ax = self._ax
ax.clear()
ax.set_xlim(-16, 16)
ax.set_ylim(-16, 16)
ax.set_aspect("equal")
ax.set_facecolor("#dcedc8")
ax.set_xlim(-16, 16); ax.set_ylim(-16, 16)
ax.set_aspect("equal"); ax.set_facecolor("#dcedc8")
# Field boundary
ax.add_patch(mpatches.Rectangle(
(-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2
))
# Pen
pw = self.PEN_X[1] - self.PEN_X[0]
ph = self.PEN_Y[1] - self.PEN_Y[0]
ax.add_patch(mpatches.Rectangle(
@@ -207,21 +204,25 @@ class HerdingEnv(gym.Env):
ax.text(11.5, -11.5, "pen", ha="center", va="center",
fontsize=8, color="#795548")
# Sheep
for i in range(self.MAX_SHEEP):
com, radius, _ = self._flock_stats()
ax.add_patch(plt.Circle(com, radius, color="steelblue",
fill=False, linestyle="--", linewidth=1))
ax.plot(*com, "+", color="steelblue", markersize=10)
for i in range(self.n_sheep):
if i >= self.n_sheep:
continue # inactive slot — not shown
continue
color = "deeppink" if self.penned[i] else "white"
ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11,
markeredgecolor="#555", markeredgewidth=1.5)
# Dog
ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13,
markeredgecolor="black", markeredgewidth=1.5)
ax.set_title(
f"step {self._step_count} | "
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep}",
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | "
f"r={radius:.1f}m",
fontsize=11
)
self._fig.canvas.draw()
@@ -242,49 +243,58 @@ class HerdingEnv(gym.Env):
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
def _flock_stats(self):
"""Return (COM, radius, mean_dispersion) over active sheep."""
active_mask = ~self.penned[:self.n_sheep]
if not active_mask.any():
return self.PEN_CENTER.copy(), 0.0, 0.0
pts = self.sheep_pos[:self.n_sheep][active_mask]
com = pts.mean(axis=0)
dists = np.linalg.norm(pts - com, axis=1)
return com, float(dists.max()), float(dists.mean())
def _obs(self) -> np.ndarray:
scale = 1.0 / self.FIELD
return np.concatenate([
self.dog_pos * scale, # 2
(self.sheep_pos * scale).flatten(), # 2 * MAX_SHEEP
self.penned.astype(np.float32), # MAX_SHEEP
]).astype(np.float32)
com, radius, mean_disp = self._flock_stats()
active_mask = ~self.penned[:self.n_sheep]
# Farthest active sheep from COM (outlier the dog needs to chase)
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
idx = int(np.argmax(np.linalg.norm(pts - com, axis=1)))
far = pts[idx]
else:
far = self.PEN_CENTER.copy()
S = self.FIELD # normalisation scale for positions
D = 2 * self.FIELD # for relative vectors that can span the whole field
return np.array([
self.dog_pos[0] / S, self.dog_pos[1] / S, # dog abs pos
(com[0] - self.dog_pos[0]) / D, # COM relative to dog
(com[1] - self.dog_pos[1]) / D,
(far[0] - self.dog_pos[0]) / D, # farthest relative to dog
(far[1] - self.dog_pos[1]) / D,
(self.PEN_CENTER[0] - com[0]) / D, # COM to pen
(self.PEN_CENTER[1] - com[1]) / D,
(self.PEN_CENTER[0] - far[0]) / D, # farthest to pen
(self.PEN_CENTER[1] - far[1]) / D,
radius / D, # flock compactness
mean_disp / D, # mean spread
active_mask.sum() / self.n_sheep, # fraction still active
], dtype=np.float32)
def _reward(self, n_penned: int, newly_penned: int) -> float:
active_mask = ~self.penned[:self.n_sheep]
if active_mask.any():
active_pos = self.sheep_pos[:self.n_sheep][active_mask]
dists_pen = np.linalg.norm(active_pos - self.PEN_CENTER, axis=1)
dists_dog = np.linalg.norm(active_pos - self.dog_pos, axis=1)
com, radius, _ = self._flock_stats()
com_dist = float(np.linalg.norm(com - self.PEN_CENTER))
# Sheep-to-pen shaping
shaping = -(dists_pen.mean() / (2 * self.FIELD))
# Progress rewards: positive when flock moves toward pen or compacts
drive_progress = (self._prev_com_dist - com_dist) * self.W_DRIVE
collect_progress = (self._prev_radius - radius) * self.W_COLLECT
# Approach: dog penalised for being far from nearest sheep
approach = -(dists_dog.min() / (2 * self.FIELD))
self._prev_com_dist = com_dist
self._prev_radius = radius
# Alignment: reward dog for being on the anti-pen side of each sheep.
# When the dog is opposite the pen relative to a sheep, that sheep
# flees toward the pen. Score ∈ [-1, 1] per sheep, weighted by
# a proximity gate so only nearby dogs count.
align_scores = []
for s_pos, d_pen, d_dog in zip(active_pos, dists_pen, dists_dog):
if d_pen < 0.1 or d_dog < 0.1:
continue
pen_dir = (self.PEN_CENTER - s_pos) / d_pen # sheep → pen
dog_dir = (self.dog_pos - s_pos) / d_dog # sheep → dog
# cos(angle): +1 → dog behind sheep, -1 → dog on pen side
cosine = -float(np.dot(pen_dir, dog_dir))
# gate: full credit inside flee range, fades beyond
proximity = max(0.0, 1.0 - d_dog / self.FLEE_DIST)
align_scores.append(cosine * proximity)
alignment = float(np.mean(align_scores)) if align_scores else 0.0
else:
shaping = approach = alignment = 0.0
reward = shaping * self.W_SHAPING
reward += approach * self.W_APPROACH
reward += alignment * self.W_ALIGN
reward = drive_progress + collect_progress
reward += newly_penned * self.W_PEN_BONUS
reward -= self.W_STEP_COST
if n_penned == self.n_sheep:
@@ -292,12 +302,12 @@ class HerdingEnv(gym.Env):
return reward
def _step_sheep(self, i: int) -> np.ndarray:
"""Apply one timestep of boid dynamics to sheep i."""
"""Apply one timestep of boid dynamics to sheep i (mirrors sheep.py)."""
pos = self.sheep_pos[i].copy()
fx, fy = 0.0, 0.0
fleeing = False
# Flee from dog — quadratic ramp (mirrors sheep.py)
# Flee from dog — quadratic ramp
diff = self.dog_pos - pos
dist = float(np.linalg.norm(diff))
if 0.01 < dist < self.FLEE_DIST: