"""Build notebooks/recursive_reasoning_chaos.ipynb via nbformat. Self-contained playground: (1) analytically-tractable transient-chaos toy (no GPU), (2) load a trained TRM/HRM from HuggingFace, (3) extended rollout showing TRM=escapable transient chaos vs HRM=trapped chaotic attractor. HF_REPO is filled in after the upload step. """ import nbformat as nbf from pathlib import Path HF_REPO = "blackhao0426/recursive-reasoning-chaos" # HF account (GitHub is YurenHao0426) nb = nbf.v4.new_notebook() C = [] def md(t): C.append(nbf.v4.new_markdown_cell(t)) def code(t): C.append(nbf.v4.new_code_cell(t)) md(f"""# Recursive Reasoning Failures are Chaotic — and it's *transient chaos* Small recursive reasoners (HRM, TRM) iterate a latent state to solve puzzles (Sudoku, Maze). Measured along the inference trajectory, **failed examples are more chaotic** (higher finite-time Lyapunov exponent / latent drift) than successful ones, in the *same* trained network. This notebook lets you reproduce and play with the mechanism: 1. **Toy model** (pure numpy, no GPU) — *transient chaos*: chaotic search of latent space until the trajectory escapes into the solution basin. Failures = not-yet-escaped trajectories. 2. **Real trained model** loaded from HuggingFace (`{HF_REPO}`). 3. **Extended rollout** — run the recurrence far beyond its training budget. Both architectures' failures sit on a chaotic *saddle* (transient chaos), not a wrong fixed point — they just escape at very different rates: **TRM** failures mostly escape and self-correct given enough compute; **HRM** failures are far more strongly trapped (most keep churning). 4. **Basin accessibility** — restart a trapped puzzle from perturbed initial latent states. A small kick frees most of TRM's (IC-determined, large basin); a hard core of HRM's never escapes any nearby initial condition (input-determined). Companion analysis repo: `github.com/YurenHao0426/recursive-reasoning-dynamics`.""") md("## 0. Setup") code("""# minimal deps; torch+einops+pydantic are enough to load these models (TRM-Sudoku is MLP-mixer, # no FlashAttention needed -> runs on any GPU, even CPU). %pip install -q torch einops pydantic huggingface_hub numpy matplotlib import numpy as np, matplotlib.pyplot as plt, torch print("torch", torch.__version__, "| cuda", torch.cuda.is_available())""") md("""## 1. The toy model — transient chaos (no GPU, runs in seconds) A trajectory chaotically *searches* `[0,1]` (logistic map, λ=ln2≈+0.69) until it lands within `eps` of the solution `s` (the "puzzle"), then it converges (λ=ln0.5<0). At a fixed readout time `T`: **captured = success** (FTLE low), **still searching = failure** (FTLE high). The escape time is ~geometric (chaotic-saddle signature) and the FTLE separation is purely a *finite-time* effect — it vanishes as `T→∞` because everyone eventually escapes.""") code("""def run_toy(n=20000, T=16, eps=0.04, seed=0): rg = np.random.default_rng(seed) s = rg.uniform(0.15, 0.85, n); x = rg.uniform(0, 1, n) captured = np.zeros(n, bool); logd = np.zeros(n) for t in range(T): search = ~captured ld = np.where(search, np.log(np.abs(4*(1-2*x))+1e-12), np.log(0.5)) xn = np.where(search, 4*x*(1-x), s + 0.5*(x-s)) captured |= search & (np.abs(xn-s) < eps); x = xn; logd += ld ftle = logd / T success = captured & (np.abs(x-s) < 0.05) return ftle, success def auc(score, y): p, n = score[y==1], score[y==0]; a=np.concatenate([p,n]); o=np.argsort(a) r=np.empty(len(a)); r[o]=np.arange(1,len(a)+1) return (r[:len(p)].sum()-len(p)*(len(p)+1)/2)/(len(p)*len(n)) ftle, succ = run_toy(T=16) print(f"success rate {succ.mean():.2f} | FTLE success {np.median(ftle[succ]):+.3f} vs failure {np.median(ftle[~succ]):+.3f}") print(f"AUC(-FTLE -> success) = {auc(-ftle, succ.astype(int)):.3f} (failure more chaotic)") fig,ax=plt.subplots(1,2,figsize=(11,4)) b=np.linspace(-0.5,0.75,50) ax[0].hist(ftle[succ],b,alpha=.6,color='g',density=True,label='success'); ax[0].hist(ftle[~succ],b,alpha=.6,color='r',density=True,label='failure') ax[0].set_title('toy: failure more chaotic'); ax[0].set_xlabel('finite-time Lyapunov exp'); ax[0].legend() Ts=[4,8,16,32,64,128,256]; A=[auc(-run_toy(T=T)[0],run_toy(T=T)[1].astype(int)) for T in Ts]; R=[run_toy(T=T)[1].mean() for T in Ts] ax[1].plot(Ts,A,'o-',label='AUC(-FTLE->success)'); ax[1].plot(Ts,R,'s--',label='success rate'); ax[1].set_xscale('log') ax[1].set_xlabel('readout time T'); ax[1].set_title('finite-time: separation vanishes as T->inf'); ax[1].legend(); plt.tight_layout(); plt.show()""") md(f"""## 2. Load a trained model from HuggingFace Downloads the model code + checkpoint + config from `{HF_REPO}`. `MODEL` ∈ {{`trm_sudoku`, `hrm_sudoku`}}.""") code(f"""import sys, yaml, json from pathlib import Path from huggingface_hub import snapshot_download HF_REPO = "{HF_REPO}" MODEL = "trm_sudoku" # or "hrm_sudoku" root = Path(snapshot_download(HF_REPO)) # TRM and HRM ship separate `models/` packages -> put the right one on the path. # (To switch MODEL, restart the kernel: Python caches the `models` package name.) sys.path.insert(0, str(root / ("code_trm" if MODEL.startswith("trm") else "code_hrm"))) cfg = yaml.safe_load((root / MODEL / "all_config.yaml").read_text()) meta = json.loads((root / "data" / "sudoku_meta.json").read_text()) arch = dict(cfg["arch"]); arch.update(batch_size=64, seq_len=meta["seq_len"], vocab_size=meta["vocab_size"], num_puzzle_identifiers=meta["num_puzzle_identifiers"], causal=False) if MODEL.startswith("trm"): from models.recursive_reasoning.trm import TinyRecursiveReasoningModel_ACTV1 as M else: from models.hrm.hrm_act_v1 import HierarchicalReasoningModel_ACTV1 as M model = M(arch) sd = torch.load(root / MODEL / "weights.pt", map_location="cpu", weights_only=True) model.load_state_dict({{k.replace("_orig_mod.","").replace("model.",""): v for k,v in sd.items()}}, strict=False) dev = "cuda" if torch.cuda.is_available() else "cpu"; model.to(dev).eval() inner = model.inner inp = np.load(root/"data"/"sudoku_test_inputs.npy"); lab = np.load(root/"data"/"sudoku_test_labels.npy") pid = np.load(root/"data"/"sudoku_test_pid.npy") print(f"loaded {{MODEL}}: hidden={{inner.config.hidden_size}}, H_cycles={{inner.config.H_cycles}}, L_cycles={{inner.config.L_cycles}}, test puzzles={{len(inp)}}")""") md("""## 3. Extended rollout — the mechanism Run the recurrence `N_SEG` segments (far past the 16-segment training budget) and watch the fate of trajectories that fail at segment 16. Re-run cell 2 with `MODEL="hrm_sudoku"` to see the contrast.""") code("""def extended_rollout(inp, lab, pid, n=256, n_seg=128, seed=0): rng=np.random.default_rng(seed); idx=rng.choice(len(inp), n, replace=False) pe=inner.puzzle_emb_len; sf=inner.config.seq_len+pe; hid=inner.config.hidden_size is_hrm = hasattr(inner, "H_level") X=torch.tensor(inp[idx].astype(np.int32),device=dev); Y=torch.tensor(lab[idx].astype(np.int32),device=dev) P=torch.tensor(pid[idx].astype(np.int32),device=dev) EX=[]; DR=[] with torch.no_grad(): zH=inner.H_init.unsqueeze(0).expand(n,sf,hid).clone().to(inner.forward_dtype) zL=inner.L_init.unsqueeze(0).expand(n,sf,hid).clone().to(inner.forward_dtype) si=dict(cos_sin=inner.rotary_emb() if hasattr(inner,"rotary_emb") else None) emb=inner._input_embeddings(X,P); m=Y>0; prev=None for _ in range(n_seg): for _h in range(inner.config.H_cycles): for _l in range(inner.config.L_cycles): zL=inner.L_level(zL, zH+emb, **si) zH=(inner.H_level if is_hrm else inner.L_level)(zH, zL, **si) p=inner.lm_head(zH)[:,pe:].float().argmax(-1) EX.append(((p==Y)|~m).all(-1).float().cpu().numpy()) DR.append((torch.zeros(n) if prev is None else (zH-prev).float().flatten(1).norm(1).cpu()).numpy()) prev=zH.detach() return np.stack(EX,1), np.stack(DR,1) ex, dr = extended_rollout(inp, lab, pid, n=256, n_seg=128) T=ex.shape[1]; fail=ex[:,15]==0; nf=fail.sum() print(f"accuracy @16={ex[:,15].mean():.3f} @{T}={ex[:,-1].mean():.3f}") print(f"of {nf} step-16 failures: self-resolve to CORRECT by seg{T}: {(fail&(ex[:,-1]==1)).sum()/nf*100:.0f}%") ld=dr[:,-4:].mean(1) print(f"median latent drift -- failures {np.median(ld[fail]):.1f} vs successes {np.median(ld[ex[:,15]==1]):.1f}") fig,ax=plt.subplots(1,2,figsize=(11,4)) ax[0].plot(range(1,T+1), ex.mean(0)); ax[0].axvline(16,ls='--',c='gray'); ax[0].set_xscale('log') ax[0].set_xlabel('inference segments'); ax[0].set_ylabel('accuracy'); ax[0].set_title('accuracy vs compute') S=[(fail&(ex[:,:s].max(1)==0)).sum()/nf for s in range(16,T+1)] ax[1].plot(range(16,T+1),S); ax[1].set_yscale('log'); ax[1].set_xlabel('segments'); ax[1].set_ylabel('frac failures still unsolved') ax[1].set_title('escape from chaotic set (straight line on log-y = exponential escape)'); plt.tight_layout(); plt.show()""") md("""## 4. Basin accessibility — input-determined or initial-condition-determined? The puzzle is re-injected at *every* segment (`z_H + input_embeddings`), so perturbing only the **initial** latent state `z0` is a clean initial-condition change that leaves the input intact. Restart each step-16 failure `K` times from `z0 + sigma*noise`: if a small kick frees it (some restart solves), the solution basin is large and accessible — *initial-condition-determined*; if no nearby IC escapes, the trapping is *input-determined*. TRM: a small kick frees most. HRM: a hard core escapes no nearby IC. (GPU: seconds. Laptop/CPU with TRM: a couple of minutes — lower `n`/`K`.)""") code("""def perturb_z0(inp, lab, pid, n=96, K=8, sigmas=(0.0, 0.1, 0.3, 1.0), n_seg=48, readout=16, seed=0): rng = np.random.default_rng(seed); idx = rng.choice(len(inp), n, replace=False) pe = inner.puzzle_emb_len; sf = inner.config.seq_len + pe; hid = inner.config.hidden_size is_hrm = hasattr(inner, "H_level") and getattr(inner, "H_level", None) is not None Hup = inner.H_level if is_hrm else inner.L_level # weight-tied TRM reuses L_level sc = float(inner.H_init.float().std()); g = torch.Generator(device=dev).manual_seed(seed) X = torch.tensor(inp[idx].astype(np.int32), device=dev); Y = torch.tensor(lab[idx].astype(np.int32), device=dev) P = torch.tensor(pid[idx].astype(np.int32), device=dev) si = dict(cos_sin=inner.rotary_emb() if hasattr(inner, "rotary_emb") else None) solve = np.zeros((n, len(sigmas), K), bool); base = None with torch.no_grad(): emb0 = inner._input_embeddings(X, P); m0 = Y > 0 for sj, sg in enumerate(sigmas): emb = emb0.repeat_interleave(K, 0); Yr = Y.repeat_interleave(K, 0); mr = m0.repeat_interleave(K, 0); B = n * K zH = inner.H_init.unsqueeze(0).expand(B, sf, hid).clone().to(inner.forward_dtype) zL = inner.L_init.unsqueeze(0).expand(B, sf, hid).clone().to(inner.forward_dtype) if sg > 0: zH = (zH.float() + sg*sc*torch.randn(zH.shape, generator=g, device=dev)).to(inner.forward_dtype) zL = (zL.float() + sg*sc*torch.randn(zL.shape, generator=g, device=dev)).to(inner.forward_dtype) solved = torch.zeros(B, dtype=torch.bool, device=dev) for s in range(n_seg): for _h in range(inner.config.H_cycles): for _l in range(inner.config.L_cycles): zL = inner.L_level(zL, zH + emb, **si) zH = Hup(zH, zL, **si) ok = ((inner.lm_head(zH)[:, pe:].float().argmax(-1) == Yr) | ~mr).all(-1); solved |= ok if sj == 0 and s == readout - 1: base = ok.view(n, K)[:, 0].cpu().numpy() solve[:, sj] = solved.view(n, K).cpu().numpy() return solve, base, np.array(sigmas) solve, base, sg = perturb_z0(inp, lab, pid) fail = ~base; nf = int(fail.sum()) print(f"{nf} of {len(base)} puzzles fail@{16}; freeing them by restarting from a perturbed IC:") for j, s in enumerate(sg): sub = solve[fail, j]; print(f" sigma={s:.1f}: single-restart={sub.mean():.2f} best-of-K={sub.any(1).mean():.2f}") plt.figure(figsize=(6, 4)) plt.plot(sg, [solve[fail, j].mean() for j in range(len(sg))], 'o--', label='single restart') plt.plot(sg, [solve[fail, j].any(1).mean() for j in range(len(sg))], 's-', label='best-of-K') plt.xlabel('relative IC noise sigma'); plt.ylabel('solve rate (failing puzzles)') plt.title('basin accessibility: does a restart free a trapped puzzle?'); plt.legend(); plt.grid(alpha=.3); plt.show()""") md("""## What this shows - **TRM**: step-16 failures *escape* the chaotic transient and resolve to the correct answer (≈0 settle to a wrong answer) → a chaotic **saddle** + one solution fixed point. More compute solves more puzzles. - **HRM**: failures escape too, but *much* more slowly — most are still churning at this horizon. Out to 4000 segments the never-correct fraction keeps decaying (≈0.87→0.77), so it is a **strongly-trapping chaotic saddle**, NOT a strict attractor. And the per-segment escape-rate gap (~5×) is mostly compute-per-segment: TRM evaluates its recurrent module 21×/segment vs HRM 6×, so per module-evaluation the gap is only ~1.6×. - **Neither settles to a wrong fixed point** — the "spurious fixed point" reading from 2D PCA is an artifact of projecting high-dimensional chaotic wandering onto two axes. Try: change `MODEL`, `N_SEG`, `eps` (toy); compare TRM vs HRM escape curves.""") nb["cells"] = C out = Path(__file__).resolve().parent / "recursive_reasoning_chaos.ipynb" nbf.write(nb, str(out)) print("wrote", out, f"({len(C)} cells)")