Files
TIR_PROJ/training/herding_env.py
T
Johnny Fernandes 287743709a Approach refinement
2026-04-26 02:02:25 +01:00

544 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
2D herding environment for PPO training (Gymnasium-compatible).
The dog agent (action: 2D velocity vector) must herd n_sheep into the
quarantine pen. Sheep dynamics mirror the Webots controller exactly:
flee (quadratic ramp), separation (inverse-distance), cohesion, wall
avoidance, and wander.
Coordinate system matches the Webots world file:
field : x ∈ [-15, 15], y ∈ [-15, 15]
pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north)
Observation (16-dim, fixed regardless of n_sheep):
dog position (2), flock COM relative to dog (2), top-3 farthest active
sheep relative to dog (6), pen relative to COM (2), pen relative to
farthest sheep (2), flock radius (1), fraction penned (1).
Permutation-invariant by design: curriculum stages share the same obs dim
so VecNormalize statistics transfer as n_sheep advances.
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
class HerdingEnv(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
# -----------------------------------------------------------------------
# World constants — must match Webots world file
# -----------------------------------------------------------------------
MAX_SHEEP = 10
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
PEN_ENTRY = np.array([11.5, -8.0], dtype=np.float32) # north entrance face center
# -----------------------------------------------------------------------
# Dynamics — calibrated to match Webots robot specs
# -----------------------------------------------------------------------
DOG_SPEED = 2.5 # m/s
SHEEP_FLEE_V = 0.65 # m/s
SHEEP_WANDER_V = 0.20 # m/s
DT = 0.1 # seconds per step
# Boid parameters — identical to sheep.py
FLEE_DIST = 7.0
SEPARATION_DIST = 2.5
COHESION_DIST = 8.0
WALL_MARGIN = 3.5
# -----------------------------------------------------------------------
# Reward weights (simple per-sheep progress — no phases, no gating)
# -----------------------------------------------------------------------
W_PER_SHEEP = 2.0 # progress: sum of per-sheep distance-to-pen reductions
W_ALIGN = 0.05 # gated on action magnitude — dog only earns it when moving.
# Without gating this created a sit-still trap from n_sheep≥2.
W_PEN_BONUS = 10.0 # per sheep penned
W_COMPLETE = 100.0 # all sheep penned
W_STEP_COST = 0.02 # time penalty — strong enough to punish doing nothing
W_COMPACT = 0.0 # reward for flock-radius reduction (off by default)
W_WALL_TOUCH = 0.04 # per-sheep max penalty at wall surface. Linear ramp
# within WALL_TOUCH_BUFFER nudges the agent to avoid
# pinning sheep against pen walls. 0.04 ≈ 2× step_cost
# — noticeable but never dominates progress reward.
WALL_TOUCH_BUFFER = 0.3 # metres from wall where penalty starts ramping
ALIGN_SHAPE = "standoff" # "standoff" (peaks at IDEAL) | "near" (peaks at 0)
ALIGN_GATED = True # gate alignment on action magnitude
ENTRY_AWARE = False # When True, targets PEN_ENTRY (entrance face) instead
# of PEN_CENTER for progress/obs. Intended to fix wall-
# corralling but collapsed n_sheep≥2 success rate.
# The wall-touch gradient penalty handles wall avoidance
# without breaking the core herding signal.
# Initial sheep spawn: first sheep placed anywhere; rest within CLUSTER_RADIUS
# of it. Set to None for legacy uniform-scatter behaviour.
# Cluster radius ≤ COHESION_DIST (8m) so boid cohesion keeps the flock together.
INIT_CLUSTER_RADIUS = 5.0
def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
render_mode: str = None, random_n_sheep: bool = False,
reward_cfg: dict = None):
super().__init__()
assert 1 <= n_sheep <= self.MAX_SHEEP
self.n_sheep = n_sheep
self.max_steps = max_steps
self.render_mode = render_mode
self.random_n_sheep = random_n_sheep # if True, randomise n_sheep each reset
# Override class-default reward weights / shape with per-instance config
# so sweeps can ship configs into subprocess envs via pickled make_env.
if reward_cfg:
for k, v in reward_cfg.items():
if not hasattr(self.__class__, k):
raise ValueError(f"unknown reward_cfg key: {k}")
setattr(self, k, v)
# Fixed 16-dim observation regardless of n_sheep:
# dog_pos(2) + rel_com(2) + rel_far1(2) + rel_far2(2) + rel_far3(2)
# + com_to_pen(2) + far1_to_pen(2) + radius(1) + frac_penned(1)
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(16,), dtype=np.float32
)
# Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED
self.action_space = spaces.Box(
low=-1.0, high=1.0, shape=(2,), dtype=np.float32
)
# Runtime state (populated by reset)
self._step_count = 0
self._prev_penned = 0
self._prev_pen_dist_sum = 0.0
self.dog_pos = np.zeros(2, dtype=np.float32)
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._fig = None
# ------------------------------------------------------------------
# Curriculum interface
# ------------------------------------------------------------------
def set_n_sheep(self, n: int):
"""Advance curriculum difficulty; takes effect on next reset()."""
assert 1 <= n <= self.MAX_SHEEP
self.n_sheep = n
# ------------------------------------------------------------------
# Gymnasium API
# ------------------------------------------------------------------
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._step_count = 0
self._prev_penned = 0
if self.random_n_sheep:
self.n_sheep = int(self.np_random.integers(1, self.MAX_SHEEP + 1))
# Active sheep (0 .. n_sheep-1): random non-pen positions
self.sheep_pos[:] = self.PEN_CENTER
self.penned[:] = True
# Spawn first sheep anywhere; subsequent sheep clustered around it
# so boid cohesion (active within 8m) keeps the flock together.
# Without clustering, sheep can start 25m apart and never coalesce —
# task becomes intractable for n_sheep ≥ 2.
placed = 0
cluster_center = None
radius = self.INIT_CLUSTER_RADIUS
while placed < self.n_sheep:
if placed == 0 or radius is None:
p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32)
else:
offset = self.np_random.uniform(-radius, radius, size=(2,))
p = (cluster_center + offset).astype(np.float32)
p = np.clip(p, -12.0, 12.0)
if not self._in_pen(p):
self.sheep_pos[placed] = p
self.penned[placed] = False
if placed == 0:
cluster_center = p.copy()
placed += 1
# Dog: 50% of resets start already behind the flock (anti-pen side,
# within flee range) to give early training aligned experiences.
# Use the flock COM as the reference (not sheep[0]) so the bias
# generalizes from 1-sheep to multi-sheep without putting the dog
# in front of or inside the flock.
if self.np_random.random() < 0.5:
active_pts = self.sheep_pos[:self.n_sheep][~self.penned[:self.n_sheep]]
ref = active_pts.mean(axis=0) if len(active_pts) else self.sheep_pos[0]
away = ref - self.PEN_CENTER
d = float(np.linalg.norm(away))
if d > 0.1:
away = away / d
offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8)
self.dog_pos = np.clip(
(ref + offset).astype(np.float32), -self.FIELD, self.FIELD
)
else:
self.dog_pos = self.np_random.uniform(
-self.FIELD * 0.8, self.FIELD * 0.8, size=(2,)
).astype(np.float32)
self.wander_ang = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
# Initialise per-sheep pen-distance sum for progress reward
active = ~self.penned[:self.n_sheep]
target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
if active.any():
self._prev_pen_dist_sum = float(
np.linalg.norm(
self.sheep_pos[:self.n_sheep][active] - target, axis=1
).sum()
)
com0 = self.sheep_pos[:self.n_sheep][active].mean(axis=0)
self._prev_radius = float(
np.linalg.norm(self.sheep_pos[:self.n_sheep][active] - com0, axis=1).max()
)
else:
self._prev_pen_dist_sum = 0.0
self._prev_radius = 0.0
return self._obs(), {}
def step(self, action):
self._step_count += 1
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
old_dog = self.dog_pos.copy()
new_dog = np.clip(
self.dog_pos + act * self.DOG_SPEED * self.DT,
-self.FIELD, self.FIELD
)
# Pen wall collision — mirrors Webots geometry. West (x=PEN_X[0]) and
# east (x=PEN_X[1]) walls block the dog within the pen's y-range.
# North face (y=PEN_Y[1]=-8) is open. South is the field edge.
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
if py0 < new_dog[1] < py1:
if old_dog[0] < px0 <= new_dog[0]:
new_dog[0] = px0 - 1e-3
elif old_dog[0] > px0 >= new_dog[0]:
new_dog[0] = px0 + 1e-3
if old_dog[0] > px1 >= new_dog[0]:
new_dog[0] = px1 + 1e-3
elif old_dog[0] < px1 <= new_dog[0]:
new_dog[0] = px1 - 1e-3
self.dog_pos = new_dog.astype(np.float32)
for i in range(self.n_sheep):
if self.penned[i]:
continue
self.sheep_pos[i] = self._step_sheep(i)
if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True
n_penned = int(self.penned[:self.n_sheep].sum())
newly_penned = n_penned - self._prev_penned
self._prev_penned = n_penned
reward, rcomps = self._reward(n_penned, newly_penned, act)
terminated = n_penned == self.n_sheep
truncated = self._step_count >= self.max_steps
info = {"n_penned": n_penned, "n_sheep": self.n_sheep,
"rcomps": rcomps}
if self.render_mode == "human":
self.render()
return self._obs(), float(reward), terminated, truncated, info
def render(self):
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
if self._fig is None:
plt.ion()
self._fig, self._ax = plt.subplots(figsize=(6, 6))
ax = self._ax
ax.clear()
ax.set_xlim(-16, 16); ax.set_ylim(-16, 16)
ax.set_aspect("equal"); ax.set_facecolor("#dcedc8")
ax.add_patch(mpatches.Rectangle(
(-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2
))
pw = self.PEN_X[1] - self.PEN_X[0]
ph = self.PEN_Y[1] - self.PEN_Y[0]
ax.add_patch(mpatches.Rectangle(
(self.PEN_X[0], self.PEN_Y[0]), pw, ph,
facecolor="#ffe082", edgecolor="#795548", linewidth=2
))
ax.text(11.5, -11.5, "pen", ha="center", va="center",
fontsize=8, color="#795548")
com, radius, _ = self._flock_stats()
ax.add_patch(plt.Circle(com, radius, color="steelblue",
fill=False, linestyle="--", linewidth=1))
ax.plot(*com, "+", color="steelblue", markersize=10)
for i in range(self.n_sheep):
if i >= self.n_sheep:
continue
color = "deeppink" if self.penned[i] else "white"
ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11,
markeredgecolor="#555", markeredgewidth=1.5)
ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13,
markeredgecolor="black", markeredgewidth=1.5)
ax.set_title(
f"step {self._step_count} | "
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | "
f"r={radius:.1f}m",
fontsize=11
)
self._fig.canvas.draw()
self._fig.canvas.flush_events()
plt.pause(0.001)
def close(self):
if self._fig is not None:
import matplotlib.pyplot as plt
plt.close(self._fig)
self._fig = None
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _in_pen(self, pos: np.ndarray) -> bool:
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
def _flock_stats(self):
"""Return (COM, radius, mean_dispersion) over active sheep."""
active_mask = ~self.penned[:self.n_sheep]
if not active_mask.any():
return self.PEN_CENTER.copy(), 0.0, 0.0
pts = self.sheep_pos[:self.n_sheep][active_mask]
com = pts.mean(axis=0)
dists = np.linalg.norm(pts - com, axis=1)
return com, float(dists.max()), float(dists.mean())
def _obs(self) -> np.ndarray:
com, radius, _ = self._flock_stats()
active_mask = ~self.penned[:self.n_sheep]
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
dists = np.linalg.norm(pts - com, axis=1)
sorted_idx = np.argsort(dists)[::-1] # farthest first
# Top-3 stragglers; pad with COM when fewer active sheep exist
def nth(n):
return pts[sorted_idx[n]] if len(sorted_idx) > n else com
far1, far2, far3 = nth(0), nth(1), nth(2)
else:
far1 = far2 = far3 = self.PEN_CENTER.copy()
S = self.FIELD
D = 2 * self.FIELD
# far1/far2/far3 expressed relative to COM, not dog.
# For 1 sheep: far1-COM = far2-COM = far3-COM = [0,0] → cleanly ignorable.
# For 3+ sheep: non-zero vectors tell the dog where each straggler is
# within the group, without conflicting with weights trained on 1 sheep.
# Pen reference for the policy. Aligned with the reward target so the
# policy isn't forced to learn an implicit offset between what it sees
# ("pen is here") and what it's rewarded for ("get sheep close to here").
pen_ref = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
return np.array([
self.dog_pos[0] / S, self.dog_pos[1] / S,
(com[0] - self.dog_pos[0]) / D, (com[1] - self.dog_pos[1]) / D,
(far1[0] - com[0]) / D, (far1[1] - com[1]) / D,
(far2[0] - com[0]) / D, (far2[1] - com[1]) / D,
(far3[0] - com[0]) / D, (far3[1] - com[1]) / D,
(pen_ref[0] - com[0]) / D, (pen_ref[1] - com[1]) / D,
(pen_ref[0] - far1[0]) / D, (pen_ref[1] - far1[1]) / D,
radius / D,
active_mask.sum() / self.n_sheep,
], dtype=np.float32)
def _reward(self, n_penned: int, newly_penned: int, action: np.ndarray):
active = ~self.penned[:self.n_sheep]
# Per-sheep progress toward pen: fires whenever any sheep moves closer.
# Naturally rewards keeping the flock together and pushing toward pen:
# dog behind flock → all sheep flee toward pen → all contribute positive reward.
# Dog from wrong side → sheep scatter away from pen → negative reward.
target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
if active.any():
pen_dists = np.linalg.norm(
self.sheep_pos[:self.n_sheep][active] - target, axis=1
)
cur_sum = float(pen_dists.sum())
r_progress = (self._prev_pen_dist_sum - cur_sum) * self.W_PER_SHEEP
self._prev_pen_dist_sum = cur_sum
else:
r_progress = 0.0
com, _, _ = self._flock_stats()
com_dist = float(np.linalg.norm(com - target))
d_dog_com = float(np.linalg.norm(self.dog_pos - com))
if d_dog_com > 0.1 and com_dist > 0.1:
pen_dir = (target - com) / com_dist
dog_dir = (self.dog_pos - com) / d_dog_com
cosine = -float(np.dot(pen_dir, dog_dir))
if self.ALIGN_SHAPE == "standoff":
IDEAL = 0.5 * (self.SEPARATION_DIST + self.FLEE_DIST)
HALF = self.FLEE_DIST - IDEAL
proximity = max(0.0, 1.0 - abs(d_dog_com - IDEAL) / HALF)
else: # "near"
proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST)
move_gate = (min(1.0, float(np.linalg.norm(action)))
if self.ALIGN_GATED else 1.0)
alignment = cosine * proximity * move_gate * self.W_ALIGN
else:
alignment = 0.0
# Wall-touch penalty: distance-based gradient covering all 3 solid pen
# walls (west, east, south). Linearly ramps from 0 at buffer edge to
# W_WALL_TOUCH at the wall surface — gives the agent a smooth signal
# to avoid pinning sheep against walls.
if self.W_WALL_TOUCH and active.any():
pts = self.sheep_pos[:self.n_sheep][active]
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
buf = self.WALL_TOUCH_BUFFER
far = buf + 1.0
d_w = np.where((pts[:, 0] < px0) & (pts[:, 1] > py0) & (pts[:, 1] < py1),
px0 - pts[:, 0], far)
d_e = np.where((pts[:, 0] > px1) & (pts[:, 1] > py0) & (pts[:, 1] < py1),
pts[:, 0] - px1, far)
d_s = np.where((pts[:, 1] < py0) & (pts[:, 0] > px0) & (pts[:, 0] < px1),
py0 - pts[:, 1], far)
d_min = np.minimum(np.minimum(d_w, d_e), d_s)
penalties = np.maximum(0.0, 1.0 - d_min / buf) * self.W_WALL_TOUCH
r_wall_touch = -float(penalties.sum())
else:
r_wall_touch = 0.0
# Compactness shaping: reward decreases in flock radius (active sheep only)
if self.W_COMPACT and active.any():
cur_radius = float(np.linalg.norm(
self.sheep_pos[:self.n_sheep][active] - com, axis=1
).max())
r_compact = (self._prev_radius - cur_radius) * self.W_COMPACT
self._prev_radius = cur_radius
else:
r_compact = 0.0
r_pen_bonus = newly_penned * self.W_PEN_BONUS
r_step_cost = -self.W_STEP_COST
r_complete = self.W_COMPLETE if n_penned == self.n_sheep else 0.0
reward = (r_progress + alignment + r_compact + r_wall_touch
+ r_pen_bonus + r_step_cost + r_complete)
rcomps = {
"progress": float(r_progress),
"alignment": float(alignment),
"compact": float(r_compact),
"wall_touch": float(r_wall_touch),
"pen_bonus": float(r_pen_bonus),
"step_cost": float(r_step_cost),
"complete": float(r_complete),
}
return reward, rcomps
def _step_sheep(self, i: int) -> np.ndarray:
"""Apply one timestep of boid dynamics to sheep i (mirrors sheep.py)."""
old_pos = self.sheep_pos[i].copy() # saved for pen wall collision check
pos = old_pos.copy()
fx, fy = 0.0, 0.0
fleeing = False
# Flee from dog — quadratic ramp
diff = self.dog_pos - pos
dist = float(np.linalg.norm(diff))
if 0.01 < dist < self.FLEE_DIST:
t = 1.0 - dist / self.FLEE_DIST
s = t * t * 5.0
fx -= (diff[0] / dist) * s
fy -= (diff[1] / dist) * s
fleeing = True
# Separation (inverse-distance) + Cohesion
cx, cy, cn = 0.0, 0.0, 0
for j in range(self.n_sheep):
if j == i or self.penned[j]:
continue
dv = self.sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.3 < dj < self.COHESION_DIST:
cx += self.sheep_pos[j][0]
cy += self.sheep_pos[j][1]
cn += 1
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
fx -= (dv[0] / dj) * push * 2.5
fy -= (dv[1] / dj) * push * 2.5
if cn > 0:
w = 0.08 if fleeing else 0.15
fx += (cx / cn - pos[0]) * w
fy += (cy / cn - pos[1]) * w
# Wall avoidance
m, F = self.WALL_MARGIN, self.FIELD
if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0
if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0
if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0
if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0
# Hard-stop clamp: mirrors sheep.py — zero any force driving further
# into the wall within 0.5 m so the flee force cannot pin the sheep.
HS = 0.5
if pos[0] < -F + HS and fx < 0: fx = 0.0
if pos[0] > F - HS and fx > 0: fx = 0.0
if pos[1] < -F + HS and fy < 0: fy = 0.0
if pos[1] > F - HS and fy > 0: fy = 0.0
# Wander — suppressed while fleeing
if not fleeing:
if self.np_random.random() < 0.02:
self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6))
fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5
# Integrate
force = np.array([fx, fy])
mag = float(np.linalg.norm(force))
if mag > 0.01:
top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V
speed = min(top_speed, mag * 0.3)
pos = np.clip(pos + (force / mag) * speed * self.DT,
-self.FIELD, self.FIELD)
# Pen solid wall collision — mirrors Webots geometry.
# The pen has THREE solid walls: west (x=PEN_X[0]), east (x=PEN_X[1]),
# south (y=PEN_Y[0]). The NORTH face (y=PEN_Y[1]=-8) is the open entrance.
# Sheep may only enter through the north face; crossing a solid wall is blocked.
px0, px1 = self.PEN_X[0], self.PEN_X[1]
py0, py1 = self.PEN_Y[0], self.PEN_Y[1]
entered_from_north = (
old_pos[1] >= py1 and pos[1] < py1 and px0 < pos[0] < px1
)
if not entered_from_north:
# Block crossing through west wall from outside
if old_pos[0] < px0 <= pos[0] and py0 < pos[1] < py1:
pos = np.array([px0 - 1e-3, pos[1]], dtype=np.float32)
# Block crossing through east wall from outside
if old_pos[0] > px1 >= pos[0] and py0 < pos[1] < py1:
pos = np.array([px1 + 1e-3, pos[1]], dtype=np.float32)
return pos.astype(np.float32)