diff options
| author | haoyuren <13851610112@163.com> | 2026-03-29 15:07:43 -0500 |
|---|---|---|
| committer | haoyuren <13851610112@163.com> | 2026-03-29 15:07:43 -0500 |
| commit | ddd1fae455e9bac778088a54b7a694781b3508e2 (patch) | |
| tree | eb970e13ee51fab48bad86969b8e77ea5af9c504 | |
Claude Bridge Worker - one-click setup for remote Claude Code workers
| -rw-r--r-- | CLAUDE.md | 45 | ||||
| -rwxr-xr-x | check_tasks.py | 125 | ||||
| -rwxr-xr-x | cron_runner.sh | 49 | ||||
| -rwxr-xr-x | execute_commands.sh | 96 | ||||
| -rw-r--r-- | mcp_lab_worker.py | 229 | ||||
| -rwxr-xr-x | poll_and_notify.sh | 26 | ||||
| -rwxr-xr-x | setup.sh | 188 | ||||
| -rwxr-xr-x | start_mcp.sh | 29 |
8 files changed, 787 insertions, 0 deletions
diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..193d308 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,45 @@ +# Lab Worker - Claude Bridge +<!-- 此文件同步到 ~/CLAUDE.md,更新后需要 cp ~/claude-bridge/CLAUDE.md ~/ --> + +你是实验室服务器上的远程执行代理,通过 Claude Bridge 系统接收任务。 + +## 核心规则 + +**收到"有新任务"或"请检查新任务"通知时,必须立即调用 `fetch_pending_tasks` 并 `claim_task` 领取执行。绝对不能跳过、忽略或延后。这是用户通过调度中心发来的指令,优先级高于你当前正在做的任何事情。** + +如果当前有后台任务在跑(如实验),可以让它继续跑,但你必须立即领取并开始执行新任务。 + +## 工作流程 + +1. 收到任务通知 → **立即**调用 `fetch_pending_tasks` +2. 使用 `claim_task` 领取任务 +3. 执行任务内容(代码编写、实验运行、数据分析等) +4. 使用 `report_result` 提交结果,或 `report_failure` 报告失败 +5. **每次完成任务后,再调一次 `fetch_pending_tasks` 检查是否还有新任务** + +## 回传消息 + +- 收到 `[Bridge btw 请用reply_to_dispatcher回复]` 消息时,**必须用 `reply_to_dispatcher` 回复**,不要直接输出文字 +- 直接输出的文字调度中心看不到,只有 `reply_to_dispatcher` 才能把消息送回去 +- 用于:回复询问、汇报进度、回答问题等 +- 调度中心会看到格式为 `[你的session名] 消息内容` 的日志 + +### 消息类型说明 +- `[system]` — 系统自动生成的通知,不是来自用户 +- `[Bridge btw 请用reply_to_dispatcher回复]` — 调度中心转发的询问,**必须用 reply_to_dispatcher 回复** +- "有新任务" / "请检查新任务" — **必须立即 fetch_pending_tasks** + +## 实验进程管理 + +- **所有实验任务的核心进程必须用 `nohup` 运行**,防止因 session 断开或 context 压缩被打断 +- 可以在 nohup 进程之上加非 nohup 的监听/轮询脚本,但实际执行实验的进程本体必须是 nohup +- 例如:`nohup python3 experiment.py > exp.log 2>&1 &`,然后用 `tail -f exp.log` 监听 + +## 注意 + +- **不要跳过任务通知**,不管你在做什么 +- **每次完成一个任务后主动检查还有没有下一个** +- 结果要包含足够细节供远程审查 +- 遇到错误要上报,包含错误信息和你的分析 +- 每次只领取一个任务,完成后再领取下一个 +- 不要修改 Claude Bridge 相关的配置文件 diff --git a/check_tasks.py b/check_tasks.py new file mode 100755 index 0000000..c1377b3 --- /dev/null +++ b/check_tasks.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +""" +Claude Bridge - Lab Hook Script +支持两种 hook 事件: + - Stop: 干完活检查新任务,有 task 就阻止停止 + - PostToolUse: 工作中检查,message 注入 context 不打断 + +任务类型: + - task: 需要领取执行(Stop 时阻塞) + - message: 只注入 context(btw 模式,不打断) +""" + +import json +import os +import sys +import urllib.request + +BROKER_URL = os.environ.get("BROKER_URL", "") +API_SECRET = os.environ.get("API_SECRET", "") +SESSION_NAME = os.environ.get("SESSION_NAME", "") + +if not BROKER_URL or not API_SECRET or not SESSION_NAME: + sys.exit(1) + +try: + hook_input = json.loads(sys.stdin.read()) +except Exception: + hook_input = {} + +event = hook_input.get("hook_event_name", "Stop") + +if event == "Stop" and hook_input.get("stop_hook_active", False): + sys.exit(0) + + +def consume_task(task_id): + """Claim + mark done (for messages that just need to be seen)""" + try: + req = urllib.request.Request( + f"{BROKER_URL}/tasks/{task_id}/claim", + method="POST", + headers={"Authorization": f"Bearer {API_SECRET}"}, + ) + urllib.request.urlopen(req, timeout=5) + req2 = urllib.request.Request( + f"{BROKER_URL}/tasks/{task_id}/result", + data=json.dumps({"result": "seen"}).encode(), + headers={ + "Authorization": f"Bearer {API_SECRET}", + "Content-Type": "application/json", + }, + method="POST", + ) + urllib.request.urlopen(req2, timeout=5) + except Exception: + pass + + +# 发心跳(静默,失败不影响主流程) +try: + import socket + hb_data = json.dumps({"session": SESSION_NAME, "host": socket.gethostname()}).encode() + hb_req = urllib.request.Request( + f"{BROKER_URL}/heartbeat", + data=hb_data, + headers={"Authorization": f"Bearer {API_SECRET}", "Content-Type": "application/json"}, + method="POST", + ) + urllib.request.urlopen(hb_req, timeout=3) +except Exception: + pass + +try: + req = urllib.request.Request( + f"{BROKER_URL}/tasks/pending?target={SESSION_NAME}", + headers={"Authorization": f"Bearer {API_SECRET}"}, + ) + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read()) + + items = data.get("tasks", []) + if not items: + sys.exit(0) + + tasks = [i for i in items if i.get("type", "task") == "task"] + messages = [i for i in items if i.get("type") == "message"] + + output_lines = [] + + # Hook 被触发了 = Claude 在活跃状态,可以消费 messages + for m in messages: + output_lines.append(f"[Bridge btw 请用reply_to_dispatcher回复] {m['content']}") + consume_task(m["id"]) + + if event == "Stop" and tasks: + task_list = "\n".join( + [f" - [{t['id']}] {t['content'][:120]}" for t in tasks] + ) + if output_lines: + output_lines.append("") + output_lines.append(f"有 {len(tasks)} 个新任务等待执行:") + output_lines.append(task_list) + output_lines.append("") + output_lines.append("请用 fetch_pending_tasks 获取完整信息,claim_task 领取并执行。") + output = { + "decision": "block", + "reason": "\n".join(output_lines), + } + print(json.dumps(output)) + elif event == "PostToolUse" and tasks: + task_list = "\n".join( + [f" - [{t['id']}] {t['content'][:120]}" for t in tasks] + ) + output_lines.append( + f"\n[Bridge 提示] 有 {len(tasks)} 个新任务,当前工作完成后请处理:\n{task_list}" + ) + print("\n".join(output_lines)) + elif output_lines: + print("\n".join(output_lines)) + + sys.exit(0) + +except Exception as e: + print(f"check_tasks error: {e}", file=sys.stderr) + sys.exit(1) diff --git a/cron_runner.sh b/cron_runner.sh new file mode 100755 index 0000000..397b25b --- /dev/null +++ b/cron_runner.sh @@ -0,0 +1,49 @@ +#!/bin/bash +# Claude Bridge Worker - Cron Runner +# 由 setup.sh 生成实际路径,这里用占位符 +# 功能: 1) 健康检查+自动重启 2) 任务通知 3) 命令执行 + +export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:/usr/bin:/bin:$PATH" + +BRIDGE_DIR="${CLAUDE_BRIDGE_DIR:-$HOME/claude-bridge}" +cd "$BRIDGE_DIR" || exit 1 +source .env 2>/dev/null +export BROKER_URL API_SECRET + +AUTH="Authorization: Bearer $API_SECRET" +SESSIONS_FILE="$BRIDGE_DIR/.sessions" # 每行: session_name project_dir + +# === 健康检查 + 自动重启 === +if [ -f "$SESSIONS_FILE" ]; then + while IFS=$'\t' read -r SESSION_NAME PROJECT_DIR; do + [ -z "$SESSION_NAME" ] && continue + + if ! tmux has-session -t "$SESSION_NAME" 2>/dev/null; then + echo "$(date): Session $SESSION_NAME 不存在,重新创建..." + + # 重建 tmux session + tmux new-session -d -s "$SESSION_NAME" + if [ -n "$PROJECT_DIR" ] && [ -d "$PROJECT_DIR" ]; then + tmux send-keys -t "$SESSION_NAME" "cd $PROJECT_DIR" Enter + sleep 1 + fi + tmux send-keys -t "$SESSION_NAME" "claude --continue" Enter + + # 通知 manager + curl -sf -X POST \ + -H "$AUTH" \ + -H "Content-Type: application/json" \ + -d "{\"source\": \"system\", \"message\": \"Worker $SESSION_NAME ($(hostname)) 被系统杀掉后已自动重启,工作目录: ${PROJECT_DIR:-$HOME}\"}" \ + "$BROKER_URL/log" 2>/dev/null || true + + echo "$(date): Session $SESSION_NAME 已重启并通知 manager" + fi + done < "$SESSIONS_FILE" +fi + +# === 任务通知 + 命令执行 (每15秒一轮,共4轮) === +for i in 0 1 2 3; do + bash "$BRIDGE_DIR/poll_and_notify.sh" >> /tmp/claude-bridge.log 2>&1 + bash "$BRIDGE_DIR/execute_commands.sh" >> /tmp/claude-bridge-cmd.log 2>&1 + [ $i -lt 3 ] && sleep 15 +done diff --git a/execute_commands.sh b/execute_commands.sh new file mode 100755 index 0000000..329e5ef --- /dev/null +++ b/execute_commands.sh @@ -0,0 +1,96 @@ +#!/bin/bash +# 轮询 broker 命令队列,执行系统级命令(切换项目、重启等) +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +if [ -f "$SCRIPT_DIR/.env" ]; then + set -a; source "$SCRIPT_DIR/.env"; set +a +fi +export BROKER_URL API_SECRET + +AUTH="Authorization: Bearer $API_SECRET" +CLAUDE_MD_SRC="$SCRIPT_DIR/CLAUDE.md" + +# 拉取待执行命令 +DATA=$(curl -sf -H "$AUTH" "$BROKER_URL/commands/pending" 2>/dev/null) || exit 0 + +echo "$DATA" | python3 -c " +import sys, json +d = json.load(sys.stdin) +for cmd in d.get('commands', []): + params = json.loads(cmd['params']) if isinstance(cmd['params'], str) else cmd['params'] + print(cmd['id'] + '\t' + cmd['target'] + '\t' + cmd['action'] + '\t' + json.dumps(params)) +" 2>/dev/null | while IFS=$'\t' read -r CMD_ID TARGET ACTION PARAMS; do + + echo "$(date): Executing command $CMD_ID: $ACTION on $TARGET" + + # 检查 tmux session 存在 + if ! tmux has-session -t "$TARGET" 2>/dev/null; then + curl -sf -X POST -H "$AUTH" -H "Content-Type: application/json" \ + -d "{\"result\": \"ERROR: tmux session $TARGET not found\"}" \ + "$BROKER_URL/commands/$CMD_ID/done" >/dev/null 2>&1 + continue + fi + + case "$ACTION" in + switch_project) + DIR=$(echo "$PARAMS" | python3 -c "import sys,json; print(json.load(sys.stdin).get('directory',''))") + if [ -z "$DIR" ]; then + curl -sf -X POST -H "$AUTH" -H "Content-Type: application/json" \ + -d '{"result": "ERROR: no directory specified"}' \ + "$BROKER_URL/commands/$CMD_ID/done" >/dev/null 2>&1 + continue + fi + + # 创建目录(如果不存在) + mkdir -p "$DIR" + + # 复制 CLAUDE.md + cp "$CLAUDE_MD_SRC" "$DIR/CLAUDE.md" 2>/dev/null || true + + # /exit 当前 claude + tmux send-keys -t "$TARGET" "/exit" Enter + sleep 5 + + # cd 到新目录 + tmux send-keys -t "$TARGET" "cd $DIR" Enter + sleep 1 + + # 启动 claude --continue + tmux send-keys -t "$TARGET" "claude --continue" Enter + sleep 3 + + RESULT="OK: $TARGET switched to $DIR" + ;; + + restart) + # /exit 当前 claude + tmux send-keys -t "$TARGET" "/exit" Enter + sleep 5 + + # claude --continue(在同一目录) + tmux send-keys -t "$TARGET" "claude --continue" Enter + sleep 3 + + RESULT="OK: $TARGET restarted" + ;; + + *) + RESULT="ERROR: unknown action $ACTION" + ;; + esac + + # 汇报完成 + python3 -c " +import json, urllib.request +req = urllib.request.Request( + '$BROKER_URL/commands/$CMD_ID/done', + data=json.dumps({'result': '$RESULT'}).encode(), + headers={'Authorization': 'Bearer $API_SECRET', 'Content-Type': 'application/json'}, + method='POST', +) +urllib.request.urlopen(req, timeout=10) +" 2>/dev/null || true + + echo "$(date): Command $CMD_ID done: $RESULT" +done diff --git a/mcp_lab_worker.py b/mcp_lab_worker.py new file mode 100644 index 0000000..780f6ba --- /dev/null +++ b/mcp_lab_worker.py @@ -0,0 +1,229 @@ +""" +Claude Bridge - Lab Worker MCP Server +实验室端 MCP 工具,供实验室 Claude Code 使用 +""" + +import os +import subprocess + +import httpx +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP("lab-worker") + +BROKER_URL = os.environ["BROKER_URL"] # 私人服务器公网地址 +API_SECRET = os.environ["API_SECRET"] + +HEADERS = {"Authorization": f"Bearer {API_SECRET}"} + +# 启动时就确定 session name,不每次重新检测 +_SESSION_NAME = "" + +def _detect_session_name() -> str: + """检测当前进程所在的 tmux session 名""" + # 方法 1: 环境变量(最可靠,需要在 tmux session 里设置) + name = os.environ.get("CLAUDE_SESSION_NAME", "") + if name: + return name + + # 方法 2: 进程树 PID 匹配 tmux pane + try: + result = subprocess.run( + ["tmux", "list-panes", "-a", "-F", "#{pane_pid} #{session_name}"], + capture_output=True, timeout=3, text=True, + ) + if result.returncode != 0: + return "" + pane_map = {} + for line in result.stdout.strip().split("\n"): + parts = line.split(" ", 1) + if len(parts) == 2: + pane_map[parts[0]] = parts[1] + + check_pid = os.getpid() + for _ in range(50): + if str(check_pid) in pane_map: + return pane_map[str(check_pid)] + try: + with open(f"/proc/{check_pid}/stat") as f: + stat = f.read() + ppid = int(stat.split(")")[1].split()[1]) + if ppid <= 1: + break + check_pid = ppid + except Exception: + break + except Exception: + pass + return "" + +# 启动时检测一次 +_SESSION_NAME = _detect_session_name() +if _SESSION_NAME: + print(f"[lab-worker] Detected session: {_SESSION_NAME}", flush=True) +else: + print("[lab-worker] WARNING: Could not detect tmux session name!", flush=True) + + +def _get_session_name() -> str: + return _SESSION_NAME + + +@mcp.tool() +async def fetch_pending_tasks() -> str: + """从调度服务器获取属于当前 session 的待执行任务列表。""" + session = _get_session_name() + if not session: + return "错误:无法检测 tmux session 名,拒绝获取任务(防止跨 session 抢任务)。" + params = {"target": session} + async with httpx.AsyncClient() as client: + resp = await client.get(f"{BROKER_URL}/tasks/pending", headers=HEADERS, params=params, timeout=15) + resp.raise_for_status() + data = resp.json() + + tasks = data.get("tasks", []) + if not tasks: + return "当前没有待执行的任务。" + + lines = [] + for t in tasks: + lines.append(f"任务 [{t['id']}]: {t['content']}") + return "\n---\n".join(lines) + + +@mcp.tool() +async def claim_task(task_id: str) -> str: + """领取一个任务,标记为正在执行。必须在开始执行前调用。""" + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/tasks/{task_id}/claim", headers=HEADERS, timeout=10 + ) + if resp.status_code == 200: + return f"已领取任务 {task_id},开始执行。" + return f"领取失败: {resp.text}" + + +@mcp.tool() +async def report_result(task_id: str, result: str) -> str: + """提交任务执行结果。result 应包含详细的执行结果和关键输出。""" + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/tasks/{task_id}/result", + headers=HEADERS, + json={"result": result}, + timeout=15, + ) + if resp.status_code == 200: + return f"任务 {task_id} 结果已提交。" + return f"提交失败: {resp.text}" + + +@mcp.tool() +async def report_failure(task_id: str, reason: str) -> str: + """报告任务执行失败。reason 应包含错误信息和失败原因。""" + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/tasks/{task_id}/fail", + headers=HEADERS, + json={"result": reason}, + timeout=15, + ) + if resp.status_code == 200: + return f"已报告任务 {task_id} 失败。" + return f"报告失败: {resp.text}" + + +@mcp.tool() +async def reply_to_dispatcher(message: str) -> str: + """回传消息给调度中心。用于回复 btw 询问、汇报进度、回答问题等。""" + session = _get_session_name() or "unknown" + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{BROKER_URL}/log", + headers=HEADERS, + json={"source": session, "message": message}, + timeout=10, + ) + if resp.status_code == 200: + return "已回传给调度中心。" + return f"回传失败: {resp.text}" + + +@mcp.tool() +async def upload_file_to_broker(file_path: str) -> str: + """把实验室本地文件上传到 broker 文件存储,供 dispatcher 下载或发给用户。 + file_path: 实验室服务器上的本地文件绝对路径。 + """ + if not os.path.exists(file_path): + return f"文件不存在: {file_path}" + filename = os.path.basename(file_path) + async with httpx.AsyncClient(timeout=120) as client: + with open(file_path, "rb") as f: + resp = await client.post( + f"{BROKER_URL}/files/upload", + headers=HEADERS, + files={"file": (filename, f)}, + data={"filename": filename}, + ) + resp.raise_for_status() + data = resp.json() + return f"已上传: {data['filename']} ({data['size']} bytes)" + + +@mcp.tool() +async def download_file_from_broker(filename: str, save_path: str = "") -> str: + """从 broker 文件存储下载文件到实验室本地。 + filename: broker 上的文件名。 + save_path: 保存路径。留空则保存到当前工作目录。 + """ + if not save_path: + save_path = os.path.join(os.getcwd(), filename) + async with httpx.AsyncClient(timeout=120) as client: + resp = await client.get( + f"{BROKER_URL}/files/{filename}", + headers=HEADERS, + ) + resp.raise_for_status() + with open(save_path, "wb") as f: + f.write(resp.content) + return f"已下载到: {save_path} ({len(resp.content)} bytes)" + + +@mcp.tool() +async def check_context_usage() -> str: + """查看当前 session 的 context 使用情况(通过 /context 命令)。""" + import time, re + + session = _get_session_name() + if not session: + return "无法检测 session 名" + + try: + subprocess.run(["tmux", "send-keys", "-t", session, "/context", "Enter"], timeout=3) + time.sleep(3) + output = subprocess.check_output( + ["tmux", "capture-pane", "-t", session, "-p", "-S", "-50"], + timeout=3 + ).decode() + + info = [] + for line in output.split("\n"): + m = re.search(r'(\d+k)/(\d+k)', line) + if m: + info.append(f"Total: {m.group(1)}/{m.group(2)}") + for keyword in ["Messages:", "Free space:", "Autocompact"]: + if keyword in line: + m2 = re.search(r'([\d.]+k?\s*tokens?\s*\([\d.]+%\))', line) + if m2: + info.append(f"{keyword} {m2.group(1)}") + + if info: + return f"[{session}] Context:\n" + "\n".join(info) + else: + return f"[{session}] 解析失败" + except Exception as e: + return f"错误: {e}" + + +if __name__ == "__main__": + mcp.run(transport="stdio") diff --git a/poll_and_notify.sh b/poll_and_notify.sh new file mode 100755 index 0000000..b012128 --- /dev/null +++ b/poll_and_notify.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# 极简 cron:只负责唤醒闲置的 Claude session +# 有 pending 任务就发一个 "check",hook 会处理剩下的一切 +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +if [ -f "$SCRIPT_DIR/.env" ]; then + set -a; source "$SCRIPT_DIR/.env"; set +a +fi +export BROKER_URL API_SECRET + +for SESSION in claude claude2 claude3; do + tmux has-session -t "$SESSION" 2>/dev/null || continue + + # 问 broker 这个 session 有没有 pending 任务 + COUNT=$(curl -sf -H "Authorization: Bearer $API_SECRET" \ + "$BROKER_URL/tasks/pending?target=$SESSION" 2>/dev/null \ + | python3 -c "import sys,json; print(len(json.load(sys.stdin).get('tasks',[])))" 2>/dev/null \ + || echo "0") + + if [ "$COUNT" -gt 0 ]; then + # 只发一个简短提示,不带任何 content + tmux send-keys -t "$SESSION" "请检查新任务" Enter + echo "$(date): poked $SESSION ($COUNT pending)" + fi +done diff --git a/setup.sh b/setup.sh new file mode 100755 index 0000000..dd57ba5 --- /dev/null +++ b/setup.sh @@ -0,0 +1,188 @@ +#!/bin/bash +# Claude Bridge Worker - 一键部署脚本 +# 用法: bash setup.sh --broker-url URL --api-secret SECRET --session NAME [--project-dir DIR] +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" + +# === 解析参数 === +BROKER_URL="" +API_SECRET="" +SESSION_NAME="" +PROJECT_DIR="" + +while [[ $# -gt 0 ]]; do + case $1 in + --broker-url) BROKER_URL="$2"; shift 2 ;; + --api-secret) API_SECRET="$2"; shift 2 ;; + --session) SESSION_NAME="$2"; shift 2 ;; + --project-dir) PROJECT_DIR="$2"; shift 2 ;; + *) echo "Unknown option: $1"; exit 1 ;; + esac +done + +if [ -z "$BROKER_URL" ] || [ -z "$API_SECRET" ] || [ -z "$SESSION_NAME" ]; then + echo "用法: bash setup.sh --broker-url URL --api-secret SECRET --session NAME [--project-dir DIR]" + echo "" + echo " --broker-url 中转服务器地址 (如 http://131.153.232.145:8000)" + echo " --api-secret API 认证密钥" + echo " --session tmux session 名 (如 claude, claude2, worker1)" + echo " --project-dir 工作目录 (可选,默认 ~/)" + exit 1 +fi + +PROJECT_DIR="${PROJECT_DIR:-$HOME}" +BRIDGE_DIR="$HOME/claude-bridge" + +echo "=== Claude Bridge Worker Setup ===" +echo "Broker: $BROKER_URL" +echo "Session: $SESSION_NAME" +echo "Project: $PROJECT_DIR" +echo "" + +# === 1. 安装文件 === +echo "[1/7] 安装 bridge 文件..." +mkdir -p "$BRIDGE_DIR" +cp "$SCRIPT_DIR"/mcp_lab_worker.py "$BRIDGE_DIR/" +cp "$SCRIPT_DIR"/start_mcp.sh "$BRIDGE_DIR/" +cp "$SCRIPT_DIR"/check_tasks.py "$BRIDGE_DIR/" +cp "$SCRIPT_DIR"/poll_and_notify.sh "$BRIDGE_DIR/" +cp "$SCRIPT_DIR"/execute_commands.sh "$BRIDGE_DIR/" +cp "$SCRIPT_DIR"/cron_runner.sh "$BRIDGE_DIR/" +cp "$SCRIPT_DIR"/CLAUDE.md "$BRIDGE_DIR/" +chmod +x "$BRIDGE_DIR"/*.sh "$BRIDGE_DIR"/check_tasks.py + +# 写 .env +cat > "$BRIDGE_DIR/.env" << EOF +BROKER_URL=$BROKER_URL +API_SECRET=$API_SECRET +EOF + +echo " 文件已安装到 $BRIDGE_DIR" + +# === 2. 安装 Python 依赖 === +echo "[2/7] 安装 Python 依赖..." +pip install httpx "mcp[cli]" 2>&1 | tail -1 + +# === 3. 注册 MCP server === +echo "[3/7] 注册 MCP server..." +# 先删除旧的(如果有) +claude mcp remove lab-worker -s user 2>/dev/null || true +claude mcp add --scope user lab-worker \ + -e "API_SECRET=$API_SECRET" \ + -e "BROKER_URL=$BROKER_URL" \ + -- bash "$BRIDGE_DIR/start_mcp.sh" +echo " MCP server 已注册" + +# === 4. 配置 hooks === +echo "[4/7] 配置 hooks..." +HOOK_CMD="BROKER_URL=$BROKER_URL API_SECRET=$API_SECRET bash $BRIDGE_DIR/check_tasks.py" + +python3 << PYEOF +import json, os + +settings_path = os.path.expanduser("~/.claude/settings.json") +try: + with open(settings_path) as f: + s = json.load(f) +except (FileNotFoundError, json.JSONDecodeError): + s = {} + +# 确保 permissions 存在 +if "permissions" not in s: + s["permissions"] = {"allow": []} +perms = s["permissions"].get("allow", []) + +# 加 MCP tool 权限 +for tool in [ + "mcp__lab-worker__fetch_pending_tasks", + "mcp__lab-worker__claim_task", + "mcp__lab-worker__report_result", + "mcp__lab-worker__report_failure", + "mcp__lab-worker__reply_to_dispatcher", + "mcp__lab-worker__upload_file_to_broker", + "mcp__lab-worker__download_file_from_broker", + "mcp__lab-worker__check_context_usage", +]: + if tool not in perms: + perms.append(tool) +s["permissions"]["allow"] = perms + +# 配置 hooks +hook_cmd = "BROKER_URL=$BROKER_URL API_SECRET=$API_SECRET SESSION_NAME=\$(tmux display-message -p '#S' 2>/dev/null || echo unknown) python3 $BRIDGE_DIR/check_tasks.py" +s["hooks"] = { + "Stop": [{"hooks": [{"type": "command", "command": hook_cmd, "timeout": 15}]}], + "PostToolUse": [{"matcher": "Bash|Edit|Write|Read", "hooks": [{"type": "command", "command": hook_cmd, "timeout": 15}]}], +} + +os.makedirs(os.path.dirname(settings_path), exist_ok=True) +with open(settings_path, "w") as f: + json.dump(s, f, indent=2) +print(" hooks 和权限已配置") +PYEOF + +# === 5. 配置 cron === +echo "[5/7] 配置 cron..." + +# cron_runner.sh 已在 package 中,只需设置环境变量 +export CLAUDE_BRIDGE_DIR="$BRIDGE_DIR" + +# 添加 cron(去重) +CRON_CMD="CLAUDE_BRIDGE_DIR=$BRIDGE_DIR $BRIDGE_DIR/cron_runner.sh" +(crontab -l 2>/dev/null | grep -v claude-bridge; echo "* * * * * $CRON_CMD") | crontab - +echo " cron 已配置 (每15秒轮询 + 自动重启)" + +# === 6. 复制 CLAUDE.md === +echo "[6/7] 部署 CLAUDE.md..." +cp "$BRIDGE_DIR/CLAUDE.md" "$HOME/CLAUDE.md" 2>/dev/null || true +cp "$BRIDGE_DIR/CLAUDE.md" "$HOME/.claude/CLAUDE.md" 2>/dev/null || true +if [ -d "$PROJECT_DIR" ] && [ "$PROJECT_DIR" != "$HOME" ]; then + cp "$BRIDGE_DIR/CLAUDE.md" "$PROJECT_DIR/CLAUDE.md" 2>/dev/null || true +fi +echo " CLAUDE.md 已部署到 home 和项目目录" + +# === 7. 启动 tmux session + Claude Code === +echo "[7/7] 启动 worker..." +if tmux has-session -t "$SESSION_NAME" 2>/dev/null; then + echo " tmux session '$SESSION_NAME' 已存在,跳过创建" +else + tmux new-session -d -s "$SESSION_NAME" + if [ "$PROJECT_DIR" != "$HOME" ]; then + tmux send-keys -t "$SESSION_NAME" "cd $PROJECT_DIR" Enter + sleep 1 + fi + tmux send-keys -t "$SESSION_NAME" "claude" Enter + echo " tmux session '$SESSION_NAME' 已创建并启动 Claude Code" +fi + +# 记录 session 信息(供 cron 健康检查 + 自动重启用) +SESSIONS_FILE="$BRIDGE_DIR/.sessions" +# 去重后追加 +grep -v "^${SESSION_NAME}$(printf '\t')" "$SESSIONS_FILE" 2>/dev/null > "${SESSIONS_FILE}.tmp" || true +echo -e "${SESSION_NAME}\t${PROJECT_DIR}" >> "${SESSIONS_FILE}.tmp" +mv "${SESSIONS_FILE}.tmp" "$SESSIONS_FILE" +echo " session 已注册到 $SESSIONS_FILE" + +# === 8. 向 broker 注册 === +echo "" +echo "向 broker 注册新 worker..." +HOSTNAME=$(hostname) +curl -sf -X POST \ + -H "Authorization: Bearer $API_SECRET" \ + -H "Content-Type: application/json" \ + -d "{\"source\": \"system\", \"message\": \"新 worker 上线: session=$SESSION_NAME, host=$HOSTNAME, dir=$PROJECT_DIR\"}" \ + "$BROKER_URL/log" 2>/dev/null && echo " 已通知 dispatcher" || echo " 通知失败(broker 可能未运行)" + +echo "" +echo "=== 部署完成 ===" +echo "" +echo "Worker 信息:" +echo " Session: $SESSION_NAME" +echo " Project: $PROJECT_DIR" +echo " Bridge: $BRIDGE_DIR" +echo " Broker: $BROKER_URL" +echo "" +echo "常用操作:" +echo " tmux attach -t $SESSION_NAME # 查看 worker" +echo " tmux send-keys -t $SESSION_NAME /exit Enter # 停止 worker" +echo " bash $SCRIPT_DIR/setup.sh ... # 重新部署" diff --git a/start_mcp.sh b/start_mcp.sh new file mode 100755 index 0000000..30cc24e --- /dev/null +++ b/start_mcp.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# MCP server 启动 wrapper:检测 tmux session name 并传给 python +# 用进程树反查 pane PID → session name + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" + +# 检测 session name: 遍历 PID 往上找 +detect_session() { + local check_pid=$$ + local pane_pids + pane_pids=$(tmux list-panes -a -F "#{pane_pid} #{session_name}" 2>/dev/null) || return + + for i in $(seq 1 50); do + local match + match=$(echo "$pane_pids" | grep "^${check_pid} " | head -1 | cut -d' ' -f2) + if [ -n "$match" ]; then + echo "$match" + return + fi + # 获取 PPID + local ppid + ppid=$(awk '{print $4}' /proc/${check_pid}/stat 2>/dev/null) || return + [ "$ppid" -le 1 ] && return + check_pid=$ppid + done +} + +export CLAUDE_SESSION_NAME=$(detect_session) +exec python3 "$SCRIPT_DIR/mcp_lab_worker.py" |
