Files
TIR_PROJ/training/herding_env.py
T
2026-04-24 15:55:15 +01:00

424 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
2D herding environment for PPO training (Gymnasium-compatible).
The dog agent (action: 2D velocity vector) must herd n_sheep into the
quarantine pen. Sheep dynamics mirror the Webots controller exactly:
flee (quadratic ramp), separation (inverse-distance), cohesion, wall
avoidance, and wander.
Coordinate system matches the Webots world file:
field : x ∈ [-15, 15], y ∈ [-15, 15]
pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north)
Observation (16-dim, fixed regardless of n_sheep):
dog position (2), flock COM relative to dog (2), top-3 farthest active
sheep relative to dog (6), pen relative to COM (2), pen relative to
farthest sheep (2), flock radius (1), fraction penned (1).
Permutation-invariant by design: curriculum stages share the same obs dim
so VecNormalize statistics transfer as n_sheep advances.
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
class HerdingEnv(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
# -----------------------------------------------------------------------
# World constants — must match Webots world file
# -----------------------------------------------------------------------
MAX_SHEEP = 10
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
# -----------------------------------------------------------------------
# Dynamics — calibrated to match Webots robot specs
# -----------------------------------------------------------------------
DOG_SPEED = 2.5 # m/s
SHEEP_FLEE_V = 0.65 # m/s
SHEEP_WANDER_V = 0.20 # m/s
DT = 0.1 # seconds per step
# Boid parameters — identical to sheep.py
FLEE_DIST = 7.0
SEPARATION_DIST = 2.5
COHESION_DIST = 8.0
WALL_MARGIN = 3.5
# -----------------------------------------------------------------------
# Reward weights (two-phase: collect first, then drive)
# -----------------------------------------------------------------------
W_DRIVE = 2.0 # progress: COM moved toward pen (only when compact)
W_COLLECT = 4.0 # progress: radius shrank (2× stronger when scattered)
W_APPROACH_FAR = 1.0 # progress: dog moved toward farthest straggler (scatter only)
W_ALIGN = 0.5 # position: dog on anti-pen side of COM (compact only)
W_PEN_BONUS = 10.0 # per sheep penned
W_COMPLETE = 100.0 # all sheep penned
W_STEP_COST = 0.002 # time penalty
DRIVE_GATE_RADIUS = 5.0 # flock must compact below this (m) before drive reward fires
def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
render_mode: str = None, random_n_sheep: bool = False):
super().__init__()
assert 1 <= n_sheep <= self.MAX_SHEEP
self.n_sheep = n_sheep
self.max_steps = max_steps
self.render_mode = render_mode
self.random_n_sheep = random_n_sheep # if True, randomise n_sheep each reset
# Fixed 16-dim observation regardless of n_sheep:
# dog_pos(2) + rel_com(2) + rel_far1(2) + rel_far2(2) + rel_far3(2)
# + com_to_pen(2) + far1_to_pen(2) + radius(1) + frac_penned(1)
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(16,), dtype=np.float32
)
# Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED
self.action_space = spaces.Box(
low=-1.0, high=1.0, shape=(2,), dtype=np.float32
)
# Runtime state (populated by reset)
self._step_count = 0
self._prev_penned = 0
self._prev_com_dist = 0.0
self._prev_radius = 0.0
self._prev_dog_to_far1 = 0.0
self.dog_pos = np.zeros(2, dtype=np.float32)
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._fig = None
# ------------------------------------------------------------------
# Curriculum interface
# ------------------------------------------------------------------
def set_n_sheep(self, n: int):
"""Advance curriculum difficulty; takes effect on next reset()."""
assert 1 <= n <= self.MAX_SHEEP
self.n_sheep = n
# ------------------------------------------------------------------
# Gymnasium API
# ------------------------------------------------------------------
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._step_count = 0
self._prev_penned = 0
if self.random_n_sheep:
self.n_sheep = int(self.np_random.integers(1, self.MAX_SHEEP + 1))
# Active sheep (0 .. n_sheep-1): random non-pen positions
self.sheep_pos[:] = self.PEN_CENTER
self.penned[:] = True
placed = 0
while placed < self.n_sheep:
p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32)
if not self._in_pen(p):
self.sheep_pos[placed] = p
self.penned[placed] = False
placed += 1
# Dog: 50% of resets start already behind the flock (anti-pen side,
# within flee range) to give early training aligned experiences.
if self.np_random.random() < 0.5:
ref = self.sheep_pos[0]
away = ref - self.PEN_CENTER
d = float(np.linalg.norm(away))
if d > 0.1:
away = away / d
offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8)
self.dog_pos = np.clip(
(ref + offset).astype(np.float32), -self.FIELD, self.FIELD
)
else:
self.dog_pos = self.np_random.uniform(
-self.FIELD * 0.8, self.FIELD * 0.8, size=(2,)
).astype(np.float32)
self.wander_ang = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
# Initialise previous-step values for progress rewards
com, radius, _ = self._flock_stats()
self._prev_com_dist = float(np.linalg.norm(com - self.PEN_CENTER))
self._prev_radius = radius
active_mask = ~self.penned[:self.n_sheep]
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
far1 = pts[int(np.argmax(np.linalg.norm(pts - com, axis=1)))]
self._prev_dog_to_far1 = float(np.linalg.norm(self.dog_pos - far1))
else:
self._prev_dog_to_far1 = 0.0
return self._obs(), {}
def step(self, action):
self._step_count += 1
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
self.dog_pos = np.clip(
self.dog_pos + act * self.DOG_SPEED * self.DT,
-self.FIELD, self.FIELD
)
for i in range(self.n_sheep):
if self.penned[i]:
continue
self.sheep_pos[i] = self._step_sheep(i)
if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True
n_penned = int(self.penned[:self.n_sheep].sum())
newly_penned = n_penned - self._prev_penned
self._prev_penned = n_penned
reward = self._reward(n_penned, newly_penned)
terminated = n_penned == self.n_sheep
truncated = self._step_count >= self.max_steps
info = {"n_penned": n_penned, "n_sheep": self.n_sheep}
if self.render_mode == "human":
self.render()
return self._obs(), float(reward), terminated, truncated, info
def render(self):
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
if self._fig is None:
plt.ion()
self._fig, self._ax = plt.subplots(figsize=(6, 6))
ax = self._ax
ax.clear()
ax.set_xlim(-16, 16); ax.set_ylim(-16, 16)
ax.set_aspect("equal"); ax.set_facecolor("#dcedc8")
ax.add_patch(mpatches.Rectangle(
(-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2
))
pw = self.PEN_X[1] - self.PEN_X[0]
ph = self.PEN_Y[1] - self.PEN_Y[0]
ax.add_patch(mpatches.Rectangle(
(self.PEN_X[0], self.PEN_Y[0]), pw, ph,
facecolor="#ffe082", edgecolor="#795548", linewidth=2
))
ax.text(11.5, -11.5, "pen", ha="center", va="center",
fontsize=8, color="#795548")
com, radius, _ = self._flock_stats()
ax.add_patch(plt.Circle(com, radius, color="steelblue",
fill=False, linestyle="--", linewidth=1))
ax.plot(*com, "+", color="steelblue", markersize=10)
for i in range(self.n_sheep):
if i >= self.n_sheep:
continue
color = "deeppink" if self.penned[i] else "white"
ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11,
markeredgecolor="#555", markeredgewidth=1.5)
ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13,
markeredgecolor="black", markeredgewidth=1.5)
ax.set_title(
f"step {self._step_count} | "
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | "
f"r={radius:.1f}m",
fontsize=11
)
self._fig.canvas.draw()
self._fig.canvas.flush_events()
plt.pause(0.001)
def close(self):
if self._fig is not None:
import matplotlib.pyplot as plt
plt.close(self._fig)
self._fig = None
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _in_pen(self, pos: np.ndarray) -> bool:
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
def _flock_stats(self):
"""Return (COM, radius, mean_dispersion) over active sheep."""
active_mask = ~self.penned[:self.n_sheep]
if not active_mask.any():
return self.PEN_CENTER.copy(), 0.0, 0.0
pts = self.sheep_pos[:self.n_sheep][active_mask]
com = pts.mean(axis=0)
dists = np.linalg.norm(pts - com, axis=1)
return com, float(dists.max()), float(dists.mean())
def _obs(self) -> np.ndarray:
com, radius, _ = self._flock_stats()
active_mask = ~self.penned[:self.n_sheep]
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
dists = np.linalg.norm(pts - com, axis=1)
sorted_idx = np.argsort(dists)[::-1] # farthest first
# Top-3 stragglers; pad with COM when fewer active sheep exist
def nth(n):
return pts[sorted_idx[n]] if len(sorted_idx) > n else com
far1, far2, far3 = nth(0), nth(1), nth(2)
else:
far1 = far2 = far3 = self.PEN_CENTER.copy()
S = self.FIELD
D = 2 * self.FIELD
return np.array([
self.dog_pos[0] / S, self.dog_pos[1] / S,
(com[0] - self.dog_pos[0]) / D, (com[1] - self.dog_pos[1]) / D,
(far1[0] - self.dog_pos[0]) / D, (far1[1] - self.dog_pos[1]) / D,
(far2[0] - self.dog_pos[0]) / D, (far2[1] - self.dog_pos[1]) / D,
(far3[0] - self.dog_pos[0]) / D, (far3[1] - self.dog_pos[1]) / D,
(self.PEN_CENTER[0] - com[0]) / D, (self.PEN_CENTER[1] - com[1]) / D,
(self.PEN_CENTER[0] - far1[0]) / D, (self.PEN_CENTER[1] - far1[1]) / D,
radius / D,
active_mask.sum() / self.n_sheep,
], dtype=np.float32)
def _reward(self, n_penned: int, newly_penned: int) -> float:
com, radius, _ = self._flock_stats()
com_dist = float(np.linalg.norm(com - self.PEN_CENTER))
scattered = radius > self.DRIVE_GATE_RADIUS
drive_delta = self._prev_com_dist - com_dist
collect_delta = self._prev_radius - radius
self._prev_com_dist = com_dist
self._prev_radius = radius
# Collect: always active, 2× stronger when scattered.
r_collect = collect_delta * self.W_COLLECT * (2.0 if scattered else 1.0)
# Drive: only when compact — prevents rewarding COM movement while scattered.
r_drive = 0.0 if scattered else drive_delta * self.W_DRIVE
# Approach-to-straggler: reward dog for closing on farthest sheep.
# Only in scatter phase so it doesn't override drive positioning.
# Gated on there being active sheep.
active_mask = ~self.penned[:self.n_sheep]
if scattered and active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
far1 = pts[int(np.argmax(np.linalg.norm(pts - com, axis=1)))]
cur_dog_to_far1 = float(np.linalg.norm(self.dog_pos - far1))
r_approach = (self._prev_dog_to_far1 - cur_dog_to_far1) * self.W_APPROACH_FAR
self._prev_dog_to_far1 = cur_dog_to_far1
else:
r_approach = 0.0
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
far1 = pts[int(np.argmax(np.linalg.norm(pts - com, axis=1)))]
self._prev_dog_to_far1 = float(np.linalg.norm(self.dog_pos - far1))
# Alignment: dog on anti-pen side of COM — only in drive phase.
# Disabled when scattered: chasing a straggler on the pen side would be
# wrongly penalised otherwise.
d_dog_com = float(np.linalg.norm(self.dog_pos - com))
if not scattered and d_dog_com > 0.1 and com_dist > 0.1:
pen_dir = (self.PEN_CENTER - com) / com_dist
dog_dir = (self.dog_pos - com) / d_dog_com
cosine = -float(np.dot(pen_dir, dog_dir))
proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST)
alignment = cosine * proximity * self.W_ALIGN
else:
alignment = 0.0
reward = r_drive + r_collect + r_approach + alignment
reward += newly_penned * self.W_PEN_BONUS
reward -= self.W_STEP_COST
if n_penned == self.n_sheep:
reward += self.W_COMPLETE
return reward
def _step_sheep(self, i: int) -> np.ndarray:
"""Apply one timestep of boid dynamics to sheep i (mirrors sheep.py)."""
pos = self.sheep_pos[i].copy()
fx, fy = 0.0, 0.0
fleeing = False
# Flee from dog — quadratic ramp
diff = self.dog_pos - pos
dist = float(np.linalg.norm(diff))
if 0.01 < dist < self.FLEE_DIST:
t = 1.0 - dist / self.FLEE_DIST
s = t * t * 5.0
fx -= (diff[0] / dist) * s
fy -= (diff[1] / dist) * s
fleeing = True
# Separation (inverse-distance) + Cohesion
cx, cy, cn = 0.0, 0.0, 0
for j in range(self.n_sheep):
if j == i or self.penned[j]:
continue
dv = self.sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.3 < dj < self.COHESION_DIST:
cx += self.sheep_pos[j][0]
cy += self.sheep_pos[j][1]
cn += 1
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
fx -= (dv[0] / dj) * push * 2.5
fy -= (dv[1] / dj) * push * 2.5
if cn > 0:
w = 0.08 if fleeing else 0.15
fx += (cx / cn - pos[0]) * w
fy += (cy / cn - pos[1]) * w
# Wall avoidance
m, F = self.WALL_MARGIN, self.FIELD
if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0
if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0
if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0
if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0
# Hard-stop clamp: mirrors sheep.py — zero any force driving further
# into the wall within 0.5 m so the flee force cannot pin the sheep.
HS = 0.5
if pos[0] < -F + HS and fx < 0: fx = 0.0
if pos[0] > F - HS and fx > 0: fx = 0.0
if pos[1] < -F + HS and fy < 0: fy = 0.0
if pos[1] > F - HS and fy > 0: fy = 0.0
# Wander — suppressed while fleeing
if not fleeing:
if self.np_random.random() < 0.02:
self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6))
fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5
# Integrate
force = np.array([fx, fy])
mag = float(np.linalg.norm(force))
if mag > 0.01:
top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V
speed = min(top_speed, mag * 0.3)
pos = np.clip(pos + (force / mag) * speed * self.DT,
-self.FIELD, self.FIELD)
return pos.astype(np.float32)