diff options
| author | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-09 11:00:39 -0600 |
|---|---|---|
| committer | YurenHao0426 <blackhao0426@gmail.com> | 2026-02-09 11:00:39 -0600 |
| commit | 13ddc8dc583d8b1355909970cb8c27f85b7d3c8b (patch) | |
| tree | 073534138604c1c49021ca7e334322262129f6ac /tests/test_predictor.py | |
Initial implementation: DAGFormer Phase 1
- olmo_graph.py: Modified OLMo2-1B forward with per-head routing via 256x256 adjacency matrix A
- Proportional attribution for post-norm decomposition
- All 6 GPU sanity checks pass (baseline diff = 0.000001)
- predictor.py: Qwen3-Embedding encoder + MLP decoder + Gumbel-Sigmoid + cascading gate
- pipeline.py: End-to-end glue (predictor -> A -> OLMo -> NLL)
- trainer.py: Full training loop with DDP, gradient accumulation, eval, checkpointing
- dolma.py: Streaming Dolma v1.7 with sequence packing
- 43/43 unit tests pass
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'tests/test_predictor.py')
| -rw-r--r-- | tests/test_predictor.py | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/tests/test_predictor.py b/tests/test_predictor.py new file mode 100644 index 0000000..00a4124 --- /dev/null +++ b/tests/test_predictor.py @@ -0,0 +1,206 @@ +"""Tests for the structure predictor components (no GPU or model loading required).""" + +import pytest +import torch +import torch.nn as nn + +from src.model.predictor import ( + PredictorMLP, + cascading_gate, + gumbel_sigmoid, +) +from src.model.olmo_graph import create_block_upper_triangular_mask + + +class TestPredictorMLP: + """Test MLP decoder shapes and gradient flow.""" + + def setup_method(self): + self.batch = 2 + self.input_dim = 1024 # Qwen embed_dim + self.hidden_dim = 256 # small for testing + self.rank = 8 + self.mlp = PredictorMLP(self.input_dim, self.hidden_dim, self.rank) + + def test_output_shape(self): + e = torch.randn(self.batch, self.input_dim) + Z = self.mlp(e) + assert Z.shape == (self.batch, 256, 256) + + def test_low_rank_structure(self): + """Z = UV^T should have rank <= r.""" + e = torch.randn(1, self.input_dim) + Z = self.mlp(e) + Z_2d = Z.squeeze(0) + # SVD to check effective rank + S = torch.linalg.svdvals(Z_2d) + # Values beyond rank r should be ~0 (up to numerical precision) + assert S[self.rank:].abs().max() < 1e-4, \ + f"Z has effective rank > {self.rank}: max singular value beyond rank = {S[self.rank:].abs().max()}" + + def test_gradient_flow(self): + e = torch.randn(self.batch, self.input_dim) + Z = self.mlp(e) + loss = Z.sum() + loss.backward() + for name, p in self.mlp.named_parameters(): + assert p.grad is not None, f"No gradient for {name}" + assert p.grad.abs().sum() > 0, f"Zero gradient for {name}" + + def test_batch_independence(self): + """Different inputs should produce different outputs.""" + e1 = torch.randn(1, self.input_dim) + e2 = torch.randn(1, self.input_dim) + Z1 = self.mlp(e1) + Z2 = self.mlp(e2) + assert not torch.allclose(Z1, Z2), "Different inputs produced identical Z" + + +class TestGumbelSigmoid: + """Test Gumbel-Sigmoid in all 3 modes.""" + + def setup_method(self): + self.batch = 2 + mask = create_block_upper_triangular_mask() + # Create Z_masked with valid structure + Z = torch.randn(self.batch, 256, 256) + self.Z_masked = Z * mask.unsqueeze(0) + (-1e9) * (1 - mask.unsqueeze(0)) + self.tau = 2.0 + + def test_train_mode_range(self): + A = gumbel_sigmoid(self.Z_masked, self.tau, mode="train") + assert A.shape == (self.batch, 256, 256) + assert (A >= 0).all() and (A <= 1).all(), "Train mode values out of [0, 1]" + + def test_train_mode_stochastic(self): + """Two calls with same input should give different results (stochastic).""" + A1 = gumbel_sigmoid(self.Z_masked, self.tau, mode="train") + A2 = gumbel_sigmoid(self.Z_masked, self.tau, mode="train") + assert not torch.allclose(A1, A2), "Train mode is deterministic (should be stochastic)" + + def test_eval_soft_range(self): + A = gumbel_sigmoid(self.Z_masked, self.tau, mode="eval_soft") + assert (A >= 0).all() and (A <= 1).all(), "Eval soft values out of [0, 1]" + + def test_eval_soft_deterministic(self): + A1 = gumbel_sigmoid(self.Z_masked, self.tau, mode="eval_soft") + A2 = gumbel_sigmoid(self.Z_masked, self.tau, mode="eval_soft") + assert torch.allclose(A1, A2), "Eval soft is not deterministic" + + def test_eval_hard_binary(self): + A = gumbel_sigmoid(self.Z_masked, self.tau, mode="eval_hard") + unique_values = A.unique() + assert all(v in [0.0, 1.0] for v in unique_values), \ + f"Eval hard should produce binary 0/1, got {unique_values}" + + def test_eval_hard_deterministic(self): + A1 = gumbel_sigmoid(self.Z_masked, self.tau, mode="eval_hard") + A2 = gumbel_sigmoid(self.Z_masked, self.tau, mode="eval_hard") + assert torch.allclose(A1, A2), "Eval hard is not deterministic" + + def test_invalid_positions_zero(self): + """Invalid positions (same/backward layer) should be ~0 in all modes.""" + mask = create_block_upper_triangular_mask() + invalid_mask = (1 - mask).bool() + for mode in ["train", "eval_soft", "eval_hard"]: + A = gumbel_sigmoid(self.Z_masked, self.tau, mode=mode) + invalid_vals = A[0][invalid_mask] + assert (invalid_vals < 1e-6).all(), \ + f"Invalid positions not zero in {mode}: max={invalid_vals.max()}" + + def test_unknown_mode_raises(self): + with pytest.raises(ValueError): + gumbel_sigmoid(self.Z_masked, self.tau, mode="unknown") + + def test_temperature_effect(self): + """Lower temperature → sharper distribution (closer to binary).""" + A_high_tau = gumbel_sigmoid(self.Z_masked, tau=10.0, mode="eval_soft") + A_low_tau = gumbel_sigmoid(self.Z_masked, tau=0.1, mode="eval_soft") + mask = create_block_upper_triangular_mask().bool() + # Low tau should be more extreme (values closer to 0 or 1) + valid_high = A_high_tau[0][mask] + valid_low = A_low_tau[0][mask] + # Measure "sharpness": distance from 0.5 + sharp_high = (valid_high - 0.5).abs().mean() + sharp_low = (valid_low - 0.5).abs().mean() + assert sharp_low > sharp_high, \ + f"Lower tau should be sharper: sharp_low={sharp_low:.4f}, sharp_high={sharp_high:.4f}" + + def test_gradient_through_train_mode(self): + """Gradients should flow through Gumbel-Sigmoid in train mode.""" + Z = torch.randn(1, 256, 256, requires_grad=True) + mask = create_block_upper_triangular_mask() + Z_masked = Z * mask + (-1e9) * (1 - mask) + A = gumbel_sigmoid(Z_masked, tau=2.0, mode="train") + loss = A.sum() + loss.backward() + assert Z.grad is not None + # Gradients should be nonzero at valid positions + valid_grads = Z.grad[0][mask.bool()] + assert (valid_grads != 0).any(), "No nonzero gradients at valid positions" + + +class TestCascadingGate: + """Test cascading activation gate.""" + + def setup_method(self): + self.batch = 2 + + def test_output_shape(self): + A = torch.rand(self.batch, 256, 256) + A_gated = cascading_gate(A, k=5.0, hard=False) + assert A_gated.shape == A.shape + + def test_soft_mode_range(self): + A = torch.rand(self.batch, 256, 256) + A_gated = cascading_gate(A, k=5.0, hard=False) + assert (A_gated >= 0).all() and (A_gated <= 1).all() + + def test_hard_mode_kills_disconnected(self): + """Nodes with no incoming edges should have all outgoing edges zeroed.""" + A = torch.zeros(1, 256, 256) + # Only set edges from node 0 to node 16 (layer 0 → layer 1) + A[0, 0, 16] = 1.0 + A_gated = cascading_gate(A, k=5.0, hard=True) + # Node 0 has no incoming edges → its outgoing should be zeroed + assert A_gated[0, 0, 16] == 0.0, "Node 0 has no incoming but wasn't gated to 0" + # Node 16 has incoming from node 0 (but node 0 was gated to 0) + # In one-pass mode, inc uses ORIGINAL A, so node 16 has inc > 0 + + def test_hard_mode_preserves_connected(self): + """Nodes with incoming edges keep their outgoing edges.""" + A = torch.zeros(1, 256, 256) + # Set edges: node 0→16, node 16→32 + A[0, 0, 16] = 1.0 + A[0, 16, 32] = 1.0 + A_gated = cascading_gate(A, k=5.0, hard=True) + # Node 16 has incoming (from 0) → g_16 = 1 → outgoing preserved + assert A_gated[0, 16, 32] == 1.0 + + def test_soft_mode_differentiable(self): + A = torch.rand(1, 256, 256, requires_grad=True) + A_gated = cascading_gate(A, k=5.0, hard=False) + loss = A_gated.sum() + loss.backward() + assert A.grad is not None + assert A.grad.abs().sum() > 0 + + def test_all_zeros_all_killed(self): + """If A is all zeros, cascading gate should keep it all zeros.""" + A = torch.zeros(1, 256, 256) + A_gated = cascading_gate(A, k=5.0, hard=True) + assert (A_gated == 0).all() + + def test_one_pass_uses_original(self): + """Verify cascading gate uses original A for incoming sums (one-pass).""" + # If it were iterative, node 0 being gated off would affect node 16's incoming + # But one-pass uses original A, so node 16's incoming is computed from original + A = torch.zeros(1, 256, 256) + A[0, 0, 16] = 1.0 # 0 → 16 + A[0, 16, 32] = 1.0 # 16 → 32 + + A_gated = cascading_gate(A, k=5.0, hard=True) + # One-pass: inc[16] = A[:,16].sum() = A[0,16] = 1.0 (from original A) + # g[16] = (inc[16] > 0) = 1.0 + # So A_gated[16, 32] = A[16, 32] * g[16] = 1.0 * 1.0 = 1.0 + assert A_gated[0, 16, 32] == 1.0 |
