Checkpoint 8

This commit is contained in:
Johnny Fernandes
2026-05-12 22:41:03 +01:00
parent a01a5c9cef
commit 5c2ee4bba5
31 changed files with 3189 additions and 380 deletions
+27 -7
View File
@@ -33,7 +33,11 @@ class ActiveScanTeacher:
Call signature::
vx, vy, mode = teacher(dog_xy, dog_heading, sheep_positions, pen_target)
vx, vy, omega, mode = teacher(dog_xy, dog_heading, sheep_positions,
pen_target, drive_mode="differential")
``omega`` is the yaw-rate intent (mecanum only); 0.0 for differential
drive and during blind exploration phases.
"""
def __init__(self, base_action_fn, initial_scan_steps: int = INITIAL_SCAN_STEPS):
@@ -62,7 +66,8 @@ class ActiveScanTeacher:
return 0.0, 0.0
return EXPLORE_SPEED * dx / d, EXPLORE_SPEED * dy / d
def __call__(self, dog_xy, dog_heading, sheep_positions, pen_target):
def __call__(self, dog_xy, dog_heading, sheep_positions, pen_target,
drive_mode="differential"):
self.step += 1
n_visible = len(sheep_positions)
@@ -75,7 +80,7 @@ class ActiveScanTeacher:
if self.step <= self.initial_scan:
vx, vy = self._scan_action(dog_heading)
self.last_action = (vx, vy)
return vx, vy, "scan_initial"
return vx, vy, 0.0, "scan_initial"
# Phase 2: walk-to-centre after a sustained empty tracker.
if self.empty_streak >= EMPTY_DEBOUNCE_STEPS:
@@ -87,16 +92,31 @@ class ActiveScanTeacher:
vx, vy = ex, ey
mode = "explore"
self.last_action = (vx, vy)
return vx, vy, mode
return vx, vy, 0.0, mode
# Phase 2b: brief tracker blink — hold the previous action.
if n_visible == 0:
vx, vy = self.last_action
return vx, vy, "hold"
return vx, vy, 0.0, "hold"
# Phase 3: hand off to the underlying analytic teacher, then
# apply the shared near-sheep speed modulation.
vx, vy, mode = self.base(dog_xy, sheep_positions, pen_target)
# Handle both old-style (dog_xy, sheep, pen) and new-style
# (dog_xy, heading, sheep, pen, drive_mode) teachers.
try:
result = self.base(dog_xy, dog_heading, sheep_positions,
pen_target, drive_mode)
except TypeError:
try:
result = self.base(dog_xy, dog_heading, sheep_positions,
pen_target)
except TypeError:
result = self.base(dog_xy, sheep_positions, pen_target)
if len(result) == 4:
vx, vy, omega, mode = result
else:
vx, vy, mode = result
omega = 0.0
vx, vy = modulate_speed_near_sheep(vx, vy, dog_xy, sheep_positions)
self.last_action = (vx, vy)
return vx, vy, mode
return vx, vy, omega, mode
+187
View File
@@ -0,0 +1,187 @@
"""Universal shepherd teacher — Strömbom core + mecanum omega + straggler recovery.
The core collect/drive logic is **identical** to :mod:`strombom` (same
``F_FACTOR``, ``DELTA_COLLECT``, ``DELTA_DRIVE`` thresholds and target
computation) so it inherits the proven ~100 % success rate at n ≤ 8.
Two additions make it useful as a universal teacher:
1. **Omega for mecanum.** When ``drive_mode="mecanum"``, the teacher
outputs a non-zero ``omega`` channel so the dog **faces the
direction of travel**. During collect the dog faces the target
sheep; during drive it faces the pen. This gives the BC student a
real rotation signal to learn from.
2. **Last-straggler recovery.** When exactly one sheep remains active
and it is near the gate, the dog positions itself behind that
straggler (opposite the gate) and pushes it straight through. This
handles the edge case where the last sheep circles the gate posts.
Call signature::
vx, vy, omega, mode = compute_action(
dog_xy, dog_heading, sheep_positions, pen_target,
drive_mode="differential",
)
For differential drive ``omega`` is always 0.0 and can be ignored.
"""
import math
from herding.world.geometry import (
PEN_ENTRY, GATE_X, GATE_Y, in_pen,
)
# ---------------------------------------------------------------------------
# Tuning constants — match Strömbom exactly for proven success rates.
# ---------------------------------------------------------------------------
F_FACTOR = 4.0 # collect/drive threshold scaled by √n
DELTA_COLLECT = 1.5 # standoff behind the furthest sheep
DELTA_DRIVE = 2.0 # standoff behind flock CoM
# Omega gain for mecanum (how strongly the dog turns to face target)
OMEGA_GAIN = 0.6
# Recovery: push the last straggler straight through the gate.
RECOVERY_GATE_DIST = 6.0 # only when straggler is this close to gate centre
RECOVERY_PUSH_DIST = 1.2 # stand-off behind straggler, away from gate
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _unit(x, y):
d = math.hypot(x, y)
if d < 1e-6:
return 0.0, 0.0
return x / d, y / d
def _is_active(x, y) -> bool:
return (not in_pen(x, y)) and y > GATE_Y
def _angle_diff(a, b):
"""Signed shortest angular difference a - b, in [-π, π]."""
return math.atan2(math.sin(a - b), math.cos(a - b))
def _gate_center():
"""Centre of the gate opening."""
return (0.5 * (GATE_X[0] + GATE_X[1]), GATE_Y)
# ---------------------------------------------------------------------------
# Core teacher
# ---------------------------------------------------------------------------
def compute_action(dog_xy, dog_heading, sheep_positions,
pen_target=PEN_ENTRY, drive_mode="differential"):
"""Return ``(vx, vy, omega, mode)``.
Parameters
----------
dog_xy : (float, float)
Dog position in world frame.
dog_heading : float
Dog heading in world frame (rad), 0 = +x axis.
sheep_positions : dict[str, (float, float)]
Visible sheep positions.
pen_target : (float, float)
Centre of the pen gate (defaults to geometry.PEN_ENTRY).
drive_mode : str
``"differential"`` or ``"mecanum"``.
Returns
-------
vx, vy : float
Velocity intent in [-1, 1].
omega : float
Yaw intent in [-1, 1] (0 for differential).
mode : str
Phase label: ``"idle"``, ``"collect"``, ``"drive"``, ``"recovery"``.
"""
active = [(x, y) for (x, y) in sheep_positions.values()
if _is_active(x, y)]
if not active:
return 0.0, 0.0, 0.0, "idle"
n = len(active)
com_x = sum(p[0] for p in active) / n
com_y = sum(p[1] for p in active) / n
dists = [math.hypot(p[0] - com_x, p[1] - com_y) for p in active]
radius = max(dists)
# ---- Last-straggler recovery (single sheep circling near gate) ----
gc = _gate_center()
if n == 1:
sx, sy = active[0]
d_to_gate = math.hypot(sx - gc[0], sy - gc[1])
if d_to_gate < RECOVERY_GATE_DIST:
dx_g = sx - gc[0]
dy_g = sy - gc[1]
d_g = math.hypot(dx_g, dy_g)
if d_g > 0.3:
ux, uy = dx_g / d_g, dy_g / d_g
else:
ux, uy = 0.0, 1.0
tx = sx + RECOVERY_PUSH_DIST * ux
ty = sy + RECOVERY_PUSH_DIST * uy
ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1])
mode = "recovery"
face_target = (sx, sy)
omega = 0.0
if drive_mode == "mecanum":
desired = math.atan2(
face_target[1] - dog_xy[1],
face_target[0] - dog_xy[0],
)
err = _angle_diff(desired, dog_heading)
omega = max(-1.0, min(1.0, OMEGA_GAIN * err / math.pi))
return ax, ay, omega, mode
# ---- Standard Strömbom collect/drive (proven core) ----
if radius > F_FACTOR * math.sqrt(n):
# Collect: aim behind the furthest sheep, opposite the CoM.
idx = max(range(n), key=lambda i: dists[i])
sx, sy = active[idx]
ux, uy = _unit(sx - com_x, sy - com_y)
tx, ty = sx + DELTA_COLLECT * ux, sy + DELTA_COLLECT * uy
mode = "collect"
face_target = (sx, sy)
else:
# Drive: aim behind the CoM, opposite the pen.
ux, uy = _unit(com_x - pen_target[0], com_y - pen_target[1])
tx, ty = com_x + DELTA_DRIVE * ux, com_y + DELTA_DRIVE * uy
mode = "drive"
face_target = pen_target
ax, ay = _unit(tx - dog_xy[0], ty - dog_xy[1])
# ---- Omega (mecanum only) ----
omega = 0.0
if drive_mode == "mecanum" and mode != "idle":
desired_heading = math.atan2(
face_target[1] - dog_xy[1],
face_target[0] - dog_xy[0],
)
err = _angle_diff(desired_heading, dog_heading)
omega = max(-1.0, min(1.0, OMEGA_GAIN * err / math.pi))
return ax, ay, omega, mode
def compute_action_diff(dog_xy, dog_heading, sheep_positions,
pen_target=PEN_ENTRY):
"""Compatibility wrapper returning ``(vx, vy, mode)`` — same as Strömbom.
Use this when plugging into existing differential-drive code that
doesn't expect omega.
"""
vx, vy, _omega, mode = compute_action(
dog_xy, dog_heading, sheep_positions, pen_target,
drive_mode="differential",
)
return vx, vy, mode
+132 -50
View File
@@ -24,9 +24,14 @@ import math
import numpy as np
from herding.world.geometry import FIELD_X, FIELD_Y, GATE_Y, PEN_X, PEN_Y
from herding.world.geometry import (
FIELD_SHAPE, FIELD_ROUND_R,
FIELD_X, FIELD_Y, GATE_X, GATE_Y,
PEN_X, PEN_Y,
)
from herding.perception.lidar_sim import (
LIDAR_FOV, LIDAR_MAX_RANGE, LIDAR_N_RAYS, SHEEP_RADIUS, ray_angles,
LIDAR_FOV, LIDAR_MAX_RANGE, LIDAR_N_RAYS, SHEEP_RADIUS, POST_RADIUS,
ray_angles,
)
@@ -35,16 +40,94 @@ MAX_CLUSTER_SPAN = 1.5 # m — wider clusters are walls / structures
RANGE_HIT_EPS = 0.05 # m — hit if range < max_range - eps
WALL_REJECT = 0.5 # m — drop detections this close to a known wall line
# Sheep-sized static features (gate posts, corner pillars). A cluster
# centred within STATIC_REJECT of any of these is never a sheep.
_STATIC_FEATURES = (
# Multi-peak splitting: within a single cluster, if the range profile
# has a local dip (i.e. the range increases then decreases) deeper than
# SPLIT_RANGE_GAP, the cluster is split into two detections.
SPLIT_RANGE_GAP = 0.20 # m — range increase that triggers a split
# Sheep-sized static features. A cluster centred within STATIC_REJECT of
# any of these is never a sheep.
_STATIC_FEATURES_RECT = (
( 10.0, -15.0), ( 13.0, -15.0), # gate posts
( 15.0, 15.0), ( 15.0, -15.0),
(-15.0, 15.0), (-15.0, -15.0), # field corners
)
_STATIC_FEATURES_ROUND = (
(GATE_X[0], GATE_Y),
(GATE_X[1], GATE_Y),
)
STATIC_REJECT = 0.8
def _get_static_features():
if FIELD_SHAPE == "field_round":
return _STATIC_FEATURES_ROUND
return _STATIC_FEATURES_RECT
_STATIC_FEATURES = _get_static_features()
def _in_field_region(cx: float, cy: float) -> bool:
"""Check if a detection is inside the field (with small margin)."""
if FIELD_SHAPE == "field_round":
r = math.hypot(cx, cy)
return r < FIELD_ROUND_R + 0.2
return (FIELD_X[0] - 0.2 < cx < FIELD_X[1] + 0.2 and
FIELD_Y[0] - 0.2 < cy < FIELD_Y[1] + 0.2)
def _near_wall(cx: float, cy: float) -> bool:
"""True if the detection is too close to a wall to be a sheep."""
if FIELD_SHAPE == "field_round":
r = math.hypot(cx, cy)
return r > FIELD_ROUND_R - WALL_REJECT
return (
cx > FIELD_X[1] - WALL_REJECT or cx < FIELD_X[0] + WALL_REJECT or
cy > FIELD_Y[1] - WALL_REJECT or
(cy < FIELD_Y[0] + WALL_REJECT and not (PEN_X[0] <= cx <= PEN_X[1]))
)
def _split_cluster_by_range(
points: list[tuple[float, float]],
range_vals: list[float],
) -> list[list[tuple[float, float]]]:
"""Split a cluster at range-profile local maxima (gaps between sheep).
When two sheep are close, the LiDAR sees them as one arc, but the
range profile has a local peak between them (the ray passes between
the two discs). This function finds those peaks and splits.
"""
if len(points) < 4:
return [points]
# Find the minimum range in the cluster (closest point to dog).
r_min = min(range_vals)
# Find the maximum range (the dip/gap between sheep).
r_max = max(range_vals)
# If the range variation is small, it's a single target.
if r_max - r_min < SPLIT_RANGE_GAP:
return [points]
# Find the split point: the index with the maximum range.
split_idx = range_vals.index(r_max)
if split_idx <= 1 or split_idx >= len(points) - 2:
return [points]
# Split into two sub-clusters.
left = points[:split_idx]
right = points[split_idx + 1:]
# Recursively split each half.
result = []
for sub_pts, sub_ranges in [
(left, range_vals[:split_idx]),
(right, range_vals[split_idx + 1:]),
]:
if len(sub_pts) >= 1:
result.extend(_split_cluster_by_range(sub_pts, sub_ranges))
return result if result else [points]
def detections_from_scan(
ranges: np.ndarray,
dog_x: float, dog_y: float, dog_heading: float,
@@ -64,63 +147,62 @@ def detections_from_scan(
# Walk rays in angular order; a large jump between consecutive
# world-frame hit points closes the current cluster.
clusters: list[list[tuple[float, float]]] = []
current: list[tuple[float, float]] = []
prev: tuple[float, float] | None = None
# Store (x, y, range) per hit ray for multi-peak splitting.
clusters: list[list[tuple[float, float, float]]] = []
current: list[tuple[float, float, float]] = []
prev_xy: tuple[float, float] | None = None
for i in range(n_rays):
if not bool(hit[i]):
if current:
clusters.append(current)
current = []
prev = None
prev_xy = None
continue
pt = (float(px[i]), float(py[i]))
if prev is not None and math.hypot(pt[0] - prev[0], pt[1] - prev[1]) > GAP_THRESHOLD:
pt = (float(px[i]), float(py[i]), float(ranges[i]))
if prev_xy is not None and math.hypot(pt[0] - prev_xy[0], pt[1] - prev_xy[1]) > GAP_THRESHOLD:
clusters.append(current)
current = []
current.append(pt)
prev = pt
prev_xy = (pt[0], pt[1])
if current:
clusters.append(current)
detections: list[tuple[float, float]] = []
for cluster in clusters:
xs = [p[0] for p in cluster]
ys = [p[1] for p in cluster]
cx, cy = sum(xs) / len(xs), sum(ys) / len(ys)
span = math.hypot(max(xs) - min(xs), max(ys) - min(ys))
if span > MAX_CLUSTER_SPAN:
continue
# Rays hit the front edge of the sheep; offset outward by
# SHEEP_RADIUS along the dog→cluster direction to estimate the
# centre.
dx, dy = cx - dog_x, cy - dog_y
d = math.hypot(dx, dy)
if d > 1e-3:
cx += SHEEP_RADIUS * dx / d
cy += SHEEP_RADIUS * dy / d
# Region filter: in-field clusters, plus a narrow strip south of
# the gate so sheep mid-crossing get latched penned. Detections
# deeper into the pen are dropped — pen posts and rails would
# otherwise generate phantom penned tracks.
in_main = (FIELD_X[0] - 0.2 < cx < FIELD_X[1] + 0.2 and
FIELD_Y[0] - 0.2 < cy < FIELD_Y[1] + 0.2)
in_gate_strip = (PEN_X[0] - 0.2 < cx < PEN_X[1] + 0.2 and
GATE_Y - 1.0 < cy < GATE_Y + 0.2)
if not (in_main or in_gate_strip):
continue
# Known sheep-sized static features.
if any(math.hypot(cx - fx, cy - fy) < STATIC_REJECT
for fx, fy in _STATIC_FEATURES):
continue
# Wall-proximity filter — sheep can't get this close to a wall,
# so detections right at the wall line are structure noise.
near_field_wall = (
cx > FIELD_X[1] - WALL_REJECT or cx < FIELD_X[0] + WALL_REJECT or
cy > FIELD_Y[1] - WALL_REJECT or
(cy < FIELD_Y[0] + WALL_REJECT and not (PEN_X[0] <= cx <= PEN_X[1]))
)
if near_field_wall:
continue
detections.append((cx, cy))
points_xy = [(p[0], p[1]) for p in cluster]
range_vals = [p[2] for p in cluster]
# Multi-peak splitting.
if len(cluster) >= 4:
sub_clusters = _split_cluster_by_range(points_xy, range_vals)
else:
sub_clusters = [points_xy]
for sub in sub_clusters:
if len(sub) < 1:
continue
xs = [p[0] for p in sub]
ys = [p[1] for p in sub]
cx, cy = sum(xs) / len(xs), sum(ys) / len(ys)
span = math.hypot(max(xs) - min(xs), max(ys) - min(ys))
if span > MAX_CLUSTER_SPAN:
continue
# Rays hit the front edge of the sheep; offset outward by
# SHEEP_RADIUS along the dog→cluster direction.
dx, dy = cx - dog_x, cy - dog_y
d = math.hypot(dx, dy)
if d > 1e-3:
cx += SHEEP_RADIUS * dx / d
cy += SHEEP_RADIUS * dy / d
in_main = _in_field_region(cx, cy)
in_gate_strip = (PEN_X[0] - 0.2 < cx < PEN_X[1] + 0.2 and
GATE_Y - 1.0 < cy < GATE_Y + 0.2)
if not (in_main or in_gate_strip):
continue
if any(math.hypot(cx - fx, cy - fy) < STATIC_REJECT
for fx, fy in _STATIC_FEATURES):
continue
if _near_wall(cx, cy):
continue
detections.append((cx, cy))
return detections
+101 -30
View File
@@ -1,7 +1,8 @@
"""Fast 2D LiDAR simulator for the Gymnasium env.
Raycasts against sheep (discs) and static world geometry (axis-aligned
walls + gate posts) so the env reproduces the false-positive cluster
Raycasts against sheep (discs) and static world geometry. For rectangular
fields this is axis-aligned walls + gate posts; for round fields it is a
circular wall + gate posts. The env reproduces the false-positive cluster
distribution Webots produces from real 3D geometry.
Returns a range array matching the Webots Lidar device:
@@ -15,49 +16,96 @@ import math
import numpy as np
from herding.world.geometry import (
FIELD_SHAPE, FIELD_ROUND_R,
FIELD_X, FIELD_Y,
GATE_X, GATE_Y,
PEN_X, PEN_Y,
)
# Match protos/ShepherdDog.proto Lidar device.
LIDAR_N_RAYS = 180
LIDAR_FOV = 2.44 # rad ≈ 140°
# Match protos/ShepherdDog.proto Lidar device — extended to 360° for
# full situational awareness. The original Webots device is 140° FOV /
# 180 rays; we use 360 rays for full-circle coverage.
LIDAR_N_RAYS = 360
LIDAR_FOV = 2.0 * math.pi # 360° full circle
LIDAR_MAX_RANGE = 12.0
LIDAR_NOISE = 0.005 # m, gaussian std
# Sheep cross-section in the LiDAR plane (horizontal cylinder approx).
SHEEP_RADIUS = 0.30
POST_RADIUS = 0.25
# --- Static world geometry — mirrors worlds/field.wbt ---
# Vertical walls: (x, y_min, y_max).
_VERTICAL_WALLS = (
# ---------------------------------------------------------------------------
# Rectangular-field static geometry
# ---------------------------------------------------------------------------
_VERTICAL_WALLS_RECT = (
( 15.0, -15.0, 15.0), # field east
(-15.0, -15.0, 15.0), # field west
( 10.0, -22.0, -15.0), # pen west
( 13.0, -22.0, -15.0), # pen east
)
# Horizontal walls: (y, x_min, x_max). South wall has a 3 m gap at the gate.
_HORIZONTAL_WALLS = (
_HORIZONTAL_WALLS_RECT = (
( 15.0, -15.0, 15.0), # field north
(-15.0, -15.0, 10.0), # field south-west of gate
(-15.0, 13.0, 15.0), # field south-east of gate
(-22.0, 10.0, 13.0), # pen south
)
# Gate posts + field corner pillars, treated as discs at LiDAR height.
_POSTS_XY = np.array([
_POSTS_RECT = np.array([
( 10.0, -15.0), ( 13.0, -15.0),
( 15.0, 15.0), ( 15.0, -15.0),
(-15.0, 15.0), (-15.0, -15.0),
], dtype=np.float64)
POST_RADIUS = 0.25
# ---------------------------------------------------------------------------
# Round-field static geometry
# ---------------------------------------------------------------------------
# Circular wall with gate gap. Gate posts at the edges of the gate gap.
_gate_cx = 0.5 * (GATE_X[0] + GATE_X[1])
_POSTS_ROUND = np.array([
(GATE_X[0], GATE_Y),
(GATE_X[1], GATE_Y),
], dtype=np.float64)
# Pen walls for round field
_VERTICAL_WALLS_ROUND = (
(GATE_X[0], PEN_Y[0], GATE_Y), # pen west
(GATE_X[1], PEN_Y[0], GATE_Y), # pen east
)
_HORIZONTAL_WALLS_ROUND = (
(PEN_Y[0], GATE_X[0], GATE_X[1]), # pen south
)
def _build_static_geometry():
"""Select the correct static geometry for the active field shape."""
if FIELD_SHAPE == "field_round":
return (
_VERTICAL_WALLS_ROUND,
_HORIZONTAL_WALLS_ROUND,
_POSTS_ROUND,
FIELD_ROUND_R,
)
return (
_VERTICAL_WALLS_RECT,
_HORIZONTAL_WALLS_RECT,
_POSTS_RECT,
None, # no circular wall
)
_VERTS, _HORIZS, _POSTS, _CIRC_R = _build_static_geometry()
# ---------------------------------------------------------------------------
# Ray helpers
# ---------------------------------------------------------------------------
def ray_angles(n: int = LIDAR_N_RAYS, fov: float = LIDAR_FOV) -> np.ndarray:
"""Local-frame ray angles, CCW from forward, sweeping +fov/2 → -fov/2.
Matches Webots' default Lidar sweep direction.
"""
"""Local-frame ray angles, CCW from forward, sweeping +fov/2 → -fov/2."""
return np.linspace(fov / 2.0, -fov / 2.0, n, dtype=np.float64)
@@ -78,7 +126,7 @@ def _raycast_static(
safe_sin = np.where(np.abs(sin_w) < 1e-9, 1e-9, sin_w)
# Vertical walls (x = const)
for wx, ymin, ymax in _VERTICAL_WALLS:
for wx, ymin, ymax in _VERTS:
t = (wx - ox) / safe_cos
y_at = oy + t * sin_w
valid = (t > EPS) & (y_at >= ymin - EPS) & (y_at <= ymax + EPS)
@@ -86,19 +134,47 @@ def _raycast_static(
np.minimum(best, cand, out=best)
# Horizontal walls (y = const)
for wy, xmin, xmax in _HORIZONTAL_WALLS:
for wy, xmin, xmax in _HORIZS:
t = (wy - oy) / safe_sin
x_at = ox + t * cos_w
valid = (t > EPS) & (x_at >= xmin - EPS) & (x_at <= xmax + EPS)
cand = np.where(valid, t, np.inf)
np.minimum(best, cand, out=best)
# Circular wall (round field only)
if _CIRC_R is not None:
# Ray: P(t) = O + t·D. ||P(t)||² = R²
# t² - 2t(O·D) + (||O||² - R²) = 0
# a = 1 (rays are unit), b = -2(O·D), c = ||O||² - R²
a = 1.0 # cos_w² + sin_w² = 1
b = -(ox * cos_w + oy * sin_w)
c = ox * ox + oy * oy - _CIRC_R * _CIRC_R
disc = b * b - a * c
valid_disc = disc >= 0.0
sqrt_disc = np.sqrt(np.maximum(disc, 0.0))
# Two intersection candidates: t = (-b ± sqrt(disc)) / a
t1 = -b - sqrt_disc
t2 = -b + sqrt_disc
# We want the smallest positive t.
t1_valid = valid_disc & (t1 > EPS)
t2_valid = valid_disc & (t2 > EPS)
t_circ = np.where(t1_valid, t1, np.where(t2_valid, t2, np.inf))
# Exclude rays that hit the gate gap: the hit point must not lie
# in the gate column (between GATE_X and above GATE_Y).
hx = ox + t_circ * cos_w
hy = oy + t_circ * sin_w
in_gate = ((hx > GATE_X[0]) & (hx < GATE_X[1]) &
(hy > GATE_Y - 2.0) & (hy < GATE_Y + 2.0))
t_circ = np.where(in_gate, np.inf, t_circ)
np.minimum(best, t_circ, out=best)
# Posts (treat as discs)
if _POSTS_XY.size:
px = _POSTS_XY[:, 0] - ox
py = _POSTS_XY[:, 1] - oy
t_post = np.outer(px, cos_w) + np.outer(py, sin_w) # (P, N)
d2 = (px ** 2 + py ** 2)[:, None] # (P, 1)
if _POSTS.size:
px = _POSTS[:, 0] - ox
py = _POSTS[:, 1] - oy
t_post = np.outer(px, cos_w) + np.outer(py, sin_w)
d2 = (px ** 2 + py ** 2)[:, None]
perp2 = d2 - t_post ** 2
R2 = POST_RADIUS ** 2
hit = (perp2 < R2) & (t_post > 0.0)
@@ -121,16 +197,12 @@ def simulate_scan(
``sheep_xy`` is every sheep (penned or active) in the scene.
"""
n_rays = _ANGLES.shape[0]
ch, sh = math.cos(dog_heading), math.sin(dog_heading)
cos_w = ch * _COS - sh * _SIN
sin_w = sh * _COS + ch * _SIN
# Walls + posts
best = _raycast_static(dog_x, dog_y, cos_w, sin_w)
# Sheep discs
if sheep_xy:
sx = np.asarray([p[0] for p in sheep_xy], dtype=np.float64) - dog_x
sy = np.asarray([p[1] for p in sheep_xy], dtype=np.float64) - dog_y
@@ -144,7 +216,6 @@ def simulate_scan(
nearest = candidate.min(axis=0)
np.minimum(best, nearest, out=best)
# Entries with no hit stay at inf → clipped to max_range, matching Webots.
ranges = np.minimum(best, max_range).astype(np.float32)
return _add_noise(ranges, noise, rng, max_range)
+6 -12
View File
@@ -26,14 +26,15 @@ import math
import numpy as np
from herding.world.geometry import (
FIELD_X, FIELD_Y, PEN_ENTRY, MAX_SHEEP,
PEN_ENTRY, MAX_SHEEP, distance_to_wall,
)
OBS_DIM = 32
def build_obs(dog_xy, dog_heading, sheep_xy_list, sheep_penned_list,
n_max: int = MAX_SHEEP) -> np.ndarray:
n_max: int = MAX_SHEEP,
n_expected: int | None = None) -> np.ndarray:
"""Assemble the dog policy's observation vector.
Parameters
@@ -43,6 +44,7 @@ def build_obs(dog_xy, dog_heading, sheep_xy_list, sheep_penned_list,
sheep_xy_list : iterable of (x, y) for ALL known sheep
sheep_penned_list : parallel iterable of bool — True if sheep is penned
n_max : maximum supported flock size used for the count normaliser
n_expected : unused, kept for API compatibility.
"""
dog_x, dog_y = dog_xy
obs = np.zeros(OBS_DIM, dtype=np.float32)
@@ -89,16 +91,8 @@ def build_obs(dog_xy, dog_heading, sheep_xy_list, sheep_penned_list,
obs[15] = float(rel[far_idx, 0]) / 15.0
obs[16] = float(rel[far_idx, 1]) / 15.0
min_sheep_wall = min(
float(np.min(arr[:, 0] - FIELD_X[0])),
float(np.min(FIELD_X[1] - arr[:, 0])),
float(np.min(arr[:, 1] - FIELD_Y[0])),
float(np.min(FIELD_Y[1] - arr[:, 1])),
)
min_dog_wall = min(
dog_x - FIELD_X[0], FIELD_X[1] - dog_x,
dog_y - FIELD_Y[0], FIELD_Y[1] - dog_y,
)
min_sheep_wall = float(min(distance_to_wall(sx, sy) for sx, sy in active))
min_dog_wall = distance_to_wall(dog_x, dog_y)
obs[17] = min_sheep_wall / 15.0
obs[18] = float(min_dog_wall) / 15.0
obs[19] = n / n_max
+118 -42
View File
@@ -6,6 +6,14 @@ reappear off-position), plus memory of last-seen positions for sheep
out of FOV. Output is ``{name: (x, y)}`` — Strömbom / Sequential
consume it directly.
When **predictive mode** is enabled (the default), tracks carry a
constant-velocity state ``(vx, vy)`` estimated from the last two
observations. While a track is occluded its position is extrapolated
using this velocity for up to ``PREDICT_STEPS`` frames, keeping the
teacher's CoM estimate stable during brief losses. After prediction
expires, the track falls back to its last-seen position (static memory)
until ``FORGET_STEPS`` deletes it entirely.
A track is marked penned once its estimated position crosses the gate
plane south (``is_penned_position``). Penned tracks are excluded from
``get_positions`` and kept indefinitely.
@@ -25,16 +33,71 @@ PENNED_GATE_M = 4.0 # m — gate for matching detections to existing penne
FORGET_STEPS = 200 # ~3.2 s — delete stale active tracks (penned ones kept forever)
MAX_ACTIVE_TRACKS = MAX_SHEEP
# Predictive tracking constants.
PREDICT_STEPS = 120 # ~1.9 s — extrapolate velocity this many frames
VELOCITY_CLAMP = 1.0 # m/s — max predicted speed (sheep max is ~0.78 m/s)
class Track:
"""Single track with position, velocity, and age."""
__slots__ = ("x", "y", "vx", "vy", "last_seen", "penned")
def __init__(self, x: float, y: float, step: int, penned: bool = False):
self.x = x
self.y = y
self.vx = 0.0
self.vy = 0.0
self.last_seen = step
self.penned = penned
@property
def age(self) -> int:
"""Not-a-property in the hot loop — callers pass current step."""
raise NotImplementedError
def predicted_position(self, current_step: int) -> tuple[float, float]:
"""Extrapolated position using constant velocity, clamped."""
dt = current_step - self.last_seen
if dt <= 0 or dt > PREDICT_STEPS:
return self.x, self.y
speed = math.hypot(self.vx, self.vy)
if speed < 1e-4:
return self.x, self.y
# Clamp extrapolation distance.
max_d = VELOCITY_CLAMP * dt * 0.016 # steps → seconds
d = min(speed * dt * 0.016, max_d)
return (
self.x + d * (self.vx / speed),
self.y + d * (self.vy / speed),
)
def update(self, x: float, y: float, step: int) -> None:
"""Absorb a new detection and re-estimate velocity."""
dt = step - self.last_seen
if dt > 0:
dt_s = dt * 0.016 # steps → seconds
new_vx = (x - self.x) / dt_s
new_vy = (y - self.y) / dt_s
# Exponential smoothing on velocity.
alpha = 0.6
self.vx = alpha * new_vx + (1.0 - alpha) * self.vx
self.vy = alpha * new_vy + (1.0 - alpha) * self.vy
self.x = x
self.y = y
self.last_seen = step
class SheepTracker:
"""Online tracker with NN association and forgetful memory.
"""Online tracker with NN association, prediction, and forgetful memory.
Each track stores ``(x, y, last_seen_step, penned)``.
Each track is a :class:`Track` with position, velocity estimate,
last-seen step, and penned flag.
"""
def __init__(self, gate: float = GATE_M):
self.gate = gate
self._tracks: dict[int, tuple[float, float, int, bool]] = {}
self._tracks: dict[int, Track] = {}
self._next_id = 0
self.step = 0
@@ -50,13 +113,14 @@ class SheepTracker:
det_used: set[int] = set()
updated_tids: set[int] = set()
# Pass 1 — match active tracks within the primary gate. Oldest-
# seen tracks bind first so a re-emerging long-lost sheep keeps
# its old ID instead of being grabbed by a fresh neighbour.
active_tids = [tid for tid, t in self._tracks.items() if not t[3]]
active_tids.sort(key=lambda tid: self._tracks[tid][2])
# Pass 1 — match active tracks within the primary gate.
# Use predicted positions for matching, oldest-first.
active_tids = [tid for tid, t in self._tracks.items() if not t.penned]
active_tids.sort(key=lambda tid: self._tracks[tid].last_seen)
for tid in active_tids:
tx, ty, _, _ = self._tracks[tid]
track = self._tracks[tid]
# Use predicted position for matching.
tx, ty = track.predicted_position(self.step)
best_j, best_d = -1, self.gate
for j, (dx, dy) in enumerate(detections):
if j in det_used:
@@ -67,20 +131,18 @@ class SheepTracker:
best_j = j
if best_j >= 0:
dx, dy = detections[best_j]
self._tracks[tid] = (dx, dy, self.step, False)
track.update(dx, dy, self.step)
det_used.add(best_j)
updated_tids.add(tid)
# Pass 1b — re-acquisition. Sheep flee at ~0.6 m/s, so over a
# 12 s occlusion the same sheep may reappear outside the primary
# gate. Allow rebinding within a wider gate for stale-enough
# tracks; otherwise phantom tracks accumulate and corrupt CoM.
# Pass 1b — re-acquisition with wider gate for stale tracks.
for tid in active_tids:
if tid in updated_tids:
continue
tx, ty, last, _ = self._tracks[tid]
if (self.step - last) < REACQUIRE_MIN_AGE:
track = self._tracks[tid]
if (self.step - track.last_seen) < REACQUIRE_MIN_AGE:
continue
tx, ty = track.predicted_position(self.step)
best_j, best_d = -1, REACQUIRE_GATE_M
for j, (dx, dy) in enumerate(detections):
if j in det_used:
@@ -91,53 +153,52 @@ class SheepTracker:
best_j = j
if best_j >= 0:
dx, dy = detections[best_j]
self._tracks[tid] = (dx, dy, self.step, False)
track.update(dx, dy, self.step)
det_used.add(best_j)
updated_tids.add(tid)
# Pass 2 — match remaining detections to penned tracks.
penned_tids = [tid for tid, t in self._tracks.items() if t[3]]
penned_tids = [tid for tid, t in self._tracks.items() if t.penned]
for tid in penned_tids:
tx, ty, _, _ = self._tracks[tid]
track = self._tracks[tid]
best_j, best_d = -1, PENNED_GATE_M
for j, (dx, dy) in enumerate(detections):
if j in det_used:
continue
d = math.hypot(dx - tx, dy - ty)
d = math.hypot(dx - track.x, dy - track.y)
if d < best_d:
best_d = d
best_j = j
if best_j >= 0:
dx, dy = detections[best_j]
self._tracks[tid] = (dx, dy, self.step, True)
track.update(dx, dy, self.step)
det_used.add(best_j)
# Spawn new tracks for unmatched detections. Born "penned" if
# the detection already sits inside the pen geometry.
# Spawn new tracks for unmatched detections.
for j, (dx, dy) in enumerate(detections):
if j in det_used:
continue
penned = in_pen(dx, dy) or is_penned_position(dx, dy)
self._tracks[self._next_id] = (dx, dy, self.step, penned)
self._tracks[self._next_id] = Track(dx, dy, self.step, penned)
self._next_id += 1
# Promote active tracks whose current estimate crosses the gate.
for tid, (tx, ty, last, penned) in list(self._tracks.items()):
if penned:
for track in self._tracks.values():
if track.penned:
continue
if is_penned_position(tx, ty):
self._tracks[tid] = (tx, ty, last, True)
px, py = track.predicted_position(self.step)
if is_penned_position(px, py):
track.penned = True
# Forget stale active tracks; penned tracks live forever.
for tid, (tx, ty, last, penned) in list(self._tracks.items()):
if penned:
continue
if (self.step - last) > FORGET_STEPS:
del self._tracks[tid]
stale = [tid for tid, t in self._tracks.items()
if not t.penned and (self.step - t.last_seen) > FORGET_STEPS]
for tid in stale:
del self._tracks[tid]
# Hard cap on the active set — drop the oldest-seen overflow.
active = [(tid, last) for tid, (_, _, last, p) in self._tracks.items()
if not p]
active = [(tid, t.last_seen) for tid, t in self._tracks.items()
if not t.penned]
if len(active) > MAX_ACTIVE_TRACKS:
active.sort(key=lambda kv: kv[1])
for tid, _ in active[: len(active) - MAX_ACTIVE_TRACKS]:
@@ -146,16 +207,31 @@ class SheepTracker:
return self.get_positions()
def get_positions(self) -> dict[str, tuple[float, float]]:
"""Active (not-penned) tracks as a ``{name: (x, y)}`` dict."""
return {f"t{tid}": (x, y)
for tid, (x, y, _, penned) in self._tracks.items()
if not penned}
"""Active (not-penned) tracks as a ``{name: (x, y)}`` dict.
For tracks currently being predicted (occluded but within
PREDICT_STEPS), returns the extrapolated position so the teacher
sees a smooth estimate.
"""
result = {}
for tid, track in self._tracks.items():
if track.penned:
continue
px, py = track.predicted_position(self.step)
result[f"t{tid}"] = (px, py)
return result
def get_penned_set(self) -> set[str]:
return {f"t{tid}" for tid, (_, _, _, penned) in self._tracks.items() if penned}
return {f"t{tid}" for tid, t in self._tracks.items() if t.penned}
def n_active(self) -> int:
return sum(1 for _, _, _, penned in self._tracks.values() if not penned)
return sum(1 for t in self._tracks.values() if not t.penned)
def n_penned(self) -> int:
return sum(1 for _, _, _, penned in self._tracks.values() if penned)
return sum(1 for t in self._tracks.values() if t.penned)
def n_predicted(self) -> int:
"""Number of active tracks currently being extrapolated (not directly observed)."""
return sum(1 for t in self._tracks.values()
if not t.penned and (self.step - t.last_seen) > 0
and (self.step - t.last_seen) <= PREDICT_STEPS)
+130 -1
View File
@@ -1,4 +1,5 @@
"""Differential-drive kinematics, shared by the env and Webots controllers.
"""Differential-drive and mecanum kinematics, shared by the env and Webots
controllers.
First-order rigid-body model — no slip, wheel-accel limits, or contact
forces. Webots' ODE physics handles those at inference; the env stays
@@ -59,3 +60,131 @@ def heading_speed_to_wheels(heading, speed_motor, h, max_wheel_omega,
left = max(-max_wheel_omega, min(max_wheel_omega, fwd - turn))
right = max(-max_wheel_omega, min(max_wheel_omega, fwd + turn))
return left, right
# ---------------------------------------------------------------------------
# Mecanum (4-wheel omnidirectional) kinematics
# ---------------------------------------------------------------------------
def mecanum_kinematics_step(x, y, h, w_fl, w_fr, w_rl, w_rr,
wheel_radius, lx, ly, dt):
"""Integrate one step of mecanum forward kinematics.
Parameters
----------
x, y : robot position (m)
h : robot heading (rad), 0 = +x axis
w_fl, w_fr, w_rl, w_rr : wheel angular velocities (rad/s)
wheel_radius : wheel radius (m)
lx : half the front-to-back axle distance (m)
ly : half the left-to-right axle distance (m)
dt : timestep (s)
Returns (new_x, new_y, new_h).
"""
r = wheel_radius
vx_body = (w_fl + w_fr + w_rl + w_rr) * r / 4.0
vy_body = (-w_fl + w_fr + w_rl - w_rr) * r / 4.0
omega = (-w_fl + w_fr - w_rl + w_rr) * r / (4.0 * (lx + ly))
cos_h = math.cos(h)
sin_h = math.sin(h)
vx_world = vx_body * cos_h - vy_body * sin_h
vy_world = vx_body * sin_h + vy_body * cos_h
new_x = x + vx_world * dt
new_y = y + vy_world * dt
new_h = math.atan2(math.sin(h + omega * dt), math.cos(h + omega * dt))
return new_x, new_y, new_h
def mecanum_inverse(vx_body, vy_body, omega, wheel_radius, lx, ly,
max_wheel_omega):
"""Mecanum inverse kinematics: body-frame velocities to 4 wheel speeds.
Parameters
----------
vx_body, vy_body : desired body-frame linear velocities (m/s)
omega : desired yaw rate (rad/s)
wheel_radius : wheel radius (m)
lx : half front-to-back axle distance (m)
ly : half left-to-right axle distance (m)
max_wheel_omega : wheel angular velocity clamp (rad/s)
Returns (w_fl, w_fr, w_rl, w_rr).
"""
r = wheel_radius
k = lx + ly
w_fl = (vx_body - vy_body - k * omega) / r
w_fr = (vx_body + vy_body + k * omega) / r
w_rl = (vx_body + vy_body - k * omega) / r
w_rr = (vx_body - vy_body + k * omega) / r
scale = max(abs(w_fl), abs(w_fr), abs(w_rl), abs(w_rr), 1e-9)
if scale > max_wheel_omega:
ratio = max_wheel_omega / scale
w_fl *= ratio
w_fr *= ratio
w_rl *= ratio
w_rr *= ratio
return w_fl, w_fr, w_rl, w_rr
def velocity_to_mecanum_wheels(vx, vy, omega, h, max_linear, wheel_radius,
lx, ly, max_wheel_omega,
k_turn=4.0, wheel_base=0.28):
"""Convert world-frame (vx, vy, omega) action in [-1, 1]^3 to 4 wheel speeds.
Truly holonomic interpretation: (vx, vy) is the desired *world-frame*
velocity (magnitude up to ``max_linear`` m/s) and ``omega`` is the
desired yaw rate (independent of motion direction). The dog can
crab-walk and rotate at the same time.
This matches the universal teacher's signal: drive toward a standoff
point while facing the sheep / pen separately. With the older
non-holonomic version, ``omega`` from the teacher fought against the
forward-only kinematics and dropped success rates instead of helping.
Parameters
----------
vx, vy : desired world-frame velocity intent in [-1, 1] (clamped on
magnitude to ≤ 1)
omega : desired yaw rate intent in [-1, 1]
h : current heading (rad), 0 = +x
max_linear : max linear speed (m/s)
wheel_radius : wheel radius (m)
lx, ly : half axle distances (m)
max_wheel_omega : wheel angular velocity clamp (rad/s)
k_turn : unused (kept for signature compatibility)
wheel_base : unused (kept for signature compatibility)
Returns (w_fl, w_fr, w_rl, w_rr).
"""
# Clamp the action magnitude in the (vx, vy) unit disk.
norm = math.hypot(vx, vy)
if norm > 1.0:
vx /= norm
vy /= norm
# World-frame velocity → body-frame velocity (rotate by -h).
vx_world = vx * max_linear
vy_world = vy * max_linear
cos_h = math.cos(h)
sin_h = math.sin(h)
vx_body = cos_h * vx_world + sin_h * vy_world
vy_body = -sin_h * vx_world + cos_h * vy_world
# Yaw rate: omega ∈ [-1, 1] maps to ± max_linear / (lx + ly) — same
# peak yaw as the old "omega_extra" channel, but used directly
# rather than added to a heading-tracker.
yaw_max = max_linear / max(lx + ly, 1e-6)
omega_rad = omega * yaw_max
if abs(vx_body) < 1e-3 and abs(vy_body) < 1e-3 and abs(omega_rad) < 1e-3:
return 0.0, 0.0, 0.0, 0.0
return mecanum_inverse(
vx_body, vy_body, omega_rad,
wheel_radius, lx, ly, max_wheel_omega,
)
+40 -23
View File
@@ -27,9 +27,10 @@ import math
import random
from herding.world.geometry import (
FIELD_SHAPE, FIELD_ROUND_R,
FIELD_X, FIELD_Y,
PEN_X, PEN_Y,
GATE_X,
GATE_X, GATE_Y,
)
# Speeds are in wheel rad/s (motor units); m/s = speed * SHEEP_WHEEL_RADIUS.
@@ -131,33 +132,49 @@ def compute_heading_speed(x, y, penned, dog_xy, peers, wander_angle, rng=None):
fx -= (ddx / d) * push * 2.5
fy -= (ddy / d) * push * 2.5
# Wall soft repulsion (south wall absent inside the gate column).
if x < FIELD_X[0] + WALL_MARGIN:
fx += ((FIELD_X[0] + WALL_MARGIN - x) / WALL_MARGIN) * 6.0
if x > FIELD_X[1] - WALL_MARGIN:
fx -= ((x - (FIELD_X[1] - WALL_MARGIN)) / WALL_MARGIN) * 6.0
if y > FIELD_Y[1] - WALL_MARGIN:
fy -= ((y - (FIELD_Y[1] - WALL_MARGIN)) / WALL_MARGIN) * 6.0
if y < FIELD_Y[0] + WALL_MARGIN and not (GATE_X[0] <= x <= GATE_X[1]):
fy += ((FIELD_Y[0] + WALL_MARGIN - y) / WALL_MARGIN) * 6.0
# Wall soft repulsion.
if FIELD_SHAPE == "field_round":
r = math.hypot(x, y)
wall_d = FIELD_ROUND_R - r
in_gate_col = (GATE_X[0] <= x <= GATE_X[1]
and y < GATE_Y + WALL_MARGIN)
if wall_d < WALL_MARGIN and r > 1e-6 and not in_gate_col:
gain = ((WALL_MARGIN - wall_d) / WALL_MARGIN) * 6.0
fx -= (x / r) * gain
fy -= (y / r) * gain
# Hard escape band.
if wall_d < WALL_HARD_MARGIN and not in_gate_col:
hgain = WALL_HARD_GAIN * (1.0 - wall_d / WALL_HARD_MARGIN)
fx -= (x / r) * hgain
fy -= (y / r) * hgain
else:
# Rectangular: south wall absent inside the gate column.
if x < FIELD_X[0] + WALL_MARGIN:
fx += ((FIELD_X[0] + WALL_MARGIN - x) / WALL_MARGIN) * 6.0
if x > FIELD_X[1] - WALL_MARGIN:
fx -= ((x - (FIELD_X[1] - WALL_MARGIN)) / WALL_MARGIN) * 6.0
if y > FIELD_Y[1] - WALL_MARGIN:
fy -= ((y - (FIELD_Y[1] - WALL_MARGIN)) / WALL_MARGIN) * 6.0
if y < FIELD_Y[0] + WALL_MARGIN and not (GATE_X[0] <= x <= GATE_X[1]):
fy += ((FIELD_Y[0] + WALL_MARGIN - y) / WALL_MARGIN) * 6.0
# Hard escape band — overrides everything else near a wall.
m, g = WALL_HARD_MARGIN, WALL_HARD_GAIN
if x - FIELD_X[0] < m:
fx = max(fx, g * (1.0 - (x - FIELD_X[0]) / m))
if FIELD_X[1] - x < m:
fx = min(fx, -g * (1.0 - (FIELD_X[1] - x) / m))
if FIELD_Y[1] - y < m:
fy = min(fy, -g * (1.0 - (FIELD_Y[1] - y) / m))
if (y - FIELD_Y[0] < m) and not (GATE_X[0] <= x <= GATE_X[1]):
fy = max(fy, g * (1.0 - (y - FIELD_Y[0]) / m))
if not fleeing:
if random.random() < 0.02:
wander_angle += random.uniform(-0.6, 0.6)
if rnd.random() < 0.02:
wander_angle += rnd.uniform(-0.6, 0.6)
fx += math.cos(wander_angle) * 0.5
fy += math.sin(wander_angle) * 0.5
# Hard escape band — overrides everything else near a wall.
m, g = WALL_HARD_MARGIN, WALL_HARD_GAIN
if x - FIELD_X[0] < m:
fx = max(fx, g * (1.0 - (x - FIELD_X[0]) / m))
if FIELD_X[1] - x < m:
fx = min(fx, -g * (1.0 - (FIELD_X[1] - x) / m))
if FIELD_Y[1] - y < m:
fy = min(fy, -g * (1.0 - (FIELD_Y[1] - y) / m))
if (not penned) and (y - FIELD_Y[0] < m) and not (GATE_X[0] <= x <= GATE_X[1]):
fy = max(fy, g * (1.0 - (y - FIELD_Y[0]) / m))
heading = math.atan2(fy, fx)
mag = math.hypot(fx, fy)
speed = max(WANDER_SPEED, min(FLEE_SPEED, mag * 3.0))
+114 -7
View File
@@ -4,20 +4,35 @@ Coordinates are metres; (0, 0) is the field centre, +x east, +y north.
These constants mirror ``worlds/field.wbt`` and the proto files — if
the world changes, this file is the single point of update.
field +y north
field (rectangular)
+-----------+
| |
| |
| ...... |
+---||||----+ y = -15 (south wall, 3 m gate at x [10, 13])
+---||||----+ y = -15 (south wall, 3 m gate at x in [10, 13])
||||
|pen| y [-22, -15]
|pen| y in [-22, -15]
+---+
field_round (circular, R = 15 m)
.---.
/ ... \\
| ..... | gate at south, x in [-1.83, 1.83]
\\ ... /
'-+-' pen y in [-22, -15]
"""
import os
import math
# Field (square, stone-walled)
# ---------------------------------------------------------------------------
# Field shape selection — controlled by HERDING_WORLD env var at runtime.
# Defaults to "field" (rectangular). The launcher writes it into the
# runtime cfg so the controller can pick it up too.
# ---------------------------------------------------------------------------
FIELD_SHAPE = (os.environ.get("HERDING_WORLD", "field")).lower()
# ==================== Rectangular field (field.wbt) ====================
FIELD_X = (-15.0, 15.0)
FIELD_Y = (-15.0, 15.0)
FIELD_INSIDE_MARGIN = 0.5
@@ -32,12 +47,67 @@ PEN_ENTRY = (0.5 * (PEN_X[0] + PEN_X[1]), -15.0)
GATE_X = PEN_X
GATE_Y = -15.0
# ==================== Round field (field_round.wbt) ====================
FIELD_ROUND_R = 15.0
FIELD_ROUND_PEN_X = (-1.5, 1.5)
FIELD_ROUND_PEN_Y = (-22.0, -15.0)
FIELD_ROUND_PEN_CENTER = (
0.5 * (FIELD_ROUND_PEN_X[0] + FIELD_ROUND_PEN_X[1]),
0.5 * (FIELD_ROUND_PEN_Y[0] + FIELD_ROUND_PEN_Y[1]),
)
FIELD_ROUND_PEN_ENTRY = (0.0, -15.0)
FIELD_ROUND_GATE_X = FIELD_ROUND_PEN_X
FIELD_ROUND_GATE_Y = -15.0
# ==================== Active geometry (resolved at import) ===============
# Rectangular defaults are already assigned above. Override for round.
if FIELD_SHAPE == "field_round":
PEN_X = FIELD_ROUND_PEN_X
PEN_Y = FIELD_ROUND_PEN_Y
PEN_CENTER = FIELD_ROUND_PEN_CENTER
PEN_ENTRY = FIELD_ROUND_PEN_ENTRY
GATE_X = FIELD_ROUND_GATE_X
GATE_Y = FIELD_ROUND_GATE_Y
def configure(shape: str) -> None:
"""Switch the active field geometry at runtime.
Call this **before** importing any other ``herding.*`` module that
depends on the constants below (flocking_sim, lidar_sim, obs, etc.).
The import-time env-var path (``HERDING_WORLD``) still works; this
function is for scripts that need to choose the world via a CLI flag.
"""
global FIELD_SHAPE, PEN_X, PEN_Y, PEN_CENTER, PEN_ENTRY, GATE_X, GATE_Y
shape = shape.lower()
FIELD_SHAPE = shape
if shape == "field_round":
PEN_X = FIELD_ROUND_PEN_X
PEN_Y = FIELD_ROUND_PEN_Y
PEN_CENTER = FIELD_ROUND_PEN_CENTER
PEN_ENTRY = FIELD_ROUND_PEN_ENTRY
GATE_X = FIELD_ROUND_GATE_X
GATE_Y = FIELD_ROUND_GATE_Y
else:
PEN_X = (10.0, 13.0)
PEN_Y = (-22.0, -15.0)
PEN_CENTER = (0.5 * (PEN_X[0] + PEN_X[1]), 0.5 * (PEN_Y[0] + PEN_Y[1]))
PEN_ENTRY = (0.5 * (PEN_X[0] + PEN_X[1]), -15.0)
GATE_X = PEN_X
GATE_Y = -15.0
# Dog spec — protos/ShepherdDog.proto
DOG_WHEEL_RADIUS = 0.038 # m
DOG_WHEEL_BASE = 0.28 # m, axle-to-axle
DOG_MAX_WHEEL_OMEGA = 70.0 # rad/s
DOG_MAX_LINEAR = DOG_WHEEL_RADIUS * DOG_MAX_WHEEL_OMEGA # ≈ 2.66 m/s
# Dog mecanum spec — 4-wheel omnidirectional layout
DOG_WHEEL_BASE_X = 0.28 # m, front-to-back axle distance
DOG_WHEEL_BASE_Y = 0.28 # m, left-to-right axle distance
# Sheep spec — protos/Sheep.proto
SHEEP_WHEEL_RADIUS = 0.031 # m
SHEEP_WHEEL_BASE = 0.20 # m
@@ -58,21 +128,58 @@ def in_pen(x: float, y: float) -> bool:
def in_field(x: float, y: float, margin: float = 0.0) -> bool:
if FIELD_SHAPE == "field_round":
r = FIELD_ROUND_R - margin
return x * x + y * y <= r * r
return (FIELD_X[0] + margin <= x <= FIELD_X[1] - margin
and FIELD_Y[0] + margin <= y <= FIELD_Y[1] - margin)
def in_gate_corridor(x: float, y: float, margin: float = 0.0) -> bool:
"""True if (x, y) lies in the column of the gate (between field and pen)."""
return (PEN_X[0] - margin <= x <= PEN_X[1] + margin
return (GATE_X[0] - margin <= x <= GATE_X[1] + margin
and PEN_Y[0] - margin <= y <= GATE_Y + margin)
def is_penned_position(x: float, y: float, latch_margin: float = 0.2) -> bool:
"""True iff (x, y) is in the gate column and south of the gate line."""
return (PEN_X[0] - latch_margin <= x <= PEN_X[1] + latch_margin
return (GATE_X[0] - latch_margin <= x <= GATE_X[1] + latch_margin
and y <= GATE_Y)
def distance_to_pen_entry(x: float, y: float) -> float:
return math.hypot(x - PEN_ENTRY[0], y - PEN_ENTRY[1])
def distance_to_wall(x: float, y: float) -> float:
"""Shortest distance from (x, y) to the nearest field wall.
For a rectangular field this is the minimum Manhattan distance to the
four bounding walls. For a round field it is ``R - sqrt(x²+y²)``.
Returns a negative value if the point is outside the field.
"""
if FIELD_SHAPE == "field_round":
return FIELD_ROUND_R - math.hypot(x, y)
return min(
x - FIELD_X[0], FIELD_X[1] - x,
y - FIELD_Y[0], FIELD_Y[1] - y,
)
def clip_to_field(x: float, y: float, margin: float = 0.2) -> tuple[float, float]:
"""Clip (x, y) inside the field boundary with a small margin.
For round fields the point is projected radially inward if it exceeds
the circular boundary.
"""
if FIELD_SHAPE == "field_round":
r = math.hypot(x, y)
limit = FIELD_ROUND_R - margin
if r > limit and r > 1e-6:
scale = limit / r
return x * scale, y * scale
return x, y
return (
max(FIELD_X[0] + margin, min(FIELD_X[1] - margin, x)),
max(FIELD_Y[0] + margin, min(FIELD_Y[1] - margin, y)),
)