From 634150c9831fec2220b044294d8a205b4fa5f166 Mon Sep 17 00:00:00 2001 From: Johnny Fernandes Date: Thu, 30 Apr 2026 03:33:42 +0100 Subject: [PATCH] Removing deduplication system of configs --- pipeline/orchestrator.py | 85 ---------------------------------------- 1 file changed, 85 deletions(-) diff --git a/pipeline/orchestrator.py b/pipeline/orchestrator.py index e1cbd44..93212fe 100644 --- a/pipeline/orchestrator.py +++ b/pipeline/orchestrator.py @@ -399,86 +399,6 @@ class EphemeralVastRunner: return "generator/run.py", "generator/outputs" return "classifier/run.py", "classifier/outputs" - # Merge two dicts recursively (override wins on leaf keys) - @staticmethod - def _deep_merge_dicts(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: - result = base.copy() - for key, value in override.items(): - if key in result and isinstance(result[key], dict) and isinstance(value, dict): - result[key] = EphemeralVastRunner._deep_merge_dicts(result[key], value) - else: - result[key] = value - return result - - # Recursively resolve "extends" references in JSON configs with cycle detection - def _load_config_with_extends(self, config_path: Path, seen: set[Path] | None = None) -> dict[str, Any]: - if seen is None: - seen = set() - resolved = config_path.resolve() - if resolved in seen: - raise ValueError(f"Circular config inheritance detected at: {config_path}") - seen.add(resolved) - with open(config_path, encoding="utf-8") as fh: - cfg = json.load(fh) - base_ref = cfg.pop("extends", None) - if not base_ref: - seen.remove(resolved) - return cfg - base_path = (config_path.parent / base_ref).resolve() - base_cfg = self._load_config_with_extends(base_path, seen=seen) - seen.remove(resolved) - return self._deep_merge_dicts(base_cfg, cfg) - - # Resolve a config the same way the runners do: extends chain first, - # then shared.json overlaid underneath. Mirrors load_config() in - # classifier/src/utils/config.py and generator/src/utils/config.py — keep - # in sync if merge semantics ever change. - def _load_config_merged(self, config_path: Path) -> dict[str, Any]: - cfg = self._load_config_with_extends(config_path) - shared_path = config_path.parent.parent / "shared.json" - if shared_path.exists(): - with open(shared_path, encoding="utf-8") as fh: - shared_cfg = json.load(fh) - cfg = self._deep_merge_dicts(shared_cfg, cfg) - return cfg - - # Return a stable signature used to detect duplicate training configs - def _normalized_config_signature(self, config_path: Path) -> str: - cfg = self._load_config_merged(config_path) - # run_name should not influence whether two configs are equivalent to train. - cfg.pop("run_name", None) - return json.dumps(cfg, sort_keys=True, separators=(",", ":")) - - # Detect configs that are pure extends (only run_name + extends, no new training settings) - # These are pointers to an already-trained experiment and should be skipped - def _is_pure_extend(self, config_path: Path) -> str | None: - with open(config_path, encoding="utf-8") as fh: - raw = json.load(fh) - if "extends" not in raw: - return None - non_meta = {k for k in raw if k not in ("run_name", "extends")} - if non_meta: - return None - base_path = (config_path.parent / raw["extends"]).resolve() - return str(base_path.relative_to(self.project_root)) - - def _dedupe_training_configs(self, config_paths: list[Path]) -> list[Path]: - seen: dict[str, Path] = {} - deduped: list[Path] = [] - for cp in config_paths: - pure_extends = self._is_pure_extend(cp) - if pure_extends is not None: - print(f"Skipping {cp.name} (pure extend of {pure_extends})") - continue - sig = self._normalized_config_signature(cp) - if sig in seen: - first = seen[sig] - print(f"Skipping duplicate config {cp.name} (same training settings as {first.name})") - continue - seen[sig] = cp - deduped.append(cp) - return deduped - # ── remote directory checks ─────────────────────────────────────── # Check whether a directory exists on the remote host @@ -639,11 +559,6 @@ class EphemeralVastRunner: raise FileNotFoundError(f"Config not found: {cp}") resolved.append(cp) - resolved = self._dedupe_training_configs(resolved) - # Abort early if all configs were duplicates - if not resolved: - raise ValueError("No unique configs to run after deduplication.") - n = len(resolved) _, first_output_root = self._module_for_config(resolved[0]) local_output_root = self.project_root / first_output_root