1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
|
#!/usr/bin/env python3
import argparse
import json
import math
import os
from typing import Any, Dict, List, Tuple
import networkx as nx
def collect_courses_from_ast(ast: Dict[str, Any]) -> List[str]:
out: List[str] = []
def walk(node: Any) -> None:
if not isinstance(node, dict):
return
op = node.get("op")
if op == "COURSE" and node.get("course"):
out.append(node["course"])
for child in node.get("items", []) or []:
walk(child)
walk(ast)
# Unique order-preserving
seen = set()
uniq: List[str] = []
for c in out:
if c not in seen:
seen.add(c)
uniq.append(c)
return uniq
def build_graph(courses: List[Dict[str, Any]], include_coreq: bool = True) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
nodes_map: Dict[str, Dict[str, Any]] = {}
edges: List[Dict[str, Any]] = []
def ensure_node(course_id: str, label: str = None) -> None:
if course_id not in nodes_map:
nodes_map[course_id] = {"id": course_id, "label": label or course_id, "subject": course_id.split()[0] if ' ' in course_id else None}
for c in courses:
idx = c.get("index")
name = c.get("name")
ensure_node(idx, name)
pr = c.get("prerequisites") or {}
hard = pr.get("hard") or {"op": "EMPTY"}
coreq = pr.get("coreq_ok") or {"op": "EMPTY"}
for pre in collect_courses_from_ast(hard):
ensure_node(pre)
edges.append({"source": pre, "target": idx, "kind": "hard"})
if include_coreq:
for pre in collect_courses_from_ast(coreq):
ensure_node(pre)
edges.append({"source": pre, "target": idx, "kind": "coreq"})
nodes = [ {"id": n["id"], "label": n["label"], "subject": n.get("subject")} for n in nodes_map.values() ]
return nodes, edges
def compute_positions(
nodes: List[Dict[str, Any]],
edges: List[Dict[str, Any]],
seed: int = 42,
layout: str = "spring",
iterations: int = 100,
component_wise: bool = False,
# SMACOF (MDS) options
mds_backend: str = "auto", # auto|sklearn|cuml
mds_max_iter: int = 300,
mds_eps: float = 1e-3,
mds_verbose: int = 1,
# Overlap resolution options
resolve_overlap: bool = False,
node_size_px: float = 6.0,
min_dist_mul: float = 1.5,
overlap_max_iters: int = 60,
overlap_step: float = 0.5,
) -> Dict[str, Dict[str, float]]:
# Use a force-directed layout over an undirected graph for a compact web-like layout
G = nx.Graph()
for n in nodes:
G.add_node(n["id"])
for e in edges:
G.add_edge(e["source"], e["target"]) # undirected for layout
def layout_graph(graph: nx.Graph) -> Dict[str, Tuple[float, float]]:
if layout == "drl":
try:
import igraph as ig # type: ignore
except Exception as e:
raise RuntimeError("python-igraph is required for DRL/OpenOrd-like layout; pip install python-igraph") from e
nodes_list = list(graph.nodes())
index_of = {v: i for i, v in enumerate(nodes_list)}
g = ig.Graph()
g.add_vertices(len(nodes_list))
g.vs["name"] = nodes_list
# unique edges only
edge_idx = set()
for u, v in graph.edges():
iu, iv = index_of[u], index_of[v]
if iu == iv:
continue
a, b = (iu, iv) if iu < iv else (iv, iu)
if (a, b) not in edge_idx:
edge_idx.add((a, b))
if edge_idx:
g.add_edges(list(edge_idx))
# DRL (OpenOrd-style) is good for community separation
lay = g.layout_drl()
coords = [[float(x), float(y)] for x, y in lay]
return {nodes_list[i]: (coords[i][0], coords[i][1]) for i in range(len(nodes_list))}
if layout == "fa2":
try:
from fa2 import ForceAtlas2 # type: ignore
except Exception as e:
raise RuntimeError("fa2 is required for ForceAtlas2 layout; pip install fa2") from e
fa = ForceAtlas2(
# LinLog energy model emphasizes community separation
linLogMode=True,
gravity=1.0,
strongGravityMode=True,
scalingRatio=2.0,
outboundAttractionDistribution=False,
barnesHutOptimize=True,
barnesHutTheta=1.2,
jitterTolerance=1.0,
edgeWeightInfluence=1.0,
adjustSizes=False,
verbose=False,
)
pos = fa.forceatlas2_networkx_layout(graph, pos=None, iterations=max(300, iterations))
return {n: (float(xy[0]), float(xy[1])) for n, xy in pos.items()}
if layout == "smacof":
try:
import numpy as np
except Exception as e:
raise RuntimeError("NumPy is required for smacof layout") from e
nodes_list = list(graph.nodes())
n = len(nodes_list)
if n == 0:
return {}
if n == 1:
return {nodes_list[0]: (0.0, 0.0)}
# Compute all-pairs shortest path distances (undirected)
index_of = {v: i for i, v in enumerate(nodes_list)}
D = np.full((n, n), 0.0, dtype=np.float32)
large = 1e6
for i in range(n):
for j in range(n):
if i != j:
D[i, j] = large
for src, lengths in nx.all_pairs_shortest_path_length(graph):
i = index_of[src]
for dst, d in lengths.items():
j = index_of[dst]
if i != j:
D[i, j] = float(d)
D[j, i] = float(d)
# Replace remaining large distances with max finite distance * 1.5
finite = D[D < large]
maxd = float(finite.max()) if finite.size else 1.0
D[D >= large] = maxd * 1.5
backend_used = None
coords = None
if mds_backend in ("auto", "cuml"):
try:
from cuml.manifold import MDS as cuMDS # type: ignore
backend_used = "cuml"
print("[smacof] using cuML MDS (GPU) ...")
m = cuMDS(n_components=2, dissimilarity='precomputed', max_iter=mds_max_iter, random_state=seed, verbose=bool(mds_verbose))
coords = m.fit_transform(D)
try:
coords = coords.get() # convert cupy to numpy if needed
except Exception:
pass
except Exception:
if mds_backend == "cuml":
raise
if coords is None:
from sklearn.manifold import MDS
backend_used = "sklearn"
print("[smacof] using scikit-learn MDS (CPU) ...")
# verbose prints per-iteration stress
mds = MDS(n_components=2, dissimilarity='precomputed', metric=True, random_state=seed, n_init=1, max_iter=mds_max_iter, eps=mds_eps, verbose=mds_verbose)
coords = mds.fit_transform(D)
print(f"[smacof] backend={backend_used} done. shape={coords.shape}")
return {nodes_list[i]: (float(coords[i, 0]), float(coords[i, 1])) for i in range(n)}
if layout == "random":
return nx.random_layout(graph, dim=2, seed=seed)
if layout == "kk":
return nx.kamada_kawai_layout(graph, dim=2)
if layout == "none":
return {n: (0.0, 0.0) for n in graph.nodes}
# default: spring
try:
return nx.spring_layout(graph, seed=seed, dim=2, iterations=iterations)
except ModuleNotFoundError:
# SciPy not installed – use kamada_kawai instead
return nx.kamada_kawai_layout(graph, dim=2)
except Exception:
return nx.kamada_kawai_layout(graph, dim=2)
if component_wise:
pos_raw: Dict[str, Tuple[float, float]] = {}
for comp in nx.connected_components(G):
sub = G.subgraph(comp)
local = layout_graph(sub)
pos_raw.update(local)
else:
pos_raw = layout_graph(G)
# Normalize positions to a fixed range for consistent initial viewport
xs = [p[0] for p in pos_raw.values()]
ys = [p[1] for p in pos_raw.values()]
min_x, max_x = (min(xs), max(xs)) if xs else (0.0, 1.0)
min_y, max_y = (min(ys), max(ys)) if ys else (0.0, 1.0)
span_x = max(max_x - min_x, 1e-6)
span_y = max(max_y - min_y, 1e-6)
# Scale to a large square canvas by default; for SMACOF earlier we used disk mapping.
# Here keep linear scaling to preserve community geometry (good for ForceAtlas2/SMACOF alike).
scale = 6000.0
out: Dict[str, Dict[str, float]] = {}
for node_id, (x, y) in pos_raw.items():
x01 = (x - min_x) / span_x # 0..1
y01 = (y - min_y) / span_y # 0..1
out[node_id] = {"x": (x01 - 0.5) * scale, "y": (y01 - 0.5) * scale}
if resolve_overlap and out:
# Simple grid-based overlap removal with minimal displacement
target_dist = max(1.0, node_size_px * min_dist_mul)
cell = target_dist
node_ids = list(out.keys())
for _ in range(overlap_max_iters):
# Build spatial hash
grid: Dict[Tuple[int,int], List[str]] = {}
for nid in node_ids:
p = out[nid]
gx = int(math.floor(p["x"] / cell))
gy = int(math.floor(p["y"] / cell))
grid.setdefault((gx, gy), []).append(nid)
moved = 0.0
disp: Dict[str, Tuple[float,float]] = {}
for nid in node_ids:
px = out[nid]["x"]; py = out[nid]["y"]
gx = int(math.floor(px / cell)); gy = int(math.floor(py / cell))
# check neighbors cells
for dx in (-1,0,1):
for dy in (-1,0,1):
cell_nodes = grid.get((gx+dx, gy+dy), [])
for mid in cell_nodes:
if mid <= nid: # avoid double count and self
continue
qx = out[mid]["x"]; qy = out[mid]["y"]
vx = qx - px; vy = qy - py
dist = math.hypot(vx, vy)
if dist < target_dist and dist > 1e-6:
overlap = target_dist - dist
ux = vx / dist; uy = vy / dist
mx = -ux * (overlap * 0.5)
my = -uy * (overlap * 0.5)
disp[nid] = (disp.get(nid, (0.0,0.0))[0] + mx, disp.get(nid, (0.0,0.0))[1] + my)
disp[mid] = (disp.get(mid, (0.0,0.0))[0] - mx, disp.get(mid, (0.0,0.0))[1] - my)
if not disp:
break
for nid, (dx, dy) in disp.items():
out[nid]["x"] += dx * overlap_step
out[nid]["y"] += dy * overlap_step
moved += abs(dx) + abs(dy)
if moved < 1e-3:
break
return out
def main() -> int:
ap = argparse.ArgumentParser(description="Build slim graph assets and preset positions")
ap.add_argument("input", nargs="?", default="data/courses_parsed.json", help="Input parsed courses JSON")
ap.add_argument("--graph-out", default="data/graph.json", help="Output graph JSON (nodes, edges)")
ap.add_argument("--pos-out", default="data/positions.json", help="Output positions JSON (node -> {x,y})")
ap.add_argument("--pos-out-alt", nargs='*', default=[], help="Additional positions to generate in the form layout:name (e.g., kk:positions_kk.json spring:positions_spring.json)")
ap.add_argument("--hard-only", action="store_true", help="Only include hard prerequisite edges (exclude coreq)")
ap.add_argument("--layout", choices=["spring","kk","random","none","smacof","fa2","drl"], default="fa2", help="Layout algorithm for positions")
ap.add_argument("--iterations", type=int, default=60, help="Iterations for spring layout (lower is faster)")
ap.add_argument("--component-wise", action="store_true", help="Layout each connected component separately (can be faster)")
# Overlap options
ap.add_argument("--resolve-overlap", action="store_true", help="Run overlap removal post-process")
ap.add_argument("--node-size", type=float, default=6.0, help="Node visual diameter in px (for spacing)")
ap.add_argument("--min-dist-mul", type=float, default=1.5, help="Minimum center distance multiplier of node size")
# SMACOF options
ap.add_argument("--mds-backend", choices=["auto","sklearn","cuml"], default="auto", help="Backend for SMACOF (stress majorization)")
ap.add_argument("--mds-max-iter", type=int, default=300, help="Max iterations for SMACOF")
ap.add_argument("--mds-eps", type=float, default=1e-3, help="Convergence tolerance for SMACOF")
ap.add_argument("--mds-verbose", type=int, default=1, help="Verbosity for SMACOF (>=1 prints per-iteration stress)")
args = ap.parse_args()
with open(args.input, "r", encoding="utf-8") as f:
courses = json.load(f)
nodes, edges = build_graph(courses, include_coreq=not args.hard_only)
os.makedirs(os.path.dirname(args.graph_out) or ".", exist_ok=True)
with open(args.graph_out, "w", encoding="utf-8") as f:
json.dump({"nodes": nodes, "edges": edges}, f, ensure_ascii=False, indent=2)
print(f"building positions: nodes={len(nodes)} edges={len(edges)} layout={args.layout} iter={args.iterations} component_wise={args.component_wise}")
pos = compute_positions(
nodes, edges,
layout=args.layout,
iterations=args.iterations,
component_wise=args.component_wise,
mds_backend=args.mds_backend,
mds_max_iter=args.mds_max_iter,
mds_eps=args.mds_eps,
mds_verbose=args.mds_verbose,
resolve_overlap=args.resolve_overlap,
node_size_px=args.node_size,
min_dist_mul=args.min_dist_mul,
)
with open(args.pos_out, "w", encoding="utf-8") as f:
json.dump(pos, f, ensure_ascii=False, indent=2)
# Optionally generate additional layouts
for spec in args.pos_out_alt:
try:
lay, path = spec.split(":", 1)
except ValueError:
print(f"[warn] invalid --pos-out-alt spec: {spec}")
continue
try:
alt = compute_positions(
nodes, edges,
layout=lay,
iterations=args.iterations,
component_wise=args.component_wise,
mds_backend=args.mds_backend,
mds_max_iter=args.mds_max_iter,
mds_eps=args.mds_eps,
mds_verbose=args.mds_verbose,
)
with open(path, "w", encoding="utf-8") as f:
json.dump(alt, f, ensure_ascii=False, indent=2)
print(f"wrote alt positions: {lay} -> {path}")
except Exception as e:
print(f"[warn] failed alt positions {lay}: {e}")
print(f"nodes: {len(nodes)}, edges: {len(edges)}, positions: {len(pos)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
|