summaryrefslogtreecommitdiff
path: root/broker.py
diff options
context:
space:
mode:
Diffstat (limited to 'broker.py')
-rw-r--r--broker.py122
1 files changed, 113 insertions, 9 deletions
diff --git a/broker.py b/broker.py
index ee073fb..0fd9721 100644
--- a/broker.py
+++ b/broker.py
@@ -485,7 +485,8 @@ def _start_slack_listener():
user_id = payload.get("user_id", "")
channel_name = payload.get("channel_name", "")
- print(f"[Slack] Slash command: {command} text={cmd_text} channel={channel}")
+ ch_type = "directmessage" if channel_name == "directmessage" else "channel"
+ print(f"[Slack] Slash command: {command} text={cmd_text} channel={channel} type={ch_type}")
if command == "/help":
slack_client.chat_postMessage(channel=channel, text=(
@@ -508,11 +509,11 @@ def _start_slack_listener():
elif command == "/init-worker":
_handle_init_worker(channel, channel_name, user_id, cmd_text)
elif command == "/meta":
- _handle_meta(channel)
+ _handle_meta(channel, user_id, ch_type)
elif command == "/restart-worker":
- _handle_worker_command(channel, "restart", user_id)
+ _handle_worker_command(channel, "restart", user_id, cmd_text, ch_type)
elif command == "/stop-worker":
- _handle_worker_command(channel, "stop", user_id)
+ _handle_worker_command(channel, "stop", user_id, cmd_text, ch_type)
elif command == "/unbind":
_handle_unbind(channel, user_id)
return
@@ -562,6 +563,33 @@ def _start_slack_listener():
# 去掉 @mention 标记
text = text.replace(f"<@{bot_user_id}>", "").strip()
+ # 处理文件附件
+ files = event.get("files", [])
+ file_tags = []
+ for f in files:
+ try:
+ file_url = f.get("url_private_download") or f.get("url_private", "")
+ file_name = f.get("name", "unknown")
+ file_type = f.get("mimetype", "")
+ if file_url:
+ import urllib.request as _ur
+ dl_req = _ur.Request(file_url, headers={"Authorization": f"Bearer {SLACK_BOT_TOKEN}"})
+ with _ur.urlopen(dl_req, timeout=60) as resp:
+ content = resp.read()
+ safe_name = f"{uuid.uuid4().hex[:8]}_{file_name}"
+ save_path = os.path.join(FILES_DIR, safe_name)
+ with open(save_path, "wb") as fh:
+ fh.write(content)
+ if file_type and file_type.startswith("image/"):
+ file_tags.append(f"[image] {safe_name}")
+ else:
+ file_tags.append(f"[file] {safe_name}")
+ print(f"[Slack] File saved: {safe_name} ({len(content)} bytes)")
+ except Exception as e:
+ print(f"[Slack] File download failed: {e}")
+ if file_tags:
+ text = " ".join(file_tags) + (" " + text if text else "")
+
user_name = _get_user_name(user_id)
# 获取 channel 名
@@ -691,8 +719,36 @@ def _start_slack_listener():
)
print(f"[Register] Created user {user_id} ({user_name}) with dispatcher {session_name}")
- def _handle_meta(channel):
- """处理 /meta 命令 — 显示 channel 绑定的 worker 元数据"""
+ def _handle_meta(channel, slack_user_id="", channel_type=""):
+ """处理 /meta — channel 里显示 worker 信息,DM 里显示 dispatcher + 所有 worker"""
+ # DM 模式:显示 dispatcher 信息 + 所有 worker
+ if channel_type == "directmessage":
+ user_rec = _get_user_by_slack(slack_user_id)
+ if not user_rec:
+ slack_client.chat_postMessage(channel=channel, text="Not registered. Use `/register` first.")
+ return
+ lines = [f"*Your Dispatcher*", f"• Session: `{user_rec['dispatcher_session']}`", f"• Status: {'active' if user_rec['status'] == 'active' else 'inactive'}", ""]
+ db = get_db()
+ workers = db.execute("""
+ SELECT w.session_name, w.host, w.path, cb.channel_name
+ FROM workers w LEFT JOIN channel_bindings cb ON cb.worker_id = w.id
+ WHERE w.status = 'active' ORDER BY w.created_at
+ """).fetchall()
+ db.close()
+ if workers:
+ lines.append("*All Workers*")
+ for w in workers:
+ session = w["session_name"]
+ hb = heartbeats.get(session, {})
+ status = "🟢" if hb.get("last_seen", 0) > time.time() - HEARTBEAT_TIMEOUT else ("🔴" if hb else "⚪")
+ ch = f" → #{w['channel_name']}" if w["channel_name"] else " (unbound)"
+ lines.append(f"• `{session}` {status} {w['host']}:{w['path']}{ch}")
+ else:
+ lines.append("No active workers.")
+ slack_client.chat_postMessage(channel=channel, text="\n".join(lines))
+ return
+
+ # Channel 模式:显示绑定的 worker 信息
binding = _get_channel_binding(channel)
if not binding:
slack_client.chat_postMessage(channel=channel, text="No worker bound to this channel. Use `/init-worker host:path` first.")
@@ -745,11 +801,39 @@ def _start_slack_listener():
return True # 没记录创建者,放行
return row["created_by"] == slack_user_id
- def _handle_worker_command(channel, action, slack_user_id=""):
- """处理 /restart-worker 和 /stop-worker"""
+ def _handle_worker_command(channel, action, slack_user_id="", cmd_text="", channel_type=""):
+ """处理 /restart-worker 和 /stop-worker。Channel 里操作绑定的 worker,DM 里接受 session name 参数。"""
+ if channel_type == "directmessage" and cmd_text:
+ # DM 模式:参数是 session name
+ session_name = cmd_text.strip()
+ db = get_db()
+ w = db.execute("SELECT * FROM workers WHERE session_name = ? AND status = 'active'", (session_name,)).fetchone()
+ db.close()
+ if not w:
+ slack_client.chat_postMessage(channel=channel, text=f"Worker `{session_name}` not found.")
+ return
+ cmd_id = uuid.uuid4().hex[:8]
+ db = get_db()
+ db.execute(
+ "INSERT INTO commands (id, target, action, params, status, created_at) VALUES (?, ?, ?, '{}', 'pending', ?)",
+ (cmd_id, session_name, action, time.time()),
+ )
+ if action == "stop":
+ db.execute("UPDATE workers SET status = 'inactive' WHERE session_name = ?", (session_name,))
+ db.execute("DELETE FROM channel_bindings WHERE worker_id = ?", (w["id"],))
+ db.commit()
+ db.close()
+ emoji = "🔄" if action == "restart" else "🛑"
+ slack_client.chat_postMessage(channel=channel, text=f"{emoji} {action.title()}ing worker `{session_name}` on {w['host']}...")
+ return
+
+ # Channel 模式
binding = _get_channel_binding(channel)
if not binding:
- slack_client.chat_postMessage(channel=channel, text="No worker bound to this channel.")
+ if channel_type == "directmessage":
+ slack_client.chat_postMessage(channel=channel, text=f"Usage in DM: `/{action}-worker session_name`")
+ else:
+ slack_client.chat_postMessage(channel=channel, text="No worker bound to this channel.")
return
if not _check_pm(channel, slack_user_id):
slack_client.chat_postMessage(channel=channel, text="Permission denied. Only the PM (who ran `/init-worker`) can do this.")
@@ -1181,6 +1265,26 @@ def send_slack_message(body: SlackSend):
return {"ok": True}
+@app.post("/slack/send_file", dependencies=[Depends(verify_token)])
+def send_slack_file(filename: str = Form(...), channel: str = Form(...), caption: str = Form("")):
+ """Send a file to a Slack channel"""
+ if not _slack_handler:
+ raise HTTPException(503, "Slack not configured")
+ file_path = os.path.join(FILES_DIR, filename)
+ if not os.path.exists(file_path):
+ raise HTTPException(404, "File not found")
+ try:
+ _slack_handler.files_upload_v2(
+ channel=channel,
+ file=file_path,
+ title=filename,
+ initial_comment=caption or "",
+ )
+ except Exception as e:
+ raise HTTPException(502, f"Slack API error: {e}")
+ return {"ok": True}
+
+
# --- Log / Reply (实验室 → dispatcher) ---
class LogEntry(BaseModel):