"""Sheep flocking controller (Webots). Each sheep emits its GPS position every 3 steps and listens for the dog's position and peer-sheep positions. The behavioural step is delegated to :func:`herding.world.flocking_sim.compute_heading_speed` so the env and Webots use identical sheep dynamics. A sheep latches penned the first time it crosses the gate plane south; the wool turns pink (via the exposed ``woolColor`` PROTO field) and the dynamics switch to in-pen containment. """ import math import os import random import sys # --- Make the shared herding/ package importable from this controller dir --- _HERE = os.path.dirname(os.path.abspath(__file__)) _PROJECT_ROOT = os.path.normpath(os.path.join(_HERE, "..", "..")) if _PROJECT_ROOT not in sys.path: sys.path.insert(0, _PROJECT_ROOT) from controller import Supervisor from herding.world.diffdrive import heading_speed_to_wheels from herding.world.flocking_sim import MAX_SPEED, compute_heading_speed from herding.world.geometry import ( SHEEP_MAX_WHEEL_OMEGA, is_penned, ) # --- Devices --- robot = Supervisor() timestep = int(robot.getBasicTimeStep()) name = robot.getName() self_node = robot.getSelf() # Seed Python's RNG (shared with the dog controller) so a fixed # HERDING_SEED produces reproducible runs. Each sheep mixes its name # into the seed so the flock isn't all identical. def _read_runtime_cfg(): cfg_path = os.path.join(_PROJECT_ROOT, "herding_runtime.cfg") out = {} if os.path.exists(cfg_path): try: with open(cfg_path) as f: for line in f: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") out[k.strip().upper()] = v.strip() except OSError: pass return out _rt = _read_runtime_cfg() _seed_raw = (os.environ.get("HERDING_SEED") or _rt.get("HERDING_SEED") or "").strip() if _seed_raw: try: # XOR with hash(name) so different sheep have different seeds # but the flock as a whole is deterministic for a given seed. random.seed(int(_seed_raw) ^ (hash(name) & 0x7FFFFFFF)) except ValueError: pass left_motor = robot.getDevice("left wheel motor") right_motor = robot.getDevice("right wheel motor") left_motor.setPosition(float("inf")) right_motor.setPosition(float("inf")) left_motor.setVelocity(0.0) right_motor.setVelocity(0.0) MOTOR_MAX = min(left_motor.getMaxVelocity(), SHEEP_MAX_WHEEL_OMEGA) gps = robot.getDevice("gps"); gps.enable(timestep) compass = robot.getDevice("compass"); compass.enable(timestep) receiver = robot.getDevice("receiver"); receiver.enable(timestep) emitter = robot.getDevice("emitter") # --- Helpers --- def bearing(): """World-frame heading (0 = east, π/2 = north).""" n = compass.getValues() return math.atan2(n[0], n[1]) def drive(heading, speed_motor): left_w, right_w = heading_speed_to_wheels( heading, min(speed_motor, MAX_SPEED), bearing(), MOTOR_MAX, k_turn=4.0 ) left_motor.setVelocity(left_w) right_motor.setVelocity(right_w) def paint_pink(): """Switch the sheep's wool to pink via the exposed PROTO field.""" self_node.getField("woolColor").setSFColor([1.0, 0.55, 0.72]) # --- State --- wander_angle = random.uniform(-math.pi, math.pi) step_count = 0 dogs = {} # name → (x, y); supports the dual-dog setup peers = {} # name → (x, y); periodically pruned penned = False # Safety net for differential-drive sheep pinned against a wall. _prev_x, _prev_y = None, None _stuck_count = 0 STUCK_STEPS = 20 STUCK_DIST = 0.05 # --- Main loop --- while robot.step(timestep) != -1: step_count += 1 pos = gps.getValues() x, y = pos[0], pos[1] if not penned and is_penned(x, y): penned = True paint_pink() # Stale peers get dropped periodically so a peer that's gone silent # doesn't permanently distort the local CoM. Dogs are pruned too — # otherwise a temporarily-silent dog stays in `dogs` forever and # the closest-dog flee target stops being accurate. if step_count % 30 == 0: peers.clear() dogs.clear() while receiver.getQueueLength() > 0: msg = receiver.getString() receiver.nextPacket() parts = msg.split(":") # Legacy single-dog message: "dog:x:y". # Dual-dog message: "dog:NAME:x:y". if parts[0] == "dog" and len(parts) == 3: dogs["ShepherdDog"] = (float(parts[1]), float(parts[2])) elif parts[0] == "dog" and len(parts) >= 4: dogs[parts[1]] = (float(parts[2]), float(parts[3])) elif parts[0] == "sheep" and len(parts) >= 4 and parts[1] != name: peers[parts[1]] = (float(parts[2]), float(parts[3])) # Flee target = closest known dog; the flocking heuristic only needs # one (vx, vy) repulsion vector regardless of how many dogs are out # there. With two dogs at orthogonal axes, the sheep will see one of # them as nearest at any moment and react to it; the other dog's # influence enters through the sheep that does react to it pushing # this sheep in turn (Reynolds peer-repulsion). if dogs: closest = min(dogs.values(), key=lambda d: math.hypot(d[0] - x, d[1] - y)) dog_xy = closest else: dog_xy = None heading, speed, wander_angle = compute_heading_speed( x=x, y=y, penned=penned, dog_xy=dog_xy, peers=peers, wander_angle=wander_angle, ) # Stuck-against-wall recovery: drive toward the field centre. if _prev_x is not None: moved = math.hypot(x - _prev_x, y - _prev_y) _stuck_count = _stuck_count + 1 if moved < STUCK_DIST else 0 if _stuck_count >= STUCK_STEPS: heading = math.atan2(-y, -x) speed = MAX_SPEED _stuck_count = 0 _prev_x, _prev_y = x, y drive(heading, speed) if step_count % 3 == 0: emitter.send(f"sheep:{name}:{x:.4f}:{y:.4f}")