summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rw-r--r--scripts/__pycache__/parse_course_prereqs.cpython-312.pycbin0 -> 10335 bytes
-rw-r--r--scripts/analyze_prereqs.py120
-rw-r--r--scripts/build_final_parsed.py107
-rw-r--r--scripts/build_graph_assets.py359
-rw-r--r--scripts/fetch_uiuc_courses.py230
-rw-r--r--scripts/parse_course_prereqs.py190
-rw-r--r--scripts/reduce_and_cluster.py153
-rw-r--r--scripts/scrape.js63
-rw-r--r--scripts/validate_courses.py31
9 files changed, 1190 insertions, 63 deletions
diff --git a/scripts/__pycache__/parse_course_prereqs.cpython-312.pyc b/scripts/__pycache__/parse_course_prereqs.cpython-312.pyc
new file mode 100644
index 0000000..b085808
--- /dev/null
+++ b/scripts/__pycache__/parse_course_prereqs.cpython-312.pyc
Binary files differ
diff --git a/scripts/analyze_prereqs.py b/scripts/analyze_prereqs.py
new file mode 100644
index 0000000..7c580f7
--- /dev/null
+++ b/scripts/analyze_prereqs.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python3
+import argparse
+import json
+import re
+from typing import Any, Dict, Iterable, List
+
+
+COURSE_TOKEN = re.compile(r"\b([A-Z]{2,4})\s*(\d{2,3}[A-Z]?)\b")
+NONE_PATTERNS = [
+ re.compile(r"^\s*none\.?\s*$", re.IGNORECASE),
+ re.compile(r"no prerequisites", re.IGNORECASE),
+ re.compile(r"prerequisite[s]?:\s*none\b", re.IGNORECASE),
+]
+
+
+def is_none_text(text: str) -> bool:
+ t = text.strip()
+ return any(p.search(t) for p in NONE_PATTERNS)
+
+
+def extract_course_refs(text: str) -> List[str]:
+ refs = []
+ for m in COURSE_TOKEN.finditer(text):
+ subject, number = m.group(1), m.group(2)
+ refs.append(f"{subject} {number}")
+ return refs
+
+
+NON_COURSE_KEYWORDS = [
+ r"consent", r"permission", r"approval",
+ r"standing", r"senior", r"junior", r"sophomore", r"freshman",
+ r"major", r"minor", r"program", r"restricted", r"enrollment", r"enrolled",
+ r"gpa", r"grade", r"minimum", r"credit hour", r"credits?", r"hours?",
+ r"registration", r"concurrent", r"co-requisite", r"corequisite",
+ r"department", r"instructor",
+]
+
+def has_non_course_requirements(text: str) -> bool:
+ t = text.lower()
+ return any(re.search(k, t) for k in NON_COURSE_KEYWORDS)
+
+
+def is_course_only(text: str) -> bool:
+ t = text.strip()
+ if has_non_course_requirements(t):
+ return False
+ # Remove course tokens, then see if any nontrivial tokens remain besides basic connectors
+ placeholder = COURSE_TOKEN.sub("COURSE", t)
+ # Remove conjunctions and punctuation
+ simplified = re.sub(r"[(),.;]", " ", placeholder)
+ simplified = re.sub(r"\b(and|or|and/or|either|both|one of|two of|with|credit in)\b", " ", simplified, flags=re.IGNORECASE)
+ # Remove common quantifiers
+ simplified = re.sub(r"\b(at\s+least)\b", " ", simplified, flags=re.IGNORECASE)
+ # Collapse whitespace
+ simplified = re.sub(r"\s+", " ", simplified).strip()
+ # If empty or only words like COURSE left, treat as course-only
+ return simplified == "" or re.fullmatch(r"(COURSE\s*)+", simplified) is not None
+
+
+def analyze(courses: Iterable[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
+ results = {
+ "none": [],
+ "course_only": [],
+ "remaining": [],
+ }
+
+ for c in courses:
+ prereq = c.get("prerequisites") or ""
+ if not prereq.strip():
+ results["none"].append(c)
+ continue
+ if is_none_text(prereq):
+ results["none"].append(c)
+ continue
+ if is_course_only(prereq):
+ results["course_only"].append({
+ "index": c.get("index"),
+ "name": c.get("name"),
+ "prerequisites": prereq,
+ "courses": extract_course_refs(prereq),
+ })
+ else:
+ results["remaining"].append({
+ "index": c.get("index"),
+ "name": c.get("name"),
+ "prerequisites": prereq,
+ })
+ return results
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="Analyze UIUC course prerequisite text")
+ ap.add_argument("input", default="data/courses.json", nargs="?", help="Input courses JSON array")
+ ap.add_argument("--outdir", default="data/analysis", help="Output directory")
+ args = ap.parse_args()
+
+ with open(args.input, "r", encoding="utf-8") as f:
+ data = json.load(f)
+
+ res = analyze(data)
+
+ import os
+ os.makedirs(args.outdir, exist_ok=True)
+ with open(os.path.join(args.outdir, "none.json"), "w", encoding="utf-8") as f:
+ json.dump(res["none"], f, ensure_ascii=False, indent=2)
+ with open(os.path.join(args.outdir, "course_only.json"), "w", encoding="utf-8") as f:
+ json.dump(res["course_only"], f, ensure_ascii=False, indent=2)
+ with open(os.path.join(args.outdir, "remaining.json"), "w", encoding="utf-8") as f:
+ json.dump(res["remaining"], f, ensure_ascii=False, indent=2)
+
+ print(f"none: {len(res['none'])}")
+ print(f"course_only: {len(res['course_only'])}")
+ print(f"remaining: {len(res['remaining'])}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
+
+
diff --git a/scripts/build_final_parsed.py b/scripts/build_final_parsed.py
new file mode 100644
index 0000000..0d34a03
--- /dev/null
+++ b/scripts/build_final_parsed.py
@@ -0,0 +1,107 @@
+#!/usr/bin/env python3
+import argparse
+import json
+import os
+import re
+import sys
+from typing import Any, Dict, List
+
+
+# Ensure we can import sibling script
+SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
+if SCRIPT_DIR not in sys.path:
+ sys.path.insert(0, SCRIPT_DIR)
+
+from parse_course_prereqs import parse_prereq_text # type: ignore
+
+
+COURSE_TOKEN = re.compile(r"\b([A-Z]{2,4})\s*(\d{2,3}[A-Z]?)\b")
+CLAUSE_SPLIT_RE = re.compile(r";+")
+
+NON_COURSE_KEYWORDS = [
+ r"consent", r"permission", r"approval",
+ r"standing", r"senior", r"junior", r"sophomore", r"freshman",
+ r"major", r"minor", r"program", r"restricted", r"enrollment", r"enrolled",
+ r"gpa", r"grade", r"minimum", r"credit hour", r"credits?", r"hours?",
+ r"registration", r"concurrent", r"co-requisite", r"corequisite",
+ r"department", r"instructor",
+]
+
+
+def has_course_token(s: str) -> bool:
+ return COURSE_TOKEN.search(s) is not None
+
+
+def detect_flags(text: str) -> List[str]:
+ t = text.lower()
+ flags: List[str] = []
+ mapping = [
+ (r"consent|permission|approval", "CONSENT"),
+ (r"standing|senior|junior|sophomore|freshman", "STANDING"),
+ (r"major|minor|program|restricted|enrollment|enrolled", "MAJOR_OR_PROGRAM"),
+ (r"gpa|grade|minimum", "GRADE_OR_GPA"),
+ (r"concurrent|co-requisite|corequisite", "COREQ_ALLOWED"),
+ (r"department|instructor", "DEPT_OR_INSTRUCTOR"),
+ ]
+ for pat, name in mapping:
+ if re.search(pat, t):
+ flags.append(name)
+ return sorted(set(flags))
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="Build final parsed JSON for all courses")
+ ap.add_argument("input", nargs="?", default="data/courses.json", help="Input courses.json")
+ ap.add_argument("--output", default="data/courses_parsed.json", help="Output JSON path")
+ args = ap.parse_args()
+
+ with open(args.input, "r", encoding="utf-8") as f:
+ courses = json.load(f)
+
+ out: List[Dict[str, Any]] = []
+ stats = {"total": 0, "hard_nonempty": 0, "coreq_nonempty": 0}
+ for c in courses:
+ stats["total"] += 1
+ raw = (c.get("prerequisites") or "").strip()
+ ast = parse_prereq_text(raw)
+
+ hard = ast.get("hard") if isinstance(ast, dict) else {"op": "EMPTY"}
+ coreq_ok = ast.get("coreq_ok") if isinstance(ast, dict) else {"op": "EMPTY"}
+ if hard and hard.get("op") != "EMPTY":
+ stats["hard_nonempty"] += 1
+ if coreq_ok and coreq_ok.get("op") != "EMPTY":
+ stats["coreq_nonempty"] += 1
+
+ # Capture non-course clauses for reference
+ notes: List[str] = []
+ if raw:
+ clauses = [s.strip() for s in CLAUSE_SPLIT_RE.split(raw) if s.strip()]
+ for cl in clauses:
+ if not has_course_token(cl) or detect_flags(cl):
+ notes.append(cl)
+
+ out.append({
+ "index": c.get("index"),
+ "name": c.get("name"),
+ "description": c.get("description"),
+ "prerequisites": {
+ "raw": raw or None,
+ "hard": hard,
+ "coreq_ok": coreq_ok,
+ "flags": detect_flags(raw) if raw else [],
+ "notes": notes,
+ },
+ })
+
+ os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
+ with open(args.output, "w", encoding="utf-8") as f:
+ json.dump(out, f, ensure_ascii=False, indent=2)
+
+ print(json.dumps(stats))
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
+
+
diff --git a/scripts/build_graph_assets.py b/scripts/build_graph_assets.py
new file mode 100644
index 0000000..6f27f43
--- /dev/null
+++ b/scripts/build_graph_assets.py
@@ -0,0 +1,359 @@
+#!/usr/bin/env python3
+import argparse
+import json
+import math
+import os
+from typing import Any, Dict, List, Tuple
+
+import networkx as nx
+
+
+def collect_courses_from_ast(ast: Dict[str, Any]) -> List[str]:
+ out: List[str] = []
+ def walk(node: Any) -> None:
+ if not isinstance(node, dict):
+ return
+ op = node.get("op")
+ if op == "COURSE" and node.get("course"):
+ out.append(node["course"])
+ for child in node.get("items", []) or []:
+ walk(child)
+ walk(ast)
+ # Unique order-preserving
+ seen = set()
+ uniq: List[str] = []
+ for c in out:
+ if c not in seen:
+ seen.add(c)
+ uniq.append(c)
+ return uniq
+
+
+def build_graph(courses: List[Dict[str, Any]], include_coreq: bool = True) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
+ nodes_map: Dict[str, Dict[str, Any]] = {}
+ edges: List[Dict[str, Any]] = []
+
+ def ensure_node(course_id: str, label: str = None) -> None:
+ if course_id not in nodes_map:
+ nodes_map[course_id] = {"id": course_id, "label": label or course_id, "subject": course_id.split()[0] if ' ' in course_id else None}
+
+ for c in courses:
+ idx = c.get("index")
+ name = c.get("name")
+ ensure_node(idx, name)
+ pr = c.get("prerequisites") or {}
+ hard = pr.get("hard") or {"op": "EMPTY"}
+ coreq = pr.get("coreq_ok") or {"op": "EMPTY"}
+ for pre in collect_courses_from_ast(hard):
+ ensure_node(pre)
+ edges.append({"source": pre, "target": idx, "kind": "hard"})
+ if include_coreq:
+ for pre in collect_courses_from_ast(coreq):
+ ensure_node(pre)
+ edges.append({"source": pre, "target": idx, "kind": "coreq"})
+
+ nodes = [ {"id": n["id"], "label": n["label"], "subject": n.get("subject")} for n in nodes_map.values() ]
+ return nodes, edges
+
+
+def compute_positions(
+ nodes: List[Dict[str, Any]],
+ edges: List[Dict[str, Any]],
+ seed: int = 42,
+ layout: str = "spring",
+ iterations: int = 100,
+ component_wise: bool = False,
+ # SMACOF (MDS) options
+ mds_backend: str = "auto", # auto|sklearn|cuml
+ mds_max_iter: int = 300,
+ mds_eps: float = 1e-3,
+ mds_verbose: int = 1,
+ # Overlap resolution options
+ resolve_overlap: bool = False,
+ node_size_px: float = 6.0,
+ min_dist_mul: float = 1.5,
+ overlap_max_iters: int = 60,
+ overlap_step: float = 0.5,
+) -> Dict[str, Dict[str, float]]:
+ # Use a force-directed layout over an undirected graph for a compact web-like layout
+ G = nx.Graph()
+ for n in nodes:
+ G.add_node(n["id"])
+ for e in edges:
+ G.add_edge(e["source"], e["target"]) # undirected for layout
+
+ def layout_graph(graph: nx.Graph) -> Dict[str, Tuple[float, float]]:
+ if layout == "drl":
+ try:
+ import igraph as ig # type: ignore
+ except Exception as e:
+ raise RuntimeError("python-igraph is required for DRL/OpenOrd-like layout; pip install python-igraph") from e
+ nodes_list = list(graph.nodes())
+ index_of = {v: i for i, v in enumerate(nodes_list)}
+ g = ig.Graph()
+ g.add_vertices(len(nodes_list))
+ g.vs["name"] = nodes_list
+ # unique edges only
+ edge_idx = set()
+ for u, v in graph.edges():
+ iu, iv = index_of[u], index_of[v]
+ if iu == iv:
+ continue
+ a, b = (iu, iv) if iu < iv else (iv, iu)
+ if (a, b) not in edge_idx:
+ edge_idx.add((a, b))
+ if edge_idx:
+ g.add_edges(list(edge_idx))
+ # DRL (OpenOrd-style) is good for community separation
+ lay = g.layout_drl()
+ coords = [[float(x), float(y)] for x, y in lay]
+ return {nodes_list[i]: (coords[i][0], coords[i][1]) for i in range(len(nodes_list))}
+ if layout == "fa2":
+ try:
+ from fa2 import ForceAtlas2 # type: ignore
+ except Exception as e:
+ raise RuntimeError("fa2 is required for ForceAtlas2 layout; pip install fa2") from e
+ fa = ForceAtlas2(
+ # LinLog energy model emphasizes community separation
+ linLogMode=True,
+ gravity=1.0,
+ strongGravityMode=True,
+ scalingRatio=2.0,
+ outboundAttractionDistribution=False,
+ barnesHutOptimize=True,
+ barnesHutTheta=1.2,
+ jitterTolerance=1.0,
+ edgeWeightInfluence=1.0,
+ adjustSizes=False,
+ verbose=False,
+ )
+ pos = fa.forceatlas2_networkx_layout(graph, pos=None, iterations=max(300, iterations))
+ return {n: (float(xy[0]), float(xy[1])) for n, xy in pos.items()}
+ if layout == "smacof":
+ try:
+ import numpy as np
+ except Exception as e:
+ raise RuntimeError("NumPy is required for smacof layout") from e
+
+ nodes_list = list(graph.nodes())
+ n = len(nodes_list)
+ if n == 0:
+ return {}
+ if n == 1:
+ return {nodes_list[0]: (0.0, 0.0)}
+
+ # Compute all-pairs shortest path distances (undirected)
+ index_of = {v: i for i, v in enumerate(nodes_list)}
+ D = np.full((n, n), 0.0, dtype=np.float32)
+ large = 1e6
+ for i in range(n):
+ for j in range(n):
+ if i != j:
+ D[i, j] = large
+ for src, lengths in nx.all_pairs_shortest_path_length(graph):
+ i = index_of[src]
+ for dst, d in lengths.items():
+ j = index_of[dst]
+ if i != j:
+ D[i, j] = float(d)
+ D[j, i] = float(d)
+
+ # Replace remaining large distances with max finite distance * 1.5
+ finite = D[D < large]
+ maxd = float(finite.max()) if finite.size else 1.0
+ D[D >= large] = maxd * 1.5
+
+ backend_used = None
+ coords = None
+ if mds_backend in ("auto", "cuml"):
+ try:
+ from cuml.manifold import MDS as cuMDS # type: ignore
+ backend_used = "cuml"
+ print("[smacof] using cuML MDS (GPU) ...")
+ m = cuMDS(n_components=2, dissimilarity='precomputed', max_iter=mds_max_iter, random_state=seed, verbose=bool(mds_verbose))
+ coords = m.fit_transform(D)
+ try:
+ coords = coords.get() # convert cupy to numpy if needed
+ except Exception:
+ pass
+ except Exception:
+ if mds_backend == "cuml":
+ raise
+ if coords is None:
+ from sklearn.manifold import MDS
+ backend_used = "sklearn"
+ print("[smacof] using scikit-learn MDS (CPU) ...")
+ # verbose prints per-iteration stress
+ mds = MDS(n_components=2, dissimilarity='precomputed', metric=True, random_state=seed, n_init=1, max_iter=mds_max_iter, eps=mds_eps, verbose=mds_verbose)
+ coords = mds.fit_transform(D)
+ print(f"[smacof] backend={backend_used} done. shape={coords.shape}")
+
+ return {nodes_list[i]: (float(coords[i, 0]), float(coords[i, 1])) for i in range(n)}
+
+ if layout == "random":
+ return nx.random_layout(graph, dim=2, seed=seed)
+ if layout == "kk":
+ return nx.kamada_kawai_layout(graph, dim=2)
+ if layout == "none":
+ return {n: (0.0, 0.0) for n in graph.nodes}
+ # default: spring
+ try:
+ return nx.spring_layout(graph, seed=seed, dim=2, iterations=iterations)
+ except ModuleNotFoundError:
+ # SciPy not installed – use kamada_kawai instead
+ return nx.kamada_kawai_layout(graph, dim=2)
+ except Exception:
+ return nx.kamada_kawai_layout(graph, dim=2)
+
+ if component_wise:
+ pos_raw: Dict[str, Tuple[float, float]] = {}
+ for comp in nx.connected_components(G):
+ sub = G.subgraph(comp)
+ local = layout_graph(sub)
+ pos_raw.update(local)
+ else:
+ pos_raw = layout_graph(G)
+
+ # Normalize positions to a fixed range for consistent initial viewport
+ xs = [p[0] for p in pos_raw.values()]
+ ys = [p[1] for p in pos_raw.values()]
+ min_x, max_x = (min(xs), max(xs)) if xs else (0.0, 1.0)
+ min_y, max_y = (min(ys), max(ys)) if ys else (0.0, 1.0)
+ span_x = max(max_x - min_x, 1e-6)
+ span_y = max(max_y - min_y, 1e-6)
+
+ # Scale to a large square canvas by default; for SMACOF earlier we used disk mapping.
+ # Here keep linear scaling to preserve community geometry (good for ForceAtlas2/SMACOF alike).
+ scale = 6000.0
+ out: Dict[str, Dict[str, float]] = {}
+ for node_id, (x, y) in pos_raw.items():
+ x01 = (x - min_x) / span_x # 0..1
+ y01 = (y - min_y) / span_y # 0..1
+ out[node_id] = {"x": (x01 - 0.5) * scale, "y": (y01 - 0.5) * scale}
+
+ if resolve_overlap and out:
+ # Simple grid-based overlap removal with minimal displacement
+ target_dist = max(1.0, node_size_px * min_dist_mul)
+ cell = target_dist
+ node_ids = list(out.keys())
+ for _ in range(overlap_max_iters):
+ # Build spatial hash
+ grid: Dict[Tuple[int,int], List[str]] = {}
+ for nid in node_ids:
+ p = out[nid]
+ gx = int(math.floor(p["x"] / cell))
+ gy = int(math.floor(p["y"] / cell))
+ grid.setdefault((gx, gy), []).append(nid)
+
+ moved = 0.0
+ disp: Dict[str, Tuple[float,float]] = {}
+ for nid in node_ids:
+ px = out[nid]["x"]; py = out[nid]["y"]
+ gx = int(math.floor(px / cell)); gy = int(math.floor(py / cell))
+ # check neighbors cells
+ for dx in (-1,0,1):
+ for dy in (-1,0,1):
+ cell_nodes = grid.get((gx+dx, gy+dy), [])
+ for mid in cell_nodes:
+ if mid <= nid: # avoid double count and self
+ continue
+ qx = out[mid]["x"]; qy = out[mid]["y"]
+ vx = qx - px; vy = qy - py
+ dist = math.hypot(vx, vy)
+ if dist < target_dist and dist > 1e-6:
+ overlap = target_dist - dist
+ ux = vx / dist; uy = vy / dist
+ mx = -ux * (overlap * 0.5)
+ my = -uy * (overlap * 0.5)
+ disp[nid] = (disp.get(nid, (0.0,0.0))[0] + mx, disp.get(nid, (0.0,0.0))[1] + my)
+ disp[mid] = (disp.get(mid, (0.0,0.0))[0] - mx, disp.get(mid, (0.0,0.0))[1] - my)
+ if not disp:
+ break
+ for nid, (dx, dy) in disp.items():
+ out[nid]["x"] += dx * overlap_step
+ out[nid]["y"] += dy * overlap_step
+ moved += abs(dx) + abs(dy)
+ if moved < 1e-3:
+ break
+ return out
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="Build slim graph assets and preset positions")
+ ap.add_argument("input", nargs="?", default="data/courses_parsed.json", help="Input parsed courses JSON")
+ ap.add_argument("--graph-out", default="data/graph.json", help="Output graph JSON (nodes, edges)")
+ ap.add_argument("--pos-out", default="data/positions.json", help="Output positions JSON (node -> {x,y})")
+ ap.add_argument("--pos-out-alt", nargs='*', default=[], help="Additional positions to generate in the form layout:name (e.g., kk:positions_kk.json spring:positions_spring.json)")
+ ap.add_argument("--hard-only", action="store_true", help="Only include hard prerequisite edges (exclude coreq)")
+ ap.add_argument("--layout", choices=["spring","kk","random","none","smacof","fa2","drl"], default="fa2", help="Layout algorithm for positions")
+ ap.add_argument("--iterations", type=int, default=60, help="Iterations for spring layout (lower is faster)")
+ ap.add_argument("--component-wise", action="store_true", help="Layout each connected component separately (can be faster)")
+ # Overlap options
+ ap.add_argument("--resolve-overlap", action="store_true", help="Run overlap removal post-process")
+ ap.add_argument("--node-size", type=float, default=6.0, help="Node visual diameter in px (for spacing)")
+ ap.add_argument("--min-dist-mul", type=float, default=1.5, help="Minimum center distance multiplier of node size")
+ # SMACOF options
+ ap.add_argument("--mds-backend", choices=["auto","sklearn","cuml"], default="auto", help="Backend for SMACOF (stress majorization)")
+ ap.add_argument("--mds-max-iter", type=int, default=300, help="Max iterations for SMACOF")
+ ap.add_argument("--mds-eps", type=float, default=1e-3, help="Convergence tolerance for SMACOF")
+ ap.add_argument("--mds-verbose", type=int, default=1, help="Verbosity for SMACOF (>=1 prints per-iteration stress)")
+ args = ap.parse_args()
+
+ with open(args.input, "r", encoding="utf-8") as f:
+ courses = json.load(f)
+
+ nodes, edges = build_graph(courses, include_coreq=not args.hard_only)
+
+ os.makedirs(os.path.dirname(args.graph_out) or ".", exist_ok=True)
+ with open(args.graph_out, "w", encoding="utf-8") as f:
+ json.dump({"nodes": nodes, "edges": edges}, f, ensure_ascii=False, indent=2)
+
+ print(f"building positions: nodes={len(nodes)} edges={len(edges)} layout={args.layout} iter={args.iterations} component_wise={args.component_wise}")
+ pos = compute_positions(
+ nodes, edges,
+ layout=args.layout,
+ iterations=args.iterations,
+ component_wise=args.component_wise,
+ mds_backend=args.mds_backend,
+ mds_max_iter=args.mds_max_iter,
+ mds_eps=args.mds_eps,
+ mds_verbose=args.mds_verbose,
+ resolve_overlap=args.resolve_overlap,
+ node_size_px=args.node_size,
+ min_dist_mul=args.min_dist_mul,
+ )
+ with open(args.pos_out, "w", encoding="utf-8") as f:
+ json.dump(pos, f, ensure_ascii=False, indent=2)
+
+ # Optionally generate additional layouts
+ for spec in args.pos_out_alt:
+ try:
+ lay, path = spec.split(":", 1)
+ except ValueError:
+ print(f"[warn] invalid --pos-out-alt spec: {spec}")
+ continue
+ try:
+ alt = compute_positions(
+ nodes, edges,
+ layout=lay,
+ iterations=args.iterations,
+ component_wise=args.component_wise,
+ mds_backend=args.mds_backend,
+ mds_max_iter=args.mds_max_iter,
+ mds_eps=args.mds_eps,
+ mds_verbose=args.mds_verbose,
+ )
+ with open(path, "w", encoding="utf-8") as f:
+ json.dump(alt, f, ensure_ascii=False, indent=2)
+ print(f"wrote alt positions: {lay} -> {path}")
+ except Exception as e:
+ print(f"[warn] failed alt positions {lay}: {e}")
+
+ print(f"nodes: {len(nodes)}, edges: {len(edges)}, positions: {len(pos)}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
+
+
diff --git a/scripts/fetch_uiuc_courses.py b/scripts/fetch_uiuc_courses.py
new file mode 100644
index 0000000..0f38fdd
--- /dev/null
+++ b/scripts/fetch_uiuc_courses.py
@@ -0,0 +1,230 @@
+#!/usr/bin/env python3
+import argparse
+import concurrent.futures
+import json
+import re
+import sys
+import time
+from dataclasses import dataclass, asdict
+from typing import Dict, List, Optional, Tuple
+
+import requests
+from xml.etree import ElementTree as ET
+
+
+BASE_URL = "https://courses.illinois.edu/cisapp/explorer/catalog"
+
+
+@dataclass
+class CourseRecord:
+ index: str
+ name: Optional[str]
+ description: Optional[str]
+ prerequisites: Optional[str]
+
+
+def parse_xml(content: bytes) -> ET.Element:
+ try:
+ return ET.fromstring(content)
+ except ET.ParseError as exc:
+ raise RuntimeError(f"Failed to parse XML: {exc}")
+
+
+def fetch(session: requests.Session, url: str) -> bytes:
+ resp = session.get(url, timeout=30)
+ if resp.status_code != 200:
+ raise RuntimeError(f"GET {url} -> {resp.status_code}")
+ return resp.content
+
+
+def get_subject_ids(session: requests.Session, year: str, term: str) -> List[str]:
+ url = f"{BASE_URL}/{year}/{term}.xml"
+ root = parse_xml(fetch(session, url))
+ subjects = []
+ for node in root.findall(".//subject"):
+ node_id = node.attrib.get("id")
+ if node_id:
+ subjects.append(node_id)
+ return subjects
+
+
+def get_course_numbers_for_subject(session: requests.Session, year: str, term: str, subject: str) -> List[str]:
+ url = f"{BASE_URL}/{year}/{term}/{subject}.xml"
+ root = parse_xml(fetch(session, url))
+ courses = []
+ for node in root.findall(".//course"):
+ node_id = node.attrib.get("id")
+ if node_id:
+ courses.append(node_id)
+ return courses
+
+
+def extract_prerequisite_text(root: ET.Element) -> Optional[str]:
+ # Prefer explicitly labeled prerequisite elements if present
+ for tag in ["prerequisites", "prerequisite", "Prerequisites", "Prerequisite"]:
+ found = root.find(f".//{tag}")
+ if found is not None and (found.text and found.text.strip()):
+ return found.text.strip()
+
+ # Fallback: courseSectionInformation often contains "Prerequisite:" free text
+ csi = root.find(".//courseSectionInformation")
+ if csi is not None and csi.text:
+ text = csi.text.strip()
+ match = re.search(r"Prerequisite[s]?:\s*(.*)$", text, flags=re.IGNORECASE | re.DOTALL)
+ if match:
+ return match.group(1).strip()
+
+ # As a last resort, scan description for a Prerequisite sentence
+ desc = root.find(".//description")
+ if desc is not None and desc.text:
+ text = desc.text.strip()
+ match = re.search(r"Prerequisite[s]?:\s*(.*)$", text, flags=re.IGNORECASE | re.DOTALL)
+ if match:
+ return match.group(1).strip()
+
+ return None
+
+
+def get_course_details(session: requests.Session, year: str, term: str, subject: str, course_number: str) -> CourseRecord:
+ url = f"{BASE_URL}/{year}/{term}/{subject}/{course_number}.xml"
+ root = parse_xml(fetch(session, url))
+
+ # Title/name may be in <label> or <title>
+ name = None
+ label_node = root.find(".//label")
+ if label_node is not None and label_node.text:
+ name = label_node.text.strip()
+ else:
+ title_node = root.find(".//title")
+ if title_node is not None and title_node.text:
+ name = title_node.text.strip()
+
+ description = None
+ desc_node = root.find(".//description")
+ if desc_node is not None and desc_node.text:
+ description = desc_node.text.strip()
+
+ prerequisites_text = extract_prerequisite_text(root)
+
+ return CourseRecord(
+ index=f"{subject} {course_number}",
+ name=name,
+ description=description,
+ prerequisites=prerequisites_text,
+ )
+
+
+def try_year_term(session: requests.Session, year: str, term: str) -> bool:
+ url = f"{BASE_URL}/{year}/{term}.xml"
+ resp = session.get(url, timeout=15)
+ return resp.status_code == 200
+
+
+def detect_default_year_term(session: requests.Session) -> Tuple[str, str]:
+ # Try a few common combinations in likely order
+ current_year = time.gmtime().tm_year
+ candidate_terms = ["fall", "summer", "spring", "winter"]
+ candidates: List[Tuple[str, str]] = []
+ # Current year candidates first
+ for term in candidate_terms:
+ candidates.append((str(current_year), term))
+ # Then previous year
+ for term in candidate_terms:
+ candidates.append((str(current_year - 1), term))
+
+ for year, term in candidates:
+ if try_year_term(session, year, term):
+ return year, term
+ # Fallback to a known historical term
+ return "2024", "fall"
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="Fetch UIUC course catalog into JSON")
+ parser.add_argument("--year", default=None, help="Catalog year, e.g. 2025")
+ parser.add_argument("--term", default=None, help="Term, e.g. fall|spring|summer|winter")
+ parser.add_argument("--subject", default=None, help="Limit to a single subject (e.g., CS)")
+ parser.add_argument("--max-workers", type=int, default=12, help="Max concurrent requests")
+ parser.add_argument("--output", default="data/courses.json", help="Output JSON path")
+ parser.add_argument("--sleep", type=float, default=0.0, help="Optional per-request sleep seconds")
+ args = parser.parse_args()
+
+ session = requests.Session()
+ session.headers.update({"Accept": "application/xml, text/xml;q=0.9, */*;q=0.8", "User-Agent": "uiuc-course-scraper/1.0"})
+
+ year = args.year
+ term = args.term
+ if not year or not term:
+ year, term = detect_default_year_term(session)
+ print(f"[info] Using detected catalog: {year} {term}")
+ else:
+ print(f"[info] Using catalog: {year} {term}")
+
+ try:
+ subject_ids = [args.subject] if args.subject else get_subject_ids(session, year, term)
+ except Exception as exc:
+ print(f"[error] Failed to get subjects for {year} {term}: {exc}")
+ return 1
+
+ print(f"[info] Found {len(subject_ids)} subject(s)")
+
+ all_course_records: List[CourseRecord] = []
+
+ def process_subject(subject_id: str) -> List[CourseRecord]:
+ try:
+ if args.sleep:
+ time.sleep(args.sleep)
+ course_numbers = get_course_numbers_for_subject(session, year, term, subject_id)
+ except Exception as exc_subj:
+ print(f"[warn] Failed to list courses for {subject_id}: {exc_subj}")
+ return []
+
+ subject_records: List[CourseRecord] = []
+ for course_number in course_numbers:
+ try:
+ if args.sleep:
+ time.sleep(args.sleep)
+ record = get_course_details(session, year, term, subject_id, course_number)
+ subject_records.append(record)
+ except Exception as exc_course:
+ print(f"[warn] Failed details for {subject_id} {course_number}: {exc_course}")
+ continue
+ return subject_records
+
+ with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor:
+ future_to_subject: Dict[concurrent.futures.Future, str] = {}
+ for subject_id in subject_ids:
+ future = executor.submit(process_subject, subject_id)
+ future_to_subject[future] = subject_id
+ for future in concurrent.futures.as_completed(future_to_subject):
+ subject_id = future_to_subject[future]
+ try:
+ subject_records = future.result()
+ all_course_records.extend(subject_records)
+ print(f"[info] {subject_id}: {len(subject_records)} course(s)")
+ except Exception as exc:
+ print(f"[warn] Subject {subject_id} failed: {exc}")
+
+ # Sort deterministically
+ all_course_records.sort(key=lambda r: (r.index.split()[0], int(re.sub(r"[^0-9]", "", r.index.split()[1])) if len(r.index.split()) > 1 and re.search(r"\d", r.index.split()[1]) else r.index))
+
+ # Serialize to JSON array of objects
+ output_path = args.output
+ output_dir = output_path.rsplit("/", 1)[0] if "/" in output_path else "."
+ try:
+ import os
+ os.makedirs(output_dir, exist_ok=True)
+ except Exception:
+ pass
+
+ with open(output_path, "w", encoding="utf-8") as f:
+ json.dump([asdict(r) for r in all_course_records], f, ensure_ascii=False, indent=2)
+
+ print(f"[done] Wrote {len(all_course_records)} courses -> {output_path}")
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
+
+
diff --git a/scripts/parse_course_prereqs.py b/scripts/parse_course_prereqs.py
new file mode 100644
index 0000000..609303c
--- /dev/null
+++ b/scripts/parse_course_prereqs.py
@@ -0,0 +1,190 @@
+#!/usr/bin/env python3
+import argparse
+import json
+import re
+from typing import Any, Dict, List, Optional, Tuple
+
+
+COURSE_RE = re.compile(r"\b([A-Z]{2,4})\s*(\d{2,3}[A-Z]?)\b")
+
+# Clause boundaries: semicolons are strong AND separators at UIUC
+CLAUSE_SPLIT_RE = re.compile(r";+")
+
+
+def find_course_spans(text: str) -> List[Tuple[str, int, int]]:
+ spans: List[Tuple[str, int, int]] = []
+ for m in COURSE_RE.finditer(text):
+ course = f"{m.group(1)} {m.group(2)}"
+ spans.append((course, m.start(), m.end()))
+ return spans
+
+
+def normalize_space(s: str) -> str:
+ return re.sub(r"\s+", " ", s).strip()
+
+
+def parse_clause_into_group(clause: str) -> Dict[str, Any]:
+ clause_clean = normalize_space(clause)
+ courses = find_course_spans(clause_clean)
+ if not courses:
+ return {"op": "EMPTY"}
+
+ # Detect "one of" window: treat everything until boundary as OR
+ one_of_match = re.search(r"\b(one of|any of)\b", clause_clean, flags=re.IGNORECASE)
+ if one_of_match:
+ # Take all courses in the clause as OR if they appear after the phrase
+ start_idx = one_of_match.end()
+ or_list = [c for (c, s, e) in courses if s >= start_idx]
+ if or_list:
+ # Also include any course tokens that appear BEFORE the one-of phrase as separate AND terms
+ prior_courses = [c for (c, s, e) in courses if s < start_idx]
+ items: List[Dict[str, Any]] = []
+ for c in prior_courses:
+ items.append({"op": "COURSE", "course": c})
+ items.append({"op": "OR", "items": [{"op": "COURSE", "course": c} for c in or_list]})
+ return {"op": "AND", "items": items} if len(items) > 1 else items[0]
+
+ # Otherwise, infer connectors between adjacent course tokens
+ # Build pairwise connectors from text between tokens
+ connectors: List[str] = []
+ for i in range(len(courses) - 1):
+ _, _, end_prev = courses[i]
+ _, start_next, _ = courses[i + 1]
+ between = clause_clean[end_prev:start_next].lower()
+ if "and/or" in between:
+ connectors.append("OR")
+ elif re.search(r"\band\b", between):
+ connectors.append("AND")
+ elif re.search(r"\bor\b", between):
+ connectors.append("OR")
+ else:
+ # Default: comma-only separation; lean towards OR if followed by or earlier in span
+ if "," in between:
+ connectors.append("LIST")
+ else:
+ connectors.append("UNKNOWN")
+
+ course_items = [{"op": "COURSE", "course": c} for (c, _, _) in courses]
+
+ # If there is any explicit AND, group AND chunks; otherwise treat as OR if any OR, else LIST->OR
+ if "AND" in connectors and "OR" not in connectors:
+ return {"op": "AND", "items": course_items}
+ if "OR" in connectors and "AND" not in connectors:
+ return {"op": "OR", "items": course_items}
+ if "AND" not in connectors and "OR" not in connectors:
+ # All LIST/UNKNOWN: choose OR as a safer default for admissions like "A, B, or C" where last token has or
+ if any(k == "LIST" for k in connectors):
+ return {"op": "OR", "items": course_items}
+ return {"op": "AND", "items": course_items} if len(course_items) > 1 else course_items[0]
+
+ # Mixed AND and OR: build small AST by splitting on commas and respecting local conjunctions
+ # Simple heuristic: split clause by commas, parse each segment for explicit AND/OR
+ segments = [normalize_space(s) for s in re.split(r",+", clause_clean) if normalize_space(s)]
+ subitems: List[Dict[str, Any]] = []
+ for seg in segments:
+ seg_courses = find_course_spans(seg)
+ if not seg_courses:
+ continue
+ if re.search(r"\band\b", seg.lower()) and not re.search(r"\bor\b", seg.lower()):
+ subitems.append({"op": "AND", "items": [{"op": "COURSE", "course": c} for (c, _, _) in seg_courses]})
+ elif re.search(r"\bor\b", seg.lower()) and not re.search(r"\band\b", seg.lower()):
+ subitems.append({"op": "OR", "items": [{"op": "COURSE", "course": c} for (c, _, _) in seg_courses]})
+ else:
+ # ambiguous within segment; default to OR
+ subitems.append({"op": "OR", "items": [{"op": "COURSE", "course": c} for (c, _, _) in seg_courses]})
+
+ if not subitems:
+ subitems = [{"op": "COURSE", "course": c} for (c, _, _) in courses]
+
+ # Combine segments with AND if split by semicolons at higher level; here stay at clause level
+ # For mixed case within one clause, default to OR-over-segments unless explicit AND dominates
+ and_count = sum(1 for s in subitems if s.get("op") == "AND")
+ or_count = sum(1 for s in subitems if s.get("op") == "OR")
+ if and_count and not or_count:
+ return {"op": "AND", "items": subitems}
+ if or_count and not and_count:
+ return {"op": "OR", "items": subitems}
+ # Mixed: wrap in AND of items that are groups; treat OR groups as single requirements groups
+ return {"op": "AND", "items": subitems}
+
+
+def parse_prereq_text(text: str) -> Dict[str, Any]:
+ # Split by semicolons into top-level AND clauses
+ clauses = [normalize_space(c) for c in CLAUSE_SPLIT_RE.split(text) if normalize_space(c)]
+ if not clauses:
+ return {"hard": {"op": "EMPTY"}, "coreq_ok": {"op": "EMPTY"}}
+
+ def is_coreq_clause(c: str) -> bool:
+ c_low = c.lower()
+ return (
+ ("concurrent" in c_low) or
+ ("co-requisite" in c_low) or
+ ("corequisite" in c_low) or
+ re.search(r"credit\s+or\s+concurrent\s+(enrollment|registration)\s+in", c_low) is not None
+ )
+
+ hard_groups: List[Dict[str, Any]] = []
+ coreq_groups: List[Dict[str, Any]] = []
+ for clause in clauses:
+ grp = parse_clause_into_group(clause)
+ if grp.get("op") == "EMPTY":
+ continue
+ if is_coreq_clause(clause):
+ coreq_groups.append(grp)
+ else:
+ hard_groups.append(grp)
+
+ def fold(groups: List[Dict[str, Any]]) -> Dict[str, Any]:
+ if not groups:
+ return {"op": "EMPTY"}
+ if len(groups) == 1:
+ return groups[0]
+ return {"op": "AND", "items": groups}
+
+ return {"hard": fold(hard_groups), "coreq_ok": fold(coreq_groups)}
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="Parse course-only prerequisite text into AND/OR groups")
+ ap.add_argument("input", default="data/analysis/course_only.json", nargs="?", help="Input JSON array of course-only prereqs")
+ ap.add_argument("--output", default="data/parsed/course_only_parsed.json", help="Output JSON path")
+ ap.add_argument("--unparsed-output", default="data/parsed/course_only_unparsed.json", help="Unparsed/empty output JSON path")
+ args = ap.parse_args()
+
+ with open(args.input, "r", encoding="utf-8") as f:
+ data = json.load(f)
+
+ parsed: List[Dict[str, Any]] = []
+ unparsed: List[Dict[str, Any]] = []
+
+ for item in data:
+ raw = item.get("prerequisites") or ""
+ ast = parse_prereq_text(raw)
+ record = {
+ "index": item.get("index"),
+ "name": item.get("name"),
+ "raw": raw,
+ "ast": ast,
+ }
+ # Consider unparsed only if both hard and coreq_ok are EMPTY
+ if (isinstance(ast, dict) and ast.get("hard", {}).get("op") == "EMPTY" and ast.get("coreq_ok", {}).get("op") == "EMPTY"):
+ unparsed.append(record)
+ else:
+ parsed.append(record)
+
+ import os
+ os.makedirs("data/parsed", exist_ok=True)
+ with open(args.output, "w", encoding="utf-8") as f:
+ json.dump(parsed, f, ensure_ascii=False, indent=2)
+ with open(args.unparsed_output, "w", encoding="utf-8") as f:
+ json.dump(unparsed, f, ensure_ascii=False, indent=2)
+
+ print(f"parsed: {len(parsed)}")
+ print(f"unparsed: {len(unparsed)}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
+
+
diff --git a/scripts/reduce_and_cluster.py b/scripts/reduce_and_cluster.py
new file mode 100644
index 0000000..a6913bb
--- /dev/null
+++ b/scripts/reduce_and_cluster.py
@@ -0,0 +1,153 @@
+#!/usr/bin/env python3
+import argparse
+import json
+import os
+from typing import Any, Dict, List, Set, Tuple
+
+import networkx as nx
+
+
+def load_graph(path: str) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
+ with open(path, "r", encoding="utf-8") as f:
+ data = json.load(f)
+ return data.get("nodes", []), data.get("edges", [])
+
+
+def directed_hard_graph(nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> nx.DiGraph:
+ G = nx.DiGraph()
+ for n in nodes:
+ G.add_node(n["id"], **n)
+ for e in edges:
+ if e.get("kind") == "hard":
+ G.add_edge(e["source"], e["target"])
+ # drop self-loops
+ G.remove_edges_from(nx.selfloop_edges(G))
+ return G
+
+
+def transitive_reduction_with_scc(G: nx.DiGraph) -> nx.DiGraph:
+ # Collapse strongly connected components to ensure DAG for TR
+ sccs: List[Set[str]] = list(nx.strongly_connected_components(G))
+ comp_id_of: Dict[str, int] = {}
+ for i, comp in enumerate(sccs):
+ for v in comp:
+ comp_id_of[v] = i
+
+ # Build component DAG
+ CG = nx.DiGraph()
+ for i in range(len(sccs)):
+ CG.add_node(i)
+ original_cross_edges: Dict[Tuple[int, int], List[Tuple[str, str]]] = {}
+ for u, v in G.edges():
+ cu, cv = comp_id_of[u], comp_id_of[v]
+ if cu != cv:
+ CG.add_edge(cu, cv)
+ original_cross_edges.setdefault((cu, cv), []).append((u, v))
+
+ # Transitive reduction on component DAG
+ TR_CG = nx.transitive_reduction(CG) if CG.number_of_edges() else CG
+
+ # Build reduced graph: keep all intra-SCC edges; between SCCs keep one representative per reduced edge
+ R = nx.DiGraph()
+ R.add_nodes_from(G.nodes(data=True))
+
+ # Keep intra-SCC edges (within each component)
+ for i, comp in enumerate(sccs):
+ if len(comp) == 1:
+ continue
+ for u in comp:
+ for v in G.successors(u):
+ if comp_id_of[v] == i:
+ R.add_edge(u, v)
+
+ # For each edge in reduced component graph, keep one representative original edge
+ for cu, cv in TR_CG.edges():
+ reps = original_cross_edges.get((cu, cv), [])
+ if not reps:
+ continue
+ # choose deterministically: first sorted
+ u, v = sorted(reps)[0]
+ R.add_edge(u, v)
+
+ return R
+
+
+def detect_communities_undirected(R: nx.DiGraph) -> Dict[str, int]:
+ UG = R.to_undirected()
+ # Greedy modularity communities (built-in, no extra deps)
+ communities = list(nx.algorithms.community.greedy_modularity_communities(UG))
+ node_to_comm: Dict[str, int] = {}
+ for cid, comm in enumerate(communities):
+ for v in comm:
+ node_to_comm[v] = cid
+ # Isolated nodes not included
+ for v in R.nodes():
+ node_to_comm.setdefault(v, -1)
+ return node_to_comm
+
+
+def palette(n: int) -> List[str]:
+ base = [
+ "#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd",
+ "#8c564b", "#e377c2", "#7f7f7f", "#bcbd22", "#17becf",
+ ]
+ if n <= len(base):
+ return base[:n]
+ colors = []
+ for i in range(n):
+ colors.append(base[i % len(base)])
+ return colors
+
+
+def write_outputs(R: nx.DiGraph, node_to_comm: Dict[str, int], graph_out: str, comm_out: str) -> None:
+ # Prepare node list with community and color
+ max_comm = max(node_to_comm.values()) if node_to_comm else -1
+ colors = palette(max_comm + 1)
+ nodes: List[Dict[str, Any]] = []
+ for v, data in R.nodes(data=True):
+ cid = node_to_comm.get(v, -1)
+ color = colors[cid] if cid >= 0 else "#4f46e5"
+ nodes.append({
+ "id": v,
+ "label": data.get("label") or v,
+ "community": cid,
+ "color": color,
+ "subject": data.get("subject"),
+ })
+
+ edges: List[Dict[str, Any]] = []
+ for u, v in R.edges():
+ edges.append({"source": u, "target": v, "kind": "hard"})
+
+ os.makedirs(os.path.dirname(graph_out) or ".", exist_ok=True)
+ with open(graph_out, "w", encoding="utf-8") as f:
+ json.dump({"nodes": nodes, "edges": edges}, f, ensure_ascii=False, indent=2)
+
+ # communities summary
+ comm_map: Dict[int, List[str]] = {}
+ for node, cid in node_to_comm.items():
+ comm_map.setdefault(cid, []).append(node)
+ with open(comm_out, "w", encoding="utf-8") as f:
+ json.dump({str(k): v for k, v in sorted(comm_map.items())}, f, ensure_ascii=False, indent=2)
+
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="Transitive reduction + community detection pipeline")
+ ap.add_argument("input", nargs="?", default="data/graph.json", help="Input graph.json (nodes, edges)")
+ ap.add_argument("--graph-out", default="data/graph_reduced.json", help="Output reduced graph with communities")
+ ap.add_argument("--comm-out", default="data/communities.json", help="Output communities membership")
+ args = ap.parse_args()
+
+ nodes, edges = load_graph(args.input)
+ G = directed_hard_graph(nodes, edges)
+ R = transitive_reduction_with_scc(G)
+ node_to_comm = detect_communities_undirected(R)
+ write_outputs(R, node_to_comm, args.graph_out, args.comm_out)
+ print(f"reduced_nodes={R.number_of_nodes()} reduced_edges={R.number_of_edges()} communities={max(node_to_comm.values())+1}")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
+
+
diff --git a/scripts/scrape.js b/scripts/scrape.js
deleted file mode 100644
index d4b2ecb..0000000
--- a/scripts/scrape.js
+++ /dev/null
@@ -1,63 +0,0 @@
-import { XMLParser } from "fast-xml-parser";
-import fs from "fs/promises";
-import path from "path";
-import dns from "node:dns";
-
-import { ProxyAgent } from "undici";
-
-dns.setDefaultResultOrder("ipv4first");
-
-const proxy = process.env.HTTPS_PROXY || process.env.https_proxy || process.env.HTTP_PROXY || process.env.http_proxy;
-const dispatcher = proxy ? new ProxyAgent(proxy) : undefined;
-
-const [ , , YEAR = "2025", TERM = "fall" ] = process.argv;
-// UIUC's API uses XML namespaces (e.g. `<ns2:term>`). In order for the
-// returned object to have plain keys like `term` and `subject`, we instruct
-// fast-xml-parser to strip the namespace prefixes.
-const parser = new XMLParser({ ignoreAttributes: false, removeNSPrefix: true });
-const BASE = `https://courses.illinois.edu/cisapp/explorer`;
-
-async function getXML(url) {
- const res = await fetch(url, { dispatcher });
- if (!res.ok) throw new Error(`Request failed: ${res.status} ${url}`);
- return parser.parse(await res.text());
-
-}
-
-async function scrapeSchedule(year, term) {
- const catalog = {};
- const termRoot = await getXML(`${BASE}/schedule/${year}/${term}.xml`);
-
- const subjects = termRoot.term?.subjects?.subject;
- if (!subjects) throw new Error(`Unexpected XML structure for ${year} ${term}`);
-
- const subjHrefs = Array.isArray(subjects) ? subjects.map(s => s['@_href']) : [subjects['@_href']];
-
- for (const subjURL of subjHrefs) {
- const subjXML = await getXML(subjURL);
-
- const courses = subjXML.subject?.courses?.course || [];
-
- const courseList = Array.isArray(courses) ? courses : [courses];
- for (const c of courseList) {
- const courseURL = c['@_href'];
- const courseXML = await getXML(courseURL);
- const id = courseXML.course['@_id'];
- const desc = courseXML.course.description ?? "";
- const m = desc.match(/Prerequisite[s]?:\s*([^.;]*)/i);
- if (!m) continue;
- const prereqs = m[1]
- .match(/[A-Z]{2,4}\s?\d{2,3}[A-Z]?/g)
- ?.map(s => s.replace(/\s+/, "")) ?? [];
- if (prereqs.length) catalog[id.replace(/\s+/, "")] = prereqs;
- }
- await new Promise(r => setTimeout(r, 300));
- }
- return catalog;
-}
-
-const data = await scrapeSchedule(YEAR, TERM);
-const outDir = path.resolve("data");
-await fs.mkdir(outDir, { recursive: true });
-await fs.writeFile(path.join(outDir, `catalog_${YEAR}_${TERM}.json`), JSON.stringify(data, null, 2));
-console.log(`Saved ${Object.keys(data).length} courses`);
diff --git a/scripts/validate_courses.py b/scripts/validate_courses.py
new file mode 100644
index 0000000..acff4cd
--- /dev/null
+++ b/scripts/validate_courses.py
@@ -0,0 +1,31 @@
+#!/usr/bin/env python3
+import json
+import sys
+from jsonschema import Draft202012Validator
+
+
+def main() -> int:
+ if len(sys.argv) != 3:
+ print("usage: validate_courses.py <schema.json> <data.json>")
+ return 2
+
+ schema_path, data_path = sys.argv[1], sys.argv[2]
+ with open(schema_path, "r", encoding="utf-8") as f:
+ schema = json.load(f)
+ with open(data_path, "r", encoding="utf-8") as f:
+ data = json.load(f)
+
+ validator = Draft202012Validator(schema)
+ errors = list(validator.iter_errors(data[0] if isinstance(data, list) and data else data))
+ if errors:
+ for err in errors:
+ print(f"error: {err.message} at {list(err.path)}")
+ return 1
+ print("ok")
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
+
+