Checkpoint 6
This commit is contained in:
@@ -0,0 +1,144 @@
|
||||
"""Cluster a 2D LiDAR scan into world-frame sheep position estimates.
|
||||
|
||||
Pipeline:
|
||||
ranges (N,) ─► hit mask ─► world-frame points
|
||||
│
|
||||
▼
|
||||
adjacency clustering (gap > GAP_THRESHOLD
|
||||
starts a new cluster, walking rays in
|
||||
angular order)
|
||||
│
|
||||
▼
|
||||
centroid + span filter
|
||||
│
|
||||
▼
|
||||
field/pen-corridor filter
|
||||
│
|
||||
▼
|
||||
list of (x, y) detections
|
||||
|
||||
The clusterer is intentionally simple — for ≤10 sheep there is rarely
|
||||
any real ambiguity, and proper DBSCAN would only matter if rays from
|
||||
two adjacent sheep merged. The downstream tracker handles association
|
||||
across frames.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
|
||||
from herding.world.geometry import FIELD_X, FIELD_Y, GATE_Y, PEN_X, PEN_Y
|
||||
from herding.perception.lidar_sim import (
|
||||
LIDAR_FOV, LIDAR_MAX_RANGE, LIDAR_N_RAYS, SHEEP_RADIUS, ray_angles,
|
||||
)
|
||||
|
||||
|
||||
GAP_THRESHOLD = 0.6 # m — adjacent ray-points farther apart start new cluster
|
||||
MAX_CLUSTER_SPAN = 1.5 # m — clusters wider than this are likely walls/structures
|
||||
RANGE_HIT_EPS = 0.05 # m — hit if range < max_range - eps
|
||||
WALL_REJECT = 0.5 # m — drop detections this close to a known wall line
|
||||
|
||||
# Known sheep-sized static features. Detections within STATIC_REJECT
|
||||
# of any of these are discarded — these aren't sheep. Mid-pillars on
|
||||
# the field walls are NOT in this list because they're embedded in the
|
||||
# wall (the wall's span filter handles them); listing them here would
|
||||
# only reject real sheep that happened to be near the wall.
|
||||
_STATIC_FEATURES = (
|
||||
# Gate posts (sheep-sized boxes flanking the south-wall opening)
|
||||
( 10.0, -15.0), ( 13.0, -15.0),
|
||||
# Field corner pillars
|
||||
( 15.0, 15.0), ( 15.0, -15.0), (-15.0, 15.0), (-15.0, -15.0),
|
||||
)
|
||||
STATIC_REJECT = 0.8 # m — detection within this of a static feature → drop
|
||||
|
||||
|
||||
def detections_from_scan(
|
||||
ranges: np.ndarray,
|
||||
dog_x: float, dog_y: float, dog_heading: float,
|
||||
max_range: float = LIDAR_MAX_RANGE,
|
||||
) -> list[tuple[float, float]]:
|
||||
"""Return list of (x, y) world-frame sheep position estimates."""
|
||||
ranges = np.asarray(ranges, dtype=np.float32)
|
||||
n_rays = ranges.shape[0]
|
||||
if n_rays == 0:
|
||||
return []
|
||||
angles = ray_angles(n_rays, LIDAR_FOV)
|
||||
hit = ranges < max_range - RANGE_HIT_EPS
|
||||
|
||||
world_a = dog_heading + angles
|
||||
px = dog_x + ranges * np.cos(world_a)
|
||||
py = dog_y + ranges * np.sin(world_a)
|
||||
|
||||
clusters: list[list[tuple[float, float]]] = []
|
||||
current: list[tuple[float, float]] = []
|
||||
prev: tuple[float, float] | None = None
|
||||
for i in range(n_rays):
|
||||
if not bool(hit[i]):
|
||||
if current:
|
||||
clusters.append(current)
|
||||
current = []
|
||||
prev = None
|
||||
continue
|
||||
pt = (float(px[i]), float(py[i]))
|
||||
if prev is not None and math.hypot(pt[0] - prev[0], pt[1] - prev[1]) > GAP_THRESHOLD:
|
||||
clusters.append(current)
|
||||
current = []
|
||||
current.append(pt)
|
||||
prev = pt
|
||||
if current:
|
||||
clusters.append(current)
|
||||
|
||||
detections: list[tuple[float, float]] = []
|
||||
for cluster in clusters:
|
||||
xs = [p[0] for p in cluster]
|
||||
ys = [p[1] for p in cluster]
|
||||
cx, cy = sum(xs) / len(xs), sum(ys) / len(ys)
|
||||
span = math.hypot(max(xs) - min(xs), max(ys) - min(ys))
|
||||
if span > MAX_CLUSTER_SPAN:
|
||||
continue
|
||||
# Surface-to-centre correction: rays hit the front of the sheep,
|
||||
# so the cluster centroid is biased toward the dog by SHEEP_RADIUS.
|
||||
# Push it outward along the dog→cluster direction.
|
||||
dx, dy = cx - dog_x, cy - dog_y
|
||||
d = math.hypot(dx, dy)
|
||||
if d > 1e-3:
|
||||
cx += SHEEP_RADIUS * dx / d
|
||||
cy += SHEEP_RADIUS * dy / d
|
||||
# Keep detections inside the field OR in the gate corridor /
|
||||
# external pen — penned sheep are still worth tracking so the
|
||||
# tracker can latch them as "penned" rather than spawn fresh
|
||||
# tracks each scan.
|
||||
# Accept detections inside the field, plus a narrow strip
|
||||
# immediately south of the gate to catch sheep mid-crossing
|
||||
# (so they get marked penned via is_penned_position before the
|
||||
# track goes stale). Detections deeper into the pen are
|
||||
# dropped entirely — Webots's pen posts and rails would
|
||||
# otherwise produce a torrent of phantom penned tracks that
|
||||
# the tracker can't keep up with.
|
||||
in_main = (FIELD_X[0] - 0.2 < cx < FIELD_X[1] + 0.2 and
|
||||
FIELD_Y[0] - 0.2 < cy < FIELD_Y[1] + 0.2)
|
||||
in_gate_strip = (PEN_X[0] - 0.2 < cx < PEN_X[1] + 0.2 and
|
||||
GATE_Y - 1.0 < cy < GATE_Y + 0.2)
|
||||
if not (in_main or in_gate_strip):
|
||||
continue
|
||||
# Known-static-feature filter: gate posts and corner pillars
|
||||
# show up as sheep-sized clusters but are never sheep.
|
||||
if any(math.hypot(cx - fx, cy - fy) < STATIC_REJECT
|
||||
for fx, fy in _STATIC_FEATURES):
|
||||
continue
|
||||
# Wall-proximity filter: at oblique scan angles, walls produce
|
||||
# multiple short clusters because adjacent ray returns are
|
||||
# spaced just above GAP_THRESHOLD. Sheep can't get within ~0.3 m
|
||||
# of a wall (the env clips them to FIELD_INSIDE), so anything
|
||||
# right at the wall line is structure noise.
|
||||
near_field_wall = (
|
||||
cx > FIELD_X[1] - WALL_REJECT or cx < FIELD_X[0] + WALL_REJECT or
|
||||
cy > FIELD_Y[1] - WALL_REJECT or
|
||||
(cy < FIELD_Y[0] + WALL_REJECT and not (PEN_X[0] <= cx <= PEN_X[1]))
|
||||
)
|
||||
if near_field_wall:
|
||||
continue
|
||||
detections.append((cx, cy))
|
||||
return detections
|
||||
@@ -0,0 +1,193 @@
|
||||
"""Fast 2D LiDAR simulator for the Gymnasium env.
|
||||
|
||||
Raycasts against:
|
||||
* **Sheep** — discs of radius ``SHEEP_RADIUS``.
|
||||
* **Static world geometry** — axis-aligned wall segments and gate
|
||||
posts taken from ``worlds/field.wbt``. Without these, demos
|
||||
collected in-env would never include the false-positive clusters
|
||||
Webots produces from the stone walls and gate-post boxes, and the
|
||||
BC student trained on those demos collapses on deployment.
|
||||
|
||||
Returns a range array matching the Webots Lidar device on the dog
|
||||
(see ``protos/ShepherdDog.proto``: 180 rays, 140° FOV centred on
|
||||
forward, 12 m max range, 5 mm noise).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
# Match protos/ShepherdDog.proto Lidar device.
|
||||
LIDAR_N_RAYS = 180
|
||||
LIDAR_FOV = 2.44 # rad ≈ 140°
|
||||
LIDAR_MAX_RANGE = 12.0
|
||||
LIDAR_NOISE = 0.005 # m, gaussian std
|
||||
|
||||
# Sheep modelled as a vertical cylinder; this is the horizontal-section
|
||||
# radius the LiDAR plane intersects. Tuned to the proto sheep (~0.45 m
|
||||
# body length). The exact value is not load-bearing — the perception
|
||||
# clusterer is range-tolerant.
|
||||
SHEEP_RADIUS = 0.30
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Static world geometry — must match worlds/field.wbt
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Vertical walls: (x, y_min, y_max). Field east/west walls and the two
|
||||
# pen side walls are visible through the open gate.
|
||||
_VERTICAL_WALLS = (
|
||||
( 15.0, -15.0, 15.0), # field east
|
||||
(-15.0, -15.0, 15.0), # field west
|
||||
( 10.0, -22.0, -15.0), # pen west
|
||||
( 13.0, -22.0, -15.0), # pen east
|
||||
)
|
||||
|
||||
# Horizontal walls: (y, x_min, x_max). South wall is split by the 3 m
|
||||
# gate at x ∈ [10, 13]; the pen south wall closes the back of the pen.
|
||||
_HORIZONTAL_WALLS = (
|
||||
( 15.0, -15.0, 15.0), # field north
|
||||
(-15.0, -15.0, 10.0), # field south-west of gate
|
||||
(-15.0, 13.0, 15.0), # field south-east of gate
|
||||
(-22.0, 10.0, 13.0), # pen south
|
||||
)
|
||||
|
||||
# Gate posts and field corner pillars treated as vertical cylinders at
|
||||
# LiDAR height. Radius 0.25 m comes from the 0.44 × 0.44 m boxes in the
|
||||
# wbt — close enough to a circular cross-section for this purpose.
|
||||
_POSTS_XY = np.array([
|
||||
( 10.0, -15.0), # west gate post
|
||||
( 13.0, -15.0), # east gate post
|
||||
( 15.0, 15.0), # NE field corner
|
||||
( 15.0, -15.0), # SE field corner
|
||||
(-15.0, 15.0), # NW field corner
|
||||
(-15.0, -15.0), # SW field corner
|
||||
], dtype=np.float64)
|
||||
POST_RADIUS = 0.25
|
||||
|
||||
|
||||
def ray_angles(n: int = LIDAR_N_RAYS, fov: float = LIDAR_FOV) -> np.ndarray:
|
||||
"""Local-frame ray angles, sweeping from +fov/2 to -fov/2.
|
||||
|
||||
Convention: angle is measured CCW from the dog's forward axis. Ray 0
|
||||
points to the dog's left, last ray to the right. Webots' default
|
||||
Lidar sweep matches this.
|
||||
"""
|
||||
return np.linspace(fov / 2.0, -fov / 2.0, n, dtype=np.float64)
|
||||
|
||||
|
||||
# Cached so we don't rebuild every step.
|
||||
_ANGLES = ray_angles()
|
||||
_COS = np.cos(_ANGLES)
|
||||
_SIN = np.sin(_ANGLES)
|
||||
|
||||
|
||||
def _raycast_static(
|
||||
ox: float, oy: float, cos_w: np.ndarray, sin_w: np.ndarray,
|
||||
) -> np.ndarray:
|
||||
"""Per-ray distance to nearest wall or post hit (∞ if none).
|
||||
|
||||
Walls are axis-aligned line segments; for each ray we compute t at
|
||||
which it crosses the wall's constant-coord plane and check the
|
||||
other coord lies in the segment. Posts are circles; same disc
|
||||
intersection as for sheep.
|
||||
"""
|
||||
n_rays = cos_w.shape[0]
|
||||
best = np.full(n_rays, np.inf, dtype=np.float64)
|
||||
|
||||
EPS = 1e-3
|
||||
safe_cos = np.where(np.abs(cos_w) < 1e-9, 1e-9, cos_w)
|
||||
safe_sin = np.where(np.abs(sin_w) < 1e-9, 1e-9, sin_w)
|
||||
|
||||
# Vertical walls (x = const)
|
||||
for wx, ymin, ymax in _VERTICAL_WALLS:
|
||||
t = (wx - ox) / safe_cos
|
||||
y_at = oy + t * sin_w
|
||||
valid = (t > EPS) & (y_at >= ymin - EPS) & (y_at <= ymax + EPS)
|
||||
cand = np.where(valid, t, np.inf)
|
||||
np.minimum(best, cand, out=best)
|
||||
|
||||
# Horizontal walls (y = const)
|
||||
for wy, xmin, xmax in _HORIZONTAL_WALLS:
|
||||
t = (wy - oy) / safe_sin
|
||||
x_at = ox + t * cos_w
|
||||
valid = (t > EPS) & (x_at >= xmin - EPS) & (x_at <= xmax + EPS)
|
||||
cand = np.where(valid, t, np.inf)
|
||||
np.minimum(best, cand, out=best)
|
||||
|
||||
# Posts (treat as discs)
|
||||
if _POSTS_XY.size:
|
||||
px = _POSTS_XY[:, 0] - ox
|
||||
py = _POSTS_XY[:, 1] - oy
|
||||
t_post = np.outer(px, cos_w) + np.outer(py, sin_w) # (P, N)
|
||||
d2 = (px ** 2 + py ** 2)[:, None] # (P, 1)
|
||||
perp2 = d2 - t_post ** 2
|
||||
R2 = POST_RADIUS ** 2
|
||||
hit = (perp2 < R2) & (t_post > 0.0)
|
||||
half = np.sqrt(np.clip(R2 - perp2, 0.0, None))
|
||||
cand = np.where(hit, t_post - half, np.inf)
|
||||
nearest = cand.min(axis=0)
|
||||
np.minimum(best, nearest, out=best)
|
||||
|
||||
return best
|
||||
|
||||
|
||||
def simulate_scan(
|
||||
dog_x: float, dog_y: float, dog_heading: float,
|
||||
sheep_xy: list[tuple[float, float]],
|
||||
noise: float = LIDAR_NOISE,
|
||||
max_range: float = LIDAR_MAX_RANGE,
|
||||
rng: np.random.Generator | None = None,
|
||||
) -> np.ndarray:
|
||||
"""Return a (N,) float32 range array. No-hit entries equal ``max_range``.
|
||||
|
||||
``sheep_xy`` is the list of (x, y) world positions of every sheep in
|
||||
the scene (penned and active). Static world geometry (walls and
|
||||
posts) is also raycast so demos contain the same false-positive
|
||||
clusters Webots produces.
|
||||
"""
|
||||
n_rays = _ANGLES.shape[0]
|
||||
|
||||
ch, sh = math.cos(dog_heading), math.sin(dog_heading)
|
||||
cos_w = ch * _COS - sh * _SIN
|
||||
sin_w = sh * _COS + ch * _SIN
|
||||
|
||||
# Walls + posts
|
||||
best = _raycast_static(dog_x, dog_y, cos_w, sin_w)
|
||||
|
||||
# Sheep discs
|
||||
if sheep_xy:
|
||||
sx = np.asarray([p[0] for p in sheep_xy], dtype=np.float64) - dog_x
|
||||
sy = np.asarray([p[1] for p in sheep_xy], dtype=np.float64) - dog_y
|
||||
t = np.outer(sx, cos_w) + np.outer(sy, sin_w)
|
||||
s_dist2 = (sx ** 2 + sy ** 2)[:, None]
|
||||
perp2 = s_dist2 - t ** 2
|
||||
R2 = SHEEP_RADIUS ** 2
|
||||
hit = (perp2 < R2) & (t > 0.0)
|
||||
half = np.sqrt(np.clip(R2 - perp2, 0.0, None))
|
||||
candidate = np.where(hit, t - half, np.inf)
|
||||
nearest = candidate.min(axis=0)
|
||||
np.minimum(best, nearest, out=best)
|
||||
|
||||
# Clip to LIDAR_MAX_RANGE; entries that never got a hit stay at inf
|
||||
# → clipped down to max_range like the real Webots device.
|
||||
ranges = np.minimum(best, max_range).astype(np.float32)
|
||||
return _add_noise(ranges, noise, rng, max_range)
|
||||
|
||||
|
||||
def _add_noise(ranges: np.ndarray, sigma: float,
|
||||
rng: np.random.Generator | None, max_range: float) -> np.ndarray:
|
||||
if sigma <= 0.0:
|
||||
return ranges
|
||||
if rng is None:
|
||||
rng = np.random.default_rng()
|
||||
hit_mask = ranges < max_range - 1e-3
|
||||
n_hit = int(hit_mask.sum())
|
||||
if n_hit:
|
||||
ranges = ranges.copy()
|
||||
ranges[hit_mask] += rng.normal(0.0, sigma, size=n_hit).astype(np.float32)
|
||||
np.clip(ranges, 0.0, max_range, out=ranges)
|
||||
return ranges
|
||||
@@ -0,0 +1,197 @@
|
||||
"""Multi-target tracker for LiDAR-detected sheep.
|
||||
|
||||
Greedy nearest-neighbour data association (with a distance gate) across
|
||||
frames, plus a memory of last-seen positions for tracks that fall out
|
||||
of the dog's FOV. Output is a ``{name: (x, y)}`` dict shaped exactly
|
||||
like the receiver-based ``sheep_positions`` used previously by the
|
||||
Webots controller and by the env, so Strömbom and Sequential can
|
||||
consume it unchanged.
|
||||
|
||||
Penned-detection heuristic
|
||||
--------------------------
|
||||
Two ways a track is marked penned:
|
||||
1. Its current estimated position is south of the gate plane and
|
||||
within the gate column (the ``is_penned_position`` test the env
|
||||
already uses on ground truth).
|
||||
2. It hasn't been observed for ``STALE_STEPS`` and its last-seen
|
||||
position was inside the gate-approach band — the dog's LiDAR can
|
||||
only see ~2 m into the pen through the open gate, so a sheep
|
||||
that disappeared near the entry has almost certainly entered.
|
||||
|
||||
Tracks marked penned are excluded from ``get_positions()`` (which is
|
||||
what Strömbom consumes), matching the prior receiver-based behaviour.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
|
||||
from herding.world.geometry import MAX_SHEEP, in_pen, is_penned_position
|
||||
|
||||
|
||||
GATE_M = 2.5 # m — primary NN gate (recent tracks)
|
||||
REACQUIRE_GATE_M = 4.5 # m — wider gate for re-acquiring stale tracks (sheep moved during occlusion)
|
||||
REACQUIRE_MIN_AGE = 20 # steps — only rebind via the wide gate if the track has been stale for this long
|
||||
PENNED_GATE_M = 4.0 # m — wide gate for matching against already-penned tracks; the pen is small (3×7 m) so duplicates are easy without it
|
||||
FORGET_STEPS = 200 # ~3.2 s — delete stale active tracks; tighter than 5 s to limit phantoms but long enough to bridge typical FOV gaps
|
||||
MAX_ACTIVE_TRACKS = MAX_SHEEP # hard cap to the worst-case real flock size
|
||||
# Penned tracks are never forgotten: sheep don't leave the pen, and
|
||||
# losing the track makes the counter oscillate as the same sheep gets
|
||||
# re-detected and counted multiple times.
|
||||
|
||||
|
||||
class SheepTracker:
|
||||
"""Online tracker with NN association and a forgetful memory.
|
||||
|
||||
Each track stores ``(x, y, last_seen_step, penned)``.
|
||||
"""
|
||||
|
||||
def __init__(self, gate: float = GATE_M):
|
||||
self.gate = gate
|
||||
# tid → (x, y, last_seen_step, penned)
|
||||
self._tracks: dict[int, tuple[float, float, int, bool]] = {}
|
||||
self._next_id = 0
|
||||
self.step = 0
|
||||
|
||||
def reset(self) -> None:
|
||||
self._tracks.clear()
|
||||
self._next_id = 0
|
||||
self.step = 0
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Update
|
||||
# ------------------------------------------------------------------
|
||||
def update(self, detections: list[tuple[float, float]]) -> dict[str, tuple[float, float]]:
|
||||
"""Fold a new set of detections in and return active positions."""
|
||||
self.step += 1
|
||||
|
||||
det_used: set[int] = set()
|
||||
updated_tids: set[int] = set()
|
||||
|
||||
# Pass 1: match against ACTIVE tracks first (oldest-seen-first so
|
||||
# a re-emerging long-lost sheep grabs its old ID before a fresh
|
||||
# neighbour does).
|
||||
active_tids = [tid for tid, t in self._tracks.items() if not t[3]]
|
||||
active_tids.sort(key=lambda tid: self._tracks[tid][2])
|
||||
for tid in active_tids:
|
||||
tx, ty, _, _ = self._tracks[tid]
|
||||
best_j, best_d = -1, self.gate
|
||||
for j, (dx, dy) in enumerate(detections):
|
||||
if j in det_used:
|
||||
continue
|
||||
d = math.hypot(dx - tx, dy - ty)
|
||||
if d < best_d:
|
||||
best_d = d
|
||||
best_j = j
|
||||
if best_j >= 0:
|
||||
dx, dy = detections[best_j]
|
||||
self._tracks[tid] = (dx, dy, self.step, False)
|
||||
det_used.add(best_j)
|
||||
updated_tids.add(tid)
|
||||
|
||||
# Pass 1b: re-acquisition with a wider gate for tracks that have
|
||||
# been stale for ≥ REACQUIRE_MIN_AGE steps. Sheep flee at
|
||||
# ~0.6 m/s; over a 1–2 s occlusion (dog rotating or driving)
|
||||
# they move enough that a fresh detection lies outside the
|
||||
# primary GATE_M but is still clearly the same sheep. Without
|
||||
# this, phantom tracks accumulate and corrupt the CoM.
|
||||
for tid in active_tids:
|
||||
if tid in updated_tids:
|
||||
continue
|
||||
tx, ty, last, _ = self._tracks[tid]
|
||||
if (self.step - last) < REACQUIRE_MIN_AGE:
|
||||
continue
|
||||
best_j, best_d = -1, REACQUIRE_GATE_M
|
||||
for j, (dx, dy) in enumerate(detections):
|
||||
if j in det_used:
|
||||
continue
|
||||
d = math.hypot(dx - tx, dy - ty)
|
||||
if d < best_d:
|
||||
best_d = d
|
||||
best_j = j
|
||||
if best_j >= 0:
|
||||
dx, dy = detections[best_j]
|
||||
self._tracks[tid] = (dx, dy, self.step, False)
|
||||
det_used.add(best_j)
|
||||
updated_tids.add(tid)
|
||||
|
||||
# Pass 2: match remaining detections against PENNED tracks with
|
||||
# a tighter gate. Without this, every frame near the gate spawns
|
||||
# a fresh penned track for the same sheep, which under a long
|
||||
# Webots run leads to thousands of phantom penned tracks.
|
||||
penned_tids = [tid for tid, t in self._tracks.items() if t[3]]
|
||||
for tid in penned_tids:
|
||||
tx, ty, _, _ = self._tracks[tid]
|
||||
best_j, best_d = -1, PENNED_GATE_M
|
||||
for j, (dx, dy) in enumerate(detections):
|
||||
if j in det_used:
|
||||
continue
|
||||
d = math.hypot(dx - tx, dy - ty)
|
||||
if d < best_d:
|
||||
best_d = d
|
||||
best_j = j
|
||||
if best_j >= 0:
|
||||
dx, dy = detections[best_j]
|
||||
self._tracks[tid] = (dx, dy, self.step, True)
|
||||
det_used.add(best_j)
|
||||
|
||||
# Unmatched detections → new tracks. A detection that is already
|
||||
# inside the pen is born "penned" so we don't accumulate active
|
||||
# tracks for sheep that arrived in the pen during occlusion.
|
||||
for j, (dx, dy) in enumerate(detections):
|
||||
if j in det_used:
|
||||
continue
|
||||
penned = in_pen(dx, dy) or is_penned_position(dx, dy)
|
||||
self._tracks[self._next_id] = (dx, dy, self.step, penned)
|
||||
self._next_id += 1
|
||||
|
||||
# Promote active tracks to penned ONLY by geometric position
|
||||
# (sheep is in the pen column south of the gate). The previous
|
||||
# "stale + near gate" heuristic was firing on ordinary occlusion
|
||||
# near the gate and creating phantom penned tracks.
|
||||
for tid, (tx, ty, last, penned) in list(self._tracks.items()):
|
||||
if penned:
|
||||
continue
|
||||
if is_penned_position(tx, ty):
|
||||
self._tracks[tid] = (tx, ty, last, True)
|
||||
|
||||
# Forget stale ACTIVE tracks after FORGET_STEPS. Penned tracks
|
||||
# are kept indefinitely — sheep can't escape the pen, so once a
|
||||
# track is marked penned, that sheep is permanently penned.
|
||||
for tid, (tx, ty, last, penned) in list(self._tracks.items()):
|
||||
if penned:
|
||||
continue
|
||||
if (self.step - last) > FORGET_STEPS:
|
||||
del self._tracks[tid]
|
||||
|
||||
# Hard cap on the active set. If we somehow have more than
|
||||
# MAX_ACTIVE_TRACKS active tracks, drop the oldest-seen ones
|
||||
# first — they are most likely false positives from world
|
||||
# geometry (walls, gate posts) the env's raycaster doesn't
|
||||
# model, and a bloated active set wrecks the downstream CoM.
|
||||
active = [(tid, last) for tid, (_, _, last, p) in self._tracks.items()
|
||||
if not p]
|
||||
if len(active) > MAX_ACTIVE_TRACKS:
|
||||
active.sort(key=lambda kv: kv[1]) # oldest-seen first
|
||||
for tid, _ in active[: len(active) - MAX_ACTIVE_TRACKS]:
|
||||
del self._tracks[tid]
|
||||
|
||||
return self.get_positions()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Outputs
|
||||
# ------------------------------------------------------------------
|
||||
def get_positions(self) -> dict[str, tuple[float, float]]:
|
||||
"""Active (not-yet-penned) tracks. Same shape as receiver dict."""
|
||||
return {f"t{tid}": (x, y)
|
||||
for tid, (x, y, _, penned) in self._tracks.items()
|
||||
if not penned}
|
||||
|
||||
def get_penned_set(self) -> set[str]:
|
||||
return {f"t{tid}" for tid, (_, _, _, penned) in self._tracks.items() if penned}
|
||||
|
||||
def n_active(self) -> int:
|
||||
return sum(1 for _, _, _, penned in self._tracks.values() if not penned)
|
||||
|
||||
def n_penned(self) -> int:
|
||||
return sum(1 for _, _, _, penned in self._tracks.values() if penned)
|
||||
Reference in New Issue
Block a user