409 lines
16 KiB
Python
409 lines
16 KiB
Python
"""
|
|
2D herding environment for PPO training (Gymnasium-compatible).
|
|
|
|
The dog agent (action: 2D velocity vector) must herd n_sheep into the
|
|
quarantine pen. Sheep dynamics mirror the Webots controller exactly:
|
|
flee (quadratic ramp), separation (inverse-distance), cohesion, wall
|
|
avoidance, and wander.
|
|
|
|
Coordinate system matches the Webots world file:
|
|
field : x ∈ [-15, 15], y ∈ [-15, 15]
|
|
pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north)
|
|
|
|
Observation (16-dim, fixed regardless of n_sheep):
|
|
dog position (2), flock COM relative to dog (2), top-3 farthest active
|
|
sheep relative to dog (6), pen relative to COM (2), pen relative to
|
|
farthest sheep (2), flock radius (1), fraction penned (1).
|
|
|
|
Permutation-invariant by design: curriculum stages share the same obs dim
|
|
so VecNormalize statistics transfer as n_sheep advances.
|
|
"""
|
|
|
|
import numpy as np
|
|
import gymnasium as gym
|
|
from gymnasium import spaces
|
|
|
|
|
|
class HerdingEnv(gym.Env):
|
|
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
|
|
|
|
# -----------------------------------------------------------------------
|
|
# World constants — must match Webots world file
|
|
# -----------------------------------------------------------------------
|
|
MAX_SHEEP = 10
|
|
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
|
|
PEN_X = (10.0, 13.0)
|
|
PEN_Y = (-15.0, -8.0)
|
|
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Dynamics — calibrated to match Webots robot specs
|
|
# -----------------------------------------------------------------------
|
|
DOG_SPEED = 2.5 # m/s
|
|
SHEEP_FLEE_V = 0.65 # m/s
|
|
SHEEP_WANDER_V = 0.20 # m/s
|
|
DT = 0.1 # seconds per step
|
|
|
|
# Boid parameters — identical to sheep.py
|
|
FLEE_DIST = 7.0
|
|
SEPARATION_DIST = 2.5
|
|
COHESION_DIST = 8.0
|
|
WALL_MARGIN = 3.5
|
|
|
|
# -----------------------------------------------------------------------
|
|
# Reward weights (simple per-sheep progress — no phases, no gating)
|
|
# -----------------------------------------------------------------------
|
|
W_PER_SHEEP = 2.0 # progress: sum of per-sheep distance-to-pen reductions
|
|
W_SCATTER_PEN = 0.5 # penalty per metre the active flock radius exceeds threshold
|
|
SCATTER_THRESH = 8.0 # metres — allow natural spread, penalise excessive scatter
|
|
W_PEN_BONUS = 10.0 # per sheep penned
|
|
W_COMPLETE = 100.0 # all sheep penned
|
|
W_STEP_COST = 0.02 # time penalty — strong enough to punish doing nothing
|
|
|
|
def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
|
|
render_mode: str = None, random_n_sheep: bool = False):
|
|
super().__init__()
|
|
assert 1 <= n_sheep <= self.MAX_SHEEP
|
|
self.n_sheep = n_sheep
|
|
self.max_steps = max_steps
|
|
self.render_mode = render_mode
|
|
self.random_n_sheep = random_n_sheep # if True, randomise n_sheep each reset
|
|
|
|
# Fixed 16-dim observation regardless of n_sheep:
|
|
# dog_pos(2) + rel_com(2) + rel_far1(2) + rel_far2(2) + rel_far3(2)
|
|
# + com_to_pen(2) + far1_to_pen(2) + radius(1) + frac_penned(1)
|
|
self.observation_space = spaces.Box(
|
|
low=-np.inf, high=np.inf, shape=(16,), dtype=np.float32
|
|
)
|
|
|
|
# Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED
|
|
self.action_space = spaces.Box(
|
|
low=-1.0, high=1.0, shape=(2,), dtype=np.float32
|
|
)
|
|
|
|
# Runtime state (populated by reset)
|
|
self._step_count = 0
|
|
self._prev_penned = 0
|
|
self._prev_pen_dist_sum = 0.0
|
|
self.dog_pos = np.zeros(2, dtype=np.float32)
|
|
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
|
|
self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
|
|
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
|
|
|
|
self._fig = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Curriculum interface
|
|
# ------------------------------------------------------------------
|
|
|
|
def set_n_sheep(self, n: int):
|
|
"""Advance curriculum difficulty; takes effect on next reset()."""
|
|
assert 1 <= n <= self.MAX_SHEEP
|
|
self.n_sheep = n
|
|
|
|
# ------------------------------------------------------------------
|
|
# Gymnasium API
|
|
# ------------------------------------------------------------------
|
|
|
|
def reset(self, seed=None, options=None):
|
|
super().reset(seed=seed)
|
|
self._step_count = 0
|
|
self._prev_penned = 0
|
|
|
|
if self.random_n_sheep:
|
|
self.n_sheep = int(self.np_random.integers(1, self.MAX_SHEEP + 1))
|
|
|
|
# Active sheep (0 .. n_sheep-1): random non-pen positions
|
|
self.sheep_pos[:] = self.PEN_CENTER
|
|
self.penned[:] = True
|
|
|
|
placed = 0
|
|
while placed < self.n_sheep:
|
|
p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32)
|
|
if not self._in_pen(p):
|
|
self.sheep_pos[placed] = p
|
|
self.penned[placed] = False
|
|
placed += 1
|
|
|
|
# Dog: 50% of resets start already behind the flock (anti-pen side,
|
|
# within flee range) to give early training aligned experiences.
|
|
if self.np_random.random() < 0.5:
|
|
ref = self.sheep_pos[0]
|
|
away = ref - self.PEN_CENTER
|
|
d = float(np.linalg.norm(away))
|
|
if d > 0.1:
|
|
away = away / d
|
|
offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8)
|
|
self.dog_pos = np.clip(
|
|
(ref + offset).astype(np.float32), -self.FIELD, self.FIELD
|
|
)
|
|
else:
|
|
self.dog_pos = self.np_random.uniform(
|
|
-self.FIELD * 0.8, self.FIELD * 0.8, size=(2,)
|
|
).astype(np.float32)
|
|
|
|
self.wander_ang = self.np_random.uniform(
|
|
-np.pi, np.pi, size=(self.MAX_SHEEP,)
|
|
).astype(np.float32)
|
|
|
|
# Initialise per-sheep pen-distance sum for progress reward
|
|
active = ~self.penned[:self.n_sheep]
|
|
if active.any():
|
|
self._prev_pen_dist_sum = float(
|
|
np.linalg.norm(
|
|
self.sheep_pos[:self.n_sheep][active] - self.PEN_CENTER, axis=1
|
|
).sum()
|
|
)
|
|
else:
|
|
self._prev_pen_dist_sum = 0.0
|
|
|
|
return self._obs(), {}
|
|
|
|
def step(self, action):
|
|
self._step_count += 1
|
|
|
|
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
|
|
self.dog_pos = np.clip(
|
|
self.dog_pos + act * self.DOG_SPEED * self.DT,
|
|
-self.FIELD, self.FIELD
|
|
)
|
|
|
|
for i in range(self.n_sheep):
|
|
if self.penned[i]:
|
|
continue
|
|
self.sheep_pos[i] = self._step_sheep(i)
|
|
if self._in_pen(self.sheep_pos[i]):
|
|
self.penned[i] = True
|
|
|
|
n_penned = int(self.penned[:self.n_sheep].sum())
|
|
newly_penned = n_penned - self._prev_penned
|
|
self._prev_penned = n_penned
|
|
|
|
reward = self._reward(n_penned, newly_penned)
|
|
terminated = n_penned == self.n_sheep
|
|
truncated = self._step_count >= self.max_steps
|
|
info = {"n_penned": n_penned, "n_sheep": self.n_sheep}
|
|
|
|
if self.render_mode == "human":
|
|
self.render()
|
|
|
|
return self._obs(), float(reward), terminated, truncated, info
|
|
|
|
def render(self):
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.patches as mpatches
|
|
|
|
if self._fig is None:
|
|
plt.ion()
|
|
self._fig, self._ax = plt.subplots(figsize=(6, 6))
|
|
|
|
ax = self._ax
|
|
ax.clear()
|
|
ax.set_xlim(-16, 16); ax.set_ylim(-16, 16)
|
|
ax.set_aspect("equal"); ax.set_facecolor("#dcedc8")
|
|
|
|
ax.add_patch(mpatches.Rectangle(
|
|
(-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2
|
|
))
|
|
pw = self.PEN_X[1] - self.PEN_X[0]
|
|
ph = self.PEN_Y[1] - self.PEN_Y[0]
|
|
ax.add_patch(mpatches.Rectangle(
|
|
(self.PEN_X[0], self.PEN_Y[0]), pw, ph,
|
|
facecolor="#ffe082", edgecolor="#795548", linewidth=2
|
|
))
|
|
ax.text(11.5, -11.5, "pen", ha="center", va="center",
|
|
fontsize=8, color="#795548")
|
|
|
|
com, radius, _ = self._flock_stats()
|
|
ax.add_patch(plt.Circle(com, radius, color="steelblue",
|
|
fill=False, linestyle="--", linewidth=1))
|
|
ax.plot(*com, "+", color="steelblue", markersize=10)
|
|
|
|
for i in range(self.n_sheep):
|
|
if i >= self.n_sheep:
|
|
continue
|
|
color = "deeppink" if self.penned[i] else "white"
|
|
ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11,
|
|
markeredgecolor="#555", markeredgewidth=1.5)
|
|
|
|
ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13,
|
|
markeredgecolor="black", markeredgewidth=1.5)
|
|
|
|
ax.set_title(
|
|
f"step {self._step_count} | "
|
|
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | "
|
|
f"r={radius:.1f}m",
|
|
fontsize=11
|
|
)
|
|
self._fig.canvas.draw()
|
|
self._fig.canvas.flush_events()
|
|
plt.pause(0.001)
|
|
|
|
def close(self):
|
|
if self._fig is not None:
|
|
import matplotlib.pyplot as plt
|
|
plt.close(self._fig)
|
|
self._fig = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internals
|
|
# ------------------------------------------------------------------
|
|
|
|
def _in_pen(self, pos: np.ndarray) -> bool:
|
|
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
|
|
self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
|
|
|
|
def _flock_stats(self):
|
|
"""Return (COM, radius, mean_dispersion) over active sheep."""
|
|
active_mask = ~self.penned[:self.n_sheep]
|
|
if not active_mask.any():
|
|
return self.PEN_CENTER.copy(), 0.0, 0.0
|
|
pts = self.sheep_pos[:self.n_sheep][active_mask]
|
|
com = pts.mean(axis=0)
|
|
dists = np.linalg.norm(pts - com, axis=1)
|
|
return com, float(dists.max()), float(dists.mean())
|
|
|
|
def _obs(self) -> np.ndarray:
|
|
com, radius, _ = self._flock_stats()
|
|
active_mask = ~self.penned[:self.n_sheep]
|
|
|
|
if active_mask.any():
|
|
pts = self.sheep_pos[:self.n_sheep][active_mask]
|
|
dists = np.linalg.norm(pts - com, axis=1)
|
|
sorted_idx = np.argsort(dists)[::-1] # farthest first
|
|
# Top-3 stragglers; pad with COM when fewer active sheep exist
|
|
def nth(n):
|
|
return pts[sorted_idx[n]] if len(sorted_idx) > n else com
|
|
far1, far2, far3 = nth(0), nth(1), nth(2)
|
|
else:
|
|
far1 = far2 = far3 = self.PEN_CENTER.copy()
|
|
|
|
S = self.FIELD
|
|
D = 2 * self.FIELD
|
|
|
|
# far1/far2/far3 expressed relative to COM, not dog.
|
|
# For 1 sheep: far1-COM = far2-COM = far3-COM = [0,0] → cleanly ignorable.
|
|
# For 3+ sheep: non-zero vectors tell the dog where each straggler is
|
|
# within the group, without conflicting with weights trained on 1 sheep.
|
|
return np.array([
|
|
self.dog_pos[0] / S, self.dog_pos[1] / S,
|
|
(com[0] - self.dog_pos[0]) / D, (com[1] - self.dog_pos[1]) / D,
|
|
(far1[0] - com[0]) / D, (far1[1] - com[1]) / D,
|
|
(far2[0] - com[0]) / D, (far2[1] - com[1]) / D,
|
|
(far3[0] - com[0]) / D, (far3[1] - com[1]) / D,
|
|
(self.PEN_CENTER[0] - com[0]) / D, (self.PEN_CENTER[1] - com[1]) / D,
|
|
(self.PEN_CENTER[0] - far1[0]) / D, (self.PEN_CENTER[1] - far1[1]) / D,
|
|
radius / D,
|
|
active_mask.sum() / self.n_sheep,
|
|
], dtype=np.float32)
|
|
|
|
def _reward(self, n_penned: int, newly_penned: int) -> float:
|
|
active = ~self.penned[:self.n_sheep]
|
|
|
|
# Per-sheep progress toward pen: fires whenever any sheep moves closer.
|
|
# Naturally rewards keeping the flock together and pushing toward pen:
|
|
# dog behind flock → all sheep flee toward pen → all contribute positive reward.
|
|
# Dog from wrong side → sheep scatter away from pen → negative reward.
|
|
if active.any():
|
|
pen_dists = np.linalg.norm(
|
|
self.sheep_pos[:self.n_sheep][active] - self.PEN_CENTER, axis=1
|
|
)
|
|
cur_sum = float(pen_dists.sum())
|
|
r_progress = (self._prev_pen_dist_sum - cur_sum) * self.W_PER_SHEEP
|
|
self._prev_pen_dist_sum = cur_sum
|
|
else:
|
|
r_progress = 0.0
|
|
|
|
# Soft scatter penalty: discourages abandoning the remaining active flock.
|
|
# Only fires when radius exceeds threshold so normal spread isn't punished.
|
|
_, radius, _ = self._flock_stats()
|
|
r_scatter = -max(0.0, radius - self.SCATTER_THRESH) * self.W_SCATTER_PEN
|
|
|
|
reward = r_progress + r_scatter
|
|
reward += newly_penned * self.W_PEN_BONUS
|
|
reward -= self.W_STEP_COST
|
|
if n_penned == self.n_sheep:
|
|
reward += self.W_COMPLETE
|
|
return reward
|
|
|
|
def _step_sheep(self, i: int) -> np.ndarray:
|
|
"""Apply one timestep of boid dynamics to sheep i (mirrors sheep.py)."""
|
|
pos = self.sheep_pos[i].copy()
|
|
fx, fy = 0.0, 0.0
|
|
fleeing = False
|
|
|
|
# Flee from dog — quadratic ramp
|
|
diff = self.dog_pos - pos
|
|
dist = float(np.linalg.norm(diff))
|
|
if 0.01 < dist < self.FLEE_DIST:
|
|
t = 1.0 - dist / self.FLEE_DIST
|
|
s = t * t * 5.0
|
|
fx -= (diff[0] / dist) * s
|
|
fy -= (diff[1] / dist) * s
|
|
fleeing = True
|
|
|
|
# Separation (inverse-distance) + Cohesion
|
|
cx, cy, cn = 0.0, 0.0, 0
|
|
for j in range(self.n_sheep):
|
|
if j == i or self.penned[j]:
|
|
continue
|
|
dv = self.sheep_pos[j] - pos
|
|
dj = float(np.linalg.norm(dv))
|
|
if 0.3 < dj < self.COHESION_DIST:
|
|
cx += self.sheep_pos[j][0]
|
|
cy += self.sheep_pos[j][1]
|
|
cn += 1
|
|
if 0.05 < dj < self.SEPARATION_DIST:
|
|
push = (self.SEPARATION_DIST - dj) / dj
|
|
fx -= (dv[0] / dj) * push * 2.5
|
|
fy -= (dv[1] / dj) * push * 2.5
|
|
if cn > 0:
|
|
w = 0.08 if fleeing else 0.15
|
|
fx += (cx / cn - pos[0]) * w
|
|
fy += (cy / cn - pos[1]) * w
|
|
|
|
# Wall avoidance
|
|
m, F = self.WALL_MARGIN, self.FIELD
|
|
if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0
|
|
if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0
|
|
if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0
|
|
if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0
|
|
|
|
# Pen exterior wall avoidance — mirrors sheep.py addition.
|
|
# Prevents sheep getting pinned against the pen side/back walls when fleeing.
|
|
EM = 1.2
|
|
px0, px1 = self.PEN_X[0], self.PEN_X[1]
|
|
py0, py1 = self.PEN_Y[0], self.PEN_Y[1]
|
|
if py0 - EM < pos[1] < py1 and pos[0] < px0 + EM:
|
|
fx -= ((px0 + EM - pos[0]) / EM) * 8.0
|
|
if py0 - EM < pos[1] < py1 and pos[0] > px1 - EM:
|
|
fx += ((pos[0] - (px1 - EM)) / EM) * 8.0
|
|
if pos[1] < py0 + EM and px0 < pos[0] < px1:
|
|
fy += ((py0 + EM - pos[1]) / EM) * 8.0
|
|
|
|
# Hard-stop clamp: mirrors sheep.py — zero any force driving further
|
|
# into the wall within 0.5 m so the flee force cannot pin the sheep.
|
|
HS = 0.5
|
|
if pos[0] < -F + HS and fx < 0: fx = 0.0
|
|
if pos[0] > F - HS and fx > 0: fx = 0.0
|
|
if pos[1] < -F + HS and fy < 0: fy = 0.0
|
|
if pos[1] > F - HS and fy > 0: fy = 0.0
|
|
|
|
# Wander — suppressed while fleeing
|
|
if not fleeing:
|
|
if self.np_random.random() < 0.02:
|
|
self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6))
|
|
fx += float(np.cos(self.wander_ang[i])) * 0.5
|
|
fy += float(np.sin(self.wander_ang[i])) * 0.5
|
|
|
|
# Integrate
|
|
force = np.array([fx, fy])
|
|
mag = float(np.linalg.norm(force))
|
|
if mag > 0.01:
|
|
top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V
|
|
speed = min(top_speed, mag * 0.3)
|
|
pos = np.clip(pos + (force / mag) * speed * self.DT,
|
|
-self.FIELD, self.FIELD)
|
|
|
|
return pos.astype(np.float32)
|