""" Claude Bridge - Dispatcher MCP Server 调度端 MCP 工具,供私人服务器上的 Claude Code 使用 """ import os import subprocess import httpx from mcp.server.fastmcp import FastMCP mcp = FastMCP("dispatcher") BROKER_URL = os.environ.get("BROKER_URL", "http://127.0.0.1:8000") API_SECRET = os.environ["API_SECRET"] HEADERS = {"Authorization": f"Bearer {API_SECRET}"} def _detect_dispatcher_id() -> str: """Detect user_id from env or tmux session name (dispatcher-XXXXXX)""" uid = os.environ.get("DISPATCHER_USER_ID", "") if uid: return uid # 从 tmux session 名推断 try: result = subprocess.run( ["tmux", "list-panes", "-a", "-F", "#{pane_pid} #{session_name}"], capture_output=True, timeout=3, text=True, ) if result.returncode == 0: pane_map = {} for line in result.stdout.strip().split("\n"): parts = line.split(" ", 1) if len(parts) == 2: pane_map[parts[0]] = parts[1] check_pid = os.getpid() for _ in range(50): if str(check_pid) in pane_map: session = pane_map[str(check_pid)] if session.startswith("dispatcher-"): return session[len("dispatcher-"):] elif session == "dispatcher": return "owner" return "" try: with open(f"/proc/{check_pid}/stat") as f: stat = f.read() ppid = int(stat.split(")")[1].split()[1]) if ppid <= 1: break check_pid = ppid except Exception: break except Exception: pass return "owner" # fallback DISPATCHER_ID = _detect_dispatcher_id() print(f"[dispatcher-mcp] Detected dispatcher_id: {DISPATCHER_ID}", flush=True) @mcp.tool() async def check_user_messages() -> str: """检查 Telegram 是否有新的用户消息。有新消息时返回消息内容,没有则返回空。""" async with httpx.AsyncClient() as client: resp = await client.get(f"{BROKER_URL}/messages/new", headers=HEADERS, timeout=10) resp.raise_for_status() data = resp.json() msgs = data.get("messages", []) if not msgs: return "没有新消息。" lines = [] for m in msgs: lines.append(f"用户消息: {m['text']}") return "\n".join(lines) @mcp.tool() async def send_task_to_lab(task_description: str, target: str = "") -> str: """创建一个任务派发给实验室服务器执行。 task_description: 详细的任务描述,包含实验室 Claude 需要知道的所有信息。 target: 目标 session 名。可选值: claude, claude2, claude3。留空则任意空闲 session 可领取。 """ async with httpx.AsyncClient() as client: resp = await client.post( f"{BROKER_URL}/tasks", headers=HEADERS, json={"content": task_description, "target": target, "dispatcher_id": DISPATCHER_ID}, timeout=10, ) resp.raise_for_status() task = resp.json() target_info = f" → {task['target']}" if task.get('target') else "" return f"任务已派发。ID: {task['id']}{target_info}" @mcp.tool() async def send_message_to_lab(message: str, target: str = "") -> str: """向实验室 session 发送消息(btw 模式)。不打断当前工作,消息会在下一次工具调用后注入 context。 适合询问进度、补充信息、轻量级指令等。 message: 消息内容 target: 目标 session 名。可选值: claude, claude2, claude3。留空则所有 session 可见。 """ async with httpx.AsyncClient() as client: resp = await client.post( f"{BROKER_URL}/tasks", headers=HEADERS, json={"content": message, "target": target, "type": "message", "dispatcher_id": DISPATCHER_ID}, timeout=10, ) resp.raise_for_status() task = resp.json() target_info = f" → {task['target']}" if task.get('target') else "" return f"消息已发送{target_info}。ID: {task['id']}" @mcp.tool() async def list_workers() -> str: """List all active workers and their status (heartbeat, bound channel).""" async with httpx.AsyncClient() as client: # Get workers resp = await client.get(f"{BROKER_URL}/workers", headers=HEADERS, timeout=10) resp.raise_for_status() workers = resp.json().get("workers", []) # Get heartbeats hb_resp = await client.get(f"{BROKER_URL}/heartbeat", headers=HEADERS, timeout=10) hb_resp.raise_for_status() heartbeats = hb_resp.json() if not workers: return "No active workers." lines = [] for w in workers: session = w["session_name"] hb = heartbeats.get(session, {}) status = "🟢 online" if hb.get("alive") else ("🔴 offline" if hb else "⚪ no heartbeat") channel = w.get("channel_name", "") ch_info = f" → #{channel}" if channel else "" lines.append(f" {session} ({w['host']}:{w['path']}){ch_info} [{status}]") return "\n".join(lines) @mcp.tool() async def check_task_status(task_id: str) -> str: """查询某个任务的执行状态和结果。""" async with httpx.AsyncClient() as client: resp = await client.get(f"{BROKER_URL}/tasks/{task_id}", headers=HEADERS, timeout=10) resp.raise_for_status() task = resp.json() status = task["status"] if status == "done": return f"任务 {task_id} 已完成。\n结果:\n{task['result']}" elif status == "failed": return f"任务 {task_id} 失败。\n原因:\n{task['result']}" elif status == "running": return f"任务 {task_id} 正在执行中..." else: return f"任务 {task_id} 等待领取中。" @mcp.tool() async def list_all_tasks(status: str = "") -> str: """列出所有任务。可选按状态过滤: pending, running, done, failed""" async with httpx.AsyncClient() as client: params = {} if status: params["status"] = status resp = await client.get( f"{BROKER_URL}/tasks", headers=HEADERS, params=params, timeout=10 ) resp.raise_for_status() data = resp.json() tasks = data.get("tasks", []) if not tasks: return "没有任务。" lines = [] for t in tasks: lines.append(f"[{t['id']}] status={t['status']} | {t['content'][:80]}") return "\n".join(lines) @mcp.tool() async def send_telegram_message(message: str) -> str: """通过 Telegram 给用户发送消息。用于汇报结果、确认收到指令等。""" async with httpx.AsyncClient() as client: resp = await client.post( f"{BROKER_URL}/telegram/send", headers=HEADERS, json={"message": message}, timeout=15, ) resp.raise_for_status() return "Telegram 消息已发送。" @mcp.tool() async def switch_worker_project(session: str, new_directory: str) -> str: """切换实验室 worker 的工作项目。会执行: /exit → cd 新目录 → 复制 CLAUDE.md → claude --continue。 session: 目标 session 名 (claude, claude2, claude3) new_directory: 新项目的绝对路径 (如 /home/yurenh2/new-project) """ async with httpx.AsyncClient() as client: resp = await client.post( f"{BROKER_URL}/commands", headers=HEADERS, json={ "target": session, "action": "switch_project", "params": {"directory": new_directory}, }, timeout=10, ) resp.raise_for_status() data = resp.json() return f"切换命令已下发。ID: {data['id']}。{session} 将切换到 {new_directory},完成后会收到通知。" @mcp.tool() async def restart_worker(session: str) -> str: """重启实验室 worker 的 Claude Code session(在同一目录 /exit + claude --continue)。 session: 目标 session 名 (claude, claude2, claude3) """ async with httpx.AsyncClient() as client: resp = await client.post( f"{BROKER_URL}/commands", headers=HEADERS, json={ "target": session, "action": "restart", "params": {}, }, timeout=10, ) resp.raise_for_status() data = resp.json() return f"重启命令已下发。ID: {data['id']}。{session} 将重启,完成后会收到通知。" @mcp.tool() async def send_slack_message(channel: str, message: str, thread_ts: str = "") -> str: """通过 Slack 发送消息。用于回复来自 Slack 的消息。 channel: Slack channel ID(从消息标签 channel=xxx 中获取) message: 消息内容 thread_ts: 可选,回复到指定 thread """ async with httpx.AsyncClient() as client: resp = await client.post( f"{BROKER_URL}/slack/send", headers=HEADERS, json={"channel": channel, "message": message, "thread_ts": thread_ts}, timeout=15, ) resp.raise_for_status() return "Slack 消息已发送。" @mcp.tool() async def send_file_to_slack(filename: str, channel: str, caption: str = "") -> str: """Send a file from broker storage to a Slack channel. filename: file name in broker storage (use list_files to see available files) channel: Slack channel ID caption: optional description """ async with httpx.AsyncClient(timeout=60) as client: resp = await client.post( f"{BROKER_URL}/slack/send_file", headers=HEADERS, data={"filename": filename, "channel": channel, "caption": caption}, ) resp.raise_for_status() return "File sent to Slack." @mcp.tool() async def check_context_usage(session: str = "dispatcher") -> str: """查看指定 session 的 context 使用情况。 session: tmux session 名。默认 dispatcher。可填 claude/claude2/claude3 查 lab worker(需要 lab 端可达)。 注意:会在目标 session 执行 /context 命令,可能短暂干扰该 session。 """ import subprocess, time, re try: # 发送 /context 命令 subprocess.run(["tmux", "send-keys", "-t", session, "/context", "Enter"], timeout=3) # 等输出渲染 time.sleep(3) # 抓取 pane 内容 output = subprocess.check_output( ["tmux", "capture-pane", "-t", session, "-p", "-S", "-50"], timeout=3 ).decode() # 解析关键信息 lines = output.split("\n") info = [] for line in lines: # 匹配 "239k/1000k" 格式 m = re.search(r'(\d+k)/(\d+k)', line) if m: info.append(f"Total: {m.group(1)}/{m.group(2)}") # 匹配百分比行 if "tokens (" in line and "%" in line: clean = re.sub(r'[⛁⛶⛀⛝\s]+', '', line).strip() if clean: info.append(clean) # 匹配 Messages/Free space 等 for keyword in ["Messages:", "Free space:", "System prompt:", "Autocompact"]: if keyword in line: # 提取数字部分 m2 = re.search(r'([\d.]+k?\s*tokens?\s*\([\d.]+%\))', line) if m2: info.append(f"{keyword} {m2.group(1)}") if info: return f"[{session}] Context:\n" + "\n".join(info) else: return f"[{session}] 无法解析 /context 输出,原始内容尾部:\n" + "\n".join(lines[-15:]) except Exception as e: return f"错误: {e}" @mcp.tool() async def send_file_to_user(filename: str, caption: str = "") -> str: """通过 Telegram 给用户发送 broker 上的文件。 filename: broker 文件存储中的文件名(用 list_files 查看可用文件)。 caption: 文件说明。 """ async with httpx.AsyncClient(timeout=60) as client: resp = await client.post( f"{BROKER_URL}/telegram/send_file", headers=HEADERS, data={"filename": filename, "caption": caption}, ) resp.raise_for_status() return "文件已通过 Telegram 发送给用户。" @mcp.tool() async def upload_local_file_to_broker(file_path: str) -> str: """把 dispatcher 本地文件上传到 broker 文件存储,供 lab worker 下载或发给用户。 file_path: dispatcher 服务器上的本地文件路径。 """ import os if not os.path.exists(file_path): return f"文件不存在: {file_path}" filename = os.path.basename(file_path) async with httpx.AsyncClient(timeout=60) as client: with open(file_path, "rb") as f: resp = await client.post( f"{BROKER_URL}/files/upload", headers=HEADERS, files={"file": (filename, f)}, data={"filename": filename}, ) resp.raise_for_status() data = resp.json() return f"已上传: {data['filename']} ({data['size']} bytes)" @mcp.tool() async def download_file_from_broker(filename: str, save_path: str = "") -> str: """从 broker 文件存储下载文件到 dispatcher 本地。 filename: broker 上的文件名。 save_path: 保存路径。留空则保存到当前目录。 """ import os if not save_path: save_path = os.path.join(os.getcwd(), filename) async with httpx.AsyncClient(timeout=60) as client: resp = await client.get( f"{BROKER_URL}/files/{filename}", headers=HEADERS, ) resp.raise_for_status() with open(save_path, "wb") as f: f.write(resp.content) return f"已下载到: {save_path} ({len(resp.content)} bytes)" @mcp.tool() async def list_files() -> str: """列出 broker 文件存储中的所有文件。""" async with httpx.AsyncClient() as client: resp = await client.get(f"{BROKER_URL}/files", headers=HEADERS, timeout=10) resp.raise_for_status() data = resp.json() files = data.get("files", []) if not files: return "没有文件。" lines = [f" {f['name']} ({f['size']} bytes)" for f in files] return "\n".join(lines) @mcp.tool() async def schedule_at(action: str, time_str: str) -> str: """在指定时刻触发一个动作。到时间后会收到 [定时任务 ID] action 的消息。 action: 到时间后要执行的指令/提醒内容 time_str: 时间,格式 "2026-03-30 14:00" 或 "14:00"(当天)。时区为服务器时区。 """ import datetime now = datetime.datetime.now() try: if len(time_str) <= 5: t = datetime.datetime.strptime(time_str, "%H:%M").replace(year=now.year, month=now.month, day=now.day) if t < now: t += datetime.timedelta(days=1) else: t = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M") except ValueError: return f"时间格式错误: {time_str},请用 'HH:MM' 或 'YYYY-MM-DD HH:MM'" async with httpx.AsyncClient() as client: resp = await client.post(f"{BROKER_URL}/schedules", headers=HEADERS, json={ "action": action, "trigger_at": t.timestamp(), "dispatcher_id": DISPATCHER_ID, }, timeout=10) resp.raise_for_status() data = resp.json() return f"定时任务已创建。ID: {data['id']},触发时间: {data['trigger_at']}" @mcp.tool() async def schedule_delay(action: str, minutes: float) -> str: """延迟指定分钟后触发一个动作。 action: 到时间后要执行的指令/提醒内容 minutes: 延迟分钟数 """ async with httpx.AsyncClient() as client: resp = await client.post(f"{BROKER_URL}/schedules", headers=HEADERS, json={ "action": action, "delay_seconds": minutes * 60, "dispatcher_id": DISPATCHER_ID, }, timeout=10) resp.raise_for_status() data = resp.json() return f"定时任务已创建。ID: {data['id']},触发时间: {data['trigger_at']}" @mcp.tool() async def schedule_loop(action: str, interval_minutes: float) -> str: """创建循环定时任务,每隔固定时间重复触发。 action: 每次触发时要执行的指令/提醒内容 interval_minutes: 间隔分钟数 """ async with httpx.AsyncClient() as client: resp = await client.post(f"{BROKER_URL}/schedules", headers=HEADERS, json={ "action": action, "delay_seconds": interval_minutes * 60, "repeat_seconds": interval_minutes * 60, "dispatcher_id": DISPATCHER_ID, }, timeout=10) resp.raise_for_status() data = resp.json() return f"循环任务已创建。ID: {data['id']},首次触发: {data['trigger_at']},间隔: {interval_minutes} 分钟" @mcp.tool() async def list_schedules() -> str: """列出所有活跃的定时任务。""" import datetime async with httpx.AsyncClient() as client: resp = await client.get(f"{BROKER_URL}/schedules", headers=HEADERS, timeout=10) resp.raise_for_status() data = resp.json() schedules = data.get("schedules", []) if not schedules: return "没有定时任务。" lines = [] for s in schedules: t = datetime.datetime.fromtimestamp(s["trigger_at"]).strftime("%Y-%m-%d %H:%M:%S") repeat = f",每 {s['repeat_seconds']/60:.0f} 分钟" if s["repeat_seconds"] > 0 else "" lines.append(f"[{s['id']}] {t}{repeat} | {s['action'][:80]}") return "\n".join(lines) @mcp.tool() async def cancel_schedule(schedule_id: str) -> str: """取消一个定时任务。""" async with httpx.AsyncClient() as client: resp = await client.delete(f"{BROKER_URL}/schedules/{schedule_id}", headers=HEADERS, timeout=10) resp.raise_for_status() return f"定时任务 {schedule_id} 已取消。" @mcp.tool() async def ask_expert(question: str) -> str: """向 GPT-Pro 专家提问。注意:GPT-Pro 回复非常慢(可能几分钟),调用后立即返回请求 ID,不要等待结果。 结果完成后会自动推送通知,届时用 get_expert_answer 查看。 question: 要问的问题,提供充分上下文。 """ async with httpx.AsyncClient() as client: resp = await client.post( f"{BROKER_URL}/expert/ask", headers=HEADERS, json={"question": question}, timeout=10, ) resp.raise_for_status() data = resp.json() return f"已提交给 GPT-Pro,请求 ID: {data['id']}。回复可能需要几分钟,完成后会自动通知你,不要等待。" @mcp.tool() async def get_expert_answer(request_id: str) -> str: """查看 GPT-Pro 专家的回复结果。""" async with httpx.AsyncClient() as client: resp = await client.get( f"{BROKER_URL}/expert/{request_id}", headers=HEADERS, timeout=10, ) resp.raise_for_status() data = resp.json() if data["status"] == "done": return f"GPT-Pro 回复:\n{data['answer']}" elif data["status"] == "error": return f"GPT-Pro 请求失败: {data['answer']}" else: return f"GPT-Pro 仍在思考中... (状态: {data['status']})" if __name__ == "__main__": mcp.run(transport="stdio")