Files
2026-04-23 11:35:15 +01:00

354 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
2D herding environment for PPO training (Gymnasium-compatible).
The dog agent (action: 2D velocity vector) must herd n_sheep into the
quarantine pen. Sheep dynamics mirror the Webots controller exactly:
flee (quadratic ramp), separation (inverse-distance), cohesion, wall
avoidance, and wander.
Coordinate system matches the Webots world file:
field : x ∈ [-15, 15], y ∈ [-15, 15]
pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north)
Observation is always sized for MAX_SHEEP (currently 5) regardless of
how many sheep are active. Inactive slots are pre-penned at the pen
centre with flag=1. This keeps the model input dimension fixed across
curriculum stages so VecNormalize statistics are preserved throughout.
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
class HerdingEnv(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
# -----------------------------------------------------------------------
# World constants — must match Webots world file
# -----------------------------------------------------------------------
MAX_SHEEP = 5
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
PEN_X = (10.0, 13.0) # quarantine pen x bounds
PEN_Y = (-15.0, -8.0) # quarantine pen y bounds
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
# -----------------------------------------------------------------------
# Dynamics — calibrated to match Webots robot specs
# wheel radius 0.031 m; sheep FLEE_SPEED 20 rad/s → 0.62 m/s
# wheel radius 0.038 m; dog maxVelocity 70 rad/s → 2.66 m/s
# -----------------------------------------------------------------------
DOG_SPEED = 2.5 # m/s
SHEEP_FLEE_V = 0.65 # m/s
SHEEP_WANDER_V = 0.20 # m/s
DT = 0.1 # seconds per step
# Boid parameters — identical to sheep.py
FLEE_DIST = 7.0
SEPARATION_DIST = 2.5
COHESION_DIST = 8.0
WALL_MARGIN = 3.5
# -----------------------------------------------------------------------
# Reward weights
# -----------------------------------------------------------------------
W_ALIGN = 0.4 # dense: dog on anti-pen side of each active sheep
W_SHAPING = 0.5 # dense: mean sheep distance to pen
W_APPROACH = 0.1 # dense: dog within flee range of nearest sheep
W_PEN_BONUS = 5.0 # sparse: per sheep successfully penned
W_COMPLETE = 20.0 # bonus when ALL active sheep are penned
W_STEP_COST = 0.002 # penalty per step (encourages efficiency)
def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
render_mode: str = None):
super().__init__()
assert 1 <= n_sheep <= self.MAX_SHEEP
self.n_sheep = n_sheep
self.max_steps = max_steps
self.render_mode = render_mode
# Observation: dog(x,y) + MAX_SHEEP×sheep(x,y) + MAX_SHEEP×penned
# Fixed size across all curriculum stages.
obs_dim = 2 + 2 * self.MAX_SHEEP + self.MAX_SHEEP
self.observation_space = spaces.Box(
low=-1.0, high=1.0, shape=(obs_dim,), dtype=np.float32
)
# Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED
self.action_space = spaces.Box(
low=-1.0, high=1.0, shape=(2,), dtype=np.float32
)
# Runtime state (populated by reset)
self._step_count = 0
self._prev_penned = 0
self.dog_pos = np.zeros(2, dtype=np.float32)
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._fig = None # lazy matplotlib figure
# ------------------------------------------------------------------
# Curriculum interface
# ------------------------------------------------------------------
def set_n_sheep(self, n: int):
"""Advance curriculum difficulty; takes effect on next reset()."""
assert 1 <= n <= self.MAX_SHEEP
self.n_sheep = n
# ------------------------------------------------------------------
# Gymnasium API
# ------------------------------------------------------------------
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._step_count = 0
self._prev_penned = 0
# Active sheep (0 .. n_sheep-1): random non-pen positions
self.sheep_pos[:] = self.PEN_CENTER
self.penned[:] = True
placed = 0
while placed < self.n_sheep:
p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32)
if not self._in_pen(p):
self.sheep_pos[placed] = p
self.penned[placed] = False
placed += 1
# Dog: 50 % of the time start already on the anti-pen side of the
# nearest sheep (within flee range) so early training gets aligned
# starts; the other 50 % is fully random to ensure generalisation.
if self.np_random.random() < 0.5:
# Place dog behind the first active sheep relative to the pen
ref = self.sheep_pos[0]
away = ref - self.PEN_CENTER # sheep→anti-pen
dist = float(np.linalg.norm(away))
if dist > 0.1:
away = away / dist
offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8)
self.dog_pos = np.clip(
(ref + offset).astype(np.float32), -self.FIELD, self.FIELD
)
else:
self.dog_pos = self.np_random.uniform(
-self.FIELD * 0.8, self.FIELD * 0.8, size=(2,)
).astype(np.float32)
# Inactive slots (n_sheep .. MAX_SHEEP-1): already at pen centre, penned=True
self.wander_ang = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
return self._obs(), {}
def step(self, action):
self._step_count += 1
# Move dog — clip each axis independently so the agent can idle
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
self.dog_pos = np.clip(
self.dog_pos + act * self.DOG_SPEED * self.DT,
-self.FIELD, self.FIELD
)
# Step sheep dynamics
for i in range(self.n_sheep):
if self.penned[i]:
continue
self.sheep_pos[i] = self._step_sheep(i)
if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True
n_penned = int(self.penned[:self.n_sheep].sum())
newly_penned = n_penned - self._prev_penned
self._prev_penned = n_penned
reward = self._reward(n_penned, newly_penned)
terminated = n_penned == self.n_sheep
truncated = self._step_count >= self.max_steps
info = {"n_penned": n_penned, "n_sheep": self.n_sheep}
if self.render_mode == "human":
self.render()
return self._obs(), float(reward), terminated, truncated, info
def render(self):
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
if self._fig is None:
plt.ion()
self._fig, self._ax = plt.subplots(figsize=(6, 6))
ax = self._ax
ax.clear()
ax.set_xlim(-16, 16)
ax.set_ylim(-16, 16)
ax.set_aspect("equal")
ax.set_facecolor("#dcedc8")
# Field boundary
ax.add_patch(mpatches.Rectangle(
(-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2
))
# Pen
pw = self.PEN_X[1] - self.PEN_X[0]
ph = self.PEN_Y[1] - self.PEN_Y[0]
ax.add_patch(mpatches.Rectangle(
(self.PEN_X[0], self.PEN_Y[0]), pw, ph,
facecolor="#ffe082", edgecolor="#795548", linewidth=2
))
ax.text(11.5, -11.5, "pen", ha="center", va="center",
fontsize=8, color="#795548")
# Sheep
for i in range(self.MAX_SHEEP):
if i >= self.n_sheep:
continue # inactive slot — not shown
color = "deeppink" if self.penned[i] else "white"
ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11,
markeredgecolor="#555", markeredgewidth=1.5)
# Dog
ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13,
markeredgecolor="black", markeredgewidth=1.5)
ax.set_title(
f"step {self._step_count} | "
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep}",
fontsize=11
)
self._fig.canvas.draw()
self._fig.canvas.flush_events()
plt.pause(0.001)
def close(self):
if self._fig is not None:
import matplotlib.pyplot as plt
plt.close(self._fig)
self._fig = None
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _in_pen(self, pos: np.ndarray) -> bool:
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
def _obs(self) -> np.ndarray:
scale = 1.0 / self.FIELD
return np.concatenate([
self.dog_pos * scale, # 2
(self.sheep_pos * scale).flatten(), # 2 * MAX_SHEEP
self.penned.astype(np.float32), # MAX_SHEEP
]).astype(np.float32)
def _reward(self, n_penned: int, newly_penned: int) -> float:
active_mask = ~self.penned[:self.n_sheep]
if active_mask.any():
active_pos = self.sheep_pos[:self.n_sheep][active_mask]
dists_pen = np.linalg.norm(active_pos - self.PEN_CENTER, axis=1)
dists_dog = np.linalg.norm(active_pos - self.dog_pos, axis=1)
# Sheep-to-pen shaping
shaping = -(dists_pen.mean() / (2 * self.FIELD))
# Approach: dog penalised for being far from nearest sheep
approach = -(dists_dog.min() / (2 * self.FIELD))
# Alignment: reward dog for being on the anti-pen side of each sheep.
# When the dog is opposite the pen relative to a sheep, that sheep
# flees toward the pen. Score ∈ [-1, 1] per sheep, weighted by
# a proximity gate so only nearby dogs count.
align_scores = []
for s_pos, d_pen, d_dog in zip(active_pos, dists_pen, dists_dog):
if d_pen < 0.1 or d_dog < 0.1:
continue
pen_dir = (self.PEN_CENTER - s_pos) / d_pen # sheep → pen
dog_dir = (self.dog_pos - s_pos) / d_dog # sheep → dog
# cos(angle): +1 → dog behind sheep, -1 → dog on pen side
cosine = -float(np.dot(pen_dir, dog_dir))
# gate: full credit inside flee range, fades beyond
proximity = max(0.0, 1.0 - d_dog / self.FLEE_DIST)
align_scores.append(cosine * proximity)
alignment = float(np.mean(align_scores)) if align_scores else 0.0
else:
shaping = approach = alignment = 0.0
reward = shaping * self.W_SHAPING
reward += approach * self.W_APPROACH
reward += alignment * self.W_ALIGN
reward += newly_penned * self.W_PEN_BONUS
reward -= self.W_STEP_COST
if n_penned == self.n_sheep:
reward += self.W_COMPLETE
return reward
def _step_sheep(self, i: int) -> np.ndarray:
"""Apply one timestep of boid dynamics to sheep i."""
pos = self.sheep_pos[i].copy()
fx, fy = 0.0, 0.0
fleeing = False
# Flee from dog — quadratic ramp (mirrors sheep.py)
diff = self.dog_pos - pos
dist = float(np.linalg.norm(diff))
if 0.01 < dist < self.FLEE_DIST:
t = 1.0 - dist / self.FLEE_DIST
s = t * t * 5.0
fx -= (diff[0] / dist) * s
fy -= (diff[1] / dist) * s
fleeing = True
# Separation (inverse-distance) + Cohesion
cx, cy, cn = 0.0, 0.0, 0
for j in range(self.n_sheep):
if j == i or self.penned[j]:
continue
dv = self.sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.3 < dj < self.COHESION_DIST:
cx += self.sheep_pos[j][0]
cy += self.sheep_pos[j][1]
cn += 1
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
fx -= (dv[0] / dj) * push * 2.5
fy -= (dv[1] / dj) * push * 2.5
if cn > 0:
w = 0.08 if fleeing else 0.15
fx += (cx / cn - pos[0]) * w
fy += (cy / cn - pos[1]) * w
# Wall avoidance
m, F = self.WALL_MARGIN, self.FIELD
if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0
if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0
if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0
if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0
# Wander — suppressed while fleeing
if not fleeing:
if self.np_random.random() < 0.02:
self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6))
fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5
# Integrate
force = np.array([fx, fy])
mag = float(np.linalg.norm(force))
if mag > 0.01:
top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V
speed = min(top_speed, mag * 0.3)
pos = np.clip(pos + (force / mag) * speed * self.DT,
-self.FIELD, self.FIELD)
return pos.astype(np.float32)