1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
|
"""
Claude Bridge - Lab Worker MCP Server
实验室端 MCP 工具,供实验室 Claude Code 使用
"""
import os
import subprocess
import httpx
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("lab-worker")
BROKER_URL = os.environ["BROKER_URL"] # 私人服务器公网地址
API_SECRET = os.environ["API_SECRET"]
HEADERS = {"Authorization": f"Bearer {API_SECRET}"}
# 启动时就确定 session name,不每次重新检测
_SESSION_NAME = ""
def _detect_session_name() -> str:
"""检测当前进程所在的 tmux session 名"""
# 方法 1: 环境变量(最可靠,需要在 tmux session 里设置)
name = os.environ.get("CLAUDE_SESSION_NAME", "")
if name:
return name
# 方法 2: 进程树 PID 匹配 tmux pane
try:
result = subprocess.run(
["tmux", "list-panes", "-a", "-F", "#{pane_pid} #{session_name}"],
capture_output=True, timeout=3, text=True,
)
if result.returncode != 0:
return ""
pane_map = {}
for line in result.stdout.strip().split("\n"):
parts = line.split(" ", 1)
if len(parts) == 2:
pane_map[parts[0]] = parts[1]
check_pid = os.getpid()
for _ in range(50):
if str(check_pid) in pane_map:
return pane_map[str(check_pid)]
try:
with open(f"/proc/{check_pid}/stat") as f:
stat = f.read()
ppid = int(stat.split(")")[1].split()[1])
if ppid <= 1:
break
check_pid = ppid
except Exception:
break
except Exception:
pass
return ""
# 启动时检测一次
_SESSION_NAME = _detect_session_name()
if _SESSION_NAME:
print(f"[lab-worker] Detected session: {_SESSION_NAME}", flush=True)
else:
print("[lab-worker] WARNING: Could not detect tmux session name!", flush=True)
def _get_session_name() -> str:
return _SESSION_NAME
@mcp.tool()
async def fetch_pending_tasks() -> str:
"""从调度服务器获取属于当前 session 的待执行任务列表。"""
session = _get_session_name()
if not session:
return "错误:无法检测 tmux session 名,拒绝获取任务(防止跨 session 抢任务)。"
params = {"target": session}
async with httpx.AsyncClient() as client:
resp = await client.get(f"{BROKER_URL}/tasks/pending", headers=HEADERS, params=params, timeout=15)
resp.raise_for_status()
data = resp.json()
tasks = data.get("tasks", [])
if not tasks:
return "当前没有待执行的任务。"
lines = []
for t in tasks:
lines.append(f"任务 [{t['id']}]: {t['content']}")
return "\n---\n".join(lines)
@mcp.tool()
async def claim_task(task_id: str) -> str:
"""领取一个任务,标记为正在执行。必须在开始执行前调用。"""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{BROKER_URL}/tasks/{task_id}/claim", headers=HEADERS, timeout=10
)
if resp.status_code == 200:
return f"已领取任务 {task_id},开始执行。"
return f"领取失败: {resp.text}"
@mcp.tool()
async def report_result(task_id: str, result: str) -> str:
"""提交任务执行结果。result 应包含详细的执行结果和关键输出。"""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{BROKER_URL}/tasks/{task_id}/result",
headers=HEADERS,
json={"result": result},
timeout=15,
)
if resp.status_code == 200:
return f"任务 {task_id} 结果已提交。"
return f"提交失败: {resp.text}"
@mcp.tool()
async def report_failure(task_id: str, reason: str) -> str:
"""报告任务执行失败。reason 应包含错误信息和失败原因。"""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{BROKER_URL}/tasks/{task_id}/fail",
headers=HEADERS,
json={"result": reason},
timeout=15,
)
if resp.status_code == 200:
return f"已报告任务 {task_id} 失败。"
return f"报告失败: {resp.text}"
@mcp.tool()
async def reply_to_dispatcher(message: str) -> str:
"""回传消息给调度中心。用于回复 btw 询问、汇报进度、回答问题等。"""
session = _get_session_name() or "unknown"
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{BROKER_URL}/log",
headers=HEADERS,
json={"source": session, "message": message},
timeout=10,
)
if resp.status_code == 200:
return "已回传给调度中心。"
return f"回传失败: {resp.text}"
@mcp.tool()
async def upload_file_to_broker(file_path: str) -> str:
"""把实验室本地文件上传到 broker 文件存储,供 dispatcher 下载或发给用户。
file_path: 实验室服务器上的本地文件绝对路径。
"""
if not os.path.exists(file_path):
return f"文件不存在: {file_path}"
filename = os.path.basename(file_path)
async with httpx.AsyncClient(timeout=120) as client:
with open(file_path, "rb") as f:
resp = await client.post(
f"{BROKER_URL}/files/upload",
headers=HEADERS,
files={"file": (filename, f)},
data={"filename": filename},
)
resp.raise_for_status()
data = resp.json()
return f"已上传: {data['filename']} ({data['size']} bytes)"
@mcp.tool()
async def download_file_from_broker(filename: str, save_path: str = "") -> str:
"""从 broker 文件存储下载文件到实验室本地。
filename: broker 上的文件名。
save_path: 保存路径。留空则保存到当前工作目录。
"""
if not save_path:
save_path = os.path.join(os.getcwd(), filename)
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.get(
f"{BROKER_URL}/files/{filename}",
headers=HEADERS,
)
resp.raise_for_status()
with open(save_path, "wb") as f:
f.write(resp.content)
return f"已下载到: {save_path} ({len(resp.content)} bytes)"
@mcp.tool()
async def check_context_usage() -> str:
"""查看当前 session 的 context 使用情况(通过 /context 命令)。"""
import time, re
session = _get_session_name()
if not session:
return "无法检测 session 名"
try:
subprocess.run(["tmux", "send-keys", "-t", session, "/context", "Enter"], timeout=3)
time.sleep(3)
output = subprocess.check_output(
["tmux", "capture-pane", "-t", session, "-p", "-S", "-50"],
timeout=3
).decode()
info = []
for line in output.split("\n"):
m = re.search(r'(\d+k)/(\d+k)', line)
if m:
info.append(f"Total: {m.group(1)}/{m.group(2)}")
for keyword in ["Messages:", "Free space:", "Autocompact"]:
if keyword in line:
m2 = re.search(r'([\d.]+k?\s*tokens?\s*\([\d.]+%\))', line)
if m2:
info.append(f"{keyword} {m2.group(1)}")
if info:
return f"[{session}] Context:\n" + "\n".join(info)
else:
return f"[{session}] 解析失败"
except Exception as e:
return f"错误: {e}"
if __name__ == "__main__":
mcp.run(transport="stdio")
|