diff options
| author | blackhao <13851610112@163.com> | 2025-08-22 02:51:50 -0500 |
|---|---|---|
| committer | blackhao <13851610112@163.com> | 2025-08-22 02:51:50 -0500 |
| commit | 4aab4087dc97906d0b9890035401175cdaab32d4 (patch) | |
| tree | 4e2e9d88a711ec5b1cfa02e8ac72a55183b99123 /scripts | |
| parent | afa8f50d1d21c721dabcb31ad244610946ab65a3 (diff) | |
2.0
Diffstat (limited to 'scripts')
| -rw-r--r-- | scripts/__pycache__/parse_course_prereqs.cpython-312.pyc | bin | 0 -> 10335 bytes | |||
| -rw-r--r-- | scripts/analyze_prereqs.py | 120 | ||||
| -rw-r--r-- | scripts/build_final_parsed.py | 107 | ||||
| -rw-r--r-- | scripts/build_graph_assets.py | 359 | ||||
| -rw-r--r-- | scripts/fetch_uiuc_courses.py | 230 | ||||
| -rw-r--r-- | scripts/parse_course_prereqs.py | 190 | ||||
| -rw-r--r-- | scripts/reduce_and_cluster.py | 153 | ||||
| -rw-r--r-- | scripts/scrape.js | 63 | ||||
| -rw-r--r-- | scripts/validate_courses.py | 31 |
9 files changed, 1190 insertions, 63 deletions
diff --git a/scripts/__pycache__/parse_course_prereqs.cpython-312.pyc b/scripts/__pycache__/parse_course_prereqs.cpython-312.pyc Binary files differnew file mode 100644 index 0000000..b085808 --- /dev/null +++ b/scripts/__pycache__/parse_course_prereqs.cpython-312.pyc diff --git a/scripts/analyze_prereqs.py b/scripts/analyze_prereqs.py new file mode 100644 index 0000000..7c580f7 --- /dev/null +++ b/scripts/analyze_prereqs.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +import argparse +import json +import re +from typing import Any, Dict, Iterable, List + + +COURSE_TOKEN = re.compile(r"\b([A-Z]{2,4})\s*(\d{2,3}[A-Z]?)\b") +NONE_PATTERNS = [ + re.compile(r"^\s*none\.?\s*$", re.IGNORECASE), + re.compile(r"no prerequisites", re.IGNORECASE), + re.compile(r"prerequisite[s]?:\s*none\b", re.IGNORECASE), +] + + +def is_none_text(text: str) -> bool: + t = text.strip() + return any(p.search(t) for p in NONE_PATTERNS) + + +def extract_course_refs(text: str) -> List[str]: + refs = [] + for m in COURSE_TOKEN.finditer(text): + subject, number = m.group(1), m.group(2) + refs.append(f"{subject} {number}") + return refs + + +NON_COURSE_KEYWORDS = [ + r"consent", r"permission", r"approval", + r"standing", r"senior", r"junior", r"sophomore", r"freshman", + r"major", r"minor", r"program", r"restricted", r"enrollment", r"enrolled", + r"gpa", r"grade", r"minimum", r"credit hour", r"credits?", r"hours?", + r"registration", r"concurrent", r"co-requisite", r"corequisite", + r"department", r"instructor", +] + +def has_non_course_requirements(text: str) -> bool: + t = text.lower() + return any(re.search(k, t) for k in NON_COURSE_KEYWORDS) + + +def is_course_only(text: str) -> bool: + t = text.strip() + if has_non_course_requirements(t): + return False + # Remove course tokens, then see if any nontrivial tokens remain besides basic connectors + placeholder = COURSE_TOKEN.sub("COURSE", t) + # Remove conjunctions and punctuation + simplified = re.sub(r"[(),.;]", " ", placeholder) + simplified = re.sub(r"\b(and|or|and/or|either|both|one of|two of|with|credit in)\b", " ", simplified, flags=re.IGNORECASE) + # Remove common quantifiers + simplified = re.sub(r"\b(at\s+least)\b", " ", simplified, flags=re.IGNORECASE) + # Collapse whitespace + simplified = re.sub(r"\s+", " ", simplified).strip() + # If empty or only words like COURSE left, treat as course-only + return simplified == "" or re.fullmatch(r"(COURSE\s*)+", simplified) is not None + + +def analyze(courses: Iterable[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + results = { + "none": [], + "course_only": [], + "remaining": [], + } + + for c in courses: + prereq = c.get("prerequisites") or "" + if not prereq.strip(): + results["none"].append(c) + continue + if is_none_text(prereq): + results["none"].append(c) + continue + if is_course_only(prereq): + results["course_only"].append({ + "index": c.get("index"), + "name": c.get("name"), + "prerequisites": prereq, + "courses": extract_course_refs(prereq), + }) + else: + results["remaining"].append({ + "index": c.get("index"), + "name": c.get("name"), + "prerequisites": prereq, + }) + return results + + +def main() -> int: + ap = argparse.ArgumentParser(description="Analyze UIUC course prerequisite text") + ap.add_argument("input", default="data/courses.json", nargs="?", help="Input courses JSON array") + ap.add_argument("--outdir", default="data/analysis", help="Output directory") + args = ap.parse_args() + + with open(args.input, "r", encoding="utf-8") as f: + data = json.load(f) + + res = analyze(data) + + import os + os.makedirs(args.outdir, exist_ok=True) + with open(os.path.join(args.outdir, "none.json"), "w", encoding="utf-8") as f: + json.dump(res["none"], f, ensure_ascii=False, indent=2) + with open(os.path.join(args.outdir, "course_only.json"), "w", encoding="utf-8") as f: + json.dump(res["course_only"], f, ensure_ascii=False, indent=2) + with open(os.path.join(args.outdir, "remaining.json"), "w", encoding="utf-8") as f: + json.dump(res["remaining"], f, ensure_ascii=False, indent=2) + + print(f"none: {len(res['none'])}") + print(f"course_only: {len(res['course_only'])}") + print(f"remaining: {len(res['remaining'])}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + + diff --git a/scripts/build_final_parsed.py b/scripts/build_final_parsed.py new file mode 100644 index 0000000..0d34a03 --- /dev/null +++ b/scripts/build_final_parsed.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +import argparse +import json +import os +import re +import sys +from typing import Any, Dict, List + + +# Ensure we can import sibling script +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +if SCRIPT_DIR not in sys.path: + sys.path.insert(0, SCRIPT_DIR) + +from parse_course_prereqs import parse_prereq_text # type: ignore + + +COURSE_TOKEN = re.compile(r"\b([A-Z]{2,4})\s*(\d{2,3}[A-Z]?)\b") +CLAUSE_SPLIT_RE = re.compile(r";+") + +NON_COURSE_KEYWORDS = [ + r"consent", r"permission", r"approval", + r"standing", r"senior", r"junior", r"sophomore", r"freshman", + r"major", r"minor", r"program", r"restricted", r"enrollment", r"enrolled", + r"gpa", r"grade", r"minimum", r"credit hour", r"credits?", r"hours?", + r"registration", r"concurrent", r"co-requisite", r"corequisite", + r"department", r"instructor", +] + + +def has_course_token(s: str) -> bool: + return COURSE_TOKEN.search(s) is not None + + +def detect_flags(text: str) -> List[str]: + t = text.lower() + flags: List[str] = [] + mapping = [ + (r"consent|permission|approval", "CONSENT"), + (r"standing|senior|junior|sophomore|freshman", "STANDING"), + (r"major|minor|program|restricted|enrollment|enrolled", "MAJOR_OR_PROGRAM"), + (r"gpa|grade|minimum", "GRADE_OR_GPA"), + (r"concurrent|co-requisite|corequisite", "COREQ_ALLOWED"), + (r"department|instructor", "DEPT_OR_INSTRUCTOR"), + ] + for pat, name in mapping: + if re.search(pat, t): + flags.append(name) + return sorted(set(flags)) + + +def main() -> int: + ap = argparse.ArgumentParser(description="Build final parsed JSON for all courses") + ap.add_argument("input", nargs="?", default="data/courses.json", help="Input courses.json") + ap.add_argument("--output", default="data/courses_parsed.json", help="Output JSON path") + args = ap.parse_args() + + with open(args.input, "r", encoding="utf-8") as f: + courses = json.load(f) + + out: List[Dict[str, Any]] = [] + stats = {"total": 0, "hard_nonempty": 0, "coreq_nonempty": 0} + for c in courses: + stats["total"] += 1 + raw = (c.get("prerequisites") or "").strip() + ast = parse_prereq_text(raw) + + hard = ast.get("hard") if isinstance(ast, dict) else {"op": "EMPTY"} + coreq_ok = ast.get("coreq_ok") if isinstance(ast, dict) else {"op": "EMPTY"} + if hard and hard.get("op") != "EMPTY": + stats["hard_nonempty"] += 1 + if coreq_ok and coreq_ok.get("op") != "EMPTY": + stats["coreq_nonempty"] += 1 + + # Capture non-course clauses for reference + notes: List[str] = [] + if raw: + clauses = [s.strip() for s in CLAUSE_SPLIT_RE.split(raw) if s.strip()] + for cl in clauses: + if not has_course_token(cl) or detect_flags(cl): + notes.append(cl) + + out.append({ + "index": c.get("index"), + "name": c.get("name"), + "description": c.get("description"), + "prerequisites": { + "raw": raw or None, + "hard": hard, + "coreq_ok": coreq_ok, + "flags": detect_flags(raw) if raw else [], + "notes": notes, + }, + }) + + os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) + with open(args.output, "w", encoding="utf-8") as f: + json.dump(out, f, ensure_ascii=False, indent=2) + + print(json.dumps(stats)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + + diff --git a/scripts/build_graph_assets.py b/scripts/build_graph_assets.py new file mode 100644 index 0000000..6f27f43 --- /dev/null +++ b/scripts/build_graph_assets.py @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 +import argparse +import json +import math +import os +from typing import Any, Dict, List, Tuple + +import networkx as nx + + +def collect_courses_from_ast(ast: Dict[str, Any]) -> List[str]: + out: List[str] = [] + def walk(node: Any) -> None: + if not isinstance(node, dict): + return + op = node.get("op") + if op == "COURSE" and node.get("course"): + out.append(node["course"]) + for child in node.get("items", []) or []: + walk(child) + walk(ast) + # Unique order-preserving + seen = set() + uniq: List[str] = [] + for c in out: + if c not in seen: + seen.add(c) + uniq.append(c) + return uniq + + +def build_graph(courses: List[Dict[str, Any]], include_coreq: bool = True) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + nodes_map: Dict[str, Dict[str, Any]] = {} + edges: List[Dict[str, Any]] = [] + + def ensure_node(course_id: str, label: str = None) -> None: + if course_id not in nodes_map: + nodes_map[course_id] = {"id": course_id, "label": label or course_id, "subject": course_id.split()[0] if ' ' in course_id else None} + + for c in courses: + idx = c.get("index") + name = c.get("name") + ensure_node(idx, name) + pr = c.get("prerequisites") or {} + hard = pr.get("hard") or {"op": "EMPTY"} + coreq = pr.get("coreq_ok") or {"op": "EMPTY"} + for pre in collect_courses_from_ast(hard): + ensure_node(pre) + edges.append({"source": pre, "target": idx, "kind": "hard"}) + if include_coreq: + for pre in collect_courses_from_ast(coreq): + ensure_node(pre) + edges.append({"source": pre, "target": idx, "kind": "coreq"}) + + nodes = [ {"id": n["id"], "label": n["label"], "subject": n.get("subject")} for n in nodes_map.values() ] + return nodes, edges + + +def compute_positions( + nodes: List[Dict[str, Any]], + edges: List[Dict[str, Any]], + seed: int = 42, + layout: str = "spring", + iterations: int = 100, + component_wise: bool = False, + # SMACOF (MDS) options + mds_backend: str = "auto", # auto|sklearn|cuml + mds_max_iter: int = 300, + mds_eps: float = 1e-3, + mds_verbose: int = 1, + # Overlap resolution options + resolve_overlap: bool = False, + node_size_px: float = 6.0, + min_dist_mul: float = 1.5, + overlap_max_iters: int = 60, + overlap_step: float = 0.5, +) -> Dict[str, Dict[str, float]]: + # Use a force-directed layout over an undirected graph for a compact web-like layout + G = nx.Graph() + for n in nodes: + G.add_node(n["id"]) + for e in edges: + G.add_edge(e["source"], e["target"]) # undirected for layout + + def layout_graph(graph: nx.Graph) -> Dict[str, Tuple[float, float]]: + if layout == "drl": + try: + import igraph as ig # type: ignore + except Exception as e: + raise RuntimeError("python-igraph is required for DRL/OpenOrd-like layout; pip install python-igraph") from e + nodes_list = list(graph.nodes()) + index_of = {v: i for i, v in enumerate(nodes_list)} + g = ig.Graph() + g.add_vertices(len(nodes_list)) + g.vs["name"] = nodes_list + # unique edges only + edge_idx = set() + for u, v in graph.edges(): + iu, iv = index_of[u], index_of[v] + if iu == iv: + continue + a, b = (iu, iv) if iu < iv else (iv, iu) + if (a, b) not in edge_idx: + edge_idx.add((a, b)) + if edge_idx: + g.add_edges(list(edge_idx)) + # DRL (OpenOrd-style) is good for community separation + lay = g.layout_drl() + coords = [[float(x), float(y)] for x, y in lay] + return {nodes_list[i]: (coords[i][0], coords[i][1]) for i in range(len(nodes_list))} + if layout == "fa2": + try: + from fa2 import ForceAtlas2 # type: ignore + except Exception as e: + raise RuntimeError("fa2 is required for ForceAtlas2 layout; pip install fa2") from e + fa = ForceAtlas2( + # LinLog energy model emphasizes community separation + linLogMode=True, + gravity=1.0, + strongGravityMode=True, + scalingRatio=2.0, + outboundAttractionDistribution=False, + barnesHutOptimize=True, + barnesHutTheta=1.2, + jitterTolerance=1.0, + edgeWeightInfluence=1.0, + adjustSizes=False, + verbose=False, + ) + pos = fa.forceatlas2_networkx_layout(graph, pos=None, iterations=max(300, iterations)) + return {n: (float(xy[0]), float(xy[1])) for n, xy in pos.items()} + if layout == "smacof": + try: + import numpy as np + except Exception as e: + raise RuntimeError("NumPy is required for smacof layout") from e + + nodes_list = list(graph.nodes()) + n = len(nodes_list) + if n == 0: + return {} + if n == 1: + return {nodes_list[0]: (0.0, 0.0)} + + # Compute all-pairs shortest path distances (undirected) + index_of = {v: i for i, v in enumerate(nodes_list)} + D = np.full((n, n), 0.0, dtype=np.float32) + large = 1e6 + for i in range(n): + for j in range(n): + if i != j: + D[i, j] = large + for src, lengths in nx.all_pairs_shortest_path_length(graph): + i = index_of[src] + for dst, d in lengths.items(): + j = index_of[dst] + if i != j: + D[i, j] = float(d) + D[j, i] = float(d) + + # Replace remaining large distances with max finite distance * 1.5 + finite = D[D < large] + maxd = float(finite.max()) if finite.size else 1.0 + D[D >= large] = maxd * 1.5 + + backend_used = None + coords = None + if mds_backend in ("auto", "cuml"): + try: + from cuml.manifold import MDS as cuMDS # type: ignore + backend_used = "cuml" + print("[smacof] using cuML MDS (GPU) ...") + m = cuMDS(n_components=2, dissimilarity='precomputed', max_iter=mds_max_iter, random_state=seed, verbose=bool(mds_verbose)) + coords = m.fit_transform(D) + try: + coords = coords.get() # convert cupy to numpy if needed + except Exception: + pass + except Exception: + if mds_backend == "cuml": + raise + if coords is None: + from sklearn.manifold import MDS + backend_used = "sklearn" + print("[smacof] using scikit-learn MDS (CPU) ...") + # verbose prints per-iteration stress + mds = MDS(n_components=2, dissimilarity='precomputed', metric=True, random_state=seed, n_init=1, max_iter=mds_max_iter, eps=mds_eps, verbose=mds_verbose) + coords = mds.fit_transform(D) + print(f"[smacof] backend={backend_used} done. shape={coords.shape}") + + return {nodes_list[i]: (float(coords[i, 0]), float(coords[i, 1])) for i in range(n)} + + if layout == "random": + return nx.random_layout(graph, dim=2, seed=seed) + if layout == "kk": + return nx.kamada_kawai_layout(graph, dim=2) + if layout == "none": + return {n: (0.0, 0.0) for n in graph.nodes} + # default: spring + try: + return nx.spring_layout(graph, seed=seed, dim=2, iterations=iterations) + except ModuleNotFoundError: + # SciPy not installed – use kamada_kawai instead + return nx.kamada_kawai_layout(graph, dim=2) + except Exception: + return nx.kamada_kawai_layout(graph, dim=2) + + if component_wise: + pos_raw: Dict[str, Tuple[float, float]] = {} + for comp in nx.connected_components(G): + sub = G.subgraph(comp) + local = layout_graph(sub) + pos_raw.update(local) + else: + pos_raw = layout_graph(G) + + # Normalize positions to a fixed range for consistent initial viewport + xs = [p[0] for p in pos_raw.values()] + ys = [p[1] for p in pos_raw.values()] + min_x, max_x = (min(xs), max(xs)) if xs else (0.0, 1.0) + min_y, max_y = (min(ys), max(ys)) if ys else (0.0, 1.0) + span_x = max(max_x - min_x, 1e-6) + span_y = max(max_y - min_y, 1e-6) + + # Scale to a large square canvas by default; for SMACOF earlier we used disk mapping. + # Here keep linear scaling to preserve community geometry (good for ForceAtlas2/SMACOF alike). + scale = 6000.0 + out: Dict[str, Dict[str, float]] = {} + for node_id, (x, y) in pos_raw.items(): + x01 = (x - min_x) / span_x # 0..1 + y01 = (y - min_y) / span_y # 0..1 + out[node_id] = {"x": (x01 - 0.5) * scale, "y": (y01 - 0.5) * scale} + + if resolve_overlap and out: + # Simple grid-based overlap removal with minimal displacement + target_dist = max(1.0, node_size_px * min_dist_mul) + cell = target_dist + node_ids = list(out.keys()) + for _ in range(overlap_max_iters): + # Build spatial hash + grid: Dict[Tuple[int,int], List[str]] = {} + for nid in node_ids: + p = out[nid] + gx = int(math.floor(p["x"] / cell)) + gy = int(math.floor(p["y"] / cell)) + grid.setdefault((gx, gy), []).append(nid) + + moved = 0.0 + disp: Dict[str, Tuple[float,float]] = {} + for nid in node_ids: + px = out[nid]["x"]; py = out[nid]["y"] + gx = int(math.floor(px / cell)); gy = int(math.floor(py / cell)) + # check neighbors cells + for dx in (-1,0,1): + for dy in (-1,0,1): + cell_nodes = grid.get((gx+dx, gy+dy), []) + for mid in cell_nodes: + if mid <= nid: # avoid double count and self + continue + qx = out[mid]["x"]; qy = out[mid]["y"] + vx = qx - px; vy = qy - py + dist = math.hypot(vx, vy) + if dist < target_dist and dist > 1e-6: + overlap = target_dist - dist + ux = vx / dist; uy = vy / dist + mx = -ux * (overlap * 0.5) + my = -uy * (overlap * 0.5) + disp[nid] = (disp.get(nid, (0.0,0.0))[0] + mx, disp.get(nid, (0.0,0.0))[1] + my) + disp[mid] = (disp.get(mid, (0.0,0.0))[0] - mx, disp.get(mid, (0.0,0.0))[1] - my) + if not disp: + break + for nid, (dx, dy) in disp.items(): + out[nid]["x"] += dx * overlap_step + out[nid]["y"] += dy * overlap_step + moved += abs(dx) + abs(dy) + if moved < 1e-3: + break + return out + + +def main() -> int: + ap = argparse.ArgumentParser(description="Build slim graph assets and preset positions") + ap.add_argument("input", nargs="?", default="data/courses_parsed.json", help="Input parsed courses JSON") + ap.add_argument("--graph-out", default="data/graph.json", help="Output graph JSON (nodes, edges)") + ap.add_argument("--pos-out", default="data/positions.json", help="Output positions JSON (node -> {x,y})") + ap.add_argument("--pos-out-alt", nargs='*', default=[], help="Additional positions to generate in the form layout:name (e.g., kk:positions_kk.json spring:positions_spring.json)") + ap.add_argument("--hard-only", action="store_true", help="Only include hard prerequisite edges (exclude coreq)") + ap.add_argument("--layout", choices=["spring","kk","random","none","smacof","fa2","drl"], default="fa2", help="Layout algorithm for positions") + ap.add_argument("--iterations", type=int, default=60, help="Iterations for spring layout (lower is faster)") + ap.add_argument("--component-wise", action="store_true", help="Layout each connected component separately (can be faster)") + # Overlap options + ap.add_argument("--resolve-overlap", action="store_true", help="Run overlap removal post-process") + ap.add_argument("--node-size", type=float, default=6.0, help="Node visual diameter in px (for spacing)") + ap.add_argument("--min-dist-mul", type=float, default=1.5, help="Minimum center distance multiplier of node size") + # SMACOF options + ap.add_argument("--mds-backend", choices=["auto","sklearn","cuml"], default="auto", help="Backend for SMACOF (stress majorization)") + ap.add_argument("--mds-max-iter", type=int, default=300, help="Max iterations for SMACOF") + ap.add_argument("--mds-eps", type=float, default=1e-3, help="Convergence tolerance for SMACOF") + ap.add_argument("--mds-verbose", type=int, default=1, help="Verbosity for SMACOF (>=1 prints per-iteration stress)") + args = ap.parse_args() + + with open(args.input, "r", encoding="utf-8") as f: + courses = json.load(f) + + nodes, edges = build_graph(courses, include_coreq=not args.hard_only) + + os.makedirs(os.path.dirname(args.graph_out) or ".", exist_ok=True) + with open(args.graph_out, "w", encoding="utf-8") as f: + json.dump({"nodes": nodes, "edges": edges}, f, ensure_ascii=False, indent=2) + + print(f"building positions: nodes={len(nodes)} edges={len(edges)} layout={args.layout} iter={args.iterations} component_wise={args.component_wise}") + pos = compute_positions( + nodes, edges, + layout=args.layout, + iterations=args.iterations, + component_wise=args.component_wise, + mds_backend=args.mds_backend, + mds_max_iter=args.mds_max_iter, + mds_eps=args.mds_eps, + mds_verbose=args.mds_verbose, + resolve_overlap=args.resolve_overlap, + node_size_px=args.node_size, + min_dist_mul=args.min_dist_mul, + ) + with open(args.pos_out, "w", encoding="utf-8") as f: + json.dump(pos, f, ensure_ascii=False, indent=2) + + # Optionally generate additional layouts + for spec in args.pos_out_alt: + try: + lay, path = spec.split(":", 1) + except ValueError: + print(f"[warn] invalid --pos-out-alt spec: {spec}") + continue + try: + alt = compute_positions( + nodes, edges, + layout=lay, + iterations=args.iterations, + component_wise=args.component_wise, + mds_backend=args.mds_backend, + mds_max_iter=args.mds_max_iter, + mds_eps=args.mds_eps, + mds_verbose=args.mds_verbose, + ) + with open(path, "w", encoding="utf-8") as f: + json.dump(alt, f, ensure_ascii=False, indent=2) + print(f"wrote alt positions: {lay} -> {path}") + except Exception as e: + print(f"[warn] failed alt positions {lay}: {e}") + + print(f"nodes: {len(nodes)}, edges: {len(edges)}, positions: {len(pos)}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + + diff --git a/scripts/fetch_uiuc_courses.py b/scripts/fetch_uiuc_courses.py new file mode 100644 index 0000000..0f38fdd --- /dev/null +++ b/scripts/fetch_uiuc_courses.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +import argparse +import concurrent.futures +import json +import re +import sys +import time +from dataclasses import dataclass, asdict +from typing import Dict, List, Optional, Tuple + +import requests +from xml.etree import ElementTree as ET + + +BASE_URL = "https://courses.illinois.edu/cisapp/explorer/catalog" + + +@dataclass +class CourseRecord: + index: str + name: Optional[str] + description: Optional[str] + prerequisites: Optional[str] + + +def parse_xml(content: bytes) -> ET.Element: + try: + return ET.fromstring(content) + except ET.ParseError as exc: + raise RuntimeError(f"Failed to parse XML: {exc}") + + +def fetch(session: requests.Session, url: str) -> bytes: + resp = session.get(url, timeout=30) + if resp.status_code != 200: + raise RuntimeError(f"GET {url} -> {resp.status_code}") + return resp.content + + +def get_subject_ids(session: requests.Session, year: str, term: str) -> List[str]: + url = f"{BASE_URL}/{year}/{term}.xml" + root = parse_xml(fetch(session, url)) + subjects = [] + for node in root.findall(".//subject"): + node_id = node.attrib.get("id") + if node_id: + subjects.append(node_id) + return subjects + + +def get_course_numbers_for_subject(session: requests.Session, year: str, term: str, subject: str) -> List[str]: + url = f"{BASE_URL}/{year}/{term}/{subject}.xml" + root = parse_xml(fetch(session, url)) + courses = [] + for node in root.findall(".//course"): + node_id = node.attrib.get("id") + if node_id: + courses.append(node_id) + return courses + + +def extract_prerequisite_text(root: ET.Element) -> Optional[str]: + # Prefer explicitly labeled prerequisite elements if present + for tag in ["prerequisites", "prerequisite", "Prerequisites", "Prerequisite"]: + found = root.find(f".//{tag}") + if found is not None and (found.text and found.text.strip()): + return found.text.strip() + + # Fallback: courseSectionInformation often contains "Prerequisite:" free text + csi = root.find(".//courseSectionInformation") + if csi is not None and csi.text: + text = csi.text.strip() + match = re.search(r"Prerequisite[s]?:\s*(.*)$", text, flags=re.IGNORECASE | re.DOTALL) + if match: + return match.group(1).strip() + + # As a last resort, scan description for a Prerequisite sentence + desc = root.find(".//description") + if desc is not None and desc.text: + text = desc.text.strip() + match = re.search(r"Prerequisite[s]?:\s*(.*)$", text, flags=re.IGNORECASE | re.DOTALL) + if match: + return match.group(1).strip() + + return None + + +def get_course_details(session: requests.Session, year: str, term: str, subject: str, course_number: str) -> CourseRecord: + url = f"{BASE_URL}/{year}/{term}/{subject}/{course_number}.xml" + root = parse_xml(fetch(session, url)) + + # Title/name may be in <label> or <title> + name = None + label_node = root.find(".//label") + if label_node is not None and label_node.text: + name = label_node.text.strip() + else: + title_node = root.find(".//title") + if title_node is not None and title_node.text: + name = title_node.text.strip() + + description = None + desc_node = root.find(".//description") + if desc_node is not None and desc_node.text: + description = desc_node.text.strip() + + prerequisites_text = extract_prerequisite_text(root) + + return CourseRecord( + index=f"{subject} {course_number}", + name=name, + description=description, + prerequisites=prerequisites_text, + ) + + +def try_year_term(session: requests.Session, year: str, term: str) -> bool: + url = f"{BASE_URL}/{year}/{term}.xml" + resp = session.get(url, timeout=15) + return resp.status_code == 200 + + +def detect_default_year_term(session: requests.Session) -> Tuple[str, str]: + # Try a few common combinations in likely order + current_year = time.gmtime().tm_year + candidate_terms = ["fall", "summer", "spring", "winter"] + candidates: List[Tuple[str, str]] = [] + # Current year candidates first + for term in candidate_terms: + candidates.append((str(current_year), term)) + # Then previous year + for term in candidate_terms: + candidates.append((str(current_year - 1), term)) + + for year, term in candidates: + if try_year_term(session, year, term): + return year, term + # Fallback to a known historical term + return "2024", "fall" + + +def main() -> int: + parser = argparse.ArgumentParser(description="Fetch UIUC course catalog into JSON") + parser.add_argument("--year", default=None, help="Catalog year, e.g. 2025") + parser.add_argument("--term", default=None, help="Term, e.g. fall|spring|summer|winter") + parser.add_argument("--subject", default=None, help="Limit to a single subject (e.g., CS)") + parser.add_argument("--max-workers", type=int, default=12, help="Max concurrent requests") + parser.add_argument("--output", default="data/courses.json", help="Output JSON path") + parser.add_argument("--sleep", type=float, default=0.0, help="Optional per-request sleep seconds") + args = parser.parse_args() + + session = requests.Session() + session.headers.update({"Accept": "application/xml, text/xml;q=0.9, */*;q=0.8", "User-Agent": "uiuc-course-scraper/1.0"}) + + year = args.year + term = args.term + if not year or not term: + year, term = detect_default_year_term(session) + print(f"[info] Using detected catalog: {year} {term}") + else: + print(f"[info] Using catalog: {year} {term}") + + try: + subject_ids = [args.subject] if args.subject else get_subject_ids(session, year, term) + except Exception as exc: + print(f"[error] Failed to get subjects for {year} {term}: {exc}") + return 1 + + print(f"[info] Found {len(subject_ids)} subject(s)") + + all_course_records: List[CourseRecord] = [] + + def process_subject(subject_id: str) -> List[CourseRecord]: + try: + if args.sleep: + time.sleep(args.sleep) + course_numbers = get_course_numbers_for_subject(session, year, term, subject_id) + except Exception as exc_subj: + print(f"[warn] Failed to list courses for {subject_id}: {exc_subj}") + return [] + + subject_records: List[CourseRecord] = [] + for course_number in course_numbers: + try: + if args.sleep: + time.sleep(args.sleep) + record = get_course_details(session, year, term, subject_id, course_number) + subject_records.append(record) + except Exception as exc_course: + print(f"[warn] Failed details for {subject_id} {course_number}: {exc_course}") + continue + return subject_records + + with concurrent.futures.ThreadPoolExecutor(max_workers=args.max_workers) as executor: + future_to_subject: Dict[concurrent.futures.Future, str] = {} + for subject_id in subject_ids: + future = executor.submit(process_subject, subject_id) + future_to_subject[future] = subject_id + for future in concurrent.futures.as_completed(future_to_subject): + subject_id = future_to_subject[future] + try: + subject_records = future.result() + all_course_records.extend(subject_records) + print(f"[info] {subject_id}: {len(subject_records)} course(s)") + except Exception as exc: + print(f"[warn] Subject {subject_id} failed: {exc}") + + # Sort deterministically + all_course_records.sort(key=lambda r: (r.index.split()[0], int(re.sub(r"[^0-9]", "", r.index.split()[1])) if len(r.index.split()) > 1 and re.search(r"\d", r.index.split()[1]) else r.index)) + + # Serialize to JSON array of objects + output_path = args.output + output_dir = output_path.rsplit("/", 1)[0] if "/" in output_path else "." + try: + import os + os.makedirs(output_dir, exist_ok=True) + except Exception: + pass + + with open(output_path, "w", encoding="utf-8") as f: + json.dump([asdict(r) for r in all_course_records], f, ensure_ascii=False, indent=2) + + print(f"[done] Wrote {len(all_course_records)} courses -> {output_path}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) + + diff --git a/scripts/parse_course_prereqs.py b/scripts/parse_course_prereqs.py new file mode 100644 index 0000000..609303c --- /dev/null +++ b/scripts/parse_course_prereqs.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +import argparse +import json +import re +from typing import Any, Dict, List, Optional, Tuple + + +COURSE_RE = re.compile(r"\b([A-Z]{2,4})\s*(\d{2,3}[A-Z]?)\b") + +# Clause boundaries: semicolons are strong AND separators at UIUC +CLAUSE_SPLIT_RE = re.compile(r";+") + + +def find_course_spans(text: str) -> List[Tuple[str, int, int]]: + spans: List[Tuple[str, int, int]] = [] + for m in COURSE_RE.finditer(text): + course = f"{m.group(1)} {m.group(2)}" + spans.append((course, m.start(), m.end())) + return spans + + +def normalize_space(s: str) -> str: + return re.sub(r"\s+", " ", s).strip() + + +def parse_clause_into_group(clause: str) -> Dict[str, Any]: + clause_clean = normalize_space(clause) + courses = find_course_spans(clause_clean) + if not courses: + return {"op": "EMPTY"} + + # Detect "one of" window: treat everything until boundary as OR + one_of_match = re.search(r"\b(one of|any of)\b", clause_clean, flags=re.IGNORECASE) + if one_of_match: + # Take all courses in the clause as OR if they appear after the phrase + start_idx = one_of_match.end() + or_list = [c for (c, s, e) in courses if s >= start_idx] + if or_list: + # Also include any course tokens that appear BEFORE the one-of phrase as separate AND terms + prior_courses = [c for (c, s, e) in courses if s < start_idx] + items: List[Dict[str, Any]] = [] + for c in prior_courses: + items.append({"op": "COURSE", "course": c}) + items.append({"op": "OR", "items": [{"op": "COURSE", "course": c} for c in or_list]}) + return {"op": "AND", "items": items} if len(items) > 1 else items[0] + + # Otherwise, infer connectors between adjacent course tokens + # Build pairwise connectors from text between tokens + connectors: List[str] = [] + for i in range(len(courses) - 1): + _, _, end_prev = courses[i] + _, start_next, _ = courses[i + 1] + between = clause_clean[end_prev:start_next].lower() + if "and/or" in between: + connectors.append("OR") + elif re.search(r"\band\b", between): + connectors.append("AND") + elif re.search(r"\bor\b", between): + connectors.append("OR") + else: + # Default: comma-only separation; lean towards OR if followed by or earlier in span + if "," in between: + connectors.append("LIST") + else: + connectors.append("UNKNOWN") + + course_items = [{"op": "COURSE", "course": c} for (c, _, _) in courses] + + # If there is any explicit AND, group AND chunks; otherwise treat as OR if any OR, else LIST->OR + if "AND" in connectors and "OR" not in connectors: + return {"op": "AND", "items": course_items} + if "OR" in connectors and "AND" not in connectors: + return {"op": "OR", "items": course_items} + if "AND" not in connectors and "OR" not in connectors: + # All LIST/UNKNOWN: choose OR as a safer default for admissions like "A, B, or C" where last token has or + if any(k == "LIST" for k in connectors): + return {"op": "OR", "items": course_items} + return {"op": "AND", "items": course_items} if len(course_items) > 1 else course_items[0] + + # Mixed AND and OR: build small AST by splitting on commas and respecting local conjunctions + # Simple heuristic: split clause by commas, parse each segment for explicit AND/OR + segments = [normalize_space(s) for s in re.split(r",+", clause_clean) if normalize_space(s)] + subitems: List[Dict[str, Any]] = [] + for seg in segments: + seg_courses = find_course_spans(seg) + if not seg_courses: + continue + if re.search(r"\band\b", seg.lower()) and not re.search(r"\bor\b", seg.lower()): + subitems.append({"op": "AND", "items": [{"op": "COURSE", "course": c} for (c, _, _) in seg_courses]}) + elif re.search(r"\bor\b", seg.lower()) and not re.search(r"\band\b", seg.lower()): + subitems.append({"op": "OR", "items": [{"op": "COURSE", "course": c} for (c, _, _) in seg_courses]}) + else: + # ambiguous within segment; default to OR + subitems.append({"op": "OR", "items": [{"op": "COURSE", "course": c} for (c, _, _) in seg_courses]}) + + if not subitems: + subitems = [{"op": "COURSE", "course": c} for (c, _, _) in courses] + + # Combine segments with AND if split by semicolons at higher level; here stay at clause level + # For mixed case within one clause, default to OR-over-segments unless explicit AND dominates + and_count = sum(1 for s in subitems if s.get("op") == "AND") + or_count = sum(1 for s in subitems if s.get("op") == "OR") + if and_count and not or_count: + return {"op": "AND", "items": subitems} + if or_count and not and_count: + return {"op": "OR", "items": subitems} + # Mixed: wrap in AND of items that are groups; treat OR groups as single requirements groups + return {"op": "AND", "items": subitems} + + +def parse_prereq_text(text: str) -> Dict[str, Any]: + # Split by semicolons into top-level AND clauses + clauses = [normalize_space(c) for c in CLAUSE_SPLIT_RE.split(text) if normalize_space(c)] + if not clauses: + return {"hard": {"op": "EMPTY"}, "coreq_ok": {"op": "EMPTY"}} + + def is_coreq_clause(c: str) -> bool: + c_low = c.lower() + return ( + ("concurrent" in c_low) or + ("co-requisite" in c_low) or + ("corequisite" in c_low) or + re.search(r"credit\s+or\s+concurrent\s+(enrollment|registration)\s+in", c_low) is not None + ) + + hard_groups: List[Dict[str, Any]] = [] + coreq_groups: List[Dict[str, Any]] = [] + for clause in clauses: + grp = parse_clause_into_group(clause) + if grp.get("op") == "EMPTY": + continue + if is_coreq_clause(clause): + coreq_groups.append(grp) + else: + hard_groups.append(grp) + + def fold(groups: List[Dict[str, Any]]) -> Dict[str, Any]: + if not groups: + return {"op": "EMPTY"} + if len(groups) == 1: + return groups[0] + return {"op": "AND", "items": groups} + + return {"hard": fold(hard_groups), "coreq_ok": fold(coreq_groups)} + + +def main() -> int: + ap = argparse.ArgumentParser(description="Parse course-only prerequisite text into AND/OR groups") + ap.add_argument("input", default="data/analysis/course_only.json", nargs="?", help="Input JSON array of course-only prereqs") + ap.add_argument("--output", default="data/parsed/course_only_parsed.json", help="Output JSON path") + ap.add_argument("--unparsed-output", default="data/parsed/course_only_unparsed.json", help="Unparsed/empty output JSON path") + args = ap.parse_args() + + with open(args.input, "r", encoding="utf-8") as f: + data = json.load(f) + + parsed: List[Dict[str, Any]] = [] + unparsed: List[Dict[str, Any]] = [] + + for item in data: + raw = item.get("prerequisites") or "" + ast = parse_prereq_text(raw) + record = { + "index": item.get("index"), + "name": item.get("name"), + "raw": raw, + "ast": ast, + } + # Consider unparsed only if both hard and coreq_ok are EMPTY + if (isinstance(ast, dict) and ast.get("hard", {}).get("op") == "EMPTY" and ast.get("coreq_ok", {}).get("op") == "EMPTY"): + unparsed.append(record) + else: + parsed.append(record) + + import os + os.makedirs("data/parsed", exist_ok=True) + with open(args.output, "w", encoding="utf-8") as f: + json.dump(parsed, f, ensure_ascii=False, indent=2) + with open(args.unparsed_output, "w", encoding="utf-8") as f: + json.dump(unparsed, f, ensure_ascii=False, indent=2) + + print(f"parsed: {len(parsed)}") + print(f"unparsed: {len(unparsed)}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + + diff --git a/scripts/reduce_and_cluster.py b/scripts/reduce_and_cluster.py new file mode 100644 index 0000000..a6913bb --- /dev/null +++ b/scripts/reduce_and_cluster.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +import argparse +import json +import os +from typing import Any, Dict, List, Set, Tuple + +import networkx as nx + + +def load_graph(path: str) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + return data.get("nodes", []), data.get("edges", []) + + +def directed_hard_graph(nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> nx.DiGraph: + G = nx.DiGraph() + for n in nodes: + G.add_node(n["id"], **n) + for e in edges: + if e.get("kind") == "hard": + G.add_edge(e["source"], e["target"]) + # drop self-loops + G.remove_edges_from(nx.selfloop_edges(G)) + return G + + +def transitive_reduction_with_scc(G: nx.DiGraph) -> nx.DiGraph: + # Collapse strongly connected components to ensure DAG for TR + sccs: List[Set[str]] = list(nx.strongly_connected_components(G)) + comp_id_of: Dict[str, int] = {} + for i, comp in enumerate(sccs): + for v in comp: + comp_id_of[v] = i + + # Build component DAG + CG = nx.DiGraph() + for i in range(len(sccs)): + CG.add_node(i) + original_cross_edges: Dict[Tuple[int, int], List[Tuple[str, str]]] = {} + for u, v in G.edges(): + cu, cv = comp_id_of[u], comp_id_of[v] + if cu != cv: + CG.add_edge(cu, cv) + original_cross_edges.setdefault((cu, cv), []).append((u, v)) + + # Transitive reduction on component DAG + TR_CG = nx.transitive_reduction(CG) if CG.number_of_edges() else CG + + # Build reduced graph: keep all intra-SCC edges; between SCCs keep one representative per reduced edge + R = nx.DiGraph() + R.add_nodes_from(G.nodes(data=True)) + + # Keep intra-SCC edges (within each component) + for i, comp in enumerate(sccs): + if len(comp) == 1: + continue + for u in comp: + for v in G.successors(u): + if comp_id_of[v] == i: + R.add_edge(u, v) + + # For each edge in reduced component graph, keep one representative original edge + for cu, cv in TR_CG.edges(): + reps = original_cross_edges.get((cu, cv), []) + if not reps: + continue + # choose deterministically: first sorted + u, v = sorted(reps)[0] + R.add_edge(u, v) + + return R + + +def detect_communities_undirected(R: nx.DiGraph) -> Dict[str, int]: + UG = R.to_undirected() + # Greedy modularity communities (built-in, no extra deps) + communities = list(nx.algorithms.community.greedy_modularity_communities(UG)) + node_to_comm: Dict[str, int] = {} + for cid, comm in enumerate(communities): + for v in comm: + node_to_comm[v] = cid + # Isolated nodes not included + for v in R.nodes(): + node_to_comm.setdefault(v, -1) + return node_to_comm + + +def palette(n: int) -> List[str]: + base = [ + "#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd", + "#8c564b", "#e377c2", "#7f7f7f", "#bcbd22", "#17becf", + ] + if n <= len(base): + return base[:n] + colors = [] + for i in range(n): + colors.append(base[i % len(base)]) + return colors + + +def write_outputs(R: nx.DiGraph, node_to_comm: Dict[str, int], graph_out: str, comm_out: str) -> None: + # Prepare node list with community and color + max_comm = max(node_to_comm.values()) if node_to_comm else -1 + colors = palette(max_comm + 1) + nodes: List[Dict[str, Any]] = [] + for v, data in R.nodes(data=True): + cid = node_to_comm.get(v, -1) + color = colors[cid] if cid >= 0 else "#4f46e5" + nodes.append({ + "id": v, + "label": data.get("label") or v, + "community": cid, + "color": color, + "subject": data.get("subject"), + }) + + edges: List[Dict[str, Any]] = [] + for u, v in R.edges(): + edges.append({"source": u, "target": v, "kind": "hard"}) + + os.makedirs(os.path.dirname(graph_out) or ".", exist_ok=True) + with open(graph_out, "w", encoding="utf-8") as f: + json.dump({"nodes": nodes, "edges": edges}, f, ensure_ascii=False, indent=2) + + # communities summary + comm_map: Dict[int, List[str]] = {} + for node, cid in node_to_comm.items(): + comm_map.setdefault(cid, []).append(node) + with open(comm_out, "w", encoding="utf-8") as f: + json.dump({str(k): v for k, v in sorted(comm_map.items())}, f, ensure_ascii=False, indent=2) + + +def main() -> int: + ap = argparse.ArgumentParser(description="Transitive reduction + community detection pipeline") + ap.add_argument("input", nargs="?", default="data/graph.json", help="Input graph.json (nodes, edges)") + ap.add_argument("--graph-out", default="data/graph_reduced.json", help="Output reduced graph with communities") + ap.add_argument("--comm-out", default="data/communities.json", help="Output communities membership") + args = ap.parse_args() + + nodes, edges = load_graph(args.input) + G = directed_hard_graph(nodes, edges) + R = transitive_reduction_with_scc(G) + node_to_comm = detect_communities_undirected(R) + write_outputs(R, node_to_comm, args.graph_out, args.comm_out) + print(f"reduced_nodes={R.number_of_nodes()} reduced_edges={R.number_of_edges()} communities={max(node_to_comm.values())+1}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + + diff --git a/scripts/scrape.js b/scripts/scrape.js deleted file mode 100644 index d4b2ecb..0000000 --- a/scripts/scrape.js +++ /dev/null @@ -1,63 +0,0 @@ -import { XMLParser } from "fast-xml-parser"; -import fs from "fs/promises"; -import path from "path"; -import dns from "node:dns"; - -import { ProxyAgent } from "undici"; - -dns.setDefaultResultOrder("ipv4first"); - -const proxy = process.env.HTTPS_PROXY || process.env.https_proxy || process.env.HTTP_PROXY || process.env.http_proxy; -const dispatcher = proxy ? new ProxyAgent(proxy) : undefined; - -const [ , , YEAR = "2025", TERM = "fall" ] = process.argv; -// UIUC's API uses XML namespaces (e.g. `<ns2:term>`). In order for the -// returned object to have plain keys like `term` and `subject`, we instruct -// fast-xml-parser to strip the namespace prefixes. -const parser = new XMLParser({ ignoreAttributes: false, removeNSPrefix: true }); -const BASE = `https://courses.illinois.edu/cisapp/explorer`; - -async function getXML(url) { - const res = await fetch(url, { dispatcher }); - if (!res.ok) throw new Error(`Request failed: ${res.status} ${url}`); - return parser.parse(await res.text()); - -} - -async function scrapeSchedule(year, term) { - const catalog = {}; - const termRoot = await getXML(`${BASE}/schedule/${year}/${term}.xml`); - - const subjects = termRoot.term?.subjects?.subject; - if (!subjects) throw new Error(`Unexpected XML structure for ${year} ${term}`); - - const subjHrefs = Array.isArray(subjects) ? subjects.map(s => s['@_href']) : [subjects['@_href']]; - - for (const subjURL of subjHrefs) { - const subjXML = await getXML(subjURL); - - const courses = subjXML.subject?.courses?.course || []; - - const courseList = Array.isArray(courses) ? courses : [courses]; - for (const c of courseList) { - const courseURL = c['@_href']; - const courseXML = await getXML(courseURL); - const id = courseXML.course['@_id']; - const desc = courseXML.course.description ?? ""; - const m = desc.match(/Prerequisite[s]?:\s*([^.;]*)/i); - if (!m) continue; - const prereqs = m[1] - .match(/[A-Z]{2,4}\s?\d{2,3}[A-Z]?/g) - ?.map(s => s.replace(/\s+/, "")) ?? []; - if (prereqs.length) catalog[id.replace(/\s+/, "")] = prereqs; - } - await new Promise(r => setTimeout(r, 300)); - } - return catalog; -} - -const data = await scrapeSchedule(YEAR, TERM); -const outDir = path.resolve("data"); -await fs.mkdir(outDir, { recursive: true }); -await fs.writeFile(path.join(outDir, `catalog_${YEAR}_${TERM}.json`), JSON.stringify(data, null, 2)); -console.log(`Saved ${Object.keys(data).length} courses`); diff --git a/scripts/validate_courses.py b/scripts/validate_courses.py new file mode 100644 index 0000000..acff4cd --- /dev/null +++ b/scripts/validate_courses.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +import json +import sys +from jsonschema import Draft202012Validator + + +def main() -> int: + if len(sys.argv) != 3: + print("usage: validate_courses.py <schema.json> <data.json>") + return 2 + + schema_path, data_path = sys.argv[1], sys.argv[2] + with open(schema_path, "r", encoding="utf-8") as f: + schema = json.load(f) + with open(data_path, "r", encoding="utf-8") as f: + data = json.load(f) + + validator = Draft202012Validator(schema) + errors = list(validator.iter_errors(data[0] if isinstance(data, list) and data else data)) + if errors: + for err in errors: + print(f"error: {err.message} at {list(err.path)}") + return 1 + print("ok") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) + + |
