Files
TIR_PROJ/tests/test_perception.py
T
Johnny Fernandes 7ab69ab0f3 Rename multi-segment functions to two-concept names; polish docstrings
Naming pass: rename functions whose third+ segment is redundant or
implementation-detail, sticking to the codebase's preferred
``noun_verb`` / ``verb_noun`` two-concept idiom. Renames are atomic
across definitions, callers, and tests.

  is_penned_position        →  is_penned
  modulate_speed_near_sheep →  modulate_speed
  mecanum_kinematics_step   →  mecanum_step
  policy_forward_mean       →  forward_mean

Two-concept patterns like ``velocity_to_wheels`` / ``detections_from_scan``
/ ``make_strombom_predictor`` are left alone — they're idiomatic
converters / factories that read as a single concept, and the longer
form aids grep-ability.

Docstring polish:
* ``herding/config.py`` header drops the "previously lived as a
  module-level literal" historical framing — we ship as a single
  thing, so the refactor anecdote no longer earns its keep. The
  usage examples now mention both ``HERDING_WEBOTS`` and
  ``HERDING_MEC_WEBOTS`` presets.

126 pytest cases still pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 01:58:15 +00:00

252 lines
8.5 KiB
Python

"""LiDAR simulation + perception pipeline + multi-target tracker."""
import math
import numpy as np
import pytest
from herding.perception.lidar_perception import (
STATIC_REJECT, detections_from_scan,
)
from herding.perception.lidar_sim import (
LIDAR_MAX_RANGE, LIDAR_N_RAYS, SHEEP_RADIUS, ray_angles, simulate_scan,
)
from herding.perception.sheep_tracker import (
FORGET_STEPS, GATE_M, MAX_ACTIVE_TRACKS, REACQUIRE_GATE_M,
REACQUIRE_MIN_AGE, SheepTracker,
)
# ---------------------------------------------------------------------------
# Sim
# ---------------------------------------------------------------------------
def test_simulate_scan_shape_and_dtype():
ranges = simulate_scan(0.0, 0.0, 0.0, [(5.0, 0.0)], noise=0.0)
assert ranges.shape == (LIDAR_N_RAYS,)
assert ranges.dtype == np.float32
def test_simulate_scan_no_sheep_far_from_walls():
# Dog at origin, no sheep, walls all ≥ 15 m away → all rays at max.
ranges = simulate_scan(0.0, 0.0, 0.0, [], noise=0.0)
# Walls (east/west at ±15) are beyond LIDAR_MAX_RANGE=12, so no hits.
assert (ranges == LIDAR_MAX_RANGE).all()
def test_simulate_scan_sheep_in_front_returns_centre_hit():
# Sheep dead ahead at 5 m. Centre ray should hit ~ 5 - SHEEP_RADIUS.
ranges = simulate_scan(0.0, 0.0, 0.0, [(5.0, 0.0)], noise=0.0)
centre = ranges[LIDAR_N_RAYS // 2]
assert math.isclose(float(centre), 5.0 - SHEEP_RADIUS, abs_tol=0.01)
def test_simulate_scan_sheep_behind_dog_not_hit():
# With 360° FOV, a sheep behind the dog IS now hit.
ranges = simulate_scan(0.0, 0.0, 0.0, [(-5.0, 0.0)], noise=0.0)
assert (ranges < LIDAR_MAX_RANGE).any()
# Verify the closest hit is near 5m (sheep at distance 5).
assert float(ranges.min()) < 5.3
def test_simulate_scan_wall_hit():
# Dog 1 m south of the north wall, facing north → centre ray ≈ 1 m.
ranges = simulate_scan(0.0, 14.0, math.pi / 2, [], noise=0.0)
centre = ranges[LIDAR_N_RAYS // 2]
assert math.isclose(float(centre), 1.0, abs_tol=0.01)
# ---------------------------------------------------------------------------
# Perception
# ---------------------------------------------------------------------------
def test_detections_recover_sheep_position():
sheep = [(5.0, 0.0), (3.0, 1.0)]
ranges = simulate_scan(0.0, 0.0, 0.0, sheep, noise=0.0)
det = detections_from_scan(ranges, 0.0, 0.0, 0.0)
assert len(det) == 2
# Centroid bias is corrected to within ~5 cm.
for truth in sheep:
assert any(math.hypot(d[0] - truth[0], d[1] - truth[1]) < 0.1
for d in det)
def test_detections_filter_gate_post():
# An empty scene at the dog right next to a gate post produces no
# detections — the static-feature filter drops the post return.
ranges = simulate_scan(11.5, -10.0, -math.pi / 2, [], noise=0.0)
det = detections_from_scan(ranges, 11.5, -10.0, -math.pi / 2)
for cx, cy in det:
assert math.hypot(cx - 10.0, cy + 15.0) > STATIC_REJECT
assert math.hypot(cx - 13.0, cy + 15.0) > STATIC_REJECT
def test_detections_empty_scan_returns_nothing():
assert detections_from_scan(np.array([], dtype=np.float32),
0.0, 0.0, 0.0) == []
# ---------------------------------------------------------------------------
# Tracker
# ---------------------------------------------------------------------------
def test_tracker_creates_track_for_new_detection():
t = SheepTracker()
t.update([(5.0, 0.0)])
assert t.n_active() == 1
def test_tracker_associates_close_detections():
"""A small movement within the gate keeps the same track."""
t = SheepTracker()
t.update([(5.0, 0.0)])
t.update([(5.5, 0.0)])
assert t.n_active() == 1
def test_tracker_spawns_new_track_far_detection():
t = SheepTracker()
t.update([(5.0, 0.0)])
t.update([(-5.0, 0.0)]) # well outside the gate
assert t.n_active() == 2
def test_tracker_reacquisition_for_stale_track():
"""A stale track within the wider re-acquisition gate rebinds rather
than spawning a duplicate."""
t = SheepTracker()
t.update([(0.0, 0.0)])
# Let it go stale.
for _ in range(REACQUIRE_MIN_AGE):
t.update([])
# Re-emerges within REACQUIRE_GATE but outside the primary GATE.
offset = (GATE_M + REACQUIRE_GATE_M) / 2.0
t.update([(offset, 0.0)])
assert t.n_active() == 1
def test_tracker_forgets_stale_tracks():
t = SheepTracker()
t.update([(0.0, 0.0)])
for _ in range(FORGET_STEPS + 1):
t.update([])
assert t.n_active() == 0
def test_tracker_penned_position_promotes_track():
t = SheepTracker()
t.update([(11.5, -16.0)]) # spawn inside the pen column
# is_penned is True for this point.
assert t.n_penned() == 1
assert t.n_active() == 0
def test_tracker_penned_tracks_persist():
t = SheepTracker()
t.update([(11.5, -16.0)])
for _ in range(FORGET_STEPS * 2):
t.update([])
# Penned tracks are not forgotten.
assert t.n_penned() == 1
def test_tracker_caps_active_set():
t = SheepTracker()
# Spawn more than the cap, each well outside the others' gates.
for k in range(MAX_ACTIVE_TRACKS + 5):
t.update([(k * (GATE_M + 1.0), 0.0)])
assert t.n_active() <= MAX_ACTIVE_TRACKS
def test_tracker_reset_clears_state():
t = SheepTracker()
t.update([(0.0, 0.0)])
t.reset()
assert t.n_active() == 0
assert t.step == 0
# ---------------------------------------------------------------------------
# Consensus promotion
# ---------------------------------------------------------------------------
def _tracker_with_consensus(k: int = 3, radius: float = 0.5, max_age: int = 8):
from herding.config import TrackerConfig
return SheepTracker(tracker_cfg=TrackerConfig(
consensus_k=k, consensus_radius_m=radius, consensus_max_age=max_age,
))
def test_consensus_default_disabled():
"""With consensus_k=1 (default) the first detection is immediately visible."""
t = SheepTracker()
t.update([(5.0, 0.0)])
assert t.n_active() == 1
assert len(t.get_positions()) == 1
def test_consensus_hides_one_shot_detection():
"""K>=2: a single detection that never reappears is filtered out."""
t = _tracker_with_consensus(k=3)
t.update([(5.0, 0.0)])
assert t.n_active() == 0 # candidate, not promoted
assert t.n_candidate() == 1
assert t.get_positions() == {}
def test_consensus_promotes_after_k_matches():
"""A real sheep visible for K frames promotes and appears in get_positions."""
t = _tracker_with_consensus(k=3)
for _ in range(3):
t.update([(5.0, 0.0)])
assert t.n_active() == 1
assert t.n_candidate() == 0
assert len(t.get_positions()) == 1
def test_consensus_candidate_expires_quickly():
"""A candidate that fails to re-confirm within consensus_max_age dies."""
t = _tracker_with_consensus(k=3, max_age=5)
t.update([(5.0, 0.0)])
assert t.n_candidate() == 1
for _ in range(6): # > max_age empty frames
t.update([])
assert t.n_candidate() == 0
assert t.n_active() == 0
def test_consensus_tracker_does_not_promote_phantom_pen():
"""A one-shot detection inside the pen column must not latch as penned
while it is still a candidate."""
t = _tracker_with_consensus(k=3)
t.update([(11.5, -16.0)]) # gate-area FP, inside the pen column
# Not promoted, not penned — just a candidate.
assert t.n_penned() == 0
assert t.n_candidate() == 1
# And after one expiry window it disappears entirely.
for _ in range(10):
t.update([])
assert t.n_penned() == 0
assert t.n_candidate() == 0
def test_consensus_distinguishes_real_sheep_from_phantom():
"""Real sheep (continuous detections) promote; phantom (intermittent
detections at jittered positions outside consensus_radius) does not
appear in get_positions even while individual candidates are still
within the max-age window."""
t = _tracker_with_consensus(k=3, radius=0.4, max_age=4)
# Real sheep visible at (5, 0) every frame; phantom jitters > radius.
phantom_positions = [(10.0, 5.0), (10.5, 5.6), (11.1, 5.0), (10.0, 5.7)]
for k in range(4):
t.update([(5.0, 0.0), phantom_positions[k]])
positions = t.get_positions()
assert len(positions) == 1
real_xy = next(iter(positions.values()))
assert math.hypot(real_xy[0] - 5.0, real_xy[1]) < 0.5
# And once the candidate window has elapsed, every phantom has died.
for _ in range(8):
t.update([(5.0, 0.0)])
assert t.n_candidate() == 0
assert len(t.get_positions()) == 1