Files
TIR_PROJ/training/herding_env.py
T
2026-04-24 16:12:16 +01:00

448 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
2D herding environment for PPO training (Gymnasium-compatible).
The dog agent (action: 2D velocity vector) must herd n_sheep into the
quarantine pen. Sheep dynamics mirror the Webots controller exactly:
flee (quadratic ramp), separation (inverse-distance), cohesion, wall
avoidance, and wander.
Coordinate system matches the Webots world file:
field : x ∈ [-15, 15], y ∈ [-15, 15]
pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north)
Observation (16-dim, fixed regardless of n_sheep):
dog position (2), flock COM relative to dog (2), top-3 farthest active
sheep relative to dog (6), pen relative to COM (2), pen relative to
farthest sheep (2), flock radius (1), fraction penned (1).
Permutation-invariant by design: curriculum stages share the same obs dim
so VecNormalize statistics transfer as n_sheep advances.
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
class HerdingEnv(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
# -----------------------------------------------------------------------
# World constants — must match Webots world file
# -----------------------------------------------------------------------
MAX_SHEEP = 10
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
# -----------------------------------------------------------------------
# Dynamics — calibrated to match Webots robot specs
# -----------------------------------------------------------------------
DOG_SPEED = 2.5 # m/s
SHEEP_FLEE_V = 0.65 # m/s
SHEEP_WANDER_V = 0.20 # m/s
DT = 0.1 # seconds per step
# Boid parameters — identical to sheep.py
FLEE_DIST = 7.0
SEPARATION_DIST = 2.5
COHESION_DIST = 8.0
WALL_MARGIN = 3.5
# -----------------------------------------------------------------------
# Reward weights (two-phase: collect first, then drive)
# -----------------------------------------------------------------------
W_DRIVE = 2.0 # progress: COM moved toward pen (only when compact)
W_COLLECT = 4.0 # progress: radius shrank (2× stronger when scattered)
W_HERD_POS = 1.5 # progress: dog moved toward ideal herding position behind far1
W_ALIGN = 0.5 # position: dog on anti-pen side of COM (compact only)
W_PEN_BONUS = 10.0 # per sheep penned
W_COMPLETE = 100.0 # all sheep penned
W_STEP_COST = 0.002 # time penalty
DRIVE_GATE_RADIUS = 5.0 # flock must compact below this (m) before drive reward fires
def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
render_mode: str = None, random_n_sheep: bool = False):
super().__init__()
assert 1 <= n_sheep <= self.MAX_SHEEP
self.n_sheep = n_sheep
self.max_steps = max_steps
self.render_mode = render_mode
self.random_n_sheep = random_n_sheep # if True, randomise n_sheep each reset
# Fixed 16-dim observation regardless of n_sheep:
# dog_pos(2) + rel_com(2) + rel_far1(2) + rel_far2(2) + rel_far3(2)
# + com_to_pen(2) + far1_to_pen(2) + radius(1) + frac_penned(1)
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(16,), dtype=np.float32
)
# Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED
self.action_space = spaces.Box(
low=-1.0, high=1.0, shape=(2,), dtype=np.float32
)
# Runtime state (populated by reset)
self._step_count = 0
self._prev_penned = 0
self._prev_com_dist = 0.0
self._prev_radius = 0.0
self._prev_dog_to_ideal = 0.0
self.dog_pos = np.zeros(2, dtype=np.float32)
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._fig = None
# ------------------------------------------------------------------
# Curriculum interface
# ------------------------------------------------------------------
def set_n_sheep(self, n: int):
"""Advance curriculum difficulty; takes effect on next reset()."""
assert 1 <= n <= self.MAX_SHEEP
self.n_sheep = n
# ------------------------------------------------------------------
# Gymnasium API
# ------------------------------------------------------------------
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._step_count = 0
self._prev_penned = 0
if self.random_n_sheep:
self.n_sheep = int(self.np_random.integers(1, self.MAX_SHEEP + 1))
# Active sheep (0 .. n_sheep-1): random non-pen positions
self.sheep_pos[:] = self.PEN_CENTER
self.penned[:] = True
placed = 0
while placed < self.n_sheep:
p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32)
if not self._in_pen(p):
self.sheep_pos[placed] = p
self.penned[placed] = False
placed += 1
# Dog: 50% of resets start already behind the flock (anti-pen side,
# within flee range) to give early training aligned experiences.
if self.np_random.random() < 0.5:
ref = self.sheep_pos[0]
away = ref - self.PEN_CENTER
d = float(np.linalg.norm(away))
if d > 0.1:
away = away / d
offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8)
self.dog_pos = np.clip(
(ref + offset).astype(np.float32), -self.FIELD, self.FIELD
)
else:
self.dog_pos = self.np_random.uniform(
-self.FIELD * 0.8, self.FIELD * 0.8, size=(2,)
).astype(np.float32)
self.wander_ang = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
# Initialise previous-step values for progress rewards
com, radius, _ = self._flock_stats()
self._prev_com_dist = float(np.linalg.norm(com - self.PEN_CENTER))
self._prev_radius = radius
active_mask = ~self.penned[:self.n_sheep]
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
far1 = pts[int(np.argmax(np.linalg.norm(pts - com, axis=1)))]
self._prev_dog_to_ideal = float(
np.linalg.norm(self.dog_pos - self._ideal_herd_pos(com, far1))
)
else:
self._prev_dog_to_ideal = 0.0
return self._obs(), {}
def step(self, action):
self._step_count += 1
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
self.dog_pos = np.clip(
self.dog_pos + act * self.DOG_SPEED * self.DT,
-self.FIELD, self.FIELD
)
for i in range(self.n_sheep):
if self.penned[i]:
continue
self.sheep_pos[i] = self._step_sheep(i)
if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True
n_penned = int(self.penned[:self.n_sheep].sum())
newly_penned = n_penned - self._prev_penned
self._prev_penned = n_penned
reward = self._reward(n_penned, newly_penned)
terminated = n_penned == self.n_sheep
truncated = self._step_count >= self.max_steps
info = {"n_penned": n_penned, "n_sheep": self.n_sheep}
if self.render_mode == "human":
self.render()
return self._obs(), float(reward), terminated, truncated, info
def render(self):
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
if self._fig is None:
plt.ion()
self._fig, self._ax = plt.subplots(figsize=(6, 6))
ax = self._ax
ax.clear()
ax.set_xlim(-16, 16); ax.set_ylim(-16, 16)
ax.set_aspect("equal"); ax.set_facecolor("#dcedc8")
ax.add_patch(mpatches.Rectangle(
(-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2
))
pw = self.PEN_X[1] - self.PEN_X[0]
ph = self.PEN_Y[1] - self.PEN_Y[0]
ax.add_patch(mpatches.Rectangle(
(self.PEN_X[0], self.PEN_Y[0]), pw, ph,
facecolor="#ffe082", edgecolor="#795548", linewidth=2
))
ax.text(11.5, -11.5, "pen", ha="center", va="center",
fontsize=8, color="#795548")
com, radius, _ = self._flock_stats()
ax.add_patch(plt.Circle(com, radius, color="steelblue",
fill=False, linestyle="--", linewidth=1))
ax.plot(*com, "+", color="steelblue", markersize=10)
for i in range(self.n_sheep):
if i >= self.n_sheep:
continue
color = "deeppink" if self.penned[i] else "white"
ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11,
markeredgecolor="#555", markeredgewidth=1.5)
ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13,
markeredgecolor="black", markeredgewidth=1.5)
ax.set_title(
f"step {self._step_count} | "
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | "
f"r={radius:.1f}m",
fontsize=11
)
self._fig.canvas.draw()
self._fig.canvas.flush_events()
plt.pause(0.001)
def close(self):
if self._fig is not None:
import matplotlib.pyplot as plt
plt.close(self._fig)
self._fig = None
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _in_pen(self, pos: np.ndarray) -> bool:
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
def _flock_stats(self):
"""Return (COM, radius, mean_dispersion) over active sheep."""
active_mask = ~self.penned[:self.n_sheep]
if not active_mask.any():
return self.PEN_CENTER.copy(), 0.0, 0.0
pts = self.sheep_pos[:self.n_sheep][active_mask]
com = pts.mean(axis=0)
dists = np.linalg.norm(pts - com, axis=1)
return com, float(dists.max()), float(dists.mean())
def _obs(self) -> np.ndarray:
com, radius, _ = self._flock_stats()
active_mask = ~self.penned[:self.n_sheep]
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
dists = np.linalg.norm(pts - com, axis=1)
sorted_idx = np.argsort(dists)[::-1] # farthest first
# Top-3 stragglers; pad with COM when fewer active sheep exist
def nth(n):
return pts[sorted_idx[n]] if len(sorted_idx) > n else com
far1, far2, far3 = nth(0), nth(1), nth(2)
else:
far1 = far2 = far3 = self.PEN_CENTER.copy()
S = self.FIELD
D = 2 * self.FIELD
return np.array([
self.dog_pos[0] / S, self.dog_pos[1] / S,
(com[0] - self.dog_pos[0]) / D, (com[1] - self.dog_pos[1]) / D,
(far1[0] - self.dog_pos[0]) / D, (far1[1] - self.dog_pos[1]) / D,
(far2[0] - self.dog_pos[0]) / D, (far2[1] - self.dog_pos[1]) / D,
(far3[0] - self.dog_pos[0]) / D, (far3[1] - self.dog_pos[1]) / D,
(self.PEN_CENTER[0] - com[0]) / D, (self.PEN_CENTER[1] - com[1]) / D,
(self.PEN_CENTER[0] - far1[0]) / D, (self.PEN_CENTER[1] - far1[1]) / D,
radius / D,
active_mask.sum() / self.n_sheep,
], dtype=np.float32)
def _ideal_herd_pos(self, com: np.ndarray, far1: np.ndarray) -> np.ndarray:
"""
Target position for the dog to push far1 toward COM:
just beyond far1 on the outward radial line from COM.
From here, the dog's approach causes far1 to flee inward.
"""
d = far1 - com
d_norm = float(np.linalg.norm(d))
if d_norm > 0.5:
direction = d / d_norm
else:
# Sheep all together — use anti-pen direction instead
to_pen = self.PEN_CENTER - com
tp = float(np.linalg.norm(to_pen))
direction = -(to_pen / tp) if tp > 0.1 else np.array([0.0, -1.0], dtype=np.float32)
target = far1 + direction * self.FLEE_DIST * 0.8
return np.clip(target, -self.FIELD, self.FIELD).astype(np.float32)
def _reward(self, n_penned: int, newly_penned: int) -> float:
com, radius, _ = self._flock_stats()
com_dist = float(np.linalg.norm(com - self.PEN_CENTER))
scattered = radius > self.DRIVE_GATE_RADIUS
drive_delta = self._prev_com_dist - com_dist
collect_delta = self._prev_radius - radius
self._prev_com_dist = com_dist
self._prev_radius = radius
# Collect: always active, 2× stronger when scattered.
r_collect = collect_delta * self.W_COLLECT * (2.0 if scattered else 1.0)
# Drive: only when compact — prevents rewarding COM movement while scattered.
r_drive = 0.0 if scattered else drive_delta * self.W_DRIVE
# Herding-position reward: guides dog to the ideal position BEHIND far1
# (on the outward radial, FLEE_DIST beyond far1 from COM).
# From there, advancing toward COM pushes far1 inward.
# Fires in scatter phase only; gives gradient even during the outward
# navigation arc when raw approach reward would be zero/negative.
active_mask = ~self.penned[:self.n_sheep]
if scattered and active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
far1 = pts[int(np.argmax(np.linalg.norm(pts - com, axis=1)))]
ideal = self._ideal_herd_pos(com, far1)
cur_dog_to_ideal = float(np.linalg.norm(self.dog_pos - ideal))
r_herd_pos = (self._prev_dog_to_ideal - cur_dog_to_ideal) * self.W_HERD_POS
self._prev_dog_to_ideal = cur_dog_to_ideal
else:
r_herd_pos = 0.0
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
far1 = pts[int(np.argmax(np.linalg.norm(pts - com, axis=1)))]
ideal = self._ideal_herd_pos(com, far1)
self._prev_dog_to_ideal = float(np.linalg.norm(self.dog_pos - ideal))
# Alignment: dog on anti-pen side of COM — only in drive phase.
# Disabled when scattered: chasing a straggler on the pen side would be
# wrongly penalised otherwise.
d_dog_com = float(np.linalg.norm(self.dog_pos - com))
if not scattered and d_dog_com > 0.1 and com_dist > 0.1:
pen_dir = (self.PEN_CENTER - com) / com_dist
dog_dir = (self.dog_pos - com) / d_dog_com
cosine = -float(np.dot(pen_dir, dog_dir))
proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST)
alignment = cosine * proximity * self.W_ALIGN
else:
alignment = 0.0
reward = r_drive + r_collect + r_herd_pos + alignment
reward += newly_penned * self.W_PEN_BONUS
reward -= self.W_STEP_COST
if n_penned == self.n_sheep:
reward += self.W_COMPLETE
return reward
def _step_sheep(self, i: int) -> np.ndarray:
"""Apply one timestep of boid dynamics to sheep i (mirrors sheep.py)."""
pos = self.sheep_pos[i].copy()
fx, fy = 0.0, 0.0
fleeing = False
# Flee from dog — quadratic ramp
diff = self.dog_pos - pos
dist = float(np.linalg.norm(diff))
if 0.01 < dist < self.FLEE_DIST:
t = 1.0 - dist / self.FLEE_DIST
s = t * t * 5.0
fx -= (diff[0] / dist) * s
fy -= (diff[1] / dist) * s
fleeing = True
# Separation (inverse-distance) + Cohesion
cx, cy, cn = 0.0, 0.0, 0
for j in range(self.n_sheep):
if j == i or self.penned[j]:
continue
dv = self.sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.3 < dj < self.COHESION_DIST:
cx += self.sheep_pos[j][0]
cy += self.sheep_pos[j][1]
cn += 1
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
fx -= (dv[0] / dj) * push * 2.5
fy -= (dv[1] / dj) * push * 2.5
if cn > 0:
w = 0.08 if fleeing else 0.15
fx += (cx / cn - pos[0]) * w
fy += (cy / cn - pos[1]) * w
# Wall avoidance
m, F = self.WALL_MARGIN, self.FIELD
if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0
if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0
if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0
if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0
# Hard-stop clamp: mirrors sheep.py — zero any force driving further
# into the wall within 0.5 m so the flee force cannot pin the sheep.
HS = 0.5
if pos[0] < -F + HS and fx < 0: fx = 0.0
if pos[0] > F - HS and fx > 0: fx = 0.0
if pos[1] < -F + HS and fy < 0: fy = 0.0
if pos[1] > F - HS and fy > 0: fy = 0.0
# Wander — suppressed while fleeing
if not fleeing:
if self.np_random.random() < 0.02:
self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6))
fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5
# Integrate
force = np.array([fx, fy])
mag = float(np.linalg.norm(force))
if mag > 0.01:
top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V
speed = min(top_speed, mag * 0.3)
pos = np.clip(pos + (force / mag) * speed * self.DT,
-self.FIELD, self.FIELD)
return pos.astype(np.float32)