Files
TIR_PROJ/controllers/sheep/sheep.py
T
Johnny Fernandes cfbf4a0267 Dual-shepherd axis-split (HERDING_NDOGS=2)
The launcher can now spawn two `ShepherdDog` robots, each masked to a
single axis of motion, so the herding workload is split orthogonally.

Mechanic:
* `HERDING_NDOGS=2` (default 1) tells `tools/run_webots.sh` to replace
  the single-dog node in the generated test world with two copies:
  - `ShepherdDogX` at (-4, -10), `customData "axis=x"`
  - `ShepherdDogY` at (+4, -10), `customData "axis=y"`
  Each spawn position sits south of the field interior so the pair
  doesn't collide with starting sheep.
* `controllers/shepherd_dog/shepherd_dog.py` reads `getCustomData()`
  at startup; when `axis=x|y` it zeroes the off-axis component of every
  action *after* speed modulation and *before* EMA smoothing. With
  `customData` empty the controller behaves identically to single-dog
  mode, so all existing launches are unaffected.
* The dog's emitter line now carries the robot's name
  (`dog:ShepherdDogX:x:y`), and `controllers/sheep/sheep.py` keeps a
  `dogs` dict keyed by name, picking the closest one each step for
  its flee target. Single-dog runs still use the legacy two-field
  `dog:x:y` format thanks to a length check.
* `HERDING_NDOGS` is written into `herding_runtime.cfg` and exported
  to subprocesses so future tooling can read it.

Verified behaviour in Webots smoke tests (HERDING_NDOGS=2, strombom,
diff/field, 5 sheep): both dogs spawn with the expected names and
axis tags, the dual-dog status print appears, each dog acts only on
its assigned axis early in the trial, and the masking is internally
consistent. The pair stalls before penning under pure axis-split
because each dog reaches its drive standoff and then has only one
degree of freedom — useful research finding for the write-up;
coordination strategy (shared CoM, role-switching, etc.) is future
work.

126 pytest cases still pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 02:35:38 +00:00

149 lines
4.9 KiB
Python

"""Sheep flocking controller (Webots).
Each sheep emits its GPS position every 3 steps and listens for the
dog's position and peer-sheep positions. The behavioural step is
delegated to :func:`herding.world.flocking_sim.compute_heading_speed`
so the env and Webots use identical sheep dynamics.
A sheep latches penned the first time it crosses the gate plane south;
the wool turns pink (via the exposed ``woolColor`` PROTO field) and
the dynamics switch to in-pen containment.
"""
import math
import os
import random
import sys
# --- Make the shared herding/ package importable from this controller dir ---
_HERE = os.path.dirname(os.path.abspath(__file__))
_PROJECT_ROOT = os.path.normpath(os.path.join(_HERE, "..", ".."))
if _PROJECT_ROOT not in sys.path:
sys.path.insert(0, _PROJECT_ROOT)
from controller import Supervisor
from herding.world.diffdrive import heading_speed_to_wheels
from herding.world.flocking_sim import MAX_SPEED, compute_heading_speed
from herding.world.geometry import (
SHEEP_MAX_WHEEL_OMEGA,
is_penned,
)
# --- Devices ---
robot = Supervisor()
timestep = int(robot.getBasicTimeStep())
name = robot.getName()
self_node = robot.getSelf()
left_motor = robot.getDevice("left wheel motor")
right_motor = robot.getDevice("right wheel motor")
left_motor.setPosition(float("inf"))
right_motor.setPosition(float("inf"))
left_motor.setVelocity(0.0)
right_motor.setVelocity(0.0)
MOTOR_MAX = min(left_motor.getMaxVelocity(), SHEEP_MAX_WHEEL_OMEGA)
gps = robot.getDevice("gps"); gps.enable(timestep)
compass = robot.getDevice("compass"); compass.enable(timestep)
receiver = robot.getDevice("receiver"); receiver.enable(timestep)
emitter = robot.getDevice("emitter")
# --- Helpers ---
def bearing():
"""World-frame heading (0 = east, π/2 = north)."""
n = compass.getValues()
return math.atan2(n[0], n[1])
def drive(heading, speed_motor):
left_w, right_w = heading_speed_to_wheels(
heading, min(speed_motor, MAX_SPEED), bearing(), MOTOR_MAX, k_turn=4.0
)
left_motor.setVelocity(left_w)
right_motor.setVelocity(right_w)
def paint_pink():
"""Switch the sheep's wool to pink via the exposed PROTO field."""
self_node.getField("woolColor").setSFColor([1.0, 0.55, 0.72])
# --- State ---
wander_angle = random.uniform(-math.pi, math.pi)
step_count = 0
dogs = {} # name → (x, y); supports the dual-dog setup
peers = {} # name → (x, y); periodically pruned
penned = False
# Safety net for differential-drive sheep pinned against a wall.
_prev_x, _prev_y = None, None
_stuck_count = 0
STUCK_STEPS = 20
STUCK_DIST = 0.05
# --- Main loop ---
while robot.step(timestep) != -1:
step_count += 1
pos = gps.getValues()
x, y = pos[0], pos[1]
if not penned and is_penned(x, y):
penned = True
paint_pink()
# Stale peers get dropped periodically so a peer that's gone silent
# doesn't permanently distort the local CoM. Dogs are pruned too —
# otherwise a temporarily-silent dog stays in `dogs` forever and
# the closest-dog flee target stops being accurate.
if step_count % 30 == 0:
peers.clear()
dogs.clear()
while receiver.getQueueLength() > 0:
msg = receiver.getString()
receiver.nextPacket()
parts = msg.split(":")
# Legacy single-dog message: "dog:x:y".
# Dual-dog message: "dog:NAME:x:y".
if parts[0] == "dog" and len(parts) == 3:
dogs["ShepherdDog"] = (float(parts[1]), float(parts[2]))
elif parts[0] == "dog" and len(parts) >= 4:
dogs[parts[1]] = (float(parts[2]), float(parts[3]))
elif parts[0] == "sheep" and len(parts) >= 4 and parts[1] != name:
peers[parts[1]] = (float(parts[2]), float(parts[3]))
# Flee target = closest known dog; the flocking heuristic only needs
# one (vx, vy) repulsion vector regardless of how many dogs are out
# there. With two dogs at orthogonal axes, the sheep will see one of
# them as nearest at any moment and react to it; the other dog's
# influence enters through the sheep that does react to it pushing
# this sheep in turn (Reynolds peer-repulsion).
if dogs:
closest = min(dogs.values(), key=lambda d: math.hypot(d[0] - x, d[1] - y))
dog_xy = closest
else:
dog_xy = None
heading, speed, wander_angle = compute_heading_speed(
x=x, y=y, penned=penned, dog_xy=dog_xy, peers=peers,
wander_angle=wander_angle,
)
# Stuck-against-wall recovery: drive toward the field centre.
if _prev_x is not None:
moved = math.hypot(x - _prev_x, y - _prev_y)
_stuck_count = _stuck_count + 1 if moved < STUCK_DIST else 0
if _stuck_count >= STUCK_STEPS:
heading = math.atan2(-y, -x)
speed = MAX_SPEED
_stuck_count = 0
_prev_x, _prev_y = x, y
drive(heading, speed)
if step_count % 3 == 0:
emitter.send(f"sheep:{name}:{x:.4f}:{y:.4f}")