"""LiDAR simulation + perception pipeline + multi-target tracker.""" import math import numpy as np import pytest from herding.perception.lidar_perception import ( STATIC_REJECT, detections_from_scan, ) from herding.perception.lidar_sim import ( LIDAR_MAX_RANGE, LIDAR_N_RAYS, SHEEP_RADIUS, ray_angles, simulate_scan, ) from herding.perception.sheep_tracker import ( FORGET_STEPS, GATE_M, MAX_ACTIVE_TRACKS, REACQUIRE_GATE_M, REACQUIRE_MIN_AGE, SheepTracker, ) # --------------------------------------------------------------------------- # Sim # --------------------------------------------------------------------------- def test_simulate_scan_shape_and_dtype(): ranges = simulate_scan(0.0, 0.0, 0.0, [(5.0, 0.0)], noise=0.0) assert ranges.shape == (LIDAR_N_RAYS,) assert ranges.dtype == np.float32 def test_simulate_scan_no_sheep_far_from_walls(): # Dog at origin, no sheep, walls all ≥ 15 m away → all rays at max. ranges = simulate_scan(0.0, 0.0, 0.0, [], noise=0.0) # Walls (east/west at ±15) are beyond LIDAR_MAX_RANGE=12, so no hits. assert (ranges == LIDAR_MAX_RANGE).all() def test_simulate_scan_sheep_in_front_returns_centre_hit(): # Sheep dead ahead at 5 m. Centre ray should hit ~ 5 - SHEEP_RADIUS. ranges = simulate_scan(0.0, 0.0, 0.0, [(5.0, 0.0)], noise=0.0) centre = ranges[LIDAR_N_RAYS // 2] assert math.isclose(float(centre), 5.0 - SHEEP_RADIUS, abs_tol=0.01) def test_simulate_scan_sheep_behind_dog_not_hit(): # With 360° FOV, a sheep behind the dog IS now hit. ranges = simulate_scan(0.0, 0.0, 0.0, [(-5.0, 0.0)], noise=0.0) assert (ranges < LIDAR_MAX_RANGE).any() # Verify the closest hit is near 5m (sheep at distance 5). assert float(ranges.min()) < 5.3 def test_simulate_scan_wall_hit(): # Dog 1 m south of the north wall, facing north → centre ray ≈ 1 m. ranges = simulate_scan(0.0, 14.0, math.pi / 2, [], noise=0.0) centre = ranges[LIDAR_N_RAYS // 2] assert math.isclose(float(centre), 1.0, abs_tol=0.01) # --------------------------------------------------------------------------- # Perception # --------------------------------------------------------------------------- def test_detections_recover_sheep_position(): sheep = [(5.0, 0.0), (3.0, 1.0)] ranges = simulate_scan(0.0, 0.0, 0.0, sheep, noise=0.0) det = detections_from_scan(ranges, 0.0, 0.0, 0.0) assert len(det) == 2 # Centroid bias is corrected to within ~5 cm. for truth in sheep: assert any(math.hypot(d[0] - truth[0], d[1] - truth[1]) < 0.1 for d in det) def test_detections_filter_gate_post(): # An empty scene at the dog right next to a gate post produces no # detections — the static-feature filter drops the post return. ranges = simulate_scan(11.5, -10.0, -math.pi / 2, [], noise=0.0) det = detections_from_scan(ranges, 11.5, -10.0, -math.pi / 2) for cx, cy in det: assert math.hypot(cx - 10.0, cy + 15.0) > STATIC_REJECT assert math.hypot(cx - 13.0, cy + 15.0) > STATIC_REJECT def test_detections_empty_scan_returns_nothing(): assert detections_from_scan(np.array([], dtype=np.float32), 0.0, 0.0, 0.0) == [] # --------------------------------------------------------------------------- # Tracker # --------------------------------------------------------------------------- def test_tracker_creates_track_for_new_detection(): t = SheepTracker() t.update([(5.0, 0.0)]) assert t.n_active() == 1 def test_tracker_associates_close_detections(): """A small movement within the gate keeps the same track.""" t = SheepTracker() t.update([(5.0, 0.0)]) t.update([(5.5, 0.0)]) assert t.n_active() == 1 def test_tracker_spawns_new_track_far_detection(): t = SheepTracker() t.update([(5.0, 0.0)]) t.update([(-5.0, 0.0)]) # well outside the gate assert t.n_active() == 2 def test_tracker_reacquisition_for_stale_track(): """A stale track within the wider re-acquisition gate rebinds rather than spawning a duplicate.""" t = SheepTracker() t.update([(0.0, 0.0)]) # Let it go stale. for _ in range(REACQUIRE_MIN_AGE): t.update([]) # Re-emerges within REACQUIRE_GATE but outside the primary GATE. offset = (GATE_M + REACQUIRE_GATE_M) / 2.0 t.update([(offset, 0.0)]) assert t.n_active() == 1 def test_tracker_forgets_stale_tracks(): t = SheepTracker() t.update([(0.0, 0.0)]) for _ in range(FORGET_STEPS + 1): t.update([]) assert t.n_active() == 0 def test_tracker_penned_position_promotes_track(): t = SheepTracker() t.update([(11.5, -16.0)]) # spawn inside the pen column # is_penned_position is True for this point. assert t.n_penned() == 1 assert t.n_active() == 0 def test_tracker_penned_tracks_persist(): t = SheepTracker() t.update([(11.5, -16.0)]) for _ in range(FORGET_STEPS * 2): t.update([]) # Penned tracks are not forgotten. assert t.n_penned() == 1 def test_tracker_caps_active_set(): t = SheepTracker() # Spawn more than the cap, each well outside the others' gates. for k in range(MAX_ACTIVE_TRACKS + 5): t.update([(k * (GATE_M + 1.0), 0.0)]) assert t.n_active() <= MAX_ACTIVE_TRACKS def test_tracker_reset_clears_state(): t = SheepTracker() t.update([(0.0, 0.0)]) t.reset() assert t.n_active() == 0 assert t.step == 0 # --------------------------------------------------------------------------- # Consensus promotion # --------------------------------------------------------------------------- def _tracker_with_consensus(k: int = 3, radius: float = 0.5, max_age: int = 8): from herding.config import TrackerConfig return SheepTracker(tracker_cfg=TrackerConfig( consensus_k=k, consensus_radius_m=radius, consensus_max_age=max_age, )) def test_consensus_default_disabled(): """With consensus_k=1 (default) the first detection is immediately visible.""" t = SheepTracker() t.update([(5.0, 0.0)]) assert t.n_active() == 1 assert len(t.get_positions()) == 1 def test_consensus_hides_one_shot_detection(): """K>=2: a single detection that never reappears is filtered out.""" t = _tracker_with_consensus(k=3) t.update([(5.0, 0.0)]) assert t.n_active() == 0 # candidate, not promoted assert t.n_candidate() == 1 assert t.get_positions() == {} def test_consensus_promotes_after_k_matches(): """A real sheep visible for K frames promotes and appears in get_positions.""" t = _tracker_with_consensus(k=3) for _ in range(3): t.update([(5.0, 0.0)]) assert t.n_active() == 1 assert t.n_candidate() == 0 assert len(t.get_positions()) == 1 def test_consensus_candidate_expires_quickly(): """A candidate that fails to re-confirm within consensus_max_age dies.""" t = _tracker_with_consensus(k=3, max_age=5) t.update([(5.0, 0.0)]) assert t.n_candidate() == 1 for _ in range(6): # > max_age empty frames t.update([]) assert t.n_candidate() == 0 assert t.n_active() == 0 def test_consensus_tracker_does_not_promote_phantom_pen(): """A one-shot detection inside the pen column must not latch as penned while it is still a candidate.""" t = _tracker_with_consensus(k=3) t.update([(11.5, -16.0)]) # gate-area FP, inside the pen column # Not promoted, not penned — just a candidate. assert t.n_penned() == 0 assert t.n_candidate() == 1 # And after one expiry window it disappears entirely. for _ in range(10): t.update([]) assert t.n_penned() == 0 assert t.n_candidate() == 0 def test_consensus_distinguishes_real_sheep_from_phantom(): """Real sheep (continuous detections) promote; phantom (intermittent detections at jittered positions outside consensus_radius) does not appear in get_positions even while individual candidates are still within the max-age window.""" t = _tracker_with_consensus(k=3, radius=0.4, max_age=4) # Real sheep visible at (5, 0) every frame; phantom jitters > radius. phantom_positions = [(10.0, 5.0), (10.5, 5.6), (11.1, 5.0), (10.0, 5.7)] for k in range(4): t.update([(5.0, 0.0), phantom_positions[k]]) positions = t.get_positions() assert len(positions) == 1 real_xy = next(iter(positions.values())) assert math.hypot(real_xy[0] - 5.0, real_xy[1]) < 0.5 # And once the candidate window has elapsed, every phantom has died. for _ in range(8): t.update([(5.0, 0.0)]) assert t.n_candidate() == 0 assert len(t.get_positions()) == 1