diff options
| author | Ilan Bigio <ilan@openai.com> | 2024-12-16 13:06:08 -0800 |
|---|---|---|
| committer | Ilan Bigio <ilan@openai.com> | 2024-12-19 16:08:22 -0500 |
| commit | 20009aed53d8864c9204d43a17895168a777d2cc (patch) | |
| tree | 754dded819869bc34a8a2a02c66ea72dac1ccd24 /websocket-server/src/sessionManager.ts | |
Initial commit
Diffstat (limited to 'websocket-server/src/sessionManager.ts')
| -rw-r--r-- | websocket-server/src/sessionManager.ts | 286 |
1 files changed, 286 insertions, 0 deletions
diff --git a/websocket-server/src/sessionManager.ts b/websocket-server/src/sessionManager.ts new file mode 100644 index 0000000..7cf6336 --- /dev/null +++ b/websocket-server/src/sessionManager.ts @@ -0,0 +1,286 @@ +import { RawData, WebSocket } from "ws"; +import functions from "./functionHandlers"; + +const OPENAI_API_KEY = process.env.OPENAI_API_KEY || ""; + +interface Session { + twilioConn?: WebSocket; + frontendConn?: WebSocket; + modelConn?: WebSocket; + streamSid?: string; + saved_config?: any; + lastAssistantItem?: string; + responseStartTimestamp?: number; + latestMediaTimestamp?: number; +} + +let session: Session = {}; + +export function handleCallConnection(ws: WebSocket) { + cleanupConnection(session.twilioConn); + session.twilioConn = ws; + + ws.on("message", handleTwilioMessage); + ws.on("error", ws.close); + ws.on("close", () => { + cleanupConnection(session.modelConn); + cleanupConnection(session.twilioConn); + session.twilioConn = undefined; + session.modelConn = undefined; + session.streamSid = undefined; + session.lastAssistantItem = undefined; + session.responseStartTimestamp = undefined; + session.latestMediaTimestamp = undefined; + if (!session.frontendConn) session = {}; + }); +} + +export function handleFrontendConnection(ws: WebSocket) { + cleanupConnection(session.frontendConn); + session.frontendConn = ws; + + ws.on("message", handleFrontendMessage); + ws.on("close", () => { + cleanupConnection(session.frontendConn); + session.frontendConn = undefined; + if (!session.twilioConn && !session.modelConn) session = {}; + }); +} + +async function handleFunctionCall(item: { name: string; arguments: string }) { + console.log("Handling function call:", item); + const fnDef = functions.find((f) => f.schema.name === item.name); + if (!fnDef) { + throw new Error(`No handler found for function: ${item.name}`); + } + + let args: unknown; + try { + args = JSON.parse(item.arguments); + } catch { + return JSON.stringify({ + error: "Invalid JSON arguments for function call.", + }); + } + + try { + console.log("Calling function:", fnDef.schema.name, args); + const result = await fnDef.handler(args as any); + return result; + } catch (err: any) { + console.error("Error running function:", err); + return JSON.stringify({ + error: `Error running function ${item.name}: ${err.message}`, + }); + } +} + +function handleTwilioMessage(data: RawData) { + const msg = parseMessage(data); + if (!msg) return; + + switch (msg.event) { + case "start": + session.streamSid = msg.start.streamSid; + session.latestMediaTimestamp = 0; + session.lastAssistantItem = undefined; + session.responseStartTimestamp = undefined; + tryConnectModel(); + break; + case "media": + session.latestMediaTimestamp = msg.media.timestamp; + if (isOpen(session.modelConn)) { + jsonSend(session.modelConn, { + type: "input_audio_buffer.append", + audio: msg.media.payload, + }); + } + break; + case "close": + closeAllConnections(); + break; + } +} + +function handleFrontendMessage(data: RawData) { + const msg = parseMessage(data); + if (!msg) return; + + if (isOpen(session.modelConn)) { + jsonSend(session.modelConn, msg); + } + + if (msg.type === "session.update") { + session.saved_config = msg.session; + } +} + +function tryConnectModel() { + if (!session.twilioConn || !session.streamSid) return; + if (isOpen(session.modelConn)) return; + + session.modelConn = new WebSocket( + "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-12-17", + { + headers: { + Authorization: `Bearer ${OPENAI_API_KEY}`, + "OpenAI-Beta": "realtime=v1", + }, + } + ); + + session.modelConn.on("open", () => { + const config = session.saved_config || {}; + jsonSend(session.modelConn, { + type: "session.update", + session: { + modalities: ["text", "audio"], + turn_detection: { type: "server_vad" }, + voice: "ash", + input_audio_transcription: { model: "whisper-1" }, + input_audio_format: "g711_ulaw", + output_audio_format: "g711_ulaw", + ...config, + }, + }); + }); + + session.modelConn.on("message", handleModelMessage); + session.modelConn.on("error", closeModel); + session.modelConn.on("close", closeModel); +} + +function handleModelMessage(data: RawData) { + const event = parseMessage(data); + if (!event) return; + + jsonSend(session.frontendConn, event); + + switch (event.type) { + case "input_audio_buffer.speech_started": + handleTruncation(); + break; + + case "response.audio.delta": + if (session.twilioConn && session.streamSid) { + if (session.responseStartTimestamp === undefined) { + session.responseStartTimestamp = session.latestMediaTimestamp || 0; + } + if (event.item_id) session.lastAssistantItem = event.item_id; + + jsonSend(session.twilioConn, { + event: "media", + streamSid: session.streamSid, + media: { payload: event.delta }, + }); + + jsonSend(session.twilioConn, { + event: "mark", + streamSid: session.streamSid, + }); + } + break; + + case "response.output_item.done": { + const { item } = event; + if (item.type === "function_call") { + handleFunctionCall(item) + .then((output) => { + if (session.modelConn) { + jsonSend(session.modelConn, { + type: "conversation.item.create", + item: { + type: "function_call_output", + call_id: item.call_id, + output: JSON.stringify(output), + }, + }); + jsonSend(session.modelConn, { type: "response.create" }); + } + }) + .catch((err) => { + console.error("Error handling function call:", err); + }); + } + break; + } + } +} + +function handleTruncation() { + if ( + !session.lastAssistantItem || + session.responseStartTimestamp === undefined + ) + return; + + const elapsedMs = + (session.latestMediaTimestamp || 0) - (session.responseStartTimestamp || 0); + const audio_end_ms = elapsedMs > 0 ? elapsedMs : 0; + + if (isOpen(session.modelConn)) { + jsonSend(session.modelConn, { + type: "conversation.item.truncate", + item_id: session.lastAssistantItem, + content_index: 0, + audio_end_ms, + }); + } + + if (session.twilioConn && session.streamSid) { + jsonSend(session.twilioConn, { + event: "clear", + streamSid: session.streamSid, + }); + } + + session.lastAssistantItem = undefined; + session.responseStartTimestamp = undefined; +} + +function closeModel() { + cleanupConnection(session.modelConn); + session.modelConn = undefined; + if (!session.twilioConn && !session.frontendConn) session = {}; +} + +function closeAllConnections() { + if (session.twilioConn) { + session.twilioConn.close(); + session.twilioConn = undefined; + } + if (session.modelConn) { + session.modelConn.close(); + session.modelConn = undefined; + } + if (session.frontendConn) { + session.frontendConn.close(); + session.frontendConn = undefined; + } + session.streamSid = undefined; + session.lastAssistantItem = undefined; + session.responseStartTimestamp = undefined; + session.latestMediaTimestamp = undefined; + session.saved_config = undefined; +} + +function cleanupConnection(ws?: WebSocket) { + if (isOpen(ws)) ws.close(); +} + +function parseMessage(data: RawData): any { + try { + return JSON.parse(data.toString()); + } catch { + return null; + } +} + +function jsonSend(ws: WebSocket | undefined, obj: unknown) { + if (!isOpen(ws)) return; + ws.send(JSON.stringify(obj)); +} + +function isOpen(ws?: WebSocket): ws is WebSocket { + return !!ws && ws.readyState === WebSocket.OPEN; +} |
