1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
#!/usr/bin/env python3
import argparse
import json
import os
from typing import Any, Dict, List, Set, Tuple
import networkx as nx
def load_graph(path: str) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("nodes", []), data.get("edges", [])
def directed_hard_graph(nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> nx.DiGraph:
G = nx.DiGraph()
for n in nodes:
G.add_node(n["id"], **n)
for e in edges:
if e.get("kind") == "hard":
G.add_edge(e["source"], e["target"])
# drop self-loops
G.remove_edges_from(nx.selfloop_edges(G))
return G
def transitive_reduction_with_scc(G: nx.DiGraph) -> nx.DiGraph:
# Collapse strongly connected components to ensure DAG for TR
sccs: List[Set[str]] = list(nx.strongly_connected_components(G))
comp_id_of: Dict[str, int] = {}
for i, comp in enumerate(sccs):
for v in comp:
comp_id_of[v] = i
# Build component DAG
CG = nx.DiGraph()
for i in range(len(sccs)):
CG.add_node(i)
original_cross_edges: Dict[Tuple[int, int], List[Tuple[str, str]]] = {}
for u, v in G.edges():
cu, cv = comp_id_of[u], comp_id_of[v]
if cu != cv:
CG.add_edge(cu, cv)
original_cross_edges.setdefault((cu, cv), []).append((u, v))
# Transitive reduction on component DAG
TR_CG = nx.transitive_reduction(CG) if CG.number_of_edges() else CG
# Build reduced graph: keep all intra-SCC edges; between SCCs keep one representative per reduced edge
R = nx.DiGraph()
R.add_nodes_from(G.nodes(data=True))
# Keep intra-SCC edges (within each component)
for i, comp in enumerate(sccs):
if len(comp) == 1:
continue
for u in comp:
for v in G.successors(u):
if comp_id_of[v] == i:
R.add_edge(u, v)
# For each edge in reduced component graph, keep one representative original edge
for cu, cv in TR_CG.edges():
reps = original_cross_edges.get((cu, cv), [])
if not reps:
continue
# choose deterministically: first sorted
u, v = sorted(reps)[0]
R.add_edge(u, v)
return R
def detect_communities_undirected(R: nx.DiGraph) -> Dict[str, int]:
UG = R.to_undirected()
# Greedy modularity communities (built-in, no extra deps)
communities = list(nx.algorithms.community.greedy_modularity_communities(UG))
node_to_comm: Dict[str, int] = {}
for cid, comm in enumerate(communities):
for v in comm:
node_to_comm[v] = cid
# Isolated nodes not included
for v in R.nodes():
node_to_comm.setdefault(v, -1)
return node_to_comm
def palette(n: int) -> List[str]:
base = [
"#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd",
"#8c564b", "#e377c2", "#7f7f7f", "#bcbd22", "#17becf",
]
if n <= len(base):
return base[:n]
colors = []
for i in range(n):
colors.append(base[i % len(base)])
return colors
def write_outputs(R: nx.DiGraph, node_to_comm: Dict[str, int], graph_out: str, comm_out: str) -> None:
# Prepare node list with community and color
max_comm = max(node_to_comm.values()) if node_to_comm else -1
colors = palette(max_comm + 1)
nodes: List[Dict[str, Any]] = []
for v, data in R.nodes(data=True):
cid = node_to_comm.get(v, -1)
color = colors[cid] if cid >= 0 else "#4f46e5"
nodes.append({
"id": v,
"label": data.get("label") or v,
"community": cid,
"color": color,
"subject": data.get("subject"),
})
edges: List[Dict[str, Any]] = []
for u, v in R.edges():
edges.append({"source": u, "target": v, "kind": "hard"})
os.makedirs(os.path.dirname(graph_out) or ".", exist_ok=True)
with open(graph_out, "w", encoding="utf-8") as f:
json.dump({"nodes": nodes, "edges": edges}, f, ensure_ascii=False, indent=2)
# communities summary
comm_map: Dict[int, List[str]] = {}
for node, cid in node_to_comm.items():
comm_map.setdefault(cid, []).append(node)
with open(comm_out, "w", encoding="utf-8") as f:
json.dump({str(k): v for k, v in sorted(comm_map.items())}, f, ensure_ascii=False, indent=2)
def main() -> int:
ap = argparse.ArgumentParser(description="Transitive reduction + community detection pipeline")
ap.add_argument("input", nargs="?", default="data/graph.json", help="Input graph.json (nodes, edges)")
ap.add_argument("--graph-out", default="data/graph_reduced.json", help="Output reduced graph with communities")
ap.add_argument("--comm-out", default="data/communities.json", help="Output communities membership")
args = ap.parse_args()
nodes, edges = load_graph(args.input)
G = directed_hard_graph(nodes, edges)
R = transitive_reduction_with_scc(G)
node_to_comm = detect_communities_undirected(R)
write_outputs(R, node_to_comm, args.graph_out, args.comm_out)
print(f"reduced_nodes={R.number_of_nodes()} reduced_edges={R.number_of_edges()} communities={max(node_to_comm.values())+1}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
|