Sheep training flock _ improver
This commit is contained in:
+44
-8
@@ -59,9 +59,13 @@ class HerdingEnv(gym.Env):
|
||||
W_PEN_BONUS = 10.0 # per sheep penned
|
||||
W_COMPLETE = 100.0 # all sheep penned
|
||||
W_STEP_COST = 0.02 # time penalty — strong enough to punish doing nothing
|
||||
W_COMPACT = 0.0 # reward for flock-radius reduction (off by default)
|
||||
ALIGN_SHAPE = "standoff" # "standoff" (peaks at IDEAL) | "near" (peaks at 0)
|
||||
ALIGN_GATED = True # gate alignment on action magnitude
|
||||
|
||||
def __init__(self, n_sheep: int = 1, max_steps: int = 2000,
|
||||
render_mode: str = None, random_n_sheep: bool = False):
|
||||
render_mode: str = None, random_n_sheep: bool = False,
|
||||
reward_cfg: dict = None):
|
||||
super().__init__()
|
||||
assert 1 <= n_sheep <= self.MAX_SHEEP
|
||||
self.n_sheep = n_sheep
|
||||
@@ -69,6 +73,14 @@ class HerdingEnv(gym.Env):
|
||||
self.render_mode = render_mode
|
||||
self.random_n_sheep = random_n_sheep # if True, randomise n_sheep each reset
|
||||
|
||||
# Override class-default reward weights / shape with per-instance config
|
||||
# so sweeps can ship configs into subprocess envs via pickled make_env.
|
||||
if reward_cfg:
|
||||
for k, v in reward_cfg.items():
|
||||
if not hasattr(self.__class__, k):
|
||||
raise ValueError(f"unknown reward_cfg key: {k}")
|
||||
setattr(self, k, v)
|
||||
|
||||
# Fixed 16-dim observation regardless of n_sheep:
|
||||
# dog_pos(2) + rel_com(2) + rel_far1(2) + rel_far2(2) + rel_far3(2)
|
||||
# + com_to_pen(2) + far1_to_pen(2) + radius(1) + frac_penned(1)
|
||||
@@ -127,8 +139,12 @@ class HerdingEnv(gym.Env):
|
||||
|
||||
# Dog: 50% of resets start already behind the flock (anti-pen side,
|
||||
# within flee range) to give early training aligned experiences.
|
||||
# Use the flock COM as the reference (not sheep[0]) so the bias
|
||||
# generalizes from 1-sheep to multi-sheep without putting the dog
|
||||
# in front of or inside the flock.
|
||||
if self.np_random.random() < 0.5:
|
||||
ref = self.sheep_pos[0]
|
||||
active_pts = self.sheep_pos[:self.n_sheep][~self.penned[:self.n_sheep]]
|
||||
ref = active_pts.mean(axis=0) if len(active_pts) else self.sheep_pos[0]
|
||||
away = ref - self.PEN_CENTER
|
||||
d = float(np.linalg.norm(away))
|
||||
if d > 0.1:
|
||||
@@ -154,8 +170,13 @@ class HerdingEnv(gym.Env):
|
||||
self.sheep_pos[:self.n_sheep][active] - self.PEN_CENTER, axis=1
|
||||
).sum()
|
||||
)
|
||||
com0 = self.sheep_pos[:self.n_sheep][active].mean(axis=0)
|
||||
self._prev_radius = float(
|
||||
np.linalg.norm(self.sheep_pos[:self.n_sheep][active] - com0, axis=1).max()
|
||||
)
|
||||
else:
|
||||
self._prev_pen_dist_sum = 0.0
|
||||
self._prev_radius = 0.0
|
||||
|
||||
return self._obs(), {}
|
||||
|
||||
@@ -322,22 +343,37 @@ class HerdingEnv(gym.Env):
|
||||
pen_dir = (self.PEN_CENTER - com) / com_dist
|
||||
dog_dir = (self.dog_pos - com) / d_dog_com
|
||||
cosine = -float(np.dot(pen_dir, dog_dir))
|
||||
proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST)
|
||||
# Gate on action magnitude: only paid when the dog is actually moving.
|
||||
# Without this, parking on the anti-pen side farms +0.03/step against
|
||||
# the -0.02 step_cost and the policy collapses to sit-still.
|
||||
move_gate = min(1.0, float(np.linalg.norm(action)))
|
||||
if self.ALIGN_SHAPE == "standoff":
|
||||
IDEAL = 0.5 * (self.SEPARATION_DIST + self.FLEE_DIST)
|
||||
HALF = self.FLEE_DIST - IDEAL
|
||||
proximity = max(0.0, 1.0 - abs(d_dog_com - IDEAL) / HALF)
|
||||
else: # "near"
|
||||
proximity = max(0.0, 1.0 - d_dog_com / self.FLEE_DIST)
|
||||
move_gate = (min(1.0, float(np.linalg.norm(action)))
|
||||
if self.ALIGN_GATED else 1.0)
|
||||
alignment = cosine * proximity * move_gate * self.W_ALIGN
|
||||
else:
|
||||
alignment = 0.0
|
||||
|
||||
# Compactness shaping: reward decreases in flock radius (active sheep only)
|
||||
if self.W_COMPACT and active.any():
|
||||
cur_radius = float(np.linalg.norm(
|
||||
self.sheep_pos[:self.n_sheep][active] - com, axis=1
|
||||
).max())
|
||||
r_compact = (self._prev_radius - cur_radius) * self.W_COMPACT
|
||||
self._prev_radius = cur_radius
|
||||
else:
|
||||
r_compact = 0.0
|
||||
|
||||
r_pen_bonus = newly_penned * self.W_PEN_BONUS
|
||||
r_step_cost = -self.W_STEP_COST
|
||||
r_complete = self.W_COMPLETE if n_penned == self.n_sheep else 0.0
|
||||
reward = r_progress + alignment + r_pen_bonus + r_step_cost + r_complete
|
||||
reward = (r_progress + alignment + r_compact + r_pen_bonus
|
||||
+ r_step_cost + r_complete)
|
||||
rcomps = {
|
||||
"progress": float(r_progress),
|
||||
"alignment": float(alignment),
|
||||
"compact": float(r_compact),
|
||||
"pen_bonus": float(r_pen_bonus),
|
||||
"step_cost": float(r_step_cost),
|
||||
"complete": float(r_complete),
|
||||
|
||||
Reference in New Issue
Block a user