diff options
Diffstat (limited to 'mcp_dispatcher.py')
| -rw-r--r-- | mcp_dispatcher.py | 515 |
1 files changed, 515 insertions, 0 deletions
diff --git a/mcp_dispatcher.py b/mcp_dispatcher.py new file mode 100644 index 0000000..8148f58 --- /dev/null +++ b/mcp_dispatcher.py @@ -0,0 +1,515 @@ +""" +Claude Bridge - Dispatcher MCP Server +调度端 MCP 工具,供私人服务器上的 Claude Code 使用 +""" + +import os +import subprocess + +import httpx +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP("dispatcher") + +BROKER_URL = os.environ.get("BROKER_URL", "http://127.0.0.1:8000") +API_SECRET = os.environ["API_SECRET"] + +HEADERS = {"Authorization": f"Bearer {API_SECRET}"} + + +def _detect_dispatcher_id() -> str: + """Detect user_id from env or tmux session name (dispatcher-XXXXXX)""" + uid = os.environ.get("DISPATCHER_USER_ID", "") + if uid: + return uid + # 从 tmux session 名推断 + try: + result = subprocess.run( + ["tmux", "list-panes", "-a", "-F", "#{pane_pid} #{session_name}"], + capture_output=True, timeout=3, text=True, + ) + if result.returncode == 0: + pane_map = {} + for line in result.stdout.strip().split("\n"): + parts = line.split(" ", 1) + if len(parts) == 2: + pane_map[parts[0]] = parts[1] + check_pid = os.getpid() + for _ in range(50): + if str(check_pid) in pane_map: + session = pane_map[str(check_pid)] + if session.startswith("dispatcher-"): + return session[len("dispatcher-"):] + elif session == "dispatcher": + return "owner" + return "" + try: + with open(f"/proc/{check_pid}/stat") as f: + stat = f.read() + ppid = int(stat.split(")")[1].split()[1]) + if ppid <= 1: + break + check_pid = ppid + except Exception: + break + except Exception: + pass + return "owner" # fallback + + +DISPATCHER_ID = _detect_dispatcher_id() +print(f"[dispatcher-mcp] Detected dispatcher_id: {DISPATCHER_ID}", flush=True) + + +@mcp.tool() +async def check_user_messages() -> str: + """检查 Telegram 是否有新的用户消息。有新消息时返回消息内容,没有则返回空。""" + async with httpx.AsyncClient() as client: + resp = await client.get(f"{BROKER_URL}/messages/new", headers=HEADERS, timeout=10) + resp.raise_for_status() + data = resp.json() + + msgs = data.get("messages", []) + if not msgs: + return "没有新消息。" + + lines = [] + for m in msgs: + lines.append(f"用户消息: {m['text']}") + return "\n".join(lines) + + +@mcp.tool() +async def send_task_to_lab(task_description: str, target: str = "") -> str: + """创建一个任务派发给实验室服务器执行。 + task_description: 详细的任务描述,包含实验室 Claude 需要知道的所有信息。 + target: 目标 session 名。可选值: claude, claude2, claude3。留空则任意空闲 session 可领取。 + """ + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/tasks", + headers=HEADERS, + json={"content": task_description, "target": target, "dispatcher_id": DISPATCHER_ID}, + timeout=10, + ) + resp.raise_for_status() + task = resp.json() + + target_info = f" → {task['target']}" if task.get('target') else "" + return f"任务已派发。ID: {task['id']}{target_info}" + + +@mcp.tool() +async def send_message_to_lab(message: str, target: str = "") -> str: + """向实验室 session 发送消息(btw 模式)。不打断当前工作,消息会在下一次工具调用后注入 context。 + 适合询问进度、补充信息、轻量级指令等。 + message: 消息内容 + target: 目标 session 名。可选值: claude, claude2, claude3。留空则所有 session 可见。 + """ + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/tasks", + headers=HEADERS, + json={"content": message, "target": target, "type": "message", "dispatcher_id": DISPATCHER_ID}, + timeout=10, + ) + resp.raise_for_status() + task = resp.json() + + target_info = f" → {task['target']}" if task.get('target') else "" + return f"消息已发送{target_info}。ID: {task['id']}" + + +@mcp.tool() +async def list_workers() -> str: + """List all active workers and their status (heartbeat, bound channel).""" + async with httpx.AsyncClient() as client: + # Get workers + resp = await client.get(f"{BROKER_URL}/workers", headers=HEADERS, timeout=10) + resp.raise_for_status() + workers = resp.json().get("workers", []) + # Get heartbeats + hb_resp = await client.get(f"{BROKER_URL}/heartbeat", headers=HEADERS, timeout=10) + hb_resp.raise_for_status() + heartbeats = hb_resp.json() + + if not workers: + return "No active workers." + + lines = [] + for w in workers: + session = w["session_name"] + hb = heartbeats.get(session, {}) + status = "🟢 online" if hb.get("alive") else ("🔴 offline" if hb else "⚪ no heartbeat") + channel = w.get("channel_name", "") + ch_info = f" → #{channel}" if channel else "" + lines.append(f" {session} ({w['host']}:{w['path']}){ch_info} [{status}]") + return "\n".join(lines) + + +@mcp.tool() +async def check_task_status(task_id: str) -> str: + """查询某个任务的执行状态和结果。""" + async with httpx.AsyncClient() as client: + resp = await client.get(f"{BROKER_URL}/tasks/{task_id}", headers=HEADERS, timeout=10) + resp.raise_for_status() + task = resp.json() + + status = task["status"] + if status == "done": + return f"任务 {task_id} 已完成。\n结果:\n{task['result']}" + elif status == "failed": + return f"任务 {task_id} 失败。\n原因:\n{task['result']}" + elif status == "running": + return f"任务 {task_id} 正在执行中..." + else: + return f"任务 {task_id} 等待领取中。" + + +@mcp.tool() +async def list_all_tasks(status: str = "") -> str: + """列出所有任务。可选按状态过滤: pending, running, done, failed""" + async with httpx.AsyncClient() as client: + params = {} + if status: + params["status"] = status + resp = await client.get( + f"{BROKER_URL}/tasks", headers=HEADERS, params=params, timeout=10 + ) + resp.raise_for_status() + data = resp.json() + + tasks = data.get("tasks", []) + if not tasks: + return "没有任务。" + + lines = [] + for t in tasks: + lines.append(f"[{t['id']}] status={t['status']} | {t['content'][:80]}") + return "\n".join(lines) + + +@mcp.tool() +async def send_telegram_message(message: str) -> str: + """通过 Telegram 给用户发送消息。用于汇报结果、确认收到指令等。""" + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/telegram/send", + headers=HEADERS, + json={"message": message}, + timeout=15, + ) + resp.raise_for_status() + + return "Telegram 消息已发送。" + + +@mcp.tool() +async def switch_worker_project(session: str, new_directory: str) -> str: + """切换实验室 worker 的工作项目。会执行: /exit → cd 新目录 → 复制 CLAUDE.md → claude --continue。 + session: 目标 session 名 (claude, claude2, claude3) + new_directory: 新项目的绝对路径 (如 /home/yurenh2/new-project) + """ + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/commands", + headers=HEADERS, + json={ + "target": session, + "action": "switch_project", + "params": {"directory": new_directory}, + }, + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + + return f"切换命令已下发。ID: {data['id']}。{session} 将切换到 {new_directory},完成后会收到通知。" + + +@mcp.tool() +async def restart_worker(session: str) -> str: + """重启实验室 worker 的 Claude Code session(在同一目录 /exit + claude --continue)。 + session: 目标 session 名 (claude, claude2, claude3) + """ + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/commands", + headers=HEADERS, + json={ + "target": session, + "action": "restart", + "params": {}, + }, + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + + return f"重启命令已下发。ID: {data['id']}。{session} 将重启,完成后会收到通知。" + + +@mcp.tool() +async def send_slack_message(channel: str, message: str, thread_ts: str = "") -> str: + """通过 Slack 发送消息。用于回复来自 Slack 的消息。 + channel: Slack channel ID(从消息标签 channel=xxx 中获取) + message: 消息内容 + thread_ts: 可选,回复到指定 thread + """ + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/slack/send", + headers=HEADERS, + json={"channel": channel, "message": message, "thread_ts": thread_ts}, + timeout=15, + ) + resp.raise_for_status() + return "Slack 消息已发送。" + + +@mcp.tool() +async def check_context_usage(session: str = "dispatcher") -> str: + """查看指定 session 的 context 使用情况。 + session: tmux session 名。默认 dispatcher。可填 claude/claude2/claude3 查 lab worker(需要 lab 端可达)。 + 注意:会在目标 session 执行 /context 命令,可能短暂干扰该 session。 + """ + import subprocess, time, re + + try: + # 发送 /context 命令 + subprocess.run(["tmux", "send-keys", "-t", session, "/context", "Enter"], timeout=3) + # 等输出渲染 + time.sleep(3) + # 抓取 pane 内容 + output = subprocess.check_output( + ["tmux", "capture-pane", "-t", session, "-p", "-S", "-50"], + timeout=3 + ).decode() + + # 解析关键信息 + lines = output.split("\n") + info = [] + for line in lines: + # 匹配 "239k/1000k" 格式 + m = re.search(r'(\d+k)/(\d+k)', line) + if m: + info.append(f"Total: {m.group(1)}/{m.group(2)}") + # 匹配百分比行 + if "tokens (" in line and "%" in line: + clean = re.sub(r'[⛁⛶⛀⛝\s]+', '', line).strip() + if clean: + info.append(clean) + # 匹配 Messages/Free space 等 + for keyword in ["Messages:", "Free space:", "System prompt:", "Autocompact"]: + if keyword in line: + # 提取数字部分 + m2 = re.search(r'([\d.]+k?\s*tokens?\s*\([\d.]+%\))', line) + if m2: + info.append(f"{keyword} {m2.group(1)}") + + if info: + return f"[{session}] Context:\n" + "\n".join(info) + else: + return f"[{session}] 无法解析 /context 输出,原始内容尾部:\n" + "\n".join(lines[-15:]) + except Exception as e: + return f"错误: {e}" + + +@mcp.tool() +async def send_file_to_user(filename: str, caption: str = "") -> str: + """通过 Telegram 给用户发送 broker 上的文件。 + filename: broker 文件存储中的文件名(用 list_files 查看可用文件)。 + caption: 文件说明。 + """ + async with httpx.AsyncClient(timeout=60) as client: + resp = await client.post( + f"{BROKER_URL}/telegram/send_file", + headers=HEADERS, + data={"filename": filename, "caption": caption}, + ) + resp.raise_for_status() + return "文件已通过 Telegram 发送给用户。" + + +@mcp.tool() +async def upload_local_file_to_broker(file_path: str) -> str: + """把 dispatcher 本地文件上传到 broker 文件存储,供 lab worker 下载或发给用户。 + file_path: dispatcher 服务器上的本地文件路径。 + """ + import os + if not os.path.exists(file_path): + return f"文件不存在: {file_path}" + filename = os.path.basename(file_path) + async with httpx.AsyncClient(timeout=60) as client: + with open(file_path, "rb") as f: + resp = await client.post( + f"{BROKER_URL}/files/upload", + headers=HEADERS, + files={"file": (filename, f)}, + data={"filename": filename}, + ) + resp.raise_for_status() + data = resp.json() + return f"已上传: {data['filename']} ({data['size']} bytes)" + + +@mcp.tool() +async def download_file_from_broker(filename: str, save_path: str = "") -> str: + """从 broker 文件存储下载文件到 dispatcher 本地。 + filename: broker 上的文件名。 + save_path: 保存路径。留空则保存到当前目录。 + """ + import os + if not save_path: + save_path = os.path.join(os.getcwd(), filename) + async with httpx.AsyncClient(timeout=60) as client: + resp = await client.get( + f"{BROKER_URL}/files/{filename}", + headers=HEADERS, + ) + resp.raise_for_status() + with open(save_path, "wb") as f: + f.write(resp.content) + return f"已下载到: {save_path} ({len(resp.content)} bytes)" + + +@mcp.tool() +async def list_files() -> str: + """列出 broker 文件存储中的所有文件。""" + async with httpx.AsyncClient() as client: + resp = await client.get(f"{BROKER_URL}/files", headers=HEADERS, timeout=10) + resp.raise_for_status() + data = resp.json() + files = data.get("files", []) + if not files: + return "没有文件。" + lines = [f" {f['name']} ({f['size']} bytes)" for f in files] + return "\n".join(lines) + + +@mcp.tool() +async def schedule_at(action: str, time_str: str) -> str: + """在指定时刻触发一个动作。到时间后会收到 [定时任务 ID] action 的消息。 + action: 到时间后要执行的指令/提醒内容 + time_str: 时间,格式 "2026-03-30 14:00" 或 "14:00"(当天)。时区为服务器时区。 + """ + import datetime + now = datetime.datetime.now() + try: + if len(time_str) <= 5: + t = datetime.datetime.strptime(time_str, "%H:%M").replace(year=now.year, month=now.month, day=now.day) + if t < now: + t += datetime.timedelta(days=1) + else: + t = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M") + except ValueError: + return f"时间格式错误: {time_str},请用 'HH:MM' 或 'YYYY-MM-DD HH:MM'" + + async with httpx.AsyncClient() as client: + resp = await client.post(f"{BROKER_URL}/schedules", headers=HEADERS, json={ + "action": action, "trigger_at": t.timestamp(), "dispatcher_id": DISPATCHER_ID, + }, timeout=10) + resp.raise_for_status() + data = resp.json() + return f"定时任务已创建。ID: {data['id']},触发时间: {data['trigger_at']}" + + +@mcp.tool() +async def schedule_delay(action: str, minutes: float) -> str: + """延迟指定分钟后触发一个动作。 + action: 到时间后要执行的指令/提醒内容 + minutes: 延迟分钟数 + """ + async with httpx.AsyncClient() as client: + resp = await client.post(f"{BROKER_URL}/schedules", headers=HEADERS, json={ + "action": action, "delay_seconds": minutes * 60, "dispatcher_id": DISPATCHER_ID, + }, timeout=10) + resp.raise_for_status() + data = resp.json() + return f"定时任务已创建。ID: {data['id']},触发时间: {data['trigger_at']}" + + +@mcp.tool() +async def schedule_loop(action: str, interval_minutes: float) -> str: + """创建循环定时任务,每隔固定时间重复触发。 + action: 每次触发时要执行的指令/提醒内容 + interval_minutes: 间隔分钟数 + """ + async with httpx.AsyncClient() as client: + resp = await client.post(f"{BROKER_URL}/schedules", headers=HEADERS, json={ + "action": action, "delay_seconds": interval_minutes * 60, "repeat_seconds": interval_minutes * 60, "dispatcher_id": DISPATCHER_ID, + }, timeout=10) + resp.raise_for_status() + data = resp.json() + return f"循环任务已创建。ID: {data['id']},首次触发: {data['trigger_at']},间隔: {interval_minutes} 分钟" + + +@mcp.tool() +async def list_schedules() -> str: + """列出所有活跃的定时任务。""" + import datetime + async with httpx.AsyncClient() as client: + resp = await client.get(f"{BROKER_URL}/schedules", headers=HEADERS, timeout=10) + resp.raise_for_status() + data = resp.json() + schedules = data.get("schedules", []) + if not schedules: + return "没有定时任务。" + lines = [] + for s in schedules: + t = datetime.datetime.fromtimestamp(s["trigger_at"]).strftime("%Y-%m-%d %H:%M:%S") + repeat = f",每 {s['repeat_seconds']/60:.0f} 分钟" if s["repeat_seconds"] > 0 else "" + lines.append(f"[{s['id']}] {t}{repeat} | {s['action'][:80]}") + return "\n".join(lines) + + +@mcp.tool() +async def cancel_schedule(schedule_id: str) -> str: + """取消一个定时任务。""" + async with httpx.AsyncClient() as client: + resp = await client.delete(f"{BROKER_URL}/schedules/{schedule_id}", headers=HEADERS, timeout=10) + resp.raise_for_status() + return f"定时任务 {schedule_id} 已取消。" + + +@mcp.tool() +async def ask_expert(question: str) -> str: + """向 GPT-Pro 专家提问。注意:GPT-Pro 回复非常慢(可能几分钟),调用后立即返回请求 ID,不要等待结果。 + 结果完成后会自动推送通知,届时用 get_expert_answer 查看。 + question: 要问的问题,提供充分上下文。 + """ + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/expert/ask", + headers=HEADERS, + json={"question": question}, + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + + return f"已提交给 GPT-Pro,请求 ID: {data['id']}。回复可能需要几分钟,完成后会自动通知你,不要等待。" + + +@mcp.tool() +async def get_expert_answer(request_id: str) -> str: + """查看 GPT-Pro 专家的回复结果。""" + async with httpx.AsyncClient() as client: + resp = await client.get( + f"{BROKER_URL}/expert/{request_id}", + headers=HEADERS, + timeout=10, + ) + resp.raise_for_status() + data = resp.json() + + if data["status"] == "done": + return f"GPT-Pro 回复:\n{data['answer']}" + elif data["status"] == "error": + return f"GPT-Pro 请求失败: {data['answer']}" + else: + return f"GPT-Pro 仍在思考中... (状态: {data['status']})" + + +if __name__ == "__main__": + mcp.run(transport="stdio") |
