"""Cluster a 2D LiDAR scan into world-frame sheep position estimates. Pipeline: ranges (N,) → hit mask → world-frame points │ ▼ adjacency clustering (gap > GAP_THRESHOLD starts a new cluster, walking rays in angular order) │ ▼ centroid + span + region + structure filters │ ▼ list of (x, y) detections The downstream tracker handles association across frames. """ from __future__ import annotations import math from typing import TYPE_CHECKING import numpy as np if TYPE_CHECKING: from herding.config import DetectionConfig, LidarConfig from herding.world.geometry import ( FIELD_SHAPE, FIELD_ROUND_R, FIELD_X, FIELD_Y, GATE_X, GATE_Y, PEN_X, PEN_Y, ) from herding.perception.lidar_sim import ( LIDAR_FOV, LIDAR_MAX_RANGE, LIDAR_N_RAYS, SHEEP_RADIUS, POST_RADIUS, ray_angles, ) GAP_THRESHOLD = 0.6 # m — adjacent ray-points farther apart start a new cluster MAX_CLUSTER_SPAN = 1.5 # m — wider clusters are walls / structures RANGE_HIT_EPS = 0.05 # m — hit if range < max_range - eps WALL_REJECT = 0.5 # m — drop detections this close to a known wall line # Multi-peak splitting: within a single cluster, if the range profile # has a local dip (i.e. the range increases then decreases) deeper than # SPLIT_RANGE_GAP, the cluster is split into two detections. SPLIT_RANGE_GAP = 0.20 # m — range increase that triggers a split # Sheep-sized static features. A cluster centred within STATIC_REJECT of # any of these is never a sheep. _STATIC_FEATURES_RECT = ( ( 10.0, -15.0), ( 13.0, -15.0), # gate posts ( 15.0, 15.0), ( 15.0, -15.0), (-15.0, 15.0), (-15.0, -15.0), # field corners ) _STATIC_FEATURES_ROUND = ( (GATE_X[0], GATE_Y), (GATE_X[1], GATE_Y), ) STATIC_REJECT = 0.8 def _get_static_features(): if FIELD_SHAPE == "field_round": return _STATIC_FEATURES_ROUND return _STATIC_FEATURES_RECT _STATIC_FEATURES = _get_static_features() def _in_field_region(cx: float, cy: float) -> bool: """Check if a detection is inside the field (with small margin).""" if FIELD_SHAPE == "field_round": r = math.hypot(cx, cy) return r < FIELD_ROUND_R + 0.2 return (FIELD_X[0] - 0.2 < cx < FIELD_X[1] + 0.2 and FIELD_Y[0] - 0.2 < cy < FIELD_Y[1] + 0.2) def _near_wall(cx: float, cy: float, wall_reject: float = WALL_REJECT) -> bool: """True if the detection is too close to a wall to be a sheep.""" if FIELD_SHAPE == "field_round": r = math.hypot(cx, cy) return r > FIELD_ROUND_R - wall_reject return ( cx > FIELD_X[1] - wall_reject or cx < FIELD_X[0] + wall_reject or cy > FIELD_Y[1] - wall_reject or (cy < FIELD_Y[0] + wall_reject and not (PEN_X[0] <= cx <= PEN_X[1])) ) def _split_cluster_by_range( points: list[tuple[float, float]], range_vals: list[float], split_range_gap: float = SPLIT_RANGE_GAP, ) -> list[list[tuple[float, float]]]: """Split a cluster at range-profile local maxima (gaps between sheep). When two sheep are close, the LiDAR sees them as one arc, but the range profile has a local peak between them (the ray passes between the two discs). This function finds those peaks and splits. """ if len(points) < 4: return [points] # Find the minimum range in the cluster (closest point to dog). r_min = min(range_vals) # Find the maximum range (the dip/gap between sheep). r_max = max(range_vals) # If the range variation is small, it's a single target. if r_max - r_min < split_range_gap: return [points] # Find the split point: the index with the maximum range. split_idx = range_vals.index(r_max) if split_idx <= 1 or split_idx >= len(points) - 2: return [points] # Split into two sub-clusters. left = points[:split_idx] right = points[split_idx + 1:] # Recursively split each half. result = [] for sub_pts, sub_ranges in [ (left, range_vals[:split_idx]), (right, range_vals[split_idx + 1:]), ]: if len(sub_pts) >= 1: result.extend(_split_cluster_by_range(sub_pts, sub_ranges, split_range_gap)) return result if result else [points] def detections_from_scan( ranges: np.ndarray, dog_x: float, dog_y: float, dog_heading: float, max_range: float = LIDAR_MAX_RANGE, detection_cfg: "DetectionConfig | None" = None, lidar_cfg: "LidarConfig | None" = None, ) -> list[tuple[float, float]]: """Return list of (x, y) world-frame sheep position estimates. Pass ``detection_cfg`` to override clustering/filtering thresholds, or ``lidar_cfg`` to inform the function of a non-default FOV (the number of rays and FOV are inferred from the length of ``ranges`` and ``lidar_cfg.fov_rad`` respectively). """ # Resolve parameters — fall back to module-level constants when no cfg. if detection_cfg is not None: gap_thr = detection_cfg.gap_threshold max_span = detection_cfg.max_cluster_span hit_eps = detection_cfg.range_hit_eps split_gap = detection_cfg.split_range_gap wall_rej = detection_cfg.wall_reject static_rej = detection_cfg.static_reject else: gap_thr = GAP_THRESHOLD max_span = MAX_CLUSTER_SPAN hit_eps = RANGE_HIT_EPS split_gap = SPLIT_RANGE_GAP wall_rej = WALL_REJECT static_rej = STATIC_REJECT sheep_r = lidar_cfg.sheep_radius if lidar_cfg is not None else SHEEP_RADIUS fov = lidar_cfg.fov_rad if lidar_cfg is not None else LIDAR_FOV if lidar_cfg is not None: max_range = lidar_cfg.max_range ranges = np.asarray(ranges, dtype=np.float32) n_rays = ranges.shape[0] if n_rays == 0: return [] angles = ray_angles(n_rays, fov) hit = ranges < max_range - hit_eps world_a = dog_heading + angles px = dog_x + ranges * np.cos(world_a) py = dog_y + ranges * np.sin(world_a) # Walk rays in angular order; a large jump between consecutive # world-frame hit points closes the current cluster. # Store (x, y, range) per hit ray for multi-peak splitting. clusters: list[list[tuple[float, float, float]]] = [] current: list[tuple[float, float, float]] = [] prev_xy: tuple[float, float] | None = None for i in range(n_rays): if not bool(hit[i]): if current: clusters.append(current) current = [] prev_xy = None continue pt = (float(px[i]), float(py[i]), float(ranges[i])) if prev_xy is not None and math.hypot(pt[0] - prev_xy[0], pt[1] - prev_xy[1]) > gap_thr: clusters.append(current) current = [] current.append(pt) prev_xy = (pt[0], pt[1]) if current: clusters.append(current) detections: list[tuple[float, float]] = [] for cluster in clusters: points_xy = [(p[0], p[1]) for p in cluster] range_vals = [p[2] for p in cluster] # Multi-peak splitting. if len(cluster) >= 4: sub_clusters = _split_cluster_by_range(points_xy, range_vals, split_gap) else: sub_clusters = [points_xy] for sub in sub_clusters: if len(sub) < 1: continue xs = [p[0] for p in sub] ys = [p[1] for p in sub] cx, cy = sum(xs) / len(xs), sum(ys) / len(ys) span = math.hypot(max(xs) - min(xs), max(ys) - min(ys)) if span > max_span: continue # Rays hit the front edge of the sheep; offset outward by # sheep_radius along the dog→cluster direction. dx, dy = cx - dog_x, cy - dog_y d = math.hypot(dx, dy) if d > 1e-3: cx += sheep_r * dx / d cy += sheep_r * dy / d in_main = _in_field_region(cx, cy) in_gate_strip = (PEN_X[0] - 0.2 < cx < PEN_X[1] + 0.2 and GATE_Y - 1.0 < cy < GATE_Y + 0.2) if not (in_main or in_gate_strip): continue if any(math.hypot(cx - fx, cy - fy) < static_rej for fx, fy in _STATIC_FEATURES): continue if _near_wall(cx, cy, wall_rej): continue detections.append((cx, cy)) return detections