diff options
| -rw-r--r-- | broker.py | 122 | ||||
| -rw-r--r-- | mcp_dispatcher.py | 17 |
2 files changed, 130 insertions, 9 deletions
@@ -485,7 +485,8 @@ def _start_slack_listener(): user_id = payload.get("user_id", "") channel_name = payload.get("channel_name", "") - print(f"[Slack] Slash command: {command} text={cmd_text} channel={channel}") + ch_type = "directmessage" if channel_name == "directmessage" else "channel" + print(f"[Slack] Slash command: {command} text={cmd_text} channel={channel} type={ch_type}") if command == "/help": slack_client.chat_postMessage(channel=channel, text=( @@ -508,11 +509,11 @@ def _start_slack_listener(): elif command == "/init-worker": _handle_init_worker(channel, channel_name, user_id, cmd_text) elif command == "/meta": - _handle_meta(channel) + _handle_meta(channel, user_id, ch_type) elif command == "/restart-worker": - _handle_worker_command(channel, "restart", user_id) + _handle_worker_command(channel, "restart", user_id, cmd_text, ch_type) elif command == "/stop-worker": - _handle_worker_command(channel, "stop", user_id) + _handle_worker_command(channel, "stop", user_id, cmd_text, ch_type) elif command == "/unbind": _handle_unbind(channel, user_id) return @@ -562,6 +563,33 @@ def _start_slack_listener(): # 去掉 @mention 标记 text = text.replace(f"<@{bot_user_id}>", "").strip() + # 处理文件附件 + files = event.get("files", []) + file_tags = [] + for f in files: + try: + file_url = f.get("url_private_download") or f.get("url_private", "") + file_name = f.get("name", "unknown") + file_type = f.get("mimetype", "") + if file_url: + import urllib.request as _ur + dl_req = _ur.Request(file_url, headers={"Authorization": f"Bearer {SLACK_BOT_TOKEN}"}) + with _ur.urlopen(dl_req, timeout=60) as resp: + content = resp.read() + safe_name = f"{uuid.uuid4().hex[:8]}_{file_name}" + save_path = os.path.join(FILES_DIR, safe_name) + with open(save_path, "wb") as fh: + fh.write(content) + if file_type and file_type.startswith("image/"): + file_tags.append(f"[image] {safe_name}") + else: + file_tags.append(f"[file] {safe_name}") + print(f"[Slack] File saved: {safe_name} ({len(content)} bytes)") + except Exception as e: + print(f"[Slack] File download failed: {e}") + if file_tags: + text = " ".join(file_tags) + (" " + text if text else "") + user_name = _get_user_name(user_id) # 获取 channel 名 @@ -691,8 +719,36 @@ def _start_slack_listener(): ) print(f"[Register] Created user {user_id} ({user_name}) with dispatcher {session_name}") - def _handle_meta(channel): - """处理 /meta 命令 — 显示 channel 绑定的 worker 元数据""" + def _handle_meta(channel, slack_user_id="", channel_type=""): + """处理 /meta — channel 里显示 worker 信息,DM 里显示 dispatcher + 所有 worker""" + # DM 模式:显示 dispatcher 信息 + 所有 worker + if channel_type == "directmessage": + user_rec = _get_user_by_slack(slack_user_id) + if not user_rec: + slack_client.chat_postMessage(channel=channel, text="Not registered. Use `/register` first.") + return + lines = [f"*Your Dispatcher*", f"• Session: `{user_rec['dispatcher_session']}`", f"• Status: {'active' if user_rec['status'] == 'active' else 'inactive'}", ""] + db = get_db() + workers = db.execute(""" + SELECT w.session_name, w.host, w.path, cb.channel_name + FROM workers w LEFT JOIN channel_bindings cb ON cb.worker_id = w.id + WHERE w.status = 'active' ORDER BY w.created_at + """).fetchall() + db.close() + if workers: + lines.append("*All Workers*") + for w in workers: + session = w["session_name"] + hb = heartbeats.get(session, {}) + status = "🟢" if hb.get("last_seen", 0) > time.time() - HEARTBEAT_TIMEOUT else ("🔴" if hb else "⚪") + ch = f" → #{w['channel_name']}" if w["channel_name"] else " (unbound)" + lines.append(f"• `{session}` {status} {w['host']}:{w['path']}{ch}") + else: + lines.append("No active workers.") + slack_client.chat_postMessage(channel=channel, text="\n".join(lines)) + return + + # Channel 模式:显示绑定的 worker 信息 binding = _get_channel_binding(channel) if not binding: slack_client.chat_postMessage(channel=channel, text="No worker bound to this channel. Use `/init-worker host:path` first.") @@ -745,11 +801,39 @@ def _start_slack_listener(): return True # 没记录创建者,放行 return row["created_by"] == slack_user_id - def _handle_worker_command(channel, action, slack_user_id=""): - """处理 /restart-worker 和 /stop-worker""" + def _handle_worker_command(channel, action, slack_user_id="", cmd_text="", channel_type=""): + """处理 /restart-worker 和 /stop-worker。Channel 里操作绑定的 worker,DM 里接受 session name 参数。""" + if channel_type == "directmessage" and cmd_text: + # DM 模式:参数是 session name + session_name = cmd_text.strip() + db = get_db() + w = db.execute("SELECT * FROM workers WHERE session_name = ? AND status = 'active'", (session_name,)).fetchone() + db.close() + if not w: + slack_client.chat_postMessage(channel=channel, text=f"Worker `{session_name}` not found.") + return + cmd_id = uuid.uuid4().hex[:8] + db = get_db() + db.execute( + "INSERT INTO commands (id, target, action, params, status, created_at) VALUES (?, ?, ?, '{}', 'pending', ?)", + (cmd_id, session_name, action, time.time()), + ) + if action == "stop": + db.execute("UPDATE workers SET status = 'inactive' WHERE session_name = ?", (session_name,)) + db.execute("DELETE FROM channel_bindings WHERE worker_id = ?", (w["id"],)) + db.commit() + db.close() + emoji = "🔄" if action == "restart" else "🛑" + slack_client.chat_postMessage(channel=channel, text=f"{emoji} {action.title()}ing worker `{session_name}` on {w['host']}...") + return + + # Channel 模式 binding = _get_channel_binding(channel) if not binding: - slack_client.chat_postMessage(channel=channel, text="No worker bound to this channel.") + if channel_type == "directmessage": + slack_client.chat_postMessage(channel=channel, text=f"Usage in DM: `/{action}-worker session_name`") + else: + slack_client.chat_postMessage(channel=channel, text="No worker bound to this channel.") return if not _check_pm(channel, slack_user_id): slack_client.chat_postMessage(channel=channel, text="Permission denied. Only the PM (who ran `/init-worker`) can do this.") @@ -1181,6 +1265,26 @@ def send_slack_message(body: SlackSend): return {"ok": True} +@app.post("/slack/send_file", dependencies=[Depends(verify_token)]) +def send_slack_file(filename: str = Form(...), channel: str = Form(...), caption: str = Form("")): + """Send a file to a Slack channel""" + if not _slack_handler: + raise HTTPException(503, "Slack not configured") + file_path = os.path.join(FILES_DIR, filename) + if not os.path.exists(file_path): + raise HTTPException(404, "File not found") + try: + _slack_handler.files_upload_v2( + channel=channel, + file=file_path, + title=filename, + initial_comment=caption or "", + ) + except Exception as e: + raise HTTPException(502, f"Slack API error: {e}") + return {"ok": True} + + # --- Log / Reply (实验室 → dispatcher) --- class LogEntry(BaseModel): diff --git a/mcp_dispatcher.py b/mcp_dispatcher.py index 8148f58..4fb52c7 100644 --- a/mcp_dispatcher.py +++ b/mcp_dispatcher.py @@ -268,6 +268,23 @@ async def send_slack_message(channel: str, message: str, thread_ts: str = "") -> @mcp.tool() +async def send_file_to_slack(filename: str, channel: str, caption: str = "") -> str: + """Send a file from broker storage to a Slack channel. + filename: file name in broker storage (use list_files to see available files) + channel: Slack channel ID + caption: optional description + """ + async with httpx.AsyncClient(timeout=60) as client: + resp = await client.post( + f"{BROKER_URL}/slack/send_file", + headers=HEADERS, + data={"filename": filename, "channel": channel, "caption": caption}, + ) + resp.raise_for_status() + return "File sent to Slack." + + +@mcp.tool() async def check_context_usage(session: str = "dispatcher") -> str: """查看指定 session 的 context 使用情况。 session: tmux session 名。默认 dispatcher。可填 claude/claude2/claude3 查 lab worker(需要 lab 端可达)。 |
