summaryrefslogtreecommitdiff
path: root/analysis_2x2/analyze_clv.py
blob: b4ae12ae2ddf126a563c56c70f59af2fa862ddef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""Decisive test for (b): does the leading-CLV GEOMETRY predict correctness beyond
drift + q_halt (which already fully explain the scalar spectrum)?

For each geometric feature (leading-CLV H-fraction, token participation ratio, answer-token PR,
readout alignment): raw AUC, partial corr with correctness | (drift+q_halt), and the multivariate
held-out ridge-AUC GAIN from adding all CLV features on top of drift+q_halt.
Gain ~ 0  => geometry also reducible (honest pivot to (a)).
Gain > 0  => a non-reducible dynamical-geometry signal (chase further).
"""
from __future__ import annotations
import sys, glob
import numpy as np
rng = np.random.default_rng(0)


def resid(y, X):
    A = np.concatenate([X, np.ones((len(y), 1))], 1)
    b, _, _, _ = np.linalg.lstsq(A, y, rcond=None)
    return y - A @ b


def pcorr(a, b, Z):
    return float(np.corrcoef(resid(a.astype(float), Z), resid(b.astype(float), Z))[0, 1])


def auc(score, y):
    pos, neg = score[y == 1], score[y == 0]
    if len(pos) == 0 or len(neg) == 0:
        return float("nan")
    a = np.concatenate([pos, neg]); o = np.argsort(a); r = np.empty(len(a)); r[o] = np.arange(1, len(a) + 1)
    return float((r[:len(pos)].sum() - len(pos) * (len(pos) + 1) / 2) / (len(pos) * len(neg)))


def ridge_cv_auc(X, y, folds=5, lam=1.0):
    n = len(y); idx = rng.permutation(n); Xs = (X - X.mean(0)) / (X.std(0) + 1e-8); aucs = []
    for f in range(folds):
        te = idx[f::folds]; tr = np.setdiff1d(idx, te)
        A = np.concatenate([Xs[tr], np.ones((len(tr), 1))], 1)
        w = np.linalg.solve(A.T @ A + lam * np.eye(A.shape[1]), A.T @ y[tr])
        pr = np.concatenate([Xs[te], np.ones((len(te), 1))], 1) @ w
        aucs.append(auc(pr, y[te]))
    return float(np.nanmean(aucs))


def go(tag, npz):
    d = np.load(npz)
    y = d["exact_correct"].astype(float)
    dr = np.concatenate([d["drift_zH"], d["drift_zL"]], 1).astype(float)
    qh = np.concatenate([d["q_halt"], d["q_continue"]], 1).astype(float)
    ctrl = np.concatenate([dr, qh], 1)
    feats = {nm: d[nm][:, 0].astype(float) for nm in ["clv_hfrac", "clv_pr", "clv_pr_ans", "clv_readout"]}
    clv_all = np.concatenate([d[nm].astype(float) for nm in ["clv_hfrac", "clv_pr", "clv_pr_ans", "clv_readout"]], 1)
    print(f"\n=== {tag}  (n={len(y)}, acc={y.mean():.3f}) ===")
    print(f"{'feature':14s} {'rawAUC':>7} {'partial|drift+qhalt':>20}")
    for nm, f in feats.items():
        print(f"{nm:14s} {auc(f, y):>7.3f} {pcorr(f, y, ctrl):>20.3f}")
    base = ridge_cv_auc(ctrl, y)
    plus = ridge_cv_auc(np.concatenate([ctrl, clv_all], 1), y)
    clv_alone = ridge_cv_auc(clv_all, y)
    print(f"held-out ridge AUC: CLV-alone={clv_alone:.3f} | drift+qhalt={base:.3f} | +CLV-geom={plus:.3f}  GAIN={plus-base:+.3f}")
    print(f"  -> GAIN ~0 means geometry reduces to convergence+confidence; >0 means non-reducible signal")


if __name__ == "__main__":
    CLV = "/home/yurenh2/rrm/research/flossing/analysis_2x2/clv"
    for tag, pat in [("Sudoku TRM", f"{CLV}/trm_geom_step58590_n512.npz"),
                     ("Sudoku HRM", f"{CLV}/hrm_geom_step26040_n512.npz")]:
        g = glob.glob(pat)
        if g:
            go(tag, g[0])
        else:
            print(f"[pending] {tag}: {pat} not found yet")