diff options
| author | YurenHao0426 <Blackhao0426@gmail.com> | 2026-04-11 16:41:19 -0500 |
|---|---|---|
| committer | YurenHao0426 <Blackhao0426@gmail.com> | 2026-04-11 16:41:19 -0500 |
| commit | bfdcc36c0e31adfa95410ce87e7da646e0b948fe (patch) | |
| tree | c486cd329fcb81d3019804dfed7f9551c6d43638 /scripts | |
| parent | 112c5d354f36d6ea6e8049cf1aeaebeb9944aa02 (diff) | |
Crash-safe incremental saving: never lose data again
Major rewrite of run_all_methods.py:
- Each example appends to progress.jsonl immediately (crash-safe)
- per_user.json written after each method completes (not at end of script)
- Resume support: re-running skips already-complete methods, resumes
partially-complete ones from the JSONL checkpoint
- is_method_complete() checks existing per_user.json before running
Also includes previous fixes:
- peft_baseline.py: save original model ref, restore in cleanup()
- fit_theta.py: CHUNK_SIZE 128→32 for K=16 OOM fix
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Diffstat (limited to 'scripts')
| -rw-r--r-- | scripts/run_all_methods.py | 305 |
1 files changed, 203 insertions, 102 deletions
diff --git a/scripts/run_all_methods.py b/scripts/run_all_methods.py index b240645..73a0a87 100644 --- a/scripts/run_all_methods.py +++ b/scripts/run_all_methods.py @@ -1,7 +1,8 @@ """Unified evaluation pipeline: all methods, all per-user data saved. -Runs Base, UPH, PEFT baselines, and ICL baselines in one script. -Saves complete per-user data (predictions, references, scores, metadata) for ALL methods. +CRASH-SAFE: Each example is appended to a JSONL file immediately after +computation. If the process is killed, all completed examples are preserved. +Already-complete methods are automatically skipped on re-run. Usage: python scripts/run_all_methods.py --task review --setting user --device cuda:0 @@ -81,16 +82,99 @@ def generate_greedy(wrapper, prompt, max_new_tokens=512, min_new_tokens=128): return wrapper.tokenizer.decode(outputs[0, input_ids.shape[1]:], skip_special_tokens=True) -class MethodRunner: - """Encapsulates running a single method across all examples.""" +# ─── Incremental saving ────────────────────────────────────────────── + +def get_method_dir(output_dir, task, setting, K, method_name, d=64): + """Get the output directory for a method.""" + exp_dir = os.path.join(output_dir, f"{task}_{setting}_K{K}") + method_label = f"uph_d{d}" if method_name == 'uph' and d != 64 else method_name + return os.path.join(exp_dir, method_label), method_label + + +def is_method_complete(method_dir, N): + """Check if a method already has a complete per_user.json.""" + path = os.path.join(method_dir, 'per_user.json') + if not os.path.exists(path): + return False + try: + with open(path) as f: + data = json.load(f) + return len(data.get('per_user', [])) >= N + except: + return False + + +def append_jsonl(path, entry): + """Append one JSON entry to a JSONL file (crash-safe).""" + with open(path, 'a') as f: + f.write(json.dumps(entry, default=str) + '\n') + + +def read_jsonl(path): + """Read all entries from a JSONL file.""" + entries = [] + if os.path.exists(path): + with open(path) as f: + for line in f: + line = line.strip() + if line: + entries.append(json.loads(line)) + return entries + + +def finalize_method(method_dir, method_label, per_user, task, setting, K, d=64): + """Write final per_user.json from completed per-user list.""" + agg = { + 'rougeL': float(np.mean([u['metrics']['rougeL'] for u in per_user])), + 'meteor': float(np.mean([u['metrics']['meteor'] for u in per_user])), + 'sfd_nolen': float(np.mean([u['metrics']['sfd_nolen'] for u in per_user])), + 'avg_len': float(np.mean([u['metrics']['length'] for u in per_user])), + } + save_data = { + 'per_user': per_user, + 'aggregate': agg, + 'num_examples': len(per_user), + 'task': task, 'setting': setting, 'K': K, + 'method': method_label, + 'decode_policy': 'greedy, min=128, max=512', + } + if 'uph' in method_label: + save_data['d'] = d + path = os.path.join(method_dir, 'per_user.json') + with open(path, 'w') as f: + json.dump(save_data, f, indent=2, default=str) + print(f" Saved: {path} ({len(per_user)} examples)") + +# ─── Method runners ────────────────────────────────────────────────── + +class MethodRunner: def __init__(self, wrapper, device, dense_retriever=None, uph_d=64): self.wrapper = wrapper self.device = device self.dense_retriever = dense_retriever self.uph_d = uph_d - def run(self, method_name, examples, support_sets, references, support_texts, N): + def _make_entry(self, ex, ref, stexts, K, pred, timing, extra=None): + metrics = compute_per_user_metrics(pred, ref, stexts) + entry = { + 'example_id': ex['example_id'], + 'user_id': ex['user_id'], + 'prediction': pred, + 'reference': ref, + 'support_texts': stexts, + 'K': K, + 'metrics': metrics, + **timing, + } + if extra: + entry.update(extra) + return entry + + def run(self, method_name, examples, support_sets, references, support_texts, + N, method_dir, method_label, task, setting, K, d=64): + """Run a method with incremental JSONL saving. Returns per_user list.""" + dispatch = { 'base': self._run_base, 'uph': self._run_uph, @@ -98,128 +182,150 @@ class MethodRunner: 'bm25_top1': self._run_bm25_top1, 'dense_top1': self._run_dense_top1, 'profile_based': self._run_profile_based, - 'lora': lambda *a: self._run_peft(*a, config=get_lora_config(rank=8), lr=1e-4, desc='LoRA r=8'), - 'tiny_lora': lambda *a: self._run_peft(*a, config=get_tiny_lora_config(rank=1), lr=1e-4, desc='Tiny LoRA r=1'), - 'vera': lambda *a: self._run_peft(*a, config=get_vera_config(rank=256), lr=1e-3, desc='VeRA r=256'), - 'prompt_tuning_5': lambda *a: self._run_peft(*a, config=get_prompt_tuning_config(5), lr=1e-3, desc='PromptTuning L=5', steps=100), - 'prompt_tuning_10': lambda *a: self._run_peft(*a, config=get_prompt_tuning_config(10), lr=1e-3, desc='PromptTuning L=10', steps=100), - 'prompt_tuning_20': lambda *a: self._run_peft(*a, config=get_prompt_tuning_config(20), lr=1e-3, desc='PromptTuning L=20', steps=100), - 'prefix_tuning_5': lambda *a: self._run_peft(*a, config=get_prefix_tuning_config(5), lr=5e-4, desc='PrefixTuning L=5', steps=100), - 'prefix_tuning_10': lambda *a: self._run_peft(*a, config=get_prefix_tuning_config(10), lr=5e-4, desc='PrefixTuning L=10', steps=100), + 'lora': lambda *a, **kw: self._run_peft(*a, config=get_lora_config(rank=8), lr=1e-4, desc='LoRA r=8', **kw), + 'tiny_lora': lambda *a, **kw: self._run_peft(*a, config=get_tiny_lora_config(rank=1), lr=1e-4, desc='Tiny LoRA r=1', **kw), + 'vera': lambda *a, **kw: self._run_peft(*a, config=get_vera_config(rank=256), lr=1e-3, desc='VeRA r=256', **kw), + 'prompt_tuning_5': lambda *a, **kw: self._run_peft(*a, config=get_prompt_tuning_config(5), lr=1e-3, desc='PromptTuning L=5', steps=100, **kw), + 'prompt_tuning_10': lambda *a, **kw: self._run_peft(*a, config=get_prompt_tuning_config(10), lr=1e-3, desc='PromptTuning L=10', steps=100, **kw), + 'prompt_tuning_20': lambda *a, **kw: self._run_peft(*a, config=get_prompt_tuning_config(20), lr=1e-3, desc='PromptTuning L=20', steps=100, **kw), + 'prefix_tuning_5': lambda *a, **kw: self._run_peft(*a, config=get_prefix_tuning_config(5), lr=5e-4, desc='PrefixTuning L=5', steps=100, **kw), + 'prefix_tuning_10': lambda *a, **kw: self._run_peft(*a, config=get_prefix_tuning_config(10), lr=5e-4, desc='PrefixTuning L=10', steps=100, **kw), } if method_name not in dispatch: print(f"Unknown method: {method_name}") return [] - print(f"\n--- {method_name} ---") - per_user = dispatch[method_name](examples, support_sets, references, support_texts, N) + os.makedirs(method_dir, exist_ok=True) + jsonl_path = os.path.join(method_dir, 'progress.jsonl') + + # Resume: check how many examples already done + existing = read_jsonl(jsonl_path) + start_idx = len(existing) + + if start_idx >= N: + print(f"\n--- {method_name} --- SKIPPED (already {start_idx}/{N} done)") + per_user = existing[:N] + else: + if start_idx > 0: + print(f"\n--- {method_name} --- RESUMING from {start_idx}/{N}") + else: + print(f"\n--- {method_name} ---") + + per_user = dispatch[method_name]( + examples, support_sets, references, support_texts, N, + jsonl_path=jsonl_path, start_idx=start_idx, existing=existing, + ) avg_rl = np.mean([u['metrics']['rougeL'] for u in per_user]) avg_sfd = np.mean([u['metrics']['sfd_nolen'] for u in per_user]) print(f" Mean R-L: {avg_rl:.4f}, SFD_-len: {avg_sfd:.4f}") + + # Write final per_user.json + finalize_method(method_dir, method_label, per_user, task, setting, K, d) return per_user - def _make_per_user_entry(self, ex, ref, stexts, K, pred, timing, extra=None): - metrics = compute_per_user_metrics(pred, ref, stexts) - entry = { - 'example_id': ex['example_id'], - 'user_id': ex['user_id'], - 'prediction': pred, - 'reference': ref, - 'support_texts': stexts, - 'K': K, - 'metrics': metrics, - **timing, - } - if extra: - entry.update(extra) - return entry + # --- Individual method runners --- + # All accept jsonl_path, start_idx, existing for resume support - def _run_base(self, examples, support_sets, references, support_texts, N): - per_user = [] - for i, ex in enumerate(examples): + def _run_base(self, examples, support_sets, references, support_texts, N, + jsonl_path, start_idx, existing): + per_user = list(existing) + for i in range(start_idx, N): + ex = examples[i] t0 = time.time() prompt = build_query_prompt(ex['query_input'], ex['task']) pred = generate_greedy(self.wrapper, prompt) - entry = self._make_per_user_entry( + entry = self._make_entry( ex, references[i], support_texts[i], len(support_sets[i]), pred, {'gen_time': time.time() - t0} ) per_user.append(entry) + append_jsonl(jsonl_path, entry) if (i + 1) % 40 == 0: print(f" {i+1}/{N}") return per_user - def _run_prompt_all_k(self, examples, support_sets, references, support_texts, N): - per_user = [] - for i, (ex, support) in enumerate(zip(examples, support_sets)): + def _run_prompt_all_k(self, examples, support_sets, references, support_texts, N, + jsonl_path, start_idx, existing): + per_user = list(existing) + for i in range(start_idx, N): + ex, support = examples[i], support_sets[i] t0 = time.time() prompt = build_prompt_with_examples(ex['query_input'], support, ex['task']) pred = generate_greedy(self.wrapper, prompt) - entry = self._make_per_user_entry( + entry = self._make_entry( ex, references[i], support_texts[i], len(support), pred, {'gen_time': time.time() - t0} ) per_user.append(entry) + append_jsonl(jsonl_path, entry) if (i + 1) % 40 == 0: print(f" {i+1}/{N}") return per_user - def _run_bm25_top1(self, examples, support_sets, references, support_texts, N): - per_user = [] - for i, (ex, support) in enumerate(zip(examples, support_sets)): + def _run_bm25_top1(self, examples, support_sets, references, support_texts, N, + jsonl_path, start_idx, existing): + per_user = list(existing) + for i in range(start_idx, N): + ex, support = examples[i], support_sets[i] t0 = time.time() selected = bm25_select_top1(ex['query_input'], support) prompt = build_prompt_with_examples(ex['query_input'], selected, ex['task']) pred = generate_greedy(self.wrapper, prompt) - entry = self._make_per_user_entry( + entry = self._make_entry( ex, references[i], support_texts[i], len(support), pred, {'gen_time': time.time() - t0} ) per_user.append(entry) + append_jsonl(jsonl_path, entry) if (i + 1) % 40 == 0: print(f" {i+1}/{N}") return per_user - def _run_dense_top1(self, examples, support_sets, references, support_texts, N): + def _run_dense_top1(self, examples, support_sets, references, support_texts, N, + jsonl_path, start_idx, existing): if self.dense_retriever is None: self.dense_retriever = DenseRetriever(device='cpu') - per_user = [] - for i, (ex, support) in enumerate(zip(examples, support_sets)): + per_user = list(existing) + for i in range(start_idx, N): + ex, support = examples[i], support_sets[i] t0 = time.time() selected = self.dense_retriever.retrieve_top_k(ex['query_input'], support, k=1) prompt = build_prompt_with_examples(ex['query_input'], selected, ex['task']) pred = generate_greedy(self.wrapper, prompt) - entry = self._make_per_user_entry( + entry = self._make_entry( ex, references[i], support_texts[i], len(support), pred, {'gen_time': time.time() - t0} ) per_user.append(entry) + append_jsonl(jsonl_path, entry) if (i + 1) % 40 == 0: print(f" {i+1}/{N}") return per_user - def _run_profile_based(self, examples, support_sets, references, support_texts, N): - per_user = [] - for i, (ex, support) in enumerate(zip(examples, support_sets)): + def _run_profile_based(self, examples, support_sets, references, support_texts, N, + jsonl_path, start_idx, existing): + per_user = list(existing) + for i in range(start_idx, N): + ex, support = examples[i], support_sets[i] t0 = time.time() - # Step 1: Generate user profile summary from support examples profile = generate_profile(self.wrapper, support, ex['task']) - # Step 2: Generate conditioned on profile prompt = build_profile_conditioned_prompt(ex['query_input'], profile, ex['task']) pred = generate_greedy(self.wrapper, prompt) - entry = self._make_per_user_entry( + entry = self._make_entry( ex, references[i], support_texts[i], len(support), pred, {'gen_time': time.time() - t0}, extra={'profile_summary': profile}, ) per_user.append(entry) + append_jsonl(jsonl_path, entry) if (i + 1) % 40 == 0: print(f" {i+1}/{N}") return per_user - def _run_uph(self, examples, support_sets, references, support_texts, N): + def _run_uph(self, examples, support_sets, references, support_texts, N, + jsonl_path, start_idx, existing): d = self.uph_d H = self.wrapper.hidden_size uncond = UnconditionalHead(H, d=d, alpha=0.1, basis_seed=42).to(self.device) @@ -228,8 +334,9 @@ class MethodRunner: if hasattr(self.wrapper.model.lm_head, 'bias') and self.wrapper.model.lm_head.bias is not None: lm_head_bias = self.wrapper.model.lm_head.bias.data - per_user = [] - for i, (ex, support) in enumerate(zip(examples, support_sets)): + per_user = list(existing) + for i in range(start_idx, N): + ex, support = examples[i], support_sets[i] t0 = time.time() cached_h = cache_support_hidden_states(self.wrapper, support, ex['task']) if not cached_h: @@ -253,23 +360,27 @@ class MethodRunner: del cached_h, theta torch.cuda.empty_cache() - entry = self._make_per_user_entry( + entry = self._make_entry( ex, references[i], support_texts[i], len(support), pred, {'adapt_time': time.time() - t0} ) per_user.append(entry) + append_jsonl(jsonl_path, entry) if (i + 1) % 40 == 0: avg_rl = np.mean([u['metrics']['rougeL'] for u in per_user]) print(f" {i+1}/{N} (avg R-L: {avg_rl:.4f})") return per_user def _run_peft(self, examples, support_sets, references, support_texts, N, - config, lr, desc, steps=30): + config, lr, desc, steps=30, jsonl_path=None, start_idx=0, existing=None): + if existing is None: + existing = [] baseline = PEFTBaseline(self.wrapper, config) - print(f" Trainable params: {baseline.n_params:,} ({baseline.n_bytes:,} bytes), steps={steps}, lr={lr}") + print(f" {desc}: {baseline.n_params:,} params ({baseline.n_bytes:,} bytes), steps={steps}, lr={lr}") - per_user = [] - for i, (ex, support) in enumerate(zip(examples, support_sets)): + per_user = list(existing) + for i in range(start_idx, N): + ex, support = examples[i], support_sets[i] t0 = time.time() pred = baseline.adapt_and_generate( support_items=support, @@ -278,12 +389,13 @@ class MethodRunner: lr=lr, steps=steps, max_new_tokens=512, min_new_tokens=128, ) - entry = self._make_per_user_entry( + entry = self._make_entry( ex, references[i], support_texts[i], len(support), pred, {'adapt_time': time.time() - t0}, extra={'n_params': baseline.n_params, 'n_bytes': baseline.n_bytes}, ) per_user.append(entry) + append_jsonl(jsonl_path, entry) if (i + 1) % 20 == 0: avg_rl = np.mean([u['metrics']['rougeL'] for u in per_user]) avg_t = np.mean([u['adapt_time'] for u in per_user]) @@ -293,6 +405,8 @@ class MethodRunner: return per_user +# ─── Main ──────────────────────────────────────────────────────────── + def paired_test(scores_a, scores_b, name_a, name_b, metric_name): a, b = np.array(scores_a), np.array(scores_b) diff = a - b @@ -344,7 +458,7 @@ def main(): else: methods = [m.strip() for m in args.methods.split(',')] - print(f"=== Unified Eval: {task}_{setting}, N={N}, K={K} ===") + print(f"=== Unified Eval: {task}_{setting}, N={N}, K={K}, d={args.d} ===") print(f"Methods: {methods}") print(f"Decode: greedy, min=128, max=512") @@ -361,7 +475,24 @@ def main(): all_per_user = {} for method in methods: - per_user = runner.run(method, examples, support_sets, references, support_texts, N) + method_dir, method_label = get_method_dir( + args.output_dir, task, setting, K, method, args.d + ) + + # Skip if already complete + if is_method_complete(method_dir, N): + print(f"\n--- {method} --- COMPLETE (loading from disk)") + with open(os.path.join(method_dir, 'per_user.json')) as f: + data = json.load(f) + all_per_user[method] = data['per_user'][:N] + avg_rl = np.mean([u['metrics']['rougeL'] for u in all_per_user[method]]) + print(f" Mean R-L: {avg_rl:.4f}") + continue + + per_user = runner.run( + method, examples, support_sets, references, support_texts, + N, method_dir, method_label, task, setting, K, args.d, + ) all_per_user[method] = per_user # Summary table @@ -369,6 +500,8 @@ def main(): print(f"{'Method':<15} {'R-L':<8} {'METEOR':<8} {'SFD_-len':<9} {'Len':<6}") print("-" * 90) for method in methods: + if method not in all_per_user: + continue pu = all_per_user[method] rl = np.mean([u['metrics']['rougeL'] for u in pu]) mt = np.mean([u['metrics']['meteor'] for u in pu]) @@ -377,15 +510,15 @@ def main(): print(f"{method:<15} {rl:<8.4f} {mt:<8.4f} {sf:<9.4f} {ln:<6.0f}") # Significance tests (UPH vs all others) + sig_results = {} if 'uph' in all_per_user: print("\n" + "=" * 90) print("Significance (UPH vs each, paired t-test p-value)") print("=" * 90) uph_rl = [u['metrics']['rougeL'] for u in all_per_user['uph']] uph_sf = [u['metrics']['sfd_nolen'] for u in all_per_user['uph']] - sig_results = {} for method in methods: - if method == 'uph': + if method == 'uph' or method not in all_per_user: continue other_rl = [u['metrics']['rougeL'] for u in all_per_user[method]] other_sf = [u['metrics']['sfd_nolen'] for u in all_per_user[method]] @@ -395,43 +528,12 @@ def main(): print(f" vs {method:<12} R-L: diff={rl_t['mean_diff']:+.4f} p={rl_t['t_pval']:.2e} " f"SFD: diff={sf_t['mean_diff']:+.4f} p={sf_t['t_pval']:.2e}") - # Save per-method data in separate directories - # Structure: output_dir/task_setting_K{K}/{method}/per_user.json - # For d ablation: uph method saved as uph_d{d} + # Save summary exp_dir = os.path.join(args.output_dir, f"{task}_{setting}_K{K}") - os.makedirs(exp_dir, exist_ok=True) - - for method in methods: - # Use uph_d{d} as directory name for d ablation - method_label = f"uph_d{args.d}" if method == 'uph' and args.d != 64 else method - method_dir = os.path.join(exp_dir, method_label) - os.makedirs(method_dir, exist_ok=True) - - pu = all_per_user[method] - agg_m = { - 'rougeL': float(np.mean([u['metrics']['rougeL'] for u in pu])), - 'meteor': float(np.mean([u['metrics']['meteor'] for u in pu])), - 'sfd_nolen': float(np.mean([u['metrics']['sfd_nolen'] for u in pu])), - 'avg_len': float(np.mean([u['metrics']['length'] for u in pu])), - } - - save_meta = { - 'per_user': pu, - 'aggregate': agg_m, - 'num_examples': N, 'task': task, 'setting': setting, 'K': K, - 'method': method_label, - 'decode_policy': 'greedy, min=128, max=512', - } - if method == 'uph': - save_meta['d'] = args.d - with open(os.path.join(method_dir, 'per_user.json'), 'w') as f: - json.dump(save_meta, f, indent=2, default=str) - - print(f" Saved: {method_dir}/per_user.json") - - # Also save a combined summary (aggregate only, no per-user data) summary = {} for method in methods: + if method not in all_per_user: + continue pu = all_per_user[method] summary[method] = { 'rougeL': float(np.mean([u['metrics']['rougeL'] for u in pu])), @@ -443,13 +545,12 @@ def main(): with open(summary_path, 'w') as f: json.dump({ 'aggregate': summary, - 'significance': sig_results if 'uph' in all_per_user else {}, + 'significance': sig_results, 'num_examples': N, 'task': task, 'setting': setting, 'K': K, 'methods': methods, }, f, indent=2, default=str) - print(f"\nPer-method data: {exp_dir}/{{method}}/per_user.json") - print(f"Summary: {summary_path}") + print(f"\nSummary: {summary_path}") if __name__ == '__main__': |
