#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import json import math import statistics import sys from collections import defaultdict from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) HIGHER_BETTER = {"accuracy", "ap", "auc", "f1", "mrr", "rocauc"} LOWER_BETTER = {"mae", "raw-mae", "rmse", "mae_sum"} def metric_direction(metric: str) -> int: metric = metric.lower() if metric in HIGHER_BETTER: return 1 if metric in LOWER_BETTER or "mae" in metric or "rmse" in metric: return -1 return 1 def mean(xs: list[float]) -> float: return sum(xs) / len(xs) if xs else math.nan def stdev(xs: list[float]) -> float: return statistics.stdev(xs) if len(xs) > 1 else 0.0 def median(xs: list[float]) -> float: return statistics.median(xs) if xs else math.nan def fmt(x: float | None, digits: int = 4) -> str: if x is None: return "" if isinstance(x, float) and math.isnan(x): return "" return f"{x:.{digits}f}" def read_json(path: Path) -> dict[str, Any] | None: try: with path.open() as f: obj = json.load(f) except (OSError, json.JSONDecodeError): return None return obj if isinstance(obj, dict) else None def score_from_split(rep: dict[str, Any], split: str, metric: str) -> float | None: value = rep.get(split, {}).get(metric) return None if value is None else float(value) def ogb_compute_label(rep: dict[str, Any]) -> str: compute = str(rep["compute"]) t = int(rep.get("T", -1)) n_sup = int(rep.get("n_sup", -1)) if compute == "classic" and t == 0 and n_sup == 1: label = "classic" elif compute == "classic" and t == 0: label = f"view-only-T{t}-ns{n_sup}" elif compute == "rrog-act": target = str(rep.get("halt_target", "")) if target == "loss": target += f"{float(rep.get('halt_loss_threshold', 0.0) or 0.0):g}" label = ( f"rrog-act-{rep.get('act_train_mode', 'stream')}-T{rep.get('T')}-ns{rep.get('n_sup')}" f"-hm{rep.get('halt_max_steps')}-min{rep.get('halt_min_steps')}" f"-{target}-lq{float(rep.get('lam_q', 0.0) or 0.0):g}" f"-hex{float(rep.get('halt_exploration_prob', 0.0) or 0.0):g}" f"-qw{rep.get('q_warmup_epochs', 0)}" ) else: label = f"{compute}-T{rep.get('T')}-ns{rep.get('n_sup')}" ema = float(rep.get("ema", 0.0) or 0.0) if ema > 0: label += f"+ema{ema:g}" return label def ogb_compute_family(rep: dict[str, Any]) -> str: compute = str(rep["compute"]) t = int(rep.get("T", -1)) n_sup = int(rep.get("n_sup", -1)) if compute == "classic" and not (t == 0 and n_sup == 1): return "view-only" if t == 0 else "classic-nonbaseline" return compute def zinc_compute_label(rep: dict[str, Any]) -> tuple[str, str]: t = int(rep.get("T", -1)) n_sup = int(rep.get("n_sup", -1)) if rep.get("act"): return "rrog-act", f"rrog-act-T{t}-ns{n_sup}" if t == 0 and n_sup == 1: return "classic", "classic" if t == 0: return "view-only", f"view-only-T{t}-ns{n_sup}" label = f"fixed-rrog-T{t}-ns{n_sup}" if rep.get("loss_mode") == "trace": label += "+trace" return "fixed-rrog", label @dataclass(frozen=True) class Record: dataset: str view: str compute_family: str compute_label: str seed: int metric: str direction: int val: float test: float epochs: int hidden: int T: int n_sup: int ep: int source: str adaptive_test: float | None = None adaptive_steps: float | None = None fixed_steps: float | None = None @property def key(self) -> tuple[str, str, str, int]: return (self.dataset, self.view, self.compute_label, self.seed) @property def rank_key(self) -> tuple[int, int, int]: return (self.epochs, self.hidden, self.ep) def standard_record(path: Path, rep: dict[str, Any]) -> Record | None: required = {"dataset", "view", "compute", "seed", "metric", "val", "test"} if not required.issubset(rep): return None metric = str(rep["metric"]) val = score_from_split(rep, "val", metric) test = score_from_split(rep, "test", metric) if val is None or test is None: return None adaptive_test = score_from_split(rep, "test_adaptive", metric) return Record( dataset=str(rep["dataset"]).lower(), view=str(rep["view"]), compute_family=ogb_compute_family(rep), compute_label=ogb_compute_label(rep), seed=int(rep["seed"]), metric=metric, direction=metric_direction(metric), val=val, test=test, adaptive_test=adaptive_test, adaptive_steps=none_or_float(rep.get("adaptive_steps")), fixed_steps=none_or_float(rep.get("fixed_steps")), epochs=int(rep.get("epochs", 0) or 0), hidden=int(rep.get("hidden", 0) or 0), T=int(rep.get("T", -1) or -1), n_sup=int(rep.get("n_sup", -1) or -1), ep=int(rep.get("ep", 0) or 0), source=str(path), ) def zinc_record(path: Path, rep: dict[str, Any]) -> Record | None: if rep.get("dataset") != "ZINC-cycle56": return None if rep.get("K") != 1 or rep.get("select") != "none" or float(rep.get("sigma", 0.0)) != 0.0: return None if "val_mae" not in rep or "test_mae" not in rep: return None family, label = zinc_compute_label(rep) val = float(sum(rep["val_mae"])) test = float(sum(rep["test_mae"])) return Record( dataset="zinc-cycle56", view=str(rep.get("view", "gin")), compute_family=family, compute_label=label, seed=int(rep["seed"]), metric="mae_sum", direction=-1, val=val, test=test, epochs=int(rep.get("epochs", 0) or 0), hidden=int(rep.get("hidden", 0) or 0), T=int(rep.get("T", -1) or -1), n_sup=int(rep.get("n_sup", -1) or -1), ep=int(rep.get("ep", 0) or 0), source=str(path), ) def none_or_float(value: Any) -> float | None: if value is None: return None try: return float(value) except (TypeError, ValueError): return None def load_records(runs_dir: Path, min_epochs: int) -> tuple[list[Record], int]: candidates: dict[tuple[str, str, str, int], Record] = {} raw_count = 0 for path in sorted(runs_dir.glob("*.json")): rep = read_json(path) if rep is None: continue raw_count += 1 rec = standard_record(path, rep) or zinc_record(path, rep) if rec is None or rec.epochs < min_epochs: continue old = candidates.get(rec.key) if old is None or rec.rank_key > old.rank_key: candidates[rec.key] = rec return sorted(candidates.values(), key=lambda r: (r.dataset, r.view, r.compute_label, r.seed)), raw_count def group_cells(records: list[Record]) -> dict[tuple[str, str, str], list[Record]]: cells: dict[tuple[str, str, str], list[Record]] = defaultdict(list) for rec in records: cells[(rec.dataset, rec.view, rec.compute_label)].append(rec) return dict(cells) def summarize_records(records: list[Record]) -> dict[str, Any]: vals = [r.val for r in records] tests = [r.test for r in records] adaptive = [r.adaptive_test for r in records if r.adaptive_test is not None] steps = [r.adaptive_steps for r in records if r.adaptive_steps is not None] first = records[0] return { "dataset": first.dataset, "view": first.view, "compute_family": first.compute_family, "compute_label": first.compute_label, "metric": first.metric, "n": len(records), "seeds": " ".join(str(r.seed) for r in sorted(records, key=lambda x: x.seed)), "epochs_min": min(r.epochs for r in records), "epochs_max": max(r.epochs for r in records), "hidden": first.hidden, "T": first.T, "n_sup": first.n_sup, "val_mean": mean(vals), "val_std": stdev(vals), "test_mean": mean(tests), "test_std": stdev(tests), "adaptive_test_mean": mean(adaptive) if adaptive else math.nan, "adaptive_steps_mean": mean(steps) if steps else math.nan, "sources": " ".join(r.source for r in sorted(records, key=lambda x: x.seed)), } def paired_deltas(records: list[Record]) -> tuple[list[dict[str, Any]], list[Record]]: classic: dict[tuple[str, str, int], Record] = {} for rec in records: if rec.compute_label == "classic" and rec.compute_family == "classic": classic[(rec.dataset, rec.view, rec.seed)] = rec rows = [] unpaired = [] for rec in records: if rec.compute_label == "classic": continue base = classic.get((rec.dataset, rec.view, rec.seed)) if base is None: unpaired.append(rec) continue adaptive_delta = None if rec.adaptive_test is not None: adaptive_delta = rec.direction * (rec.adaptive_test - base.test) rows.append({ "dataset": rec.dataset, "view": rec.view, "compute_family": rec.compute_family, "compute_label": rec.compute_label, "seed": rec.seed, "metric": rec.metric, "direction": rec.direction, "base_val": base.val, "base_test": base.test, "val": rec.val, "test": rec.test, "adaptive_test": rec.adaptive_test, "val_delta": rec.direction * (rec.val - base.val), "test_delta": rec.direction * (rec.test - base.test), "adaptive_test_delta": adaptive_delta, "adaptive_steps": rec.adaptive_steps, "fixed_steps": rec.fixed_steps, "epochs": rec.epochs, "hidden": rec.hidden, "T": rec.T, "n_sup": rec.n_sup, "source": rec.source, "base_source": base.source, }) rows.sort(key=lambda r: (r["dataset"], r["view"], r["compute_label"], r["seed"])) return rows, unpaired def summarize_deltas(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: grouped: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list) for row in rows: grouped[(row["compute_family"], row["compute_label"])].append(row) out = [] for (family, label), group in sorted(grouped.items()): test_d = [float(r["test_delta"]) for r in group] val_d = [float(r["val_delta"]) for r in group] adaptive_d = [ float(r["adaptive_test_delta"]) for r in group if r["adaptive_test_delta"] is not None ] steps = [float(r["adaptive_steps"]) for r in group if r["adaptive_steps"] is not None] out.append({ "compute_family": family, "compute_label": label, "n": len(group), "test_mean_delta": mean(test_d), "test_median_delta": median(test_d), "test_positive": sum(x > 0 for x in test_d), "test_negative": sum(x < 0 for x in test_d), "val_mean_delta": mean(val_d), "val_positive": sum(x > 0 for x in val_d), "adaptive_mean_delta": mean(adaptive_d) if adaptive_d else math.nan, "adaptive_positive": sum(x > 0 for x in adaptive_d) if adaptive_d else "", "adaptive_negative": sum(x < 0 for x in adaptive_d) if adaptive_d else "", "adaptive_steps_mean": mean(steps) if steps else math.nan, }) return out def expected_specs() -> list[tuple[str, str, str]]: try: from rrog.runspecs import RUN_SPECS except Exception: return [] return sorted({(s.task.lower(), s.view, s.compute) for s in RUN_SPECS}) def coverage_rows( records: list[Record], expected_seeds: set[int], coverage_families: set[str], ) -> list[dict[str, Any]]: by_family: dict[tuple[str, str, str], list[Record]] = defaultdict(list) for rec in records: by_family[(rec.dataset, rec.view, rec.compute_family)].append(rec) rows = [] for dataset, view, family in expected_specs(): if coverage_families and family not in coverage_families: continue present = by_family.get((dataset, view, family), []) seeds = sorted({r.seed for r in present}) labels = sorted({r.compute_label for r in present}) missing_seeds = sorted(expected_seeds.difference(seeds)) if expected_seeds else [] if not present: status = "missing" elif missing_seeds: status = "missing-seeds" else: status = "ok" rows.append({ "dataset": dataset, "view": view, "compute_family": family, "status": status, "n_runs": len(present), "seeds": " ".join(map(str, seeds)), "missing_seeds": " ".join(map(str, missing_seeds)), "labels": " | ".join(labels), }) return rows def write_csv(path: Path, rows: list[dict[str, Any]], fields: list[str]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") writer.writeheader() for row in rows: writer.writerow(row) def md_table(headers: list[str], rows: list[list[str]]) -> str: out = [ "| " + " | ".join(headers) + " |", "| " + " | ".join(["---"] * len(headers)) + " |", ] out.extend("| " + " | ".join(row) + " |" for row in rows) return "\n".join(out) def build_markdown( records: list[Record], raw_count: int, cells: list[dict[str, Any]], deltas: list[dict[str, Any]], delta_summary: list[dict[str, Any]], coverage: list[dict[str, Any]], unpaired: list[Record], digits: int, ) -> str: missing = [r for r in coverage if r["status"] == "missing"] missing_seeds = [r for r in coverage if r["status"] == "missing-seeds"] negative = sorted(deltas, key=lambda r: float(r["test_delta"]))[:30] adaptive_negative = sorted( [r for r in deltas if r["adaptive_test_delta"] is not None], key=lambda r: float(r["adaptive_test_delta"]), )[:30] val_pos_test_neg = [ r for r in deltas if float(r["val_delta"]) > 0 and float(r["test_delta"]) < 0 ] lines = [ "# Result Audit", "", f"Generated: {datetime.now().isoformat(timespec='seconds')}", "", "## Scope", "", f"- Raw JSON files scanned: {raw_count}", f"- Deduplicated runs after min-epoch filter: {len(records)}", f"- Aggregated cells: {len(cells)}", f"- Paired non-classic deltas vs matching classic: {len(deltas)}", f"- Unpaired non-classic runs: {len(unpaired)}", f"- Missing expected cells: {len(missing)}", f"- Cells missing expected seeds: {len(missing_seeds)}", "", "## Delta Summary", "", md_table( [ "family", "label", "n", "test mean", "test median", "test +/-", "val mean", "adaptive mean", "steps", ], [ [ r["compute_family"], r["compute_label"], str(r["n"]), fmt(r["test_mean_delta"], digits), fmt(r["test_median_delta"], digits), f"{r['test_positive']}/{r['test_negative']}", fmt(r["val_mean_delta"], digits), fmt(r["adaptive_mean_delta"], digits), fmt(r["adaptive_steps_mean"], 2), ] for r in delta_summary ], ), "", "## Worst Fixed-Test Deltas", "", md_table( ["dataset", "view", "label", "seed", "metric", "val delta", "test delta", "source"], [ [ r["dataset"], r["view"], r["compute_label"], str(r["seed"]), r["metric"], fmt(r["val_delta"], digits), fmt(r["test_delta"], digits), Path(r["source"]).name, ] for r in negative ], ), "", ] if adaptive_negative: lines.extend([ "## Worst Adaptive-Test Deltas", "", md_table( ["dataset", "view", "label", "seed", "metric", "adaptive delta", "steps", "source"], [ [ r["dataset"], r["view"], r["compute_label"], str(r["seed"]), r["metric"], fmt(r["adaptive_test_delta"], digits), fmt(r["adaptive_steps"], 2), Path(r["source"]).name, ] for r in adaptive_negative ], ), "", ]) if val_pos_test_neg: lines.extend([ "## Val-Positive Test-Negative Cases", "", md_table( ["dataset", "view", "label", "seed", "val delta", "test delta"], [ [ r["dataset"], r["view"], r["compute_label"], str(r["seed"]), fmt(r["val_delta"], digits), fmt(r["test_delta"], digits), ] for r in sorted(val_pos_test_neg, key=lambda x: float(x["test_delta"]))[:30] ], ), "", ]) if missing[:40]: lines.extend([ "## First Missing Expected Cells", "", md_table( ["dataset", "view", "family"], [[r["dataset"], r["view"], r["compute_family"]] for r in missing[:40]], ), "", ]) if unpaired[:40]: lines.extend([ "## First Unpaired Non-Classic Runs", "", md_table( ["dataset", "view", "label", "seed", "source"], [ [r.dataset, r.view, r.compute_label, str(r.seed), Path(r.source).name] for r in unpaired[:40] ], ), "", ]) lines.extend([ "## Files", "", "- `analysis/result_cells.csv`: one row per deduplicated dataset/backbone/compute cell.", "- `analysis/paired_deltas.csv`: seed-paired deltas against matching classic baselines.", "- `analysis/delta_summary.csv`: aggregate delta statistics by compute label.", "- `analysis/coverage.csv`: expected matrix coverage from `rrog.runspecs`.", "", ]) return "\n".join(lines) def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--runs-dir", default="runs") ap.add_argument("--out-dir", default="analysis") ap.add_argument("--min-epochs", type=int, default=10) ap.add_argument("--expected-seeds", default="0") ap.add_argument("--coverage-families", default="classic fixed-rrog rrog-act") ap.add_argument("--digits", type=int, default=4) args = ap.parse_args() runs_dir = Path(args.runs_dir) out_dir = Path(args.out_dir) expected_seeds = { int(x) for part in args.expected_seeds.replace(",", " ").split() for x in [part.strip()] if x } coverage_families = { x for part in args.coverage_families.replace(",", " ").split() for x in [part.strip()] if x } records, raw_count = load_records(runs_dir, args.min_epochs) grouped = group_cells(records) cells = [summarize_records(recs) for _, recs in sorted(grouped.items())] deltas, unpaired = paired_deltas(records) delta_summary = summarize_deltas(deltas) coverage = coverage_rows(records, expected_seeds, coverage_families) write_csv(out_dir / "result_cells.csv", cells, [ "dataset", "view", "compute_family", "compute_label", "metric", "n", "seeds", "epochs_min", "epochs_max", "hidden", "T", "n_sup", "val_mean", "val_std", "test_mean", "test_std", "adaptive_test_mean", "adaptive_steps_mean", "sources", ]) write_csv(out_dir / "paired_deltas.csv", deltas, [ "dataset", "view", "compute_family", "compute_label", "seed", "metric", "direction", "base_val", "base_test", "val", "test", "adaptive_test", "val_delta", "test_delta", "adaptive_test_delta", "adaptive_steps", "fixed_steps", "epochs", "hidden", "T", "n_sup", "source", "base_source", ]) write_csv(out_dir / "delta_summary.csv", delta_summary, [ "compute_family", "compute_label", "n", "test_mean_delta", "test_median_delta", "test_positive", "test_negative", "val_mean_delta", "val_positive", "adaptive_mean_delta", "adaptive_positive", "adaptive_negative", "adaptive_steps_mean", ]) write_csv(out_dir / "coverage.csv", coverage, [ "dataset", "view", "compute_family", "status", "n_runs", "seeds", "missing_seeds", "labels", ]) (out_dir / "result_audit.md").write_text( build_markdown( records, raw_count, cells, deltas, delta_summary, coverage, unpaired, args.digits, ) ) print(f"wrote {out_dir / 'result_audit.md'}") print(f"records={len(records)} cells={len(cells)} paired_deltas={len(deltas)}") if __name__ == "__main__": main()