Checkpoint 4

This commit is contained in:
Johnny Fernandes
2026-05-11 00:42:52 +01:00
parent 2a6db038df
commit 6688325d89
26 changed files with 2018 additions and 503 deletions
+104 -8
View File
@@ -69,7 +69,10 @@ from herding.geometry import (
SHEEP_MAX_WHEEL_OMEGA, SHEEP_WHEEL_BASE, SHEEP_WHEEL_RADIUS,
WEBOTS_DT, is_penned_position,
)
from herding.lidar_perception import detections_from_scan
from herding.lidar_sim import simulate_scan
from herding.obs import OBS_DIM, build_obs
from herding.sheep_tracker import SheepTracker
from herding.strombom import compute_action as strombom_action
@@ -130,11 +133,30 @@ class HerdingEnv(gym.Env):
max_steps: int = DEFAULT_MAX_STEPS,
difficulty: float = 0.0,
seed: Optional[int] = None,
use_lidar: bool = True,
frame_stack: int = 1,
):
super().__init__()
# When True (default), the obs and the imitation-reward teacher
# see only LiDAR-perceived sheep positions through a tracker —
# matching what the Webots controller has access to. When False,
# both consume ground-truth positions (legacy "privileged" mode,
# kept for ablation).
self._use_lidar = bool(use_lidar)
self._tracker = SheepTracker() if self._use_lidar else None
self._np_rng_lidar: Optional[np.random.Generator] = None
# Frame stacking: the policy receives the last K single-frame
# observations concatenated. Lets a memoryless MLP integrate
# information across time, partly compensating for the limited
# LiDAR FOV. K=1 reproduces the legacy single-frame obs.
self._frame_stack = max(1, int(frame_stack))
self._frame_buffer: list[np.ndarray] = []
self.action_space = spaces.Box(-1.0, 1.0, shape=(2,), dtype=np.float32)
self._single_obs_dim = OBS_DIM
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(OBS_DIM,), dtype=np.float32,
low=-np.inf, high=np.inf,
shape=(OBS_DIM * self._frame_stack,), dtype=np.float32,
)
# If n_sheep is None, env will sample uniformly from [1, max_n_sheep]
@@ -243,6 +265,16 @@ class HerdingEnv(gym.Env):
self.prev_n_penned = 0
self.prev_d_pen, self.prev_radius = self._flock_metrics()
if self._tracker is not None:
self._tracker.reset()
self._np_rng_lidar = np.random.default_rng(
int(self.np_random.integers(0, 2**31 - 1)))
# Prime the tracker with one scan so the first obs isn't empty.
self._update_tracker()
# Clear the frame stack — the next _build_obs will repopulate.
self._frame_buffer = []
obs = self._build_obs()
info = {"n_sheep": self.n_sheep}
return obs, info
@@ -289,6 +321,12 @@ class HerdingEnv(gym.Env):
and is_penned_position(self.sheep_x[i], self.sheep_y[i])):
self.sheep_penned[i] = True
# --- Run LiDAR perception on this step's state (after sheep have
# moved). Updates the tracker that obs and the imitation-
# reward teacher consume. Reward / termination still use GT. ---
if self._tracker is not None:
self._update_tracker()
# --- Reward, termination ---
d_pen, radius = self._flock_metrics()
reward = self._compute_reward(d_pen, radius, action=action)
@@ -395,10 +433,7 @@ class HerdingEnv(gym.Env):
r = self.W_PEN_DELTA * delta_pen + self.W_PROGRESS * d_progress
if action is not None and self.W_IMITATE > 0.0:
positions = {
f"s{i}": (float(self.sheep_x[i]), float(self.sheep_y[i]))
for i in range(self.n_sheep) if not self.sheep_penned[i]
}
positions = self._perceived_positions()
if positions:
sx, sy, _mode = strombom_action(
(self.dog_x, self.dog_y), positions, PEN_ENTRY,
@@ -411,11 +446,72 @@ class HerdingEnv(gym.Env):
return float(r)
def _build_obs(self) -> np.ndarray:
sheep_xy_list = list(zip(self.sheep_x.tolist(), self.sheep_y.tolist()))
sheep_penned_list = self.sheep_penned.tolist()
def _build_single_obs(self) -> np.ndarray:
if self._tracker is not None:
# Obs sees only the tracker's active set; penned tracks are
# intentionally excluded (matches the prior receiver-based
# behaviour where penned sheep stopped contributing to the
# symbolic obs).
active = self._tracker.get_positions()
sheep_xy_list = list(active.values())
sheep_penned_list = [False] * len(sheep_xy_list)
else:
sheep_xy_list = list(zip(self.sheep_x.tolist(), self.sheep_y.tolist()))
sheep_penned_list = self.sheep_penned.tolist()
return build_obs(
(self.dog_x, self.dog_y), self.dog_heading,
sheep_xy_list, sheep_penned_list,
n_max=self._max_n_sheep,
)
def _build_obs(self) -> np.ndarray:
single = self._build_single_obs()
if self._frame_stack <= 1:
return single
# On a fresh reset the buffer is empty — duplicate the first
# frame so the stack is always full-length.
if not self._frame_buffer:
self._frame_buffer = [single.copy() for _ in range(self._frame_stack)]
else:
self._frame_buffer.append(single)
if len(self._frame_buffer) > self._frame_stack:
self._frame_buffer = self._frame_buffer[-self._frame_stack:]
# Concatenate oldest → newest.
return np.concatenate(self._frame_buffer, axis=0).astype(np.float32)
# ------------------------------------------------------------------
# LiDAR perception helpers
# ------------------------------------------------------------------
def _all_sheep_xy(self) -> list[tuple[float, float]]:
"""Every sheep, including penned ones (the LiDAR sees them)."""
return [(float(self.sheep_x[i]), float(self.sheep_y[i]))
for i in range(self.n_sheep)]
def _update_tracker(self) -> None:
ranges = simulate_scan(
self.dog_x, self.dog_y, self.dog_heading,
self._all_sheep_xy(),
rng=self._np_rng_lidar,
)
detections = detections_from_scan(
ranges, self.dog_x, self.dog_y, self.dog_heading,
)
self._tracker.update(detections)
def perceived_positions(self) -> dict[str, tuple[float, float]]:
"""Public accessor — what the controller would 'see' this step.
LiDAR mode → the tracker's active set.
Privileged mode → ground-truth active sheep.
Used by ``training.eval`` and ``tools.collect_demos`` so analytic
teachers run on the same perception the deployed controller has.
"""
if self._tracker is not None:
return self._tracker.get_positions()
return {f"s{i}": (float(self.sheep_x[i]), float(self.sheep_y[i]))
for i in range(self.n_sheep) if not self.sheep_penned[i]}
# Internal alias so the imitation reward path doesn't need to know
# which mode it's in.
_perceived_positions = perceived_positions