Files
TIR_PROJ/training/herding_env.py
T
Johnny Fernandes 1af7d03ce2 Mimic webots physics
2026-04-26 18:22:26 +01:00

614 lines
28 KiB
Python

"""
2D herding environment for PPO training (Gymnasium-compatible).
The dog agent (action: 2D velocity vector) must herd n_sheep into the
quarantine pen. Sheep dynamics mirror the Webots controller exactly:
flee (quadratic ramp), separation (inverse-distance), cohesion, wall
avoidance, and wander.
Coordinate system matches the Webots world file:
field : x ∈ [-15, 15], y ∈ [-15, 15]
pen : x ∈ [10, 13], y ∈ [-15, -8] (SE corner, open north)
Observation (16-dim, fixed regardless of n_sheep):
dog position (2), flock COM relative to dog (2), top-3 farthest active
sheep relative to dog (6), pen relative to COM (2), pen relative to
farthest sheep (2), flock radius (1), fraction penned (1).
Permutation-invariant by design: curriculum stages share the same obs dim
so VecNormalize statistics transfer as n_sheep advances.
"""
import numpy as np
import gymnasium as gym
from gymnasium import spaces
class HerdingEnv(gym.Env):
metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 30}
# -----------------------------------------------------------------------
# World constants — must match Webots world file
# -----------------------------------------------------------------------
MAX_SHEEP = 10
FIELD = 15.0 # half-size; positions ∈ [-FIELD, FIELD]
PEN_X = (10.0, 13.0)
PEN_Y = (-15.0, -8.0)
PEN_CENTER = np.array([11.5, -11.5], dtype=np.float32)
PEN_ENTRY = np.array([11.5, -8.0], dtype=np.float32) # north entrance face center
# -----------------------------------------------------------------------
# Dynamics — calibrated to match Webots robot specs
# -----------------------------------------------------------------------
DOG_SPEED = 2.5 # m/s
SHEEP_FLEE_V = 0.65 # m/s
SHEEP_WANDER_V = 0.20 # m/s
DT = 0.1 # seconds per step
# Wheeled dog dynamics — mirror the Webots controller's drive():
# forward speed gated by cos(heading_error); turn rate proportional to
# error. Without this, the env treats the dog as a particle that can
# change direction instantly, producing policies that bang-bang and don't
# transfer to the wheeled Webots robot.
DOG_K_TURN = 4.0 # rad/s per rad (heading-error gain)
DOG_MAX_TURN_RATE = 6.0 # rad/s (cap on turn rate)
DOG_STOP_THRESHOLD = 0.05 # ||action|| below this → dog stops in place
# Boid parameters — identical to sheep.py
FLEE_DIST = 7.0
SEPARATION_DIST = 2.5
COHESION_DIST = 8.0
WALL_MARGIN = 3.5
# -----------------------------------------------------------------------
# Reward weights (simple per-sheep progress — no phases, no gating)
# -----------------------------------------------------------------------
W_PER_SHEEP = 2.0 # progress: sum of per-sheep distance-to-pen reductions
W_ALIGN = 0.05 # gated on action magnitude — dog only earns it when moving.
# Without gating this created a sit-still trap from n_sheep≥2.
W_PEN_BONUS = 10.0 # per sheep penned
W_COMPLETE = 100.0 # all sheep penned
W_STEP_COST = 0.02 # time penalty — strong enough to punish doing nothing
W_SOUTH = 0.01 # per-sheep per-metre penalty for active sheep below the pen
# entrance (y < PEN_Y[1]=-8). Keeps the dog from letting
# sheep drift into the dead zone below the open face where
# they must reverse direction (north) to enter — hard to
# recover. 0.01 ≈ half step_cost per metre below per sheep.
W_COMPACT = 0.0 # reward for flock-radius reduction (off by default)
W_WALL_TOUCH = 0.01 # per-sheep max penalty at wall surface. Linear ramp
# within WALL_TOUCH_BUFFER. Covers field outer walls and
# pen W/E/S walls. Kept small (≈ step_cost/2) so it
# nudges away from walls without dominating progress.
WALL_TOUCH_BUFFER = 0.4 # metres from wall where penalty starts ramping
ALIGN_SHAPE = "standoff" # "standoff" (peaks at IDEAL) | "near" (peaks at 0)
ALIGN_GATED = True # gate alignment on action magnitude
ENTRY_AWARE = False # When True, targets PEN_ENTRY (entrance face) instead
# of PEN_CENTER for progress/obs. Intended to fix wall-
# corralling but collapsed n_sheep≥2 success rate.
# The wall-touch gradient penalty handles wall avoidance
# without breaking the core herding signal.
# Initial sheep spawn: first sheep placed anywhere; rest within CLUSTER_RADIUS
# of it. Set to None for legacy uniform-scatter behaviour.
# Cluster radius ≤ COHESION_DIST (8m) so boid cohesion keeps the flock together.
INIT_CLUSTER_RADIUS = 5.0
def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
render_mode: str = None, random_n_sheep: bool = False,
reward_cfg: dict = None):
super().__init__()
assert 1 <= n_sheep <= self.MAX_SHEEP
self.n_sheep = n_sheep
self.max_steps = max_steps
self.render_mode = render_mode
self.random_n_sheep = random_n_sheep # if True, randomise n_sheep each reset
# Override class-default reward weights / shape with per-instance config
# so sweeps can ship configs into subprocess envs via pickled make_env.
if reward_cfg:
for k, v in reward_cfg.items():
if not hasattr(self.__class__, k):
raise ValueError(f"unknown reward_cfg key: {k}")
setattr(self, k, v)
# Fixed 18-dim observation regardless of n_sheep:
# dog_pos(2) + rel_com(2) + rel_far1(2) + rel_far2(2) + rel_far3(2)
# + com_to_pen(2) + far1_to_pen(2) + radius(1) + frac_penned(1)
# + cos(heading)(1) + sin(heading)(1) ← new, for wheeled dynamics
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(18,), dtype=np.float32
)
# Action: desired velocity (vx, vy) ∈ [-1, 1]², scaled by DOG_SPEED
self.action_space = spaces.Box(
low=-1.0, high=1.0, shape=(2,), dtype=np.float32
)
# Runtime state (populated by reset)
self._step_count = 0
self._prev_penned = 0
self._prev_pen_dist_sum = 0.0
self.dog_pos = np.zeros(2, dtype=np.float32)
self.dog_heading = 0.0 # radians, world frame
self.sheep_pos = np.zeros((self.MAX_SHEEP, 2), dtype=np.float32)
self.penned = np.ones(self.MAX_SHEEP, dtype=bool)
self.wander_ang = np.zeros(self.MAX_SHEEP, dtype=np.float32)
self._fig = None
# ------------------------------------------------------------------
# Curriculum interface
# ------------------------------------------------------------------
def set_n_sheep(self, n: int):
"""Advance curriculum difficulty; takes effect on next reset()."""
assert 1 <= n <= self.MAX_SHEEP
self.n_sheep = n
# ------------------------------------------------------------------
# Gymnasium API
# ------------------------------------------------------------------
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self._step_count = 0
self._prev_penned = 0
if self.random_n_sheep:
self.n_sheep = int(self.np_random.integers(1, self.MAX_SHEEP + 1))
# Active sheep (0 .. n_sheep-1): random non-pen positions
self.sheep_pos[:] = self.PEN_CENTER
self.penned[:] = True
# Spawn first sheep anywhere; subsequent sheep clustered around it
# so boid cohesion (active within 8m) keeps the flock together.
# Without clustering, sheep can start 25m apart and never coalesce —
# task becomes intractable for n_sheep ≥ 2.
placed = 0
cluster_center = None
radius = self.INIT_CLUSTER_RADIUS
while placed < self.n_sheep:
if placed == 0 or radius is None:
p = self.np_random.uniform(-12.0, 12.0, size=(2,)).astype(np.float32)
else:
offset = self.np_random.uniform(-radius, radius, size=(2,))
p = (cluster_center + offset).astype(np.float32)
p = np.clip(p, -12.0, 12.0)
if not self._in_pen(p):
self.sheep_pos[placed] = p
self.penned[placed] = False
if placed == 0:
cluster_center = p.copy()
placed += 1
# Dog: 50% of resets start already behind the flock (anti-pen side,
# within flee range) to give early training aligned experiences.
# Use the flock COM as the reference (not sheep[0]) so the bias
# generalizes from 1-sheep to multi-sheep without putting the dog
# in front of or inside the flock.
if self.np_random.random() < 0.5:
active_pts = self.sheep_pos[:self.n_sheep][~self.penned[:self.n_sheep]]
ref = active_pts.mean(axis=0) if len(active_pts) else self.sheep_pos[0]
away = ref - self.PEN_CENTER
d = float(np.linalg.norm(away))
if d > 0.1:
away = away / d
offset = away * self.np_random.uniform(2.0, self.FLEE_DIST * 0.8)
self.dog_pos = np.clip(
(ref + offset).astype(np.float32), -self.FIELD, self.FIELD
)
else:
self.dog_pos = self.np_random.uniform(
-self.FIELD * 0.8, self.FIELD * 0.8, size=(2,)
).astype(np.float32)
# Random initial heading so the policy learns to handle any orientation.
self.dog_heading = float(self.np_random.uniform(-np.pi, np.pi))
self.wander_ang = self.np_random.uniform(
-np.pi, np.pi, size=(self.MAX_SHEEP,)
).astype(np.float32)
# Initialise per-sheep pen-distance sum for progress reward
active = ~self.penned[:self.n_sheep]
target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
if active.any():
self._prev_pen_dist_sum = float(
np.linalg.norm(
self.sheep_pos[:self.n_sheep][active] - target, axis=1
).sum()
)
com0 = self.sheep_pos[:self.n_sheep][active].mean(axis=0)
self._prev_radius = float(
np.linalg.norm(self.sheep_pos[:self.n_sheep][active] - com0, axis=1).max()
)
else:
self._prev_pen_dist_sum = 0.0
self._prev_radius = 0.0
return self._obs(), {}
def step(self, action):
self._step_count += 1
act = np.clip(np.asarray(action, dtype=np.float32), -1.0, 1.0)
old_dog = self.dog_pos.copy()
# Wheeled-dog kinematics — mirrors the Webots controller's drive():
# interpret (vx, vy) as a desired velocity vector in world frame; the
# dog turns toward it at a limited rate, and forward speed is gated
# by cos(heading_error). Bang-bang policies still produce smooth
# motion (the dog can't sidestep — it has to turn first).
act_mag = float(np.linalg.norm(act))
if act_mag < self.DOG_STOP_THRESHOLD:
# Below threshold the Webots dog stops; treat the same way here.
new_dog = self.dog_pos.copy()
else:
target_heading = float(np.arctan2(act[1], act[0]))
err = target_heading - self.dog_heading
# Wrap to (-pi, pi]
err = (err + np.pi) % (2 * np.pi) - np.pi
turn_rate = np.clip(self.DOG_K_TURN * err,
-self.DOG_MAX_TURN_RATE,
self.DOG_MAX_TURN_RATE)
self.dog_heading = float(
((self.dog_heading + turn_rate * self.DT) + np.pi)
% (2 * np.pi) - np.pi
)
target_speed = act_mag * self.DOG_SPEED
fwd_speed = target_speed * max(0.0, float(np.cos(err)))
step_vec = np.array([np.cos(self.dog_heading),
np.sin(self.dog_heading)], dtype=np.float32)
new_dog = np.clip(
self.dog_pos + step_vec * fwd_speed * self.DT,
-self.FIELD, self.FIELD,
)
# Pen wall collision — west and east pen walls block the dog within
# the pen's y-range. North face is open, south is the field edge.
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
if py0 < new_dog[1] < py1:
if old_dog[0] < px0 <= new_dog[0]:
new_dog[0] = px0 - 1e-3
elif old_dog[0] > px0 >= new_dog[0]:
new_dog[0] = px0 + 1e-3
if old_dog[0] > px1 >= new_dog[0]:
new_dog[0] = px1 + 1e-3
elif old_dog[0] < px1 <= new_dog[0]:
new_dog[0] = px1 - 1e-3
self.dog_pos = new_dog.astype(np.float32)
for i in range(self.n_sheep):
if self.penned[i]:
continue
self.sheep_pos[i] = self._step_sheep(i)
if self._in_pen(self.sheep_pos[i]):
self.penned[i] = True
n_penned = int(self.penned[:self.n_sheep].sum())
newly_penned = n_penned - self._prev_penned
self._prev_penned = n_penned
reward, rcomps = self._reward(n_penned, newly_penned, act)
terminated = n_penned == self.n_sheep
truncated = self._step_count >= self.max_steps
info = {"n_penned": n_penned, "n_sheep": self.n_sheep,
"rcomps": rcomps}
if self.render_mode == "human":
self.render()
return self._obs(), float(reward), terminated, truncated, info
def render(self):
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
if self._fig is None:
plt.ion()
self._fig, self._ax = plt.subplots(figsize=(6, 6))
ax = self._ax
ax.clear()
ax.set_xlim(-16, 16); ax.set_ylim(-16, 16)
ax.set_aspect("equal"); ax.set_facecolor("#dcedc8")
ax.add_patch(mpatches.Rectangle(
(-15, -15), 30, 30, fill=False, edgecolor="#795548", linewidth=2
))
pw = self.PEN_X[1] - self.PEN_X[0]
ph = self.PEN_Y[1] - self.PEN_Y[0]
ax.add_patch(mpatches.Rectangle(
(self.PEN_X[0], self.PEN_Y[0]), pw, ph,
facecolor="#ffe082", edgecolor="#795548", linewidth=2
))
ax.text(11.5, -11.5, "pen", ha="center", va="center",
fontsize=8, color="#795548")
com, radius, _ = self._flock_stats()
ax.add_patch(plt.Circle(com, radius, color="steelblue",
fill=False, linestyle="--", linewidth=1))
ax.plot(*com, "+", color="steelblue", markersize=10)
for i in range(self.n_sheep):
if i >= self.n_sheep:
continue
color = "deeppink" if self.penned[i] else "white"
ax.plot(*self.sheep_pos[i], "o", color=color, markersize=11,
markeredgecolor="#555", markeredgewidth=1.5)
ax.plot(*self.dog_pos, "s", color="#4e342e", markersize=13,
markeredgecolor="black", markeredgewidth=1.5)
ax.set_title(
f"step {self._step_count} | "
f"penned {int(self.penned[:self.n_sheep].sum())}/{self.n_sheep} | "
f"r={radius:.1f}m",
fontsize=11
)
self._fig.canvas.draw()
self._fig.canvas.flush_events()
plt.pause(0.001)
def close(self):
if self._fig is not None:
import matplotlib.pyplot as plt
plt.close(self._fig)
self._fig = None
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
def _in_pen(self, pos: np.ndarray) -> bool:
return (self.PEN_X[0] < pos[0] < self.PEN_X[1] and
self.PEN_Y[0] < pos[1] < self.PEN_Y[1])
def _flock_stats(self):
"""Return (COM, radius, mean_dispersion) over active sheep."""
active_mask = ~self.penned[:self.n_sheep]
if not active_mask.any():
return self.PEN_CENTER.copy(), 0.0, 0.0
pts = self.sheep_pos[:self.n_sheep][active_mask]
com = pts.mean(axis=0)
dists = np.linalg.norm(pts - com, axis=1)
return com, float(dists.max()), float(dists.mean())
def _obs(self) -> np.ndarray:
com, radius, _ = self._flock_stats()
active_mask = ~self.penned[:self.n_sheep]
if active_mask.any():
pts = self.sheep_pos[:self.n_sheep][active_mask]
dists = np.linalg.norm(pts - com, axis=1)
sorted_idx = np.argsort(dists)[::-1] # farthest first
# Top-3 stragglers; pad with COM when fewer active sheep exist
def nth(n):
return pts[sorted_idx[n]] if len(sorted_idx) > n else com
far1, far2, far3 = nth(0), nth(1), nth(2)
else:
far1 = far2 = far3 = self.PEN_CENTER.copy()
S = self.FIELD
D = 2 * self.FIELD
# far1/far2/far3 expressed relative to COM, not dog.
# For 1 sheep: far1-COM = far2-COM = far3-COM = [0,0] → cleanly ignorable.
# For 3+ sheep: non-zero vectors tell the dog where each straggler is
# within the group, without conflicting with weights trained on 1 sheep.
# Pen reference for the policy. Aligned with the reward target so the
# policy isn't forced to learn an implicit offset between what it sees
# ("pen is here") and what it's rewarded for ("get sheep close to here").
pen_ref = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
return np.array([
self.dog_pos[0] / S, self.dog_pos[1] / S,
(com[0] - self.dog_pos[0]) / D, (com[1] - self.dog_pos[1]) / D,
(far1[0] - com[0]) / D, (far1[1] - com[1]) / D,
(far2[0] - com[0]) / D, (far2[1] - com[1]) / D,
(far3[0] - com[0]) / D, (far3[1] - com[1]) / D,
(pen_ref[0] - com[0]) / D, (pen_ref[1] - com[1]) / D,
(pen_ref[0] - far1[0]) / D, (pen_ref[1] - far1[1]) / D,
radius / D,
active_mask.sum() / self.n_sheep,
float(np.cos(self.dog_heading)),
float(np.sin(self.dog_heading)),
], dtype=np.float32)
def _reward(self, n_penned: int, newly_penned: int, action: np.ndarray):
active = ~self.penned[:self.n_sheep]
# Per-sheep progress toward pen: fires whenever any sheep moves closer.
# Naturally rewards keeping the flock together and pushing toward pen:
# dog behind flock → all sheep flee toward pen → all contribute positive reward.
# Dog from wrong side → sheep scatter away from pen → negative reward.
target = self.PEN_ENTRY if self.ENTRY_AWARE else self.PEN_CENTER
if active.any():
pen_dists = np.linalg.norm(
self.sheep_pos[:self.n_sheep][active] - target, axis=1
)
cur_sum = float(pen_dists.sum())
r_progress = (self._prev_pen_dist_sum - cur_sum) * self.W_PER_SHEEP
self._prev_pen_dist_sum = cur_sum
else:
r_progress = 0.0
com, _, _ = self._flock_stats()
com_dist = float(np.linalg.norm(com - target))
d_dog_com = float(np.linalg.norm(self.dog_pos - com))
if d_dog_com > 0.1 and com_dist > 0.1:
pen_dir = (target - com) / com_dist
dog_dir = (self.dog_pos - com) / d_dog_com
cosine = -float(np.dot(pen_dir, dog_dir))
if self.ALIGN_SHAPE == "standoff":
IDEAL = 0.5 * (self.SEPARATION_DIST + self.FLEE_DIST)
HALF = self.FLEE_DIST - IDEAL
proximity = max(0.0, 1.0 - abs(d_dog_com - IDEAL) / HALF)
else: # "near"
proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST)
move_gate = (min(1.0, float(np.linalg.norm(action)))
if self.ALIGN_GATED else 1.0)
alignment = cosine * proximity * move_gate * self.W_ALIGN
else:
alignment = 0.0
# Wall-touch penalty: distance-based gradient covering ALL solid surfaces
# the sheep can hit — the four field outer walls (always present) plus
# the three solid pen walls (west, east, south). Linearly ramps from 0
# at buffer edge to W_WALL_TOUCH at the wall surface. Goal: sheep should
# never end up pinned against any wall (transfer concern: Webots fences
# have pillars that can physically trap sheep).
if self.W_WALL_TOUCH and active.any():
pts = self.sheep_pos[:self.n_sheep][active]
px0, px1 = self.PEN_X
py0, py1 = self.PEN_Y
F = self.FIELD
buf = self.WALL_TOUCH_BUFFER
far = buf + 1.0
# Field outer walls — sheep is always inside [-F, F]^2.
d_fw = pts[:, 0] - (-F) # distance to west field wall
d_fe = F - pts[:, 0] # east field wall
d_fs = pts[:, 1] - (-F) # south field wall
d_fn = F - pts[:, 1] # north field wall
# Pen W/E/S walls — only relevant approached from outside.
d_pw = np.where((pts[:, 0] < px0) & (pts[:, 1] > py0) & (pts[:, 1] < py1),
px0 - pts[:, 0], far)
d_pe = np.where((pts[:, 0] > px1) & (pts[:, 1] > py0) & (pts[:, 1] < py1),
pts[:, 0] - px1, far)
d_ps = np.where((pts[:, 1] < py0) & (pts[:, 0] > px0) & (pts[:, 0] < px1),
py0 - pts[:, 1], far)
d_min = np.minimum.reduce([d_fw, d_fe, d_fs, d_fn, d_pw, d_pe, d_ps])
penalties = np.maximum(0.0, 1.0 - d_min / buf) * self.W_WALL_TOUCH
r_wall_touch = -float(penalties.sum())
else:
r_wall_touch = 0.0
# South penalty: discourage active sheep from drifting below the pen
# entrance (y < PEN_Y[1]) while OUTSIDE the pen's x-range. Sheep at
# y<-8 with x∈[PEN_X] are entering through the gate — that's desired.
# The dead zone is y<-8 and x outside [PEN_X]: stuck against pen walls,
# must reverse direction (north) to reach the entrance — hard to recover.
if self.W_SOUTH and active.any():
pts = self.sheep_pos[:self.n_sheep][active]
depth = np.maximum(0.0, self.PEN_Y[1] - pts[:, 1])
outside_pen_x = (pts[:, 0] < self.PEN_X[0]) | (pts[:, 0] > self.PEN_X[1])
r_south = -float((depth * outside_pen_x).sum()) * self.W_SOUTH
else:
r_south = 0.0
# Compactness shaping: reward decreases in flock radius (active sheep only)
if self.W_COMPACT and active.any():
cur_radius = float(np.linalg.norm(
self.sheep_pos[:self.n_sheep][active] - com, axis=1
).max())
r_compact = (self._prev_radius - cur_radius) * self.W_COMPACT
self._prev_radius = cur_radius
else:
r_compact = 0.0
r_pen_bonus = newly_penned * self.W_PEN_BONUS
r_step_cost = -self.W_STEP_COST
r_complete = self.W_COMPLETE if n_penned == self.n_sheep else 0.0
reward = (r_progress + alignment + r_south + r_compact + r_wall_touch
+ r_pen_bonus + r_step_cost + r_complete)
rcomps = {
"progress": float(r_progress),
"alignment": float(alignment),
"south": float(r_south),
"compact": float(r_compact),
"wall_touch": float(r_wall_touch),
"pen_bonus": float(r_pen_bonus),
"step_cost": float(r_step_cost),
"complete": float(r_complete),
}
return reward, rcomps
def _step_sheep(self, i: int) -> np.ndarray:
"""Apply one timestep of boid dynamics to sheep i (mirrors sheep.py)."""
old_pos = self.sheep_pos[i].copy() # saved for pen wall collision check
pos = old_pos.copy()
fx, fy = 0.0, 0.0
fleeing = False
# Flee from dog — quadratic ramp
diff = self.dog_pos - pos
dist = float(np.linalg.norm(diff))
if 0.01 < dist < self.FLEE_DIST:
t = 1.0 - dist / self.FLEE_DIST
s = t * t * 5.0
fx -= (diff[0] / dist) * s
fy -= (diff[1] / dist) * s
fleeing = True
# Separation (inverse-distance) + Cohesion
cx, cy, cn = 0.0, 0.0, 0
for j in range(self.n_sheep):
if j == i or self.penned[j]:
continue
dv = self.sheep_pos[j] - pos
dj = float(np.linalg.norm(dv))
if 0.3 < dj < self.COHESION_DIST:
cx += self.sheep_pos[j][0]
cy += self.sheep_pos[j][1]
cn += 1
if 0.05 < dj < self.SEPARATION_DIST:
push = (self.SEPARATION_DIST - dj) / dj
fx -= (dv[0] / dj) * push * 2.5
fy -= (dv[1] / dj) * push * 2.5
if cn > 0:
w = 0.08 if fleeing else 0.15
fx += (cx / cn - pos[0]) * w
fy += (cy / cn - pos[1]) * w
# Wall avoidance
m, F = self.WALL_MARGIN, self.FIELD
if pos[0] < -F + m: fx += ((-F + m - pos[0]) / m) * 6.0
if pos[0] > F - m: fx -= ((pos[0] - (F - m)) / m) * 6.0
if pos[1] < -F + m: fy += ((-F + m - pos[1]) / m) * 6.0
if pos[1] > F - m: fy -= ((pos[1] - (F - m)) / m) * 6.0
# Hard-stop clamp: mirrors sheep.py — zero any force driving further
# into the wall within 0.5 m so the flee force cannot pin the sheep.
HS = 0.5
if pos[0] < -F + HS and fx < 0: fx = 0.0
if pos[0] > F - HS and fx > 0: fx = 0.0
if pos[1] < -F + HS and fy < 0: fy = 0.0
if pos[1] > F - HS and fy > 0: fy = 0.0
# Wander — suppressed while fleeing
if not fleeing:
if self.np_random.random() < 0.02:
self.wander_ang[i] += float(self.np_random.uniform(-0.6, 0.6))
fx += float(np.cos(self.wander_ang[i])) * 0.5
fy += float(np.sin(self.wander_ang[i])) * 0.5
# Integrate
force = np.array([fx, fy])
mag = float(np.linalg.norm(force))
if mag > 0.01:
top_speed = self.SHEEP_FLEE_V if fleeing else self.SHEEP_WANDER_V
speed = min(top_speed, mag * 0.3)
pos = np.clip(pos + (force / mag) * speed * self.DT,
-self.FIELD, self.FIELD)
# Pen solid wall collision — mirrors Webots geometry.
# The pen has THREE solid walls: west (x=PEN_X[0]), east (x=PEN_X[1]),
# south (y=PEN_Y[0]). The NORTH face (y=PEN_Y[1]=-8) is the open entrance.
# Sheep may only enter through the north face; crossing a solid wall is blocked.
px0, px1 = self.PEN_X[0], self.PEN_X[1]
py0, py1 = self.PEN_Y[0], self.PEN_Y[1]
entered_from_north = (
old_pos[1] >= py1 and pos[1] < py1 and px0 < pos[0] < px1
)
if not entered_from_north:
# Block crossing through west wall from outside
if old_pos[0] < px0 <= pos[0] and py0 < pos[1] < py1:
pos = np.array([px0 - 1e-3, pos[1]], dtype=np.float32)
# Block crossing through east wall from outside
if old_pos[0] > px1 >= pos[0] and py0 < pos[1] < py1:
pos = np.array([px1 + 1e-3, pos[1]], dtype=np.float32)
return pos.astype(np.float32)