summaryrefslogtreecommitdiff
path: root/poll_and_notify.sh
diff options
context:
space:
mode:
Diffstat (limited to 'poll_and_notify.sh')
-rwxr-xr-xpoll_and_notify.sh102
1 files changed, 82 insertions, 20 deletions
diff --git a/poll_and_notify.sh b/poll_and_notify.sh
index b32a273..8f8775c 100755
--- a/poll_and_notify.sh
+++ b/poll_and_notify.sh
@@ -1,6 +1,5 @@
#!/bin/bash
-# 极简 poller:唤醒闲置的 Claude session
-# 从 .sessions 文件读取 session 列表(不再硬编码)
+# Poller: wake idle sessions, deliver messages, notify tasks
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
@@ -9,25 +8,88 @@ if [ -f "$SCRIPT_DIR/.env" ]; then
fi
export BROKER_URL API_SECRET
-SESSIONS_FILE="$SCRIPT_DIR/.sessions"
-[ -f "$SESSIONS_FILE" ] || exit 0
+python3 << 'PYEOF'
+import json, os, time, subprocess, urllib.request
-while IFS=$'\t' read -r SESSION PROJECT_DIR; do
- [ -z "$SESSION" ] && continue
- tmux has-session -t "$SESSION" 2>/dev/null || continue
+BROKER_URL = os.environ.get("BROKER_URL", "")
+API_SECRET = os.environ.get("API_SECRET", "")
+SESSIONS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)) if '__file__' in dir() else os.getcwd(), ".sessions")
- # 发心跳(每次 poll 都发,不依赖 Claude 活跃)
- curl -sf -X POST -H "Authorization: Bearer $API_SECRET" -H "Content-Type: application/json" \
- -d "{\"session\": \"$SESSION\", \"host\": \"$(hostname)\"}" \
- "$BROKER_URL/heartbeat" >/dev/null 2>&1 || true
+if not BROKER_URL or not API_SECRET:
+ exit(0)
- COUNT=$(curl -sf -H "Authorization: Bearer $API_SECRET" \
- "$BROKER_URL/tasks/pending?target=$SESSION" 2>/dev/null \
- | python3 -c "import sys,json; print(len(json.load(sys.stdin).get('tasks',[])))" 2>/dev/null \
- || echo "0")
+def api_get(path):
+ req = urllib.request.Request(f"{BROKER_URL}{path}", headers={"Authorization": f"Bearer {API_SECRET}"})
+ with urllib.request.urlopen(req, timeout=15) as resp:
+ return json.loads(resp.read())
- if [ "$COUNT" -gt 0 ]; then
- tmux send-keys -t "$SESSION" "请检查新任务" Enter
- echo "$(date): poked $SESSION ($COUNT pending)"
- fi
-done < "$SESSIONS_FILE"
+def api_post(path, data=None):
+ body = json.dumps(data).encode() if data else b""
+ req = urllib.request.Request(f"{BROKER_URL}{path}", data=body, headers={"Authorization": f"Bearer {API_SECRET}", "Content-Type": "application/json"}, method="POST")
+ with urllib.request.urlopen(req, timeout=15) as resp:
+ return json.loads(resp.read())
+
+def tmux_send(session, text):
+ subprocess.run(["tmux", "send-keys", "-t", session, text, "Enter"], timeout=5, capture_output=True)
+
+def tmux_exists(session):
+ return subprocess.run(["tmux", "has-session", "-t", session], capture_output=True).returncode == 0
+
+# Read sessions
+try:
+ with open(SESSIONS_FILE) as f:
+ sessions = []
+ for line in f:
+ parts = line.strip().split("\t")
+ if parts and parts[0]:
+ sessions.append(parts[0])
+except FileNotFoundError:
+ exit(0)
+
+import socket
+hostname = socket.gethostname()
+
+for session in sessions:
+ if not tmux_exists(session):
+ continue
+
+ # Heartbeat
+ try:
+ api_post("/heartbeat", {"session": session, "host": hostname})
+ except Exception:
+ pass
+
+ # Get pending items
+ try:
+ data = api_get(f"/tasks/pending?target={session}")
+ except Exception:
+ continue
+
+ items = data.get("tasks", [])
+ if not items:
+ continue
+
+ tasks = [i for i in items if i.get("type", "task") == "task"]
+ messages = [i for i in items if i.get("type") == "message"]
+
+ # Messages: claim + send content directly
+ for m in messages:
+ try:
+ api_post(f"/tasks/{m['id']}/claim")
+ except Exception:
+ continue # already claimed
+ content = m["content"].replace("\n", " ").replace("\r", "")
+ if len(content) > 400:
+ content = content[:400] + "..."
+ tmux_send(session, f"[Bridge btw — reply with reply_to_dispatcher] {content}")
+ try:
+ api_post(f"/tasks/{m['id']}/result", {"result": "delivered via poller"})
+ except Exception:
+ pass
+ print(f"{time.strftime('%c')}: [{session}] message {m['id']} delivered")
+
+ # Tasks: just poke
+ if tasks:
+ tmux_send(session, "请检查新任务")
+ print(f"{time.strftime('%c')}: [{session}] poked ({len(tasks)} task(s))")
+PYEOF