diff options
| author | YurenHao0426 <Blackhao0426@gmail.com> | 2026-05-23 12:04:04 -0500 |
|---|---|---|
| committer | YurenHao0426 <Blackhao0426@gmail.com> | 2026-05-23 12:04:04 -0500 |
| commit | fe4d92760f9d9ce9d9f41eb0fe69dd9eadc1534c (patch) | |
| tree | 33c95a3d7bcc5cbfc2233f60e3ed319aa4e07c9c /models/srm/hrm_orth_v1.py | |
| parent | 152821462023690df5d2bf90812e1cb5b1ca7274 (diff) | |
Add HRM-Orth v1 (codex round 2 Q6 pivot)
Patch HRM Block with Lipschitz-bounded ops:
- attention → cosine-normalized softmax attn
- SwiGLU → OrthLinear (Cayley + weak diag scale) + MaxMin + OrthLinear
- rms_norm + add → weighted residual (1-σ(w))·h + σ(w)·f(h)
- Weak orthogonality: diag(s) with s_i ∈ [0.95, 1.0] for compression directions
Keeps HRM ACT framework + H_level/L_level + cycles unchanged.
Predicted +5-7pp vs SRM v1 (codex Q5 decomp):
+1.5-2.5 (remove ReLU rank-kill via MaxMin)
+2.0-3.0 (remove AOL attenuation via Cayley)
+1.0-1.5 (orthogonal residual flow)
Also adds: train_hrm_orth.py trainer, SRM v1 run logs, .gitignore ckpts/.codex
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Diffstat (limited to 'models/srm/hrm_orth_v1.py')
| -rw-r--r-- | models/srm/hrm_orth_v1.py | 371 |
1 files changed, 371 insertions, 0 deletions
diff --git a/models/srm/hrm_orth_v1.py b/models/srm/hrm_orth_v1.py new file mode 100644 index 0000000..70ec561 --- /dev/null +++ b/models/srm/hrm_orth_v1.py @@ -0,0 +1,371 @@ +"""HRM-Orth v1 — orthogonal patch of HRM per codex round 2 recommendation. + +CORE IDEA (codex Q6 pivot, after pure-orthogonal retract Q1): +Keep HRM's H_level/L_level/ACT structure, just patch the inner Block: + - Attention → cosine-normalized attention (≈ Lipschitz-bounded) + - SwiGLU MLP → CayleyOrth linear + MaxMin + CayleyOrth linear + - rms_norm + add → weighted residual: h_new = (1-σ(w)) · h + σ(w) · f(h) + - "Weak orthogonality": diag(s) scaling with most s≈1, some s∈[0.90, 0.97] for compression + +Per codex Q5 decomp: target +5~+7pp over SRM v1 (0.39 → 0.43-0.46). +Per codex Q3: Cayley used (we have it from srm_aol_v1); Householder would be faster but more impl. +""" +from typing import Tuple, List, Dict, Optional +from dataclasses import dataclass +import math + +import torch +import torch.nn.functional as F +from torch import nn +from pydantic import BaseModel + +from models.common import trunc_normal_init_ +from models.layers import rms_norm, SwiGLU, Attention, RotaryEmbedding, CosSin, CastedEmbedding, CastedLinear +from models.sparse_embedding import CastedSparseEmbedding +from models.srm.srm_aol_v1 import CayleyOrthogonal + + +def maxmin(x: torch.Tensor, group: int = 2) -> torch.Tensor: + """1-Lipschitz norm-preserving activation (Anil et al. 2019 GroupSort). + + Pairs adjacent dims; outputs (min, max) per pair. Permutation a.e. → ||∇|| = 1. + Strictly better than ReLU under norm constraints (no rank-kill). + """ + *prefix, d = x.shape + if d % group != 0: + pad = group - (d % group) + x = F.pad(x, (0, pad)) + d = d + pad + xg = x.reshape(*prefix, d // group, group) + sorted_vals, _ = xg.sort(dim=-1) + return sorted_vals.reshape(*prefix, d) + + +def cosine_attention(q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, + tau: float = 8.0) -> torch.Tensor: + """Cosine-normalized softmax attention. Approximately Lipschitz-bounded + (exact bound depends on tau and value norms — see LipsFormer Qi 2023).""" + q = F.normalize(q, dim=-1) + k = F.normalize(k, dim=-1) + attn = (q @ k.transpose(-2, -1)) * tau + attn = attn.softmax(dim=-1) + return attn @ v + + +class OrthLinear(nn.Module): + """Orthogonal linear layer via Cayley. Allows optional row-scaling diag(s) + where s_i ∈ [s_min, 1] to introduce 'weak orthogonality' (codex Q1 fix). + + If s_min < 1, the operator is contractive in some directions: + Lip = max(s) ≤ 1, det = prod(s) ≤ 1 (weak contraction in compressing modes) + """ + def __init__(self, dim: int, s_min: float = 0.95, learn_scale: bool = True): + super().__init__() + self.Q = CayleyOrthogonal(dim) + self.s_min = s_min + # diag scale: sigmoid -> [s_min, 1] + if learn_scale and s_min < 1.0: + self.log_s_raw = nn.Parameter(torch.zeros(dim)) # init sigmoid(0)=0.5 → scale=(s_min+1)/2 + else: + self.register_buffer("log_s_raw", torch.zeros(dim)) + self.learn_scale = learn_scale + + def scale_diag(self) -> torch.Tensor: + if self.s_min >= 1.0 or not self.learn_scale: + return torch.ones_like(self.log_s_raw) + # Affine map sigmoid → [s_min, 1] + return self.s_min + (1.0 - self.s_min) * torch.sigmoid(self.log_s_raw) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + Q = self.Q() # (d, d) orthogonal + s = self.scale_diag().to(Q.dtype) # (d,) in [s_min, 1] + Qs = Q * s.unsqueeze(0) # rescale columns + return F.linear(x, Qs) + + +@dataclass +class HierarchicalReasoningModel_ACTV1InnerCarry: + z_H: torch.Tensor + z_L: torch.Tensor + + +@dataclass +class HierarchicalReasoningModel_ACTV1Carry: + inner_carry: HierarchicalReasoningModel_ACTV1InnerCarry + + steps: torch.Tensor + halted: torch.Tensor + + current_data: Dict[str, torch.Tensor] + + +class HierarchicalReasoningModel_ACTV1Config(BaseModel): + batch_size: int + seq_len: int + puzzle_emb_ndim: int = 0 + num_puzzle_identifiers: int + vocab_size: int + + H_cycles: int + L_cycles: int + + H_layers: int + L_layers: int + + # Transformer config + hidden_size: int + expansion: float + num_heads: int + pos_encodings: str + + rms_norm_eps: float = 1e-5 + rope_theta: float = 10000.0 + + # Halting Q-learning config + halt_max_steps: int + halt_exploration_prob: float + + forward_dtype: str = "bfloat16" + + +class HierarchicalReasoningModel_ACTV1Block(nn.Module): + """Orthogonal-patched HRM Block. + + Replaces (attn + SwiGLU + rms_norm) with (cosine attn + Orth-MLP + weighted residual). + The original class name preserved so the ReasoningModule wrapper is unchanged. + """ + def __init__(self, config: HierarchicalReasoningModel_ACTV1Config) -> None: + super().__init__() + d = config.hidden_size + s_min = getattr(config, "orth_s_min", 0.95) + cosine_tau = getattr(config, "cosine_attn_tau", 8.0) + + # Lipschitz-bounded cosine attention: orthogonal Q/K/V/O projections + self.q_proj = OrthLinear(d, s_min=1.0, learn_scale=False) # strict orth for q/k + self.k_proj = OrthLinear(d, s_min=1.0, learn_scale=False) + self.v_proj = OrthLinear(d, s_min=s_min, learn_scale=True) # weak orth on values + self.o_proj = OrthLinear(d, s_min=s_min, learn_scale=True) + self.cosine_tau = cosine_tau + + # Orth-MLP: OrthLinear -> MaxMin -> OrthLinear (no expansion; uses original d) + self.mlp_in = OrthLinear(d, s_min=s_min, learn_scale=True) + self.mlp_out = OrthLinear(d, s_min=s_min, learn_scale=True) + + # Weighted residual gates (init sigmoid(0)=0.5 → balanced residual) + self.w_attn_logit = nn.Parameter(torch.zeros(())) + self.w_mlp_logit = nn.Parameter(torch.zeros(())) + + def forward(self, cos_sin: CosSin, hidden_states: torch.Tensor) -> torch.Tensor: + # Cosine attention + q = self.q_proj(hidden_states) + k = self.k_proj(hidden_states) + v = self.v_proj(hidden_states) + attn_out = self.o_proj(cosine_attention(q, k, v, tau=self.cosine_tau)) + w_attn = torch.sigmoid(self.w_attn_logit) + hidden_states = (1.0 - w_attn) * hidden_states + w_attn * attn_out + + # Orth-MLP with MaxMin + mlp_out = self.mlp_out(maxmin(self.mlp_in(hidden_states), group=2)) + w_mlp = torch.sigmoid(self.w_mlp_logit) + hidden_states = (1.0 - w_mlp) * hidden_states + w_mlp * mlp_out + return hidden_states + + +class HierarchicalReasoningModel_ACTV1ReasoningModule(nn.Module): + def __init__(self, layers: List[HierarchicalReasoningModel_ACTV1Block]): + super().__init__() + + self.layers = torch.nn.ModuleList(layers) + + def forward(self, hidden_states: torch.Tensor, input_injection: torch.Tensor, **kwargs) -> torch.Tensor: + # Input injection (add) + hidden_states = hidden_states + input_injection + # Layers + for layer in self.layers: + hidden_states = layer(hidden_states=hidden_states, **kwargs) + + return hidden_states + + +class HierarchicalReasoningModel_ACTV1_Inner(nn.Module): + def __init__(self, config: HierarchicalReasoningModel_ACTV1Config) -> None: + super().__init__() + self.config = config + self.forward_dtype = getattr(torch, self.config.forward_dtype) + + # I/O + self.embed_scale = math.sqrt(self.config.hidden_size) + embed_init_std = 1.0 / self.embed_scale + + self.embed_tokens = CastedEmbedding(self.config.vocab_size, self.config.hidden_size, init_std=embed_init_std, cast_to=self.forward_dtype) + self.lm_head = CastedLinear(self.config.hidden_size, self.config.vocab_size, bias=False) + self.q_head = CastedLinear(self.config.hidden_size, 2, bias=True) + + self.puzzle_emb_len = -(self.config.puzzle_emb_ndim // -self.config.hidden_size) # ceil div + if self.config.puzzle_emb_ndim > 0: + # Zero init puzzle embeddings + self.puzzle_emb = CastedSparseEmbedding(self.config.num_puzzle_identifiers, self.config.puzzle_emb_ndim, + batch_size=self.config.batch_size, init_std=0, cast_to=self.forward_dtype) + + # LM Blocks + if self.config.pos_encodings == "rope": + self.rotary_emb = RotaryEmbedding(dim=self.config.hidden_size // self.config.num_heads, + max_position_embeddings=self.config.seq_len + self.puzzle_emb_len, + base=self.config.rope_theta) + elif self.config.pos_encodings == "learned": + self.embed_pos = CastedEmbedding(self.config.seq_len + self.puzzle_emb_len, self.config.hidden_size, init_std=embed_init_std, cast_to=self.forward_dtype) + else: + raise NotImplementedError() + + # Reasoning Layers + self.H_level = HierarchicalReasoningModel_ACTV1ReasoningModule(layers=[HierarchicalReasoningModel_ACTV1Block(self.config) for _i in range(self.config.H_layers)]) + self.L_level = HierarchicalReasoningModel_ACTV1ReasoningModule(layers=[HierarchicalReasoningModel_ACTV1Block(self.config) for _i in range(self.config.L_layers)]) + + # Initial states + self.H_init = nn.Buffer(trunc_normal_init_(torch.empty(self.config.hidden_size, dtype=self.forward_dtype), std=1), persistent=True) + self.L_init = nn.Buffer(trunc_normal_init_(torch.empty(self.config.hidden_size, dtype=self.forward_dtype), std=1), persistent=True) + + # Q head special init + # Init Q to (almost) zero for faster learning during bootstrapping + with torch.no_grad(): + self.q_head.weight.zero_() + self.q_head.bias.fill_(-5) # type: ignore + + def _input_embeddings(self, input: torch.Tensor, puzzle_identifiers: torch.Tensor): + # Token embedding + embedding = self.embed_tokens(input.to(torch.int32)) + + # Puzzle embeddings + if self.config.puzzle_emb_ndim > 0: + puzzle_embedding = self.puzzle_emb(puzzle_identifiers) + + pad_count = self.puzzle_emb_len * self.config.hidden_size - puzzle_embedding.shape[-1] + if pad_count > 0: + puzzle_embedding = F.pad(puzzle_embedding, (0, pad_count)) + + embedding = torch.cat((puzzle_embedding.view(-1, self.puzzle_emb_len, self.config.hidden_size), embedding), dim=-2) + + # Position embeddings + if self.config.pos_encodings == "learned": + # scale by 1/sqrt(2) to maintain forward variance + embedding = 0.707106781 * (embedding + self.embed_pos.embedding_weight.to(self.forward_dtype)) + + # Scale + return self.embed_scale * embedding + + def empty_carry(self, batch_size: int): + return HierarchicalReasoningModel_ACTV1InnerCarry( + z_H=torch.empty(batch_size, self.config.seq_len + self.puzzle_emb_len, self.config.hidden_size, dtype=self.forward_dtype), + z_L=torch.empty(batch_size, self.config.seq_len + self.puzzle_emb_len, self.config.hidden_size, dtype=self.forward_dtype), + ) + + def reset_carry(self, reset_flag: torch.Tensor, carry: HierarchicalReasoningModel_ACTV1InnerCarry): + return HierarchicalReasoningModel_ACTV1InnerCarry( + z_H=torch.where(reset_flag.view(-1, 1, 1), self.H_init, carry.z_H), + z_L=torch.where(reset_flag.view(-1, 1, 1), self.L_init, carry.z_L), + ) + + def forward(self, carry: HierarchicalReasoningModel_ACTV1InnerCarry, batch: Dict[str, torch.Tensor]) -> Tuple[HierarchicalReasoningModel_ACTV1InnerCarry, torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]: + seq_info = dict( + cos_sin=self.rotary_emb() if hasattr(self, "rotary_emb") else None, + ) + + # Input encoding + input_embeddings = self._input_embeddings(batch["inputs"], batch["puzzle_identifiers"]) + + # Forward iterations + with torch.no_grad(): + z_H, z_L = carry.z_H, carry.z_L + + for _H_step in range(self.config.H_cycles): + for _L_step in range(self.config.L_cycles): + if not ((_H_step == self.config.H_cycles - 1) and (_L_step == self.config.L_cycles - 1)): + z_L = self.L_level(z_L, z_H + input_embeddings, **seq_info) + + if not (_H_step == self.config.H_cycles - 1): + z_H = self.H_level(z_H, z_L, **seq_info) + + assert not z_H.requires_grad and not z_L.requires_grad + + # 1-step grad + z_L = self.L_level(z_L, z_H + input_embeddings, **seq_info) + z_H = self.H_level(z_H, z_L, **seq_info) + + # LM Outputs + new_carry = HierarchicalReasoningModel_ACTV1InnerCarry(z_H=z_H.detach(), z_L=z_L.detach()) # New carry no grad + output = self.lm_head(z_H)[:, self.puzzle_emb_len:] + + # Q head + q_logits = self.q_head(z_H[:, 0]).to(torch.float32) + + return new_carry, output, (q_logits[..., 0], q_logits[..., 1]) + + +class HierarchicalReasoningModel_ACTV1(nn.Module): + """ACT wrapper.""" + + def __init__(self, config_dict: dict): + super().__init__() + self.config = HierarchicalReasoningModel_ACTV1Config(**config_dict) + self.inner = HierarchicalReasoningModel_ACTV1_Inner(self.config) + + @property + def puzzle_emb(self): + return self.inner.puzzle_emb + + def initial_carry(self, batch: Dict[str, torch.Tensor]): + batch_size = batch["inputs"].shape[0] + + return HierarchicalReasoningModel_ACTV1Carry( + inner_carry=self.inner.empty_carry(batch_size), # Empty is expected, it will be reseted in first pass as all sequences are halted. + + steps=torch.zeros((batch_size, ), dtype=torch.int32), + halted=torch.ones((batch_size, ), dtype=torch.bool), # Default to halted + + current_data={k: torch.empty_like(v) for k, v in batch.items()} + ) + + def forward(self, carry: HierarchicalReasoningModel_ACTV1Carry, batch: Dict[str, torch.Tensor]) -> Tuple[HierarchicalReasoningModel_ACTV1Carry, Dict[str, torch.Tensor]]: + # Update data, carry (removing halted sequences) + new_inner_carry = self.inner.reset_carry(carry.halted, carry.inner_carry) + + new_steps = torch.where(carry.halted, 0, carry.steps) + + new_current_data = {k: torch.where(carry.halted.view((-1, ) + (1, ) * (batch[k].ndim - 1)), batch[k], v) for k, v in carry.current_data.items()} + + # Forward inner model + new_inner_carry, logits, (q_halt_logits, q_continue_logits) = self.inner(new_inner_carry, new_current_data) + + outputs = { + "logits": logits, + "q_halt_logits": q_halt_logits, + "q_continue_logits": q_continue_logits + } + + with torch.no_grad(): + # Step + new_steps = new_steps + 1 + is_last_step = new_steps >= self.config.halt_max_steps + + halted = is_last_step + + # if training, and ACT is enabled + if self.training and (self.config.halt_max_steps > 1): + # Halt signal + # NOTE: During evaluation, always use max steps, this is to guarantee the same halting steps inside a batch for batching purposes + halted = halted | (q_halt_logits > q_continue_logits) + + # Exploration + min_halt_steps = (torch.rand_like(q_halt_logits) < self.config.halt_exploration_prob) * torch.randint_like(new_steps, low=2, high=self.config.halt_max_steps + 1) + + halted = halted & (new_steps >= min_halt_steps) + + # Compute target Q + # NOTE: No replay buffer and target networks for computing target Q-value. + # As batch_size is large, there're many parallel envs. + # Similar concept as PQN https://arxiv.org/abs/2407.04811 + next_q_halt_logits, next_q_continue_logits = self.inner(new_inner_carry, new_current_data)[-1] + + outputs["target_q_continue"] = torch.sigmoid(torch.where(is_last_step, next_q_halt_logits, torch.maximum(next_q_halt_logits, next_q_continue_logits))) + + return HierarchicalReasoningModel_ACTV1Carry(new_inner_carry, new_steps, halted, new_current_data), outputs |
