summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scripts/run_all_methods.py305
1 files changed, 203 insertions, 102 deletions
diff --git a/scripts/run_all_methods.py b/scripts/run_all_methods.py
index b240645..73a0a87 100644
--- a/scripts/run_all_methods.py
+++ b/scripts/run_all_methods.py
@@ -1,7 +1,8 @@
"""Unified evaluation pipeline: all methods, all per-user data saved.
-Runs Base, UPH, PEFT baselines, and ICL baselines in one script.
-Saves complete per-user data (predictions, references, scores, metadata) for ALL methods.
+CRASH-SAFE: Each example is appended to a JSONL file immediately after
+computation. If the process is killed, all completed examples are preserved.
+Already-complete methods are automatically skipped on re-run.
Usage:
python scripts/run_all_methods.py --task review --setting user --device cuda:0
@@ -81,16 +82,99 @@ def generate_greedy(wrapper, prompt, max_new_tokens=512, min_new_tokens=128):
return wrapper.tokenizer.decode(outputs[0, input_ids.shape[1]:], skip_special_tokens=True)
-class MethodRunner:
- """Encapsulates running a single method across all examples."""
+# ─── Incremental saving ──────────────────────────────────────────────
+
+def get_method_dir(output_dir, task, setting, K, method_name, d=64):
+ """Get the output directory for a method."""
+ exp_dir = os.path.join(output_dir, f"{task}_{setting}_K{K}")
+ method_label = f"uph_d{d}" if method_name == 'uph' and d != 64 else method_name
+ return os.path.join(exp_dir, method_label), method_label
+
+
+def is_method_complete(method_dir, N):
+ """Check if a method already has a complete per_user.json."""
+ path = os.path.join(method_dir, 'per_user.json')
+ if not os.path.exists(path):
+ return False
+ try:
+ with open(path) as f:
+ data = json.load(f)
+ return len(data.get('per_user', [])) >= N
+ except:
+ return False
+
+
+def append_jsonl(path, entry):
+ """Append one JSON entry to a JSONL file (crash-safe)."""
+ with open(path, 'a') as f:
+ f.write(json.dumps(entry, default=str) + '\n')
+
+
+def read_jsonl(path):
+ """Read all entries from a JSONL file."""
+ entries = []
+ if os.path.exists(path):
+ with open(path) as f:
+ for line in f:
+ line = line.strip()
+ if line:
+ entries.append(json.loads(line))
+ return entries
+
+
+def finalize_method(method_dir, method_label, per_user, task, setting, K, d=64):
+ """Write final per_user.json from completed per-user list."""
+ agg = {
+ 'rougeL': float(np.mean([u['metrics']['rougeL'] for u in per_user])),
+ 'meteor': float(np.mean([u['metrics']['meteor'] for u in per_user])),
+ 'sfd_nolen': float(np.mean([u['metrics']['sfd_nolen'] for u in per_user])),
+ 'avg_len': float(np.mean([u['metrics']['length'] for u in per_user])),
+ }
+ save_data = {
+ 'per_user': per_user,
+ 'aggregate': agg,
+ 'num_examples': len(per_user),
+ 'task': task, 'setting': setting, 'K': K,
+ 'method': method_label,
+ 'decode_policy': 'greedy, min=128, max=512',
+ }
+ if 'uph' in method_label:
+ save_data['d'] = d
+ path = os.path.join(method_dir, 'per_user.json')
+ with open(path, 'w') as f:
+ json.dump(save_data, f, indent=2, default=str)
+ print(f" Saved: {path} ({len(per_user)} examples)")
+
+# ─── Method runners ──────────────────────────────────────────────────
+
+class MethodRunner:
def __init__(self, wrapper, device, dense_retriever=None, uph_d=64):
self.wrapper = wrapper
self.device = device
self.dense_retriever = dense_retriever
self.uph_d = uph_d
- def run(self, method_name, examples, support_sets, references, support_texts, N):
+ def _make_entry(self, ex, ref, stexts, K, pred, timing, extra=None):
+ metrics = compute_per_user_metrics(pred, ref, stexts)
+ entry = {
+ 'example_id': ex['example_id'],
+ 'user_id': ex['user_id'],
+ 'prediction': pred,
+ 'reference': ref,
+ 'support_texts': stexts,
+ 'K': K,
+ 'metrics': metrics,
+ **timing,
+ }
+ if extra:
+ entry.update(extra)
+ return entry
+
+ def run(self, method_name, examples, support_sets, references, support_texts,
+ N, method_dir, method_label, task, setting, K, d=64):
+ """Run a method with incremental JSONL saving. Returns per_user list."""
+
dispatch = {
'base': self._run_base,
'uph': self._run_uph,
@@ -98,128 +182,150 @@ class MethodRunner:
'bm25_top1': self._run_bm25_top1,
'dense_top1': self._run_dense_top1,
'profile_based': self._run_profile_based,
- 'lora': lambda *a: self._run_peft(*a, config=get_lora_config(rank=8), lr=1e-4, desc='LoRA r=8'),
- 'tiny_lora': lambda *a: self._run_peft(*a, config=get_tiny_lora_config(rank=1), lr=1e-4, desc='Tiny LoRA r=1'),
- 'vera': lambda *a: self._run_peft(*a, config=get_vera_config(rank=256), lr=1e-3, desc='VeRA r=256'),
- 'prompt_tuning_5': lambda *a: self._run_peft(*a, config=get_prompt_tuning_config(5), lr=1e-3, desc='PromptTuning L=5', steps=100),
- 'prompt_tuning_10': lambda *a: self._run_peft(*a, config=get_prompt_tuning_config(10), lr=1e-3, desc='PromptTuning L=10', steps=100),
- 'prompt_tuning_20': lambda *a: self._run_peft(*a, config=get_prompt_tuning_config(20), lr=1e-3, desc='PromptTuning L=20', steps=100),
- 'prefix_tuning_5': lambda *a: self._run_peft(*a, config=get_prefix_tuning_config(5), lr=5e-4, desc='PrefixTuning L=5', steps=100),
- 'prefix_tuning_10': lambda *a: self._run_peft(*a, config=get_prefix_tuning_config(10), lr=5e-4, desc='PrefixTuning L=10', steps=100),
+ 'lora': lambda *a, **kw: self._run_peft(*a, config=get_lora_config(rank=8), lr=1e-4, desc='LoRA r=8', **kw),
+ 'tiny_lora': lambda *a, **kw: self._run_peft(*a, config=get_tiny_lora_config(rank=1), lr=1e-4, desc='Tiny LoRA r=1', **kw),
+ 'vera': lambda *a, **kw: self._run_peft(*a, config=get_vera_config(rank=256), lr=1e-3, desc='VeRA r=256', **kw),
+ 'prompt_tuning_5': lambda *a, **kw: self._run_peft(*a, config=get_prompt_tuning_config(5), lr=1e-3, desc='PromptTuning L=5', steps=100, **kw),
+ 'prompt_tuning_10': lambda *a, **kw: self._run_peft(*a, config=get_prompt_tuning_config(10), lr=1e-3, desc='PromptTuning L=10', steps=100, **kw),
+ 'prompt_tuning_20': lambda *a, **kw: self._run_peft(*a, config=get_prompt_tuning_config(20), lr=1e-3, desc='PromptTuning L=20', steps=100, **kw),
+ 'prefix_tuning_5': lambda *a, **kw: self._run_peft(*a, config=get_prefix_tuning_config(5), lr=5e-4, desc='PrefixTuning L=5', steps=100, **kw),
+ 'prefix_tuning_10': lambda *a, **kw: self._run_peft(*a, config=get_prefix_tuning_config(10), lr=5e-4, desc='PrefixTuning L=10', steps=100, **kw),
}
if method_name not in dispatch:
print(f"Unknown method: {method_name}")
return []
- print(f"\n--- {method_name} ---")
- per_user = dispatch[method_name](examples, support_sets, references, support_texts, N)
+ os.makedirs(method_dir, exist_ok=True)
+ jsonl_path = os.path.join(method_dir, 'progress.jsonl')
+
+ # Resume: check how many examples already done
+ existing = read_jsonl(jsonl_path)
+ start_idx = len(existing)
+
+ if start_idx >= N:
+ print(f"\n--- {method_name} --- SKIPPED (already {start_idx}/{N} done)")
+ per_user = existing[:N]
+ else:
+ if start_idx > 0:
+ print(f"\n--- {method_name} --- RESUMING from {start_idx}/{N}")
+ else:
+ print(f"\n--- {method_name} ---")
+
+ per_user = dispatch[method_name](
+ examples, support_sets, references, support_texts, N,
+ jsonl_path=jsonl_path, start_idx=start_idx, existing=existing,
+ )
avg_rl = np.mean([u['metrics']['rougeL'] for u in per_user])
avg_sfd = np.mean([u['metrics']['sfd_nolen'] for u in per_user])
print(f" Mean R-L: {avg_rl:.4f}, SFD_-len: {avg_sfd:.4f}")
+
+ # Write final per_user.json
+ finalize_method(method_dir, method_label, per_user, task, setting, K, d)
return per_user
- def _make_per_user_entry(self, ex, ref, stexts, K, pred, timing, extra=None):
- metrics = compute_per_user_metrics(pred, ref, stexts)
- entry = {
- 'example_id': ex['example_id'],
- 'user_id': ex['user_id'],
- 'prediction': pred,
- 'reference': ref,
- 'support_texts': stexts,
- 'K': K,
- 'metrics': metrics,
- **timing,
- }
- if extra:
- entry.update(extra)
- return entry
+ # --- Individual method runners ---
+ # All accept jsonl_path, start_idx, existing for resume support
- def _run_base(self, examples, support_sets, references, support_texts, N):
- per_user = []
- for i, ex in enumerate(examples):
+ def _run_base(self, examples, support_sets, references, support_texts, N,
+ jsonl_path, start_idx, existing):
+ per_user = list(existing)
+ for i in range(start_idx, N):
+ ex = examples[i]
t0 = time.time()
prompt = build_query_prompt(ex['query_input'], ex['task'])
pred = generate_greedy(self.wrapper, prompt)
- entry = self._make_per_user_entry(
+ entry = self._make_entry(
ex, references[i], support_texts[i], len(support_sets[i]),
pred, {'gen_time': time.time() - t0}
)
per_user.append(entry)
+ append_jsonl(jsonl_path, entry)
if (i + 1) % 40 == 0:
print(f" {i+1}/{N}")
return per_user
- def _run_prompt_all_k(self, examples, support_sets, references, support_texts, N):
- per_user = []
- for i, (ex, support) in enumerate(zip(examples, support_sets)):
+ def _run_prompt_all_k(self, examples, support_sets, references, support_texts, N,
+ jsonl_path, start_idx, existing):
+ per_user = list(existing)
+ for i in range(start_idx, N):
+ ex, support = examples[i], support_sets[i]
t0 = time.time()
prompt = build_prompt_with_examples(ex['query_input'], support, ex['task'])
pred = generate_greedy(self.wrapper, prompt)
- entry = self._make_per_user_entry(
+ entry = self._make_entry(
ex, references[i], support_texts[i], len(support),
pred, {'gen_time': time.time() - t0}
)
per_user.append(entry)
+ append_jsonl(jsonl_path, entry)
if (i + 1) % 40 == 0:
print(f" {i+1}/{N}")
return per_user
- def _run_bm25_top1(self, examples, support_sets, references, support_texts, N):
- per_user = []
- for i, (ex, support) in enumerate(zip(examples, support_sets)):
+ def _run_bm25_top1(self, examples, support_sets, references, support_texts, N,
+ jsonl_path, start_idx, existing):
+ per_user = list(existing)
+ for i in range(start_idx, N):
+ ex, support = examples[i], support_sets[i]
t0 = time.time()
selected = bm25_select_top1(ex['query_input'], support)
prompt = build_prompt_with_examples(ex['query_input'], selected, ex['task'])
pred = generate_greedy(self.wrapper, prompt)
- entry = self._make_per_user_entry(
+ entry = self._make_entry(
ex, references[i], support_texts[i], len(support),
pred, {'gen_time': time.time() - t0}
)
per_user.append(entry)
+ append_jsonl(jsonl_path, entry)
if (i + 1) % 40 == 0:
print(f" {i+1}/{N}")
return per_user
- def _run_dense_top1(self, examples, support_sets, references, support_texts, N):
+ def _run_dense_top1(self, examples, support_sets, references, support_texts, N,
+ jsonl_path, start_idx, existing):
if self.dense_retriever is None:
self.dense_retriever = DenseRetriever(device='cpu')
- per_user = []
- for i, (ex, support) in enumerate(zip(examples, support_sets)):
+ per_user = list(existing)
+ for i in range(start_idx, N):
+ ex, support = examples[i], support_sets[i]
t0 = time.time()
selected = self.dense_retriever.retrieve_top_k(ex['query_input'], support, k=1)
prompt = build_prompt_with_examples(ex['query_input'], selected, ex['task'])
pred = generate_greedy(self.wrapper, prompt)
- entry = self._make_per_user_entry(
+ entry = self._make_entry(
ex, references[i], support_texts[i], len(support),
pred, {'gen_time': time.time() - t0}
)
per_user.append(entry)
+ append_jsonl(jsonl_path, entry)
if (i + 1) % 40 == 0:
print(f" {i+1}/{N}")
return per_user
- def _run_profile_based(self, examples, support_sets, references, support_texts, N):
- per_user = []
- for i, (ex, support) in enumerate(zip(examples, support_sets)):
+ def _run_profile_based(self, examples, support_sets, references, support_texts, N,
+ jsonl_path, start_idx, existing):
+ per_user = list(existing)
+ for i in range(start_idx, N):
+ ex, support = examples[i], support_sets[i]
t0 = time.time()
- # Step 1: Generate user profile summary from support examples
profile = generate_profile(self.wrapper, support, ex['task'])
- # Step 2: Generate conditioned on profile
prompt = build_profile_conditioned_prompt(ex['query_input'], profile, ex['task'])
pred = generate_greedy(self.wrapper, prompt)
- entry = self._make_per_user_entry(
+ entry = self._make_entry(
ex, references[i], support_texts[i], len(support),
pred, {'gen_time': time.time() - t0},
extra={'profile_summary': profile},
)
per_user.append(entry)
+ append_jsonl(jsonl_path, entry)
if (i + 1) % 40 == 0:
print(f" {i+1}/{N}")
return per_user
- def _run_uph(self, examples, support_sets, references, support_texts, N):
+ def _run_uph(self, examples, support_sets, references, support_texts, N,
+ jsonl_path, start_idx, existing):
d = self.uph_d
H = self.wrapper.hidden_size
uncond = UnconditionalHead(H, d=d, alpha=0.1, basis_seed=42).to(self.device)
@@ -228,8 +334,9 @@ class MethodRunner:
if hasattr(self.wrapper.model.lm_head, 'bias') and self.wrapper.model.lm_head.bias is not None:
lm_head_bias = self.wrapper.model.lm_head.bias.data
- per_user = []
- for i, (ex, support) in enumerate(zip(examples, support_sets)):
+ per_user = list(existing)
+ for i in range(start_idx, N):
+ ex, support = examples[i], support_sets[i]
t0 = time.time()
cached_h = cache_support_hidden_states(self.wrapper, support, ex['task'])
if not cached_h:
@@ -253,23 +360,27 @@ class MethodRunner:
del cached_h, theta
torch.cuda.empty_cache()
- entry = self._make_per_user_entry(
+ entry = self._make_entry(
ex, references[i], support_texts[i], len(support),
pred, {'adapt_time': time.time() - t0}
)
per_user.append(entry)
+ append_jsonl(jsonl_path, entry)
if (i + 1) % 40 == 0:
avg_rl = np.mean([u['metrics']['rougeL'] for u in per_user])
print(f" {i+1}/{N} (avg R-L: {avg_rl:.4f})")
return per_user
def _run_peft(self, examples, support_sets, references, support_texts, N,
- config, lr, desc, steps=30):
+ config, lr, desc, steps=30, jsonl_path=None, start_idx=0, existing=None):
+ if existing is None:
+ existing = []
baseline = PEFTBaseline(self.wrapper, config)
- print(f" Trainable params: {baseline.n_params:,} ({baseline.n_bytes:,} bytes), steps={steps}, lr={lr}")
+ print(f" {desc}: {baseline.n_params:,} params ({baseline.n_bytes:,} bytes), steps={steps}, lr={lr}")
- per_user = []
- for i, (ex, support) in enumerate(zip(examples, support_sets)):
+ per_user = list(existing)
+ for i in range(start_idx, N):
+ ex, support = examples[i], support_sets[i]
t0 = time.time()
pred = baseline.adapt_and_generate(
support_items=support,
@@ -278,12 +389,13 @@ class MethodRunner:
lr=lr, steps=steps,
max_new_tokens=512, min_new_tokens=128,
)
- entry = self._make_per_user_entry(
+ entry = self._make_entry(
ex, references[i], support_texts[i], len(support),
pred, {'adapt_time': time.time() - t0},
extra={'n_params': baseline.n_params, 'n_bytes': baseline.n_bytes},
)
per_user.append(entry)
+ append_jsonl(jsonl_path, entry)
if (i + 1) % 20 == 0:
avg_rl = np.mean([u['metrics']['rougeL'] for u in per_user])
avg_t = np.mean([u['adapt_time'] for u in per_user])
@@ -293,6 +405,8 @@ class MethodRunner:
return per_user
+# ─── Main ────────────────────────────────────────────────────────────
+
def paired_test(scores_a, scores_b, name_a, name_b, metric_name):
a, b = np.array(scores_a), np.array(scores_b)
diff = a - b
@@ -344,7 +458,7 @@ def main():
else:
methods = [m.strip() for m in args.methods.split(',')]
- print(f"=== Unified Eval: {task}_{setting}, N={N}, K={K} ===")
+ print(f"=== Unified Eval: {task}_{setting}, N={N}, K={K}, d={args.d} ===")
print(f"Methods: {methods}")
print(f"Decode: greedy, min=128, max=512")
@@ -361,7 +475,24 @@ def main():
all_per_user = {}
for method in methods:
- per_user = runner.run(method, examples, support_sets, references, support_texts, N)
+ method_dir, method_label = get_method_dir(
+ args.output_dir, task, setting, K, method, args.d
+ )
+
+ # Skip if already complete
+ if is_method_complete(method_dir, N):
+ print(f"\n--- {method} --- COMPLETE (loading from disk)")
+ with open(os.path.join(method_dir, 'per_user.json')) as f:
+ data = json.load(f)
+ all_per_user[method] = data['per_user'][:N]
+ avg_rl = np.mean([u['metrics']['rougeL'] for u in all_per_user[method]])
+ print(f" Mean R-L: {avg_rl:.4f}")
+ continue
+
+ per_user = runner.run(
+ method, examples, support_sets, references, support_texts,
+ N, method_dir, method_label, task, setting, K, args.d,
+ )
all_per_user[method] = per_user
# Summary table
@@ -369,6 +500,8 @@ def main():
print(f"{'Method':<15} {'R-L':<8} {'METEOR':<8} {'SFD_-len':<9} {'Len':<6}")
print("-" * 90)
for method in methods:
+ if method not in all_per_user:
+ continue
pu = all_per_user[method]
rl = np.mean([u['metrics']['rougeL'] for u in pu])
mt = np.mean([u['metrics']['meteor'] for u in pu])
@@ -377,15 +510,15 @@ def main():
print(f"{method:<15} {rl:<8.4f} {mt:<8.4f} {sf:<9.4f} {ln:<6.0f}")
# Significance tests (UPH vs all others)
+ sig_results = {}
if 'uph' in all_per_user:
print("\n" + "=" * 90)
print("Significance (UPH vs each, paired t-test p-value)")
print("=" * 90)
uph_rl = [u['metrics']['rougeL'] for u in all_per_user['uph']]
uph_sf = [u['metrics']['sfd_nolen'] for u in all_per_user['uph']]
- sig_results = {}
for method in methods:
- if method == 'uph':
+ if method == 'uph' or method not in all_per_user:
continue
other_rl = [u['metrics']['rougeL'] for u in all_per_user[method]]
other_sf = [u['metrics']['sfd_nolen'] for u in all_per_user[method]]
@@ -395,43 +528,12 @@ def main():
print(f" vs {method:<12} R-L: diff={rl_t['mean_diff']:+.4f} p={rl_t['t_pval']:.2e} "
f"SFD: diff={sf_t['mean_diff']:+.4f} p={sf_t['t_pval']:.2e}")
- # Save per-method data in separate directories
- # Structure: output_dir/task_setting_K{K}/{method}/per_user.json
- # For d ablation: uph method saved as uph_d{d}
+ # Save summary
exp_dir = os.path.join(args.output_dir, f"{task}_{setting}_K{K}")
- os.makedirs(exp_dir, exist_ok=True)
-
- for method in methods:
- # Use uph_d{d} as directory name for d ablation
- method_label = f"uph_d{args.d}" if method == 'uph' and args.d != 64 else method
- method_dir = os.path.join(exp_dir, method_label)
- os.makedirs(method_dir, exist_ok=True)
-
- pu = all_per_user[method]
- agg_m = {
- 'rougeL': float(np.mean([u['metrics']['rougeL'] for u in pu])),
- 'meteor': float(np.mean([u['metrics']['meteor'] for u in pu])),
- 'sfd_nolen': float(np.mean([u['metrics']['sfd_nolen'] for u in pu])),
- 'avg_len': float(np.mean([u['metrics']['length'] for u in pu])),
- }
-
- save_meta = {
- 'per_user': pu,
- 'aggregate': agg_m,
- 'num_examples': N, 'task': task, 'setting': setting, 'K': K,
- 'method': method_label,
- 'decode_policy': 'greedy, min=128, max=512',
- }
- if method == 'uph':
- save_meta['d'] = args.d
- with open(os.path.join(method_dir, 'per_user.json'), 'w') as f:
- json.dump(save_meta, f, indent=2, default=str)
-
- print(f" Saved: {method_dir}/per_user.json")
-
- # Also save a combined summary (aggregate only, no per-user data)
summary = {}
for method in methods:
+ if method not in all_per_user:
+ continue
pu = all_per_user[method]
summary[method] = {
'rougeL': float(np.mean([u['metrics']['rougeL'] for u in pu])),
@@ -443,13 +545,12 @@ def main():
with open(summary_path, 'w') as f:
json.dump({
'aggregate': summary,
- 'significance': sig_results if 'uph' in all_per_user else {},
+ 'significance': sig_results,
'num_examples': N, 'task': task, 'setting': setting, 'K': K,
'methods': methods,
}, f, indent=2, default=str)
- print(f"\nPer-method data: {exp_dir}/{{method}}/per_user.json")
- print(f"Summary: {summary_path}")
+ print(f"\nSummary: {summary_path}")
if __name__ == '__main__':