summaryrefslogtreecommitdiff
path: root/mcp_dispatcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'mcp_dispatcher.py')
-rw-r--r--mcp_dispatcher.py515
1 files changed, 515 insertions, 0 deletions
diff --git a/mcp_dispatcher.py b/mcp_dispatcher.py
new file mode 100644
index 0000000..8148f58
--- /dev/null
+++ b/mcp_dispatcher.py
@@ -0,0 +1,515 @@
+"""
+Claude Bridge - Dispatcher MCP Server
+调度端 MCP 工具,供私人服务器上的 Claude Code 使用
+"""
+
+import os
+import subprocess
+
+import httpx
+from mcp.server.fastmcp import FastMCP
+
+mcp = FastMCP("dispatcher")
+
+BROKER_URL = os.environ.get("BROKER_URL", "http://127.0.0.1:8000")
+API_SECRET = os.environ["API_SECRET"]
+
+HEADERS = {"Authorization": f"Bearer {API_SECRET}"}
+
+
+def _detect_dispatcher_id() -> str:
+ """Detect user_id from env or tmux session name (dispatcher-XXXXXX)"""
+ uid = os.environ.get("DISPATCHER_USER_ID", "")
+ if uid:
+ return uid
+ # 从 tmux session 名推断
+ try:
+ result = subprocess.run(
+ ["tmux", "list-panes", "-a", "-F", "#{pane_pid} #{session_name}"],
+ capture_output=True, timeout=3, text=True,
+ )
+ if result.returncode == 0:
+ pane_map = {}
+ for line in result.stdout.strip().split("\n"):
+ parts = line.split(" ", 1)
+ if len(parts) == 2:
+ pane_map[parts[0]] = parts[1]
+ check_pid = os.getpid()
+ for _ in range(50):
+ if str(check_pid) in pane_map:
+ session = pane_map[str(check_pid)]
+ if session.startswith("dispatcher-"):
+ return session[len("dispatcher-"):]
+ elif session == "dispatcher":
+ return "owner"
+ return ""
+ try:
+ with open(f"/proc/{check_pid}/stat") as f:
+ stat = f.read()
+ ppid = int(stat.split(")")[1].split()[1])
+ if ppid <= 1:
+ break
+ check_pid = ppid
+ except Exception:
+ break
+ except Exception:
+ pass
+ return "owner" # fallback
+
+
+DISPATCHER_ID = _detect_dispatcher_id()
+print(f"[dispatcher-mcp] Detected dispatcher_id: {DISPATCHER_ID}", flush=True)
+
+
+@mcp.tool()
+async def check_user_messages() -> str:
+ """检查 Telegram 是否有新的用户消息。有新消息时返回消息内容,没有则返回空。"""
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(f"{BROKER_URL}/messages/new", headers=HEADERS, timeout=10)
+ resp.raise_for_status()
+ data = resp.json()
+
+ msgs = data.get("messages", [])
+ if not msgs:
+ return "没有新消息。"
+
+ lines = []
+ for m in msgs:
+ lines.append(f"用户消息: {m['text']}")
+ return "\n".join(lines)
+
+
+@mcp.tool()
+async def send_task_to_lab(task_description: str, target: str = "") -> str:
+ """创建一个任务派发给实验室服务器执行。
+ task_description: 详细的任务描述,包含实验室 Claude 需要知道的所有信息。
+ target: 目标 session 名。可选值: claude, claude2, claude3。留空则任意空闲 session 可领取。
+ """
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(
+ f"{BROKER_URL}/tasks",
+ headers=HEADERS,
+ json={"content": task_description, "target": target, "dispatcher_id": DISPATCHER_ID},
+ timeout=10,
+ )
+ resp.raise_for_status()
+ task = resp.json()
+
+ target_info = f" → {task['target']}" if task.get('target') else ""
+ return f"任务已派发。ID: {task['id']}{target_info}"
+
+
+@mcp.tool()
+async def send_message_to_lab(message: str, target: str = "") -> str:
+ """向实验室 session 发送消息(btw 模式)。不打断当前工作,消息会在下一次工具调用后注入 context。
+ 适合询问进度、补充信息、轻量级指令等。
+ message: 消息内容
+ target: 目标 session 名。可选值: claude, claude2, claude3。留空则所有 session 可见。
+ """
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(
+ f"{BROKER_URL}/tasks",
+ headers=HEADERS,
+ json={"content": message, "target": target, "type": "message", "dispatcher_id": DISPATCHER_ID},
+ timeout=10,
+ )
+ resp.raise_for_status()
+ task = resp.json()
+
+ target_info = f" → {task['target']}" if task.get('target') else ""
+ return f"消息已发送{target_info}。ID: {task['id']}"
+
+
+@mcp.tool()
+async def list_workers() -> str:
+ """List all active workers and their status (heartbeat, bound channel)."""
+ async with httpx.AsyncClient() as client:
+ # Get workers
+ resp = await client.get(f"{BROKER_URL}/workers", headers=HEADERS, timeout=10)
+ resp.raise_for_status()
+ workers = resp.json().get("workers", [])
+ # Get heartbeats
+ hb_resp = await client.get(f"{BROKER_URL}/heartbeat", headers=HEADERS, timeout=10)
+ hb_resp.raise_for_status()
+ heartbeats = hb_resp.json()
+
+ if not workers:
+ return "No active workers."
+
+ lines = []
+ for w in workers:
+ session = w["session_name"]
+ hb = heartbeats.get(session, {})
+ status = "🟢 online" if hb.get("alive") else ("🔴 offline" if hb else "⚪ no heartbeat")
+ channel = w.get("channel_name", "")
+ ch_info = f" → #{channel}" if channel else ""
+ lines.append(f" {session} ({w['host']}:{w['path']}){ch_info} [{status}]")
+ return "\n".join(lines)
+
+
+@mcp.tool()
+async def check_task_status(task_id: str) -> str:
+ """查询某个任务的执行状态和结果。"""
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(f"{BROKER_URL}/tasks/{task_id}", headers=HEADERS, timeout=10)
+ resp.raise_for_status()
+ task = resp.json()
+
+ status = task["status"]
+ if status == "done":
+ return f"任务 {task_id} 已完成。\n结果:\n{task['result']}"
+ elif status == "failed":
+ return f"任务 {task_id} 失败。\n原因:\n{task['result']}"
+ elif status == "running":
+ return f"任务 {task_id} 正在执行中..."
+ else:
+ return f"任务 {task_id} 等待领取中。"
+
+
+@mcp.tool()
+async def list_all_tasks(status: str = "") -> str:
+ """列出所有任务。可选按状态过滤: pending, running, done, failed"""
+ async with httpx.AsyncClient() as client:
+ params = {}
+ if status:
+ params["status"] = status
+ resp = await client.get(
+ f"{BROKER_URL}/tasks", headers=HEADERS, params=params, timeout=10
+ )
+ resp.raise_for_status()
+ data = resp.json()
+
+ tasks = data.get("tasks", [])
+ if not tasks:
+ return "没有任务。"
+
+ lines = []
+ for t in tasks:
+ lines.append(f"[{t['id']}] status={t['status']} | {t['content'][:80]}")
+ return "\n".join(lines)
+
+
+@mcp.tool()
+async def send_telegram_message(message: str) -> str:
+ """通过 Telegram 给用户发送消息。用于汇报结果、确认收到指令等。"""
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(
+ f"{BROKER_URL}/telegram/send",
+ headers=HEADERS,
+ json={"message": message},
+ timeout=15,
+ )
+ resp.raise_for_status()
+
+ return "Telegram 消息已发送。"
+
+
+@mcp.tool()
+async def switch_worker_project(session: str, new_directory: str) -> str:
+ """切换实验室 worker 的工作项目。会执行: /exit → cd 新目录 → 复制 CLAUDE.md → claude --continue。
+ session: 目标 session 名 (claude, claude2, claude3)
+ new_directory: 新项目的绝对路径 (如 /home/yurenh2/new-project)
+ """
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(
+ f"{BROKER_URL}/commands",
+ headers=HEADERS,
+ json={
+ "target": session,
+ "action": "switch_project",
+ "params": {"directory": new_directory},
+ },
+ timeout=10,
+ )
+ resp.raise_for_status()
+ data = resp.json()
+
+ return f"切换命令已下发。ID: {data['id']}。{session} 将切换到 {new_directory},完成后会收到通知。"
+
+
+@mcp.tool()
+async def restart_worker(session: str) -> str:
+ """重启实验室 worker 的 Claude Code session(在同一目录 /exit + claude --continue)。
+ session: 目标 session 名 (claude, claude2, claude3)
+ """
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(
+ f"{BROKER_URL}/commands",
+ headers=HEADERS,
+ json={
+ "target": session,
+ "action": "restart",
+ "params": {},
+ },
+ timeout=10,
+ )
+ resp.raise_for_status()
+ data = resp.json()
+
+ return f"重启命令已下发。ID: {data['id']}。{session} 将重启,完成后会收到通知。"
+
+
+@mcp.tool()
+async def send_slack_message(channel: str, message: str, thread_ts: str = "") -> str:
+ """通过 Slack 发送消息。用于回复来自 Slack 的消息。
+ channel: Slack channel ID(从消息标签 channel=xxx 中获取)
+ message: 消息内容
+ thread_ts: 可选,回复到指定 thread
+ """
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(
+ f"{BROKER_URL}/slack/send",
+ headers=HEADERS,
+ json={"channel": channel, "message": message, "thread_ts": thread_ts},
+ timeout=15,
+ )
+ resp.raise_for_status()
+ return "Slack 消息已发送。"
+
+
+@mcp.tool()
+async def check_context_usage(session: str = "dispatcher") -> str:
+ """查看指定 session 的 context 使用情况。
+ session: tmux session 名。默认 dispatcher。可填 claude/claude2/claude3 查 lab worker(需要 lab 端可达)。
+ 注意:会在目标 session 执行 /context 命令,可能短暂干扰该 session。
+ """
+ import subprocess, time, re
+
+ try:
+ # 发送 /context 命令
+ subprocess.run(["tmux", "send-keys", "-t", session, "/context", "Enter"], timeout=3)
+ # 等输出渲染
+ time.sleep(3)
+ # 抓取 pane 内容
+ output = subprocess.check_output(
+ ["tmux", "capture-pane", "-t", session, "-p", "-S", "-50"],
+ timeout=3
+ ).decode()
+
+ # 解析关键信息
+ lines = output.split("\n")
+ info = []
+ for line in lines:
+ # 匹配 "239k/1000k" 格式
+ m = re.search(r'(\d+k)/(\d+k)', line)
+ if m:
+ info.append(f"Total: {m.group(1)}/{m.group(2)}")
+ # 匹配百分比行
+ if "tokens (" in line and "%" in line:
+ clean = re.sub(r'[⛁⛶⛀⛝\s]+', '', line).strip()
+ if clean:
+ info.append(clean)
+ # 匹配 Messages/Free space 等
+ for keyword in ["Messages:", "Free space:", "System prompt:", "Autocompact"]:
+ if keyword in line:
+ # 提取数字部分
+ m2 = re.search(r'([\d.]+k?\s*tokens?\s*\([\d.]+%\))', line)
+ if m2:
+ info.append(f"{keyword} {m2.group(1)}")
+
+ if info:
+ return f"[{session}] Context:\n" + "\n".join(info)
+ else:
+ return f"[{session}] 无法解析 /context 输出,原始内容尾部:\n" + "\n".join(lines[-15:])
+ except Exception as e:
+ return f"错误: {e}"
+
+
+@mcp.tool()
+async def send_file_to_user(filename: str, caption: str = "") -> str:
+ """通过 Telegram 给用户发送 broker 上的文件。
+ filename: broker 文件存储中的文件名(用 list_files 查看可用文件)。
+ caption: 文件说明。
+ """
+ async with httpx.AsyncClient(timeout=60) as client:
+ resp = await client.post(
+ f"{BROKER_URL}/telegram/send_file",
+ headers=HEADERS,
+ data={"filename": filename, "caption": caption},
+ )
+ resp.raise_for_status()
+ return "文件已通过 Telegram 发送给用户。"
+
+
+@mcp.tool()
+async def upload_local_file_to_broker(file_path: str) -> str:
+ """把 dispatcher 本地文件上传到 broker 文件存储,供 lab worker 下载或发给用户。
+ file_path: dispatcher 服务器上的本地文件路径。
+ """
+ import os
+ if not os.path.exists(file_path):
+ return f"文件不存在: {file_path}"
+ filename = os.path.basename(file_path)
+ async with httpx.AsyncClient(timeout=60) as client:
+ with open(file_path, "rb") as f:
+ resp = await client.post(
+ f"{BROKER_URL}/files/upload",
+ headers=HEADERS,
+ files={"file": (filename, f)},
+ data={"filename": filename},
+ )
+ resp.raise_for_status()
+ data = resp.json()
+ return f"已上传: {data['filename']} ({data['size']} bytes)"
+
+
+@mcp.tool()
+async def download_file_from_broker(filename: str, save_path: str = "") -> str:
+ """从 broker 文件存储下载文件到 dispatcher 本地。
+ filename: broker 上的文件名。
+ save_path: 保存路径。留空则保存到当前目录。
+ """
+ import os
+ if not save_path:
+ save_path = os.path.join(os.getcwd(), filename)
+ async with httpx.AsyncClient(timeout=60) as client:
+ resp = await client.get(
+ f"{BROKER_URL}/files/{filename}",
+ headers=HEADERS,
+ )
+ resp.raise_for_status()
+ with open(save_path, "wb") as f:
+ f.write(resp.content)
+ return f"已下载到: {save_path} ({len(resp.content)} bytes)"
+
+
+@mcp.tool()
+async def list_files() -> str:
+ """列出 broker 文件存储中的所有文件。"""
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(f"{BROKER_URL}/files", headers=HEADERS, timeout=10)
+ resp.raise_for_status()
+ data = resp.json()
+ files = data.get("files", [])
+ if not files:
+ return "没有文件。"
+ lines = [f" {f['name']} ({f['size']} bytes)" for f in files]
+ return "\n".join(lines)
+
+
+@mcp.tool()
+async def schedule_at(action: str, time_str: str) -> str:
+ """在指定时刻触发一个动作。到时间后会收到 [定时任务 ID] action 的消息。
+ action: 到时间后要执行的指令/提醒内容
+ time_str: 时间,格式 "2026-03-30 14:00" 或 "14:00"(当天)。时区为服务器时区。
+ """
+ import datetime
+ now = datetime.datetime.now()
+ try:
+ if len(time_str) <= 5:
+ t = datetime.datetime.strptime(time_str, "%H:%M").replace(year=now.year, month=now.month, day=now.day)
+ if t < now:
+ t += datetime.timedelta(days=1)
+ else:
+ t = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M")
+ except ValueError:
+ return f"时间格式错误: {time_str},请用 'HH:MM' 或 'YYYY-MM-DD HH:MM'"
+
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(f"{BROKER_URL}/schedules", headers=HEADERS, json={
+ "action": action, "trigger_at": t.timestamp(), "dispatcher_id": DISPATCHER_ID,
+ }, timeout=10)
+ resp.raise_for_status()
+ data = resp.json()
+ return f"定时任务已创建。ID: {data['id']},触发时间: {data['trigger_at']}"
+
+
+@mcp.tool()
+async def schedule_delay(action: str, minutes: float) -> str:
+ """延迟指定分钟后触发一个动作。
+ action: 到时间后要执行的指令/提醒内容
+ minutes: 延迟分钟数
+ """
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(f"{BROKER_URL}/schedules", headers=HEADERS, json={
+ "action": action, "delay_seconds": minutes * 60, "dispatcher_id": DISPATCHER_ID,
+ }, timeout=10)
+ resp.raise_for_status()
+ data = resp.json()
+ return f"定时任务已创建。ID: {data['id']},触发时间: {data['trigger_at']}"
+
+
+@mcp.tool()
+async def schedule_loop(action: str, interval_minutes: float) -> str:
+ """创建循环定时任务,每隔固定时间重复触发。
+ action: 每次触发时要执行的指令/提醒内容
+ interval_minutes: 间隔分钟数
+ """
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(f"{BROKER_URL}/schedules", headers=HEADERS, json={
+ "action": action, "delay_seconds": interval_minutes * 60, "repeat_seconds": interval_minutes * 60, "dispatcher_id": DISPATCHER_ID,
+ }, timeout=10)
+ resp.raise_for_status()
+ data = resp.json()
+ return f"循环任务已创建。ID: {data['id']},首次触发: {data['trigger_at']},间隔: {interval_minutes} 分钟"
+
+
+@mcp.tool()
+async def list_schedules() -> str:
+ """列出所有活跃的定时任务。"""
+ import datetime
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(f"{BROKER_URL}/schedules", headers=HEADERS, timeout=10)
+ resp.raise_for_status()
+ data = resp.json()
+ schedules = data.get("schedules", [])
+ if not schedules:
+ return "没有定时任务。"
+ lines = []
+ for s in schedules:
+ t = datetime.datetime.fromtimestamp(s["trigger_at"]).strftime("%Y-%m-%d %H:%M:%S")
+ repeat = f",每 {s['repeat_seconds']/60:.0f} 分钟" if s["repeat_seconds"] > 0 else ""
+ lines.append(f"[{s['id']}] {t}{repeat} | {s['action'][:80]}")
+ return "\n".join(lines)
+
+
+@mcp.tool()
+async def cancel_schedule(schedule_id: str) -> str:
+ """取消一个定时任务。"""
+ async with httpx.AsyncClient() as client:
+ resp = await client.delete(f"{BROKER_URL}/schedules/{schedule_id}", headers=HEADERS, timeout=10)
+ resp.raise_for_status()
+ return f"定时任务 {schedule_id} 已取消。"
+
+
+@mcp.tool()
+async def ask_expert(question: str) -> str:
+ """向 GPT-Pro 专家提问。注意:GPT-Pro 回复非常慢(可能几分钟),调用后立即返回请求 ID,不要等待结果。
+ 结果完成后会自动推送通知,届时用 get_expert_answer 查看。
+ question: 要问的问题,提供充分上下文。
+ """
+ async with httpx.AsyncClient() as client:
+ resp = await client.post(
+ f"{BROKER_URL}/expert/ask",
+ headers=HEADERS,
+ json={"question": question},
+ timeout=10,
+ )
+ resp.raise_for_status()
+ data = resp.json()
+
+ return f"已提交给 GPT-Pro,请求 ID: {data['id']}。回复可能需要几分钟,完成后会自动通知你,不要等待。"
+
+
+@mcp.tool()
+async def get_expert_answer(request_id: str) -> str:
+ """查看 GPT-Pro 专家的回复结果。"""
+ async with httpx.AsyncClient() as client:
+ resp = await client.get(
+ f"{BROKER_URL}/expert/{request_id}",
+ headers=HEADERS,
+ timeout=10,
+ )
+ resp.raise_for_status()
+ data = resp.json()
+
+ if data["status"] == "done":
+ return f"GPT-Pro 回复:\n{data['answer']}"
+ elif data["status"] == "error":
+ return f"GPT-Pro 请求失败: {data['answer']}"
+ else:
+ return f"GPT-Pro 仍在思考中... (状态: {data['status']})"
+
+
+if __name__ == "__main__":
+ mcp.run(transport="stdio")