"""Env-side evaluation of analytic or learned policies. Reports success rate, mean steps and mean penned per flock size for ``n_sheep ∈ 1..max_flock`` across ``--n-seeds`` seeds each. Usage:: python -m training.eval --policy training/runs/rl --n-seeds 10 python -m training.eval --policy strombom """ from __future__ import annotations import argparse import os from pathlib import Path from statistics import mean import numpy as np # Configure field geometry before other herding imports read it at module level. from herding.world.geometry import configure_from_args as _configure_from_args _configure_from_args() from herding.world.geometry import MAX_SHEEP, PEN_ENTRY from herding.control.sequential import compute_action as sequential_action from herding.control.strombom import compute_action as strombom_action from training.herding_env import HerdingEnv def rollout(env: HerdingEnv, predict_fn, max_steps: int) -> dict: obs, _ = env.reset() for t in range(max_steps): action = predict_fn(env, obs) obs, _r, terminated, truncated, info = env.step(action) if terminated or truncated: return { "success": bool(info.get("is_success", False)), "steps": info.get("steps", t + 1), "n_penned": info.get("n_penned", 0), } return {"success": False, "steps": max_steps, "n_penned": int(env.sheep_penned.sum())} def make_analytic_predictor(action_fn, drive_mode: str = "differential"): """Wrap an analytic teacher so it runs on the env's exposed perception (tracker in LiDAR mode, GT in privileged mode).""" def _predict(env, _obs): positions = env.perceived_positions() vx, vy, _mode = action_fn((env.dog_x, env.dog_y), positions, PEN_ENTRY) if drive_mode == "mecanum": return np.array([vx, vy, 0.0], dtype=np.float32) return np.array([vx, vy], dtype=np.float32) return _predict def make_strombom_predictor(drive_mode: str = "differential"): return make_analytic_predictor(strombom_action, drive_mode) def make_policy_predictor(model, vecnorm, recurrent: bool = False): state = {"lstm": None, "first": True} def _predict(_env, obs): obs_b = np.asarray(obs, dtype=np.float32).reshape(1, -1) if vecnorm is not None: obs_b = vecnorm.normalize_obs(obs_b) if recurrent: episode_start = np.array([state["first"]], dtype=bool) action, new_state = model.predict( obs_b, state=state["lstm"], episode_start=episode_start, deterministic=True, ) state["lstm"] = new_state state["first"] = False else: action, _ = model.predict(obs_b, deterministic=True) return action[0] return _predict def _reset_recurrent(predict_fn): """Reset the recurrent state between episodes.""" # The closure stores `state` dict; reach in via __closure__. for cell in predict_fn.__closure__ or []: if isinstance(cell.cell_contents, dict) and "lstm" in cell.cell_contents: cell.cell_contents["lstm"] = None cell.cell_contents["first"] = True return def main(): parser = argparse.ArgumentParser() parser.add_argument("--policy", required=True, help="'strombom', 'sequential', or path to a " "policy directory / zip.") parser.add_argument("--n-seeds", type=int, default=10) parser.add_argument("--max-steps", type=int, default=5000) parser.add_argument("--max-flock", type=int, default=MAX_SHEEP) parser.add_argument("--difficulty", type=float, default=1.0, help="0 = sheep spawn near the gate (easy); " "1 = full field (deployment distribution).") parser.add_argument("--drive-mode", default="differential", choices=["differential", "mecanum"], help="Drive mode for the dog robot.") parser.add_argument("--world", default=None, choices=["field", "field_round"], help="World shape. If not set, uses HERDING_WORLD " "env var or defaults to 'field'.") args = parser.parse_args() drive_mode = args.drive_mode frame_stack = 1 if args.policy == "strombom": predict = make_analytic_predictor(strombom_action, drive_mode) elif args.policy == "sequential": predict = make_analytic_predictor(sequential_action, drive_mode) else: from stable_baselines3 import PPO run = Path(args.policy) if run.is_file(): zip_path = run else: for name in ("policy.zip", "final.zip"): if (run / name).exists(): zip_path = run / name break else: raise FileNotFoundError( f"No checkpoint found in {run} " f"(tried policy.zip, final.zip)" ) # Try RecurrentPPO first (sb3-contrib) for LSTM policies, then # fall back to PPO for MLP policies. recurrent = False model = None try: from sb3_contrib import RecurrentPPO model = RecurrentPPO.load(str(zip_path), device="auto") recurrent = True print(f"[eval] loaded RecurrentPPO (LSTM) policy") except Exception: model = PPO.load(str(zip_path), device="auto") from herding.perception.obs import OBS_DIM as _SINGLE policy_obs_dim = int(model.observation_space.shape[0]) if policy_obs_dim % _SINGLE == 0 and policy_obs_dim // _SINGLE >= 1: frame_stack = policy_obs_dim // _SINGLE if frame_stack > 1: print(f"[eval] policy expects frame_stack={frame_stack}") vecnorm = None vn_path = run / "vecnormalize.pkl" if not vn_path.exists() and run.parent.name != "best": vn_path = run.parent / "vecnormalize.pkl" if vn_path.exists(): import pickle with open(vn_path, "rb") as f: vecnorm = pickle.load(f) vecnorm.training = False vecnorm.norm_reward = False predict = make_policy_predictor(model, vecnorm, recurrent=recurrent) # Infer drive_mode from policy action dim if using a learned policy. if args.policy not in ("strombom", "sequential"): policy_action_dim = int(model.action_space.shape[0]) if policy_action_dim == 2 and drive_mode == "mecanum": drive_mode = "differential" print(f"[eval] policy has 2D actions — overriding drive_mode " f"to differential") elif policy_action_dim == 3 and drive_mode == "differential": drive_mode = "mecanum" print(f"[eval] policy has 3D actions — overriding drive_mode " f"to mecanum") print(f"{'n_sheep':>8} {'success%':>10} {'mean_steps':>12} {'mean_penned':>12}") print("-" * 46) for n in range(1, args.max_flock + 1): successes, steps, penned = [], [], [] for seed in range(args.n_seeds): env = HerdingEnv(n_sheep=n, max_steps=args.max_steps, difficulty=args.difficulty, seed=seed, frame_stack=frame_stack, drive_mode=drive_mode) _reset_recurrent(predict) r = rollout(env, predict, args.max_steps) successes.append(int(r["success"])) steps.append(r["steps"]) penned.append(r["n_penned"]) sr = 100.0 * mean(successes) ms = mean(steps) mp = mean(penned) print(f"{n:>8d} {sr:>9.1f}% {ms:>12.0f} {mp:>12.2f}") if __name__ == "__main__": main()