summaryrefslogtreecommitdiff
path: root/poll_and_notify.sh
blob: 8f8775cdc2c97929dd7ba3a0e90cb8eb271ba076 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/bin/bash
# Poller: wake idle sessions, deliver messages, notify tasks
set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
if [ -f "$SCRIPT_DIR/.env" ]; then
    set -a; source "$SCRIPT_DIR/.env"; set +a
fi
export BROKER_URL API_SECRET

python3 << 'PYEOF'
import json, os, time, subprocess, urllib.request

BROKER_URL = os.environ.get("BROKER_URL", "")
API_SECRET = os.environ.get("API_SECRET", "")
SESSIONS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)) if '__file__' in dir() else os.getcwd(), ".sessions")

if not BROKER_URL or not API_SECRET:
    exit(0)

def api_get(path):
    req = urllib.request.Request(f"{BROKER_URL}{path}", headers={"Authorization": f"Bearer {API_SECRET}"})
    with urllib.request.urlopen(req, timeout=15) as resp:
        return json.loads(resp.read())

def api_post(path, data=None):
    body = json.dumps(data).encode() if data else b""
    req = urllib.request.Request(f"{BROKER_URL}{path}", data=body, headers={"Authorization": f"Bearer {API_SECRET}", "Content-Type": "application/json"}, method="POST")
    with urllib.request.urlopen(req, timeout=15) as resp:
        return json.loads(resp.read())

def tmux_send(session, text):
    subprocess.run(["tmux", "send-keys", "-t", session, text, "Enter"], timeout=5, capture_output=True)

def tmux_exists(session):
    return subprocess.run(["tmux", "has-session", "-t", session], capture_output=True).returncode == 0

# Read sessions
try:
    with open(SESSIONS_FILE) as f:
        sessions = []
        for line in f:
            parts = line.strip().split("\t")
            if parts and parts[0]:
                sessions.append(parts[0])
except FileNotFoundError:
    exit(0)

import socket
hostname = socket.gethostname()

for session in sessions:
    if not tmux_exists(session):
        continue

    # Heartbeat
    try:
        api_post("/heartbeat", {"session": session, "host": hostname})
    except Exception:
        pass

    # Get pending items
    try:
        data = api_get(f"/tasks/pending?target={session}")
    except Exception:
        continue

    items = data.get("tasks", [])
    if not items:
        continue

    tasks = [i for i in items if i.get("type", "task") == "task"]
    messages = [i for i in items if i.get("type") == "message"]

    # Messages: claim + send content directly
    for m in messages:
        try:
            api_post(f"/tasks/{m['id']}/claim")
        except Exception:
            continue  # already claimed
        content = m["content"].replace("\n", " ").replace("\r", "")
        if len(content) > 400:
            content = content[:400] + "..."
        tmux_send(session, f"[Bridge btw — reply with reply_to_dispatcher] {content}")
        try:
            api_post(f"/tasks/{m['id']}/result", {"result": "delivered via poller"})
        except Exception:
            pass
        print(f"{time.strftime('%c')}: [{session}] message {m['id']} delivered")

    # Tasks: just poke
    if tasks:
        tmux_send(session, "请检查新任务")
        print(f"{time.strftime('%c')}: [{session}] poked ({len(tasks)} task(s))")
PYEOF