Files
TIR_PROJ/training/herding_env.py
T
2026-04-23 19:22:39 +01:00

372 lines
14 KiB
Python

"""
2D herding environment for PPO training (Gymnasium-compatible).
The dog agent (action: 2D velocity vector) must herd n_sheep into the
quarantine pen. Sheep dynamics mirror the Webots controller exactly:
flee (quadratic ramp), separation (inverse-distance), cohesion, wall
avoidance, and wander.
Coordinate system matches the Webots world file:
field : x ∈ [-15, 15], y ∈ [-15, 15]
pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north)
Observation (13-dim, fixed regardless of n_sheep):
dog position (2), flock COM relative to dog (2), farthest active sheep
relative to dog (2), pen relative to COM (2), pen relative to farthest
sheep (2), flock radius (1), mean dispersion (1), fraction penned (1).
Permutation-invariant by design: curriculum stages share the same obs dim
so VecNormalize statistics transfer as n_sheep advances.
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
class HerdingEnv(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
# -----------------------------------------------------------------------
# World constants — must match Webots world file
# -----------------------------------------------------------------------
MAX_SHEEP = 10
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
# -----------------------------------------------------------------------
# Dynamics — calibrated to match Webots robot specs
# -----------------------------------------------------------------------
DOG_SPEED = 2.5 # m/s
SHEEP_FLEE_V = 0.65 # m/s
SHEEP_WANDER_V = 0.20 # m/s
DT = 0.1 # seconds per step
# Boid parameters — identical to sheep.py
FLEE_DIST = 7.0
SEPARATION_DIST = 2.5
COHESION_DIST = 8.0
WALL_MARGIN = 3.5
# -----------------------------------------------------------------------
# Reward weights (progress-based potential shaping + sparse bonuses)
# -----------------------------------------------------------------------
W_DRIVE = 2.0 # flock COM moved toward pen (per metre, per step)
W_COLLECT = 1.0 # flock radius shrank (per metre, per step)
W_PEN_BONUS = 5.0 # per sheep penned
W_COMPLETE = 20.0 # all sheep penned
W_STEP_COST = 0.002 # time penalty
def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
render_mode: str = None):
super().__init__()
assert 1 <= n_sheep <= self.MAX_SHEEP
self.n_sheep = n_sheep
self.max_steps = max_steps
self.render_mode = render_mode
# Fixed 13-dim observation regardless of n_sheep:
# dog_pos(2) + rel_com(2) + rel_far(2) + com_to_pen(2)
# + far_to_pen(2) + radius(1) + mean_disp(1) + frac_penned(1)
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(13,), dtype=np.float32
)
# Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED
self.action_space = spaces.Box(
low=-1.0, high=1.0, shape=(2,), dtype=np.float32
)
# Runtime state (populated by reset)
self._step_count = 0
self._prev_penned = 0
self._prev_com_dist = 0.0 # COM-to-pen distance at previous step
self._prev_radius = 0.0 # flock radius at previous step
self.dog_pos = np.zeros(2, dtype=np.float32)
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._fig = None
# ------------------------------------------------------------------
# Curriculum interface
# ------------------------------------------------------------------
def set_n_sheep(self, n: int):
"""Advance curriculum difficulty; takes effect on next reset()."""
assert 1 <= n <= self.MAX_SHEEP
self.n_sheep = n
# ------------------------------------------------------------------
# Gymnasium API
# ------------------------------------------------------------------
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._step_count = 0
self._prev_penned = 0
# Active sheep (0 .. n_sheep-1): random non-pen positions
self.sheep_pos[:] = self.PEN_CENTER
self.penned[:] = True
placed = 0
while placed < self.n_sheep:
p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32)
if not self._in_pen(p):
self.sheep_pos[placed] = p
self.penned[placed] = False
placed += 1
# Dog: 50% of resets start already behind the flock (anti-pen side,
# within flee range) to give early training aligned experiences.
if self.np_random.random() < 0.5:
ref = self.sheep_pos[0]
away = ref - self.PEN_CENTER
d = float(np.linalg.norm(away))
if d > 0.1:
away = away / d
offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8)
self.dog_pos = np.clip(
(ref + offset).astype(np.float32), -self.FIELD, self.FIELD
)
else:
self.dog_pos = self.np_random.uniform(
-self.FIELD * 0.8, self.FIELD * 0.8, size=(2,)
).astype(np.float32)
self.wander_ang = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
# Initialise previous-step values for progress rewards
com, radius, _ = self._flock_stats()
self._prev_com_dist = float(np.linalg.norm(com - self.PEN_CENTER))
self._prev_radius = radius
return self._obs(), {}
def step(self, action):
self._step_count += 1
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
self.dog_pos = np.clip(
self.dog_pos + act * self.DOG_SPEED * self.DT,
-self.FIELD, self.FIELD
)
for i in range(self.n_sheep):
if self.penned[i]:
continue
self.sheep_pos[i] = self._step_sheep(i)
if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True
n_penned = int(self.penned[:self.n_sheep].sum())
newly_penned = n_penned - self._prev_penned
self._prev_penned = n_penned
reward = self._reward(n_penned, newly_penned)
terminated = n_penned == self.n_sheep
truncated = self._step_count >= self.max_steps
info = {"n_penned": n_penned, "n_sheep": self.n_sheep}
if self.render_mode == "human":
self.render()
return self._obs(), float(reward), terminated, truncated, info
def render(self):
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
if self._fig is None:
plt.ion()
self._fig, self._ax = plt.subplots(figsize=(6, 6))
ax = self._ax
ax.clear()
ax.set_xlim(-16, 16); ax.set_ylim(-16, 16)
ax.set_aspect("equal"); ax.set_facecolor("#dcedc8")
ax.add_patch(mpatches.Rectangle(
(-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2
))
pw = self.PEN_X[1] - self.PEN_X[0]
ph = self.PEN_Y[1] - self.PEN_Y[0]
ax.add_patch(mpatches.Rectangle(
(self.PEN_X[0], self.PEN_Y[0]), pw, ph,
facecolor="#ffe082", edgecolor="#795548", linewidth=2
))
ax.text(11.5, -11.5, "pen", ha="center", va="center",
fontsize=8, color="#795548")
com, radius, _ = self._flock_stats()
ax.add_patch(plt.Circle(com, radius, color="steelblue",
fill=False, linestyle="--", linewidth=1))
ax.plot(*com, "+", color="steelblue", markersize=10)
for i in range(self.n_sheep):
if i >= self.n_sheep:
continue
color = "deeppink" if self.penned[i] else "white"
ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11,
markeredgecolor="#555", markeredgewidth=1.5)
ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13,
markeredgecolor="black", markeredgewidth=1.5)
ax.set_title(
f"step {self._step_count} | "
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | "
f"r={radius:.1f}m",
fontsize=11
)
self._fig.canvas.draw()
self._fig.canvas.flush_events()
plt.pause(0.001)
def close(self):
if self._fig is not None:
import matplotlib.pyplot as plt
plt.close(self._fig)
self._fig = None
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _in_pen(self, pos: np.ndarray) -> bool:
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
def _flock_stats(self):
"""Return (COM, radius, mean_dispersion) over active sheep."""
active_mask = ~self.penned[:self.n_sheep]
if not active_mask.any():
return self.PEN_CENTER.copy(), 0.0, 0.0
pts = self.sheep_pos[:self.n_sheep][active_mask]
com = pts.mean(axis=0)
dists = np.linalg.norm(pts - com, axis=1)
return com, float(dists.max()), float(dists.mean())
def _obs(self) -> np.ndarray:
com, radius, mean_disp = self._flock_stats()
active_mask = ~self.penned[:self.n_sheep]
# Farthest active sheep from COM (outlier the dog needs to chase)
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
idx = int(np.argmax(np.linalg.norm(pts - com, axis=1)))
far = pts[idx]
else:
far = self.PEN_CENTER.copy()
S = self.FIELD # normalisation scale for positions
D = 2 * self.FIELD # for relative vectors that can span the whole field
return np.array([
self.dog_pos[0] / S, self.dog_pos[1] / S, # dog abs pos
(com[0] - self.dog_pos[0]) / D, # COM relative to dog
(com[1] - self.dog_pos[1]) / D,
(far[0] - self.dog_pos[0]) / D, # farthest relative to dog
(far[1] - self.dog_pos[1]) / D,
(self.PEN_CENTER[0] - com[0]) / D, # COM to pen
(self.PEN_CENTER[1] - com[1]) / D,
(self.PEN_CENTER[0] - far[0]) / D, # farthest to pen
(self.PEN_CENTER[1] - far[1]) / D,
radius / D, # flock compactness
mean_disp / D, # mean spread
active_mask.sum() / self.n_sheep, # fraction still active
], dtype=np.float32)
def _reward(self, n_penned: int, newly_penned: int) -> float:
com, radius, _ = self._flock_stats()
com_dist = float(np.linalg.norm(com - self.PEN_CENTER))
# Progress rewards: positive when flock moves toward pen or compacts
drive_progress = (self._prev_com_dist - com_dist) * self.W_DRIVE
collect_progress = (self._prev_radius - radius) * self.W_COLLECT
self._prev_com_dist = com_dist
self._prev_radius = radius
reward = drive_progress + collect_progress
reward += newly_penned * self.W_PEN_BONUS
reward -= self.W_STEP_COST
if n_penned == self.n_sheep:
reward += self.W_COMPLETE
return reward
def _step_sheep(self, i: int) -> np.ndarray:
"""Apply one timestep of boid dynamics to sheep i (mirrors sheep.py)."""
pos = self.sheep_pos[i].copy()
fx, fy = 0.0, 0.0
fleeing = False
# Flee from dog — quadratic ramp
diff = self.dog_pos - pos
dist = float(np.linalg.norm(diff))
if 0.01 < dist < self.FLEE_DIST:
t = 1.0 - dist / self.FLEE_DIST
s = t * t * 5.0
fx -= (diff[0] / dist) * s
fy -= (diff[1] / dist) * s
fleeing = True
# Separation (inverse-distance) + Cohesion
cx, cy, cn = 0.0, 0.0, 0
for j in range(self.n_sheep):
if j == i or self.penned[j]:
continue
dv = self.sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.3 < dj < self.COHESION_DIST:
cx += self.sheep_pos[j][0]
cy += self.sheep_pos[j][1]
cn += 1
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
fx -= (dv[0] / dj) * push * 2.5
fy -= (dv[1] / dj) * push * 2.5
if cn > 0:
w = 0.08 if fleeing else 0.15
fx += (cx / cn - pos[0]) * w
fy += (cy / cn - pos[1]) * w
# Wall avoidance
m, F = self.WALL_MARGIN, self.FIELD
if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0
if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0
if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0
if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0
# Hard-stop clamp: mirrors sheep.py — zero any force driving further
# into the wall within 0.5 m so the flee force cannot pin the sheep.
HS = 0.5
if pos[0] < -F + HS and fx < 0: fx = 0.0
if pos[0] > F - HS and fx > 0: fx = 0.0
if pos[1] < -F + HS and fy < 0: fy = 0.0
if pos[1] > F - HS and fy > 0: fy = 0.0
# Wander — suppressed while fleeing
if not fleeing:
if self.np_random.random() < 0.02:
self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6))
fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5
# Integrate
force = np.array([fx, fy])
mag = float(np.linalg.norm(force))
if mag > 0.01:
top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V
speed = min(top_speed, mag * 0.3)
pos = np.clip(pos + (force / mag) * speed * self.DT,
-self.FIELD, self.FIELD)
return pos.astype(np.float32)