From 57e139100b36b2ebc03212a29d256c2e14dc6eec Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 24 Apr 2026 01:42:16 +0100 Subject: [PATCH] refactor: share realtime voice bridge sessions --- docs/plugins/sdk-migration.md | 2 +- .../src/webhook/realtime-handler.ts | 79 ++++------- src/plugin-sdk/realtime-voice.ts | 6 + src/realtime-voice/session-runtime.test.ts | 133 ++++++++++++++++++ src/realtime-voice/session-runtime.ts | 109 ++++++++++++++ 5 files changed, 278 insertions(+), 51 deletions(-) create mode 100644 src/realtime-voice/session-runtime.test.ts create mode 100644 src/realtime-voice/session-runtime.ts diff --git a/docs/plugins/sdk-migration.md b/docs/plugins/sdk-migration.md index 5db3f80ca14..de9c4ef912c 100644 --- a/docs/plugins/sdk-migration.md +++ b/docs/plugins/sdk-migration.md @@ -296,7 +296,7 @@ Current bundled provider examples: | `plugin-sdk/speech` | Speech helpers | Speech provider types plus provider-facing directive, registry, and validation helpers | | `plugin-sdk/speech-core` | Shared speech core | Speech provider types, registry, directives, normalization | | `plugin-sdk/realtime-transcription` | Realtime transcription helpers | Provider types, registry helpers, and shared WebSocket session helper | - | `plugin-sdk/realtime-voice` | Realtime voice helpers | Provider types and registry helpers | + | `plugin-sdk/realtime-voice` | Realtime voice helpers | Provider types, registry helpers, and bridge session helpers | | `plugin-sdk/image-generation-core` | Shared image-generation core | Image-generation types, failover, auth, and registry helpers | | `plugin-sdk/music-generation` | Music-generation helpers | Music-generation provider/request/result types | | `plugin-sdk/music-generation-core` | Shared music-generation core | Music-generation types, failover helpers, provider lookup, and model-ref parsing | diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts index 0f37cd18fd9..947a8b86e0b 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -2,10 +2,11 @@ import { randomUUID } from "node:crypto"; import http from "node:http"; import type { Duplex } from "node:stream"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; -import type { - RealtimeVoiceBridge, - RealtimeVoiceProviderConfig, - RealtimeVoiceProviderPlugin, +import { + createRealtimeVoiceBridgeSession, + type RealtimeVoiceBridgeSession, + type RealtimeVoiceProviderConfig, + type RealtimeVoiceProviderPlugin, } from "openclaw/plugin-sdk/realtime-voice"; import WebSocket, { WebSocketServer } from "ws"; import type { VoiceCallRealtimeConfig } from "../config.js"; @@ -59,16 +60,7 @@ type CallRegistration = { initialGreetingInstructions?: string; }; -type ActiveRealtimeVoiceBridge = Pick< - RealtimeVoiceBridge, - | "connect" - | "sendAudio" - | "setMediaTimestamp" - | "submitToolResult" - | "acknowledgeMark" - | "close" - | "triggerGreeting" ->; +type ActiveRealtimeVoiceBridge = RealtimeVoiceBridgeSession; export class RealtimeCallHandler { private readonly toolHandlers = new Map(); @@ -254,34 +246,30 @@ export class RealtimeCallHandler { this.endCallInManager(callSid, callId, reason); }; - const bridgeRef: { current?: ActiveRealtimeVoiceBridge } = {}; - const bridge = this.realtimeProvider.createBridge({ + const bridge = createRealtimeVoiceBridgeSession({ + provider: this.realtimeProvider, providerConfig: this.providerConfig, instructions: this.config.instructions, tools: this.config.tools, - onAudio: (muLaw) => { - if (ws.readyState !== WebSocket.OPEN) { - return; - } - ws.send( - JSON.stringify({ - event: "media", - streamSid, - media: { payload: muLaw.toString("base64") }, - }), - ); - }, - onClearAudio: () => { - if (ws.readyState !== WebSocket.OPEN) { - return; - } - ws.send(JSON.stringify({ event: "clear", streamSid })); - }, - onMark: (markName) => { - if (ws.readyState !== WebSocket.OPEN) { - return; - } - ws.send(JSON.stringify({ event: "mark", streamSid, mark: { name: markName } })); + initialGreetingInstructions, + triggerGreetingOnReady: true, + audioSink: { + isOpen: () => ws.readyState === WebSocket.OPEN, + sendAudio: (muLaw) => { + ws.send( + JSON.stringify({ + event: "media", + streamSid, + media: { payload: muLaw.toString("base64") }, + }), + ); + }, + clearAudio: () => { + ws.send(JSON.stringify({ event: "clear", streamSid })); + }, + sendMark: (markName) => { + ws.send(JSON.stringify({ event: "mark", streamSid, mark: { name: markName } })); + }, }, onTranscript: (role, text, isFinal) => { if (!isFinal) { @@ -309,22 +297,15 @@ export class RealtimeCallHandler { text, }); }, - onToolCall: (toolEvent) => { - const activeBridge = bridgeRef.current; - if (!activeBridge) { - return; - } + onToolCall: (toolEvent, session) => { void this.executeToolCall( - activeBridge, + session, callId, toolEvent.callId || toolEvent.itemId, toolEvent.name, toolEvent.args, ); }, - onReady: () => { - bridgeRef.current?.triggerGreeting?.(initialGreetingInstructions); - }, onError: (error) => { console.error("[voice-call] realtime voice error:", error.message); }, @@ -348,8 +329,6 @@ export class RealtimeCallHandler { }, }); - bridgeRef.current = bridge; - bridge.connect().catch((error: Error) => { console.error("[voice-call] Failed to connect realtime bridge:", error); bridge.close(); diff --git a/src/plugin-sdk/realtime-voice.ts b/src/plugin-sdk/realtime-voice.ts index 41e2ed77400..720b48d4e5a 100644 --- a/src/plugin-sdk/realtime-voice.ts +++ b/src/plugin-sdk/realtime-voice.ts @@ -18,3 +18,9 @@ export { listRealtimeVoiceProviders, normalizeRealtimeVoiceProviderId, } from "../realtime-voice/provider-registry.js"; +export { + createRealtimeVoiceBridgeSession, + type RealtimeVoiceAudioSink, + type RealtimeVoiceBridgeSession, + type RealtimeVoiceBridgeSessionParams, +} from "../realtime-voice/session-runtime.js"; diff --git a/src/realtime-voice/session-runtime.test.ts b/src/realtime-voice/session-runtime.test.ts new file mode 100644 index 00000000000..6afd1378f9e --- /dev/null +++ b/src/realtime-voice/session-runtime.test.ts @@ -0,0 +1,133 @@ +import { describe, expect, it, vi } from "vitest"; +import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js"; +import type { RealtimeVoiceBridge } from "./provider-types.js"; +import { createRealtimeVoiceBridgeSession } from "./session-runtime.js"; + +function makeBridge(overrides: Partial = {}): RealtimeVoiceBridge { + return { + acknowledgeMark: vi.fn(), + close: vi.fn(), + connect: vi.fn(async () => {}), + isConnected: vi.fn(() => true), + sendAudio: vi.fn(), + setMediaTimestamp: vi.fn(), + submitToolResult: vi.fn(), + triggerGreeting: vi.fn(), + ...overrides, + }; +} + +describe("realtime voice bridge session runtime", () => { + it("routes provider output through an open audio sink", () => { + let callbacks: Parameters[0] | undefined; + const bridge = makeBridge(); + const provider: RealtimeVoiceProviderPlugin = { + id: "test", + label: "Test", + isConfigured: () => true, + createBridge: (request) => { + callbacks = request; + return bridge; + }, + }; + const sendAudio = vi.fn(); + const clearAudio = vi.fn(); + const sendMark = vi.fn(); + + createRealtimeVoiceBridgeSession({ + provider, + providerConfig: {}, + audioSink: { + isOpen: () => true, + sendAudio, + clearAudio, + sendMark, + }, + }); + + callbacks?.onAudio(Buffer.from([1, 2])); + callbacks?.onClearAudio(); + callbacks?.onMark?.("mark-1"); + + expect(sendAudio).toHaveBeenCalledWith(Buffer.from([1, 2])); + expect(clearAudio).toHaveBeenCalled(); + expect(sendMark).toHaveBeenCalledWith("mark-1"); + }); + + it("passes tool calls the active session and triggers initial greeting on ready", () => { + let callbacks: Parameters[0] | undefined; + const bridge = makeBridge(); + const provider: RealtimeVoiceProviderPlugin = { + id: "test", + label: "Test", + isConfigured: () => true, + createBridge: (request) => { + callbacks = request; + return bridge; + }, + }; + const onToolCall = vi.fn(); + + const session = createRealtimeVoiceBridgeSession({ + provider, + providerConfig: {}, + audioSink: { sendAudio: vi.fn() }, + initialGreetingInstructions: "Say hello", + triggerGreetingOnReady: true, + onToolCall, + }); + const event = { + itemId: "item-1", + callId: "call-1", + name: "lookup", + args: { q: "test" }, + }; + + callbacks?.onReady?.(); + callbacks?.onToolCall?.(event); + + expect(bridge.triggerGreeting).toHaveBeenCalledWith("Say hello"); + expect(onToolCall).toHaveBeenCalledWith(event, session); + }); + + it("does not expose session callbacks until the provider returns its bridge", () => { + let callbacks: Parameters[0] | undefined; + const bridge = makeBridge(); + const onReady = vi.fn(); + const onToolCall = vi.fn(); + const event = { + itemId: "item-1", + callId: "call-1", + name: "lookup", + args: {}, + }; + const provider: RealtimeVoiceProviderPlugin = { + id: "test", + label: "Test", + isConfigured: () => true, + createBridge: (request) => { + callbacks = request; + request.onReady?.(); + request.onToolCall?.(event); + return bridge; + }, + }; + + const session = createRealtimeVoiceBridgeSession({ + provider, + providerConfig: {}, + audioSink: { sendAudio: vi.fn() }, + onReady, + onToolCall, + }); + + expect(onReady).not.toHaveBeenCalled(); + expect(onToolCall).not.toHaveBeenCalled(); + + callbacks?.onReady?.(); + callbacks?.onToolCall?.(event); + + expect(onReady).toHaveBeenCalledWith(session); + expect(onToolCall).toHaveBeenCalledWith(event, session); + }); +}); diff --git a/src/realtime-voice/session-runtime.ts b/src/realtime-voice/session-runtime.ts new file mode 100644 index 00000000000..b79feedcfdd --- /dev/null +++ b/src/realtime-voice/session-runtime.ts @@ -0,0 +1,109 @@ +import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js"; +import type { + RealtimeVoiceBridge, + RealtimeVoiceCloseReason, + RealtimeVoiceProviderConfig, + RealtimeVoiceRole, + RealtimeVoiceTool, + RealtimeVoiceToolCallEvent, +} from "./provider-types.js"; + +export type RealtimeVoiceAudioSink = { + isOpen?: () => boolean; + sendAudio: (muLaw: Buffer) => void; + clearAudio?: () => void; + sendMark?: (markName: string) => void; +}; + +export type RealtimeVoiceBridgeSession = { + bridge: RealtimeVoiceBridge; + acknowledgeMark(): void; + close(): void; + connect(): Promise; + sendAudio(audio: Buffer): void; + sendUserMessage(text: string): void; + setMediaTimestamp(ts: number): void; + submitToolResult(callId: string, result: unknown): void; + triggerGreeting(instructions?: string): void; +}; + +export type RealtimeVoiceBridgeSessionParams = { + provider: RealtimeVoiceProviderPlugin; + providerConfig: RealtimeVoiceProviderConfig; + audioSink: RealtimeVoiceAudioSink; + instructions?: string; + initialGreetingInstructions?: string; + triggerGreetingOnReady?: boolean; + tools?: RealtimeVoiceTool[]; + onTranscript?: (role: RealtimeVoiceRole, text: string, isFinal: boolean) => void; + onToolCall?: (event: RealtimeVoiceToolCallEvent, session: RealtimeVoiceBridgeSession) => void; + onReady?: (session: RealtimeVoiceBridgeSession) => void; + onError?: (error: Error) => void; + onClose?: (reason: RealtimeVoiceCloseReason) => void; +}; + +export function createRealtimeVoiceBridgeSession( + params: RealtimeVoiceBridgeSessionParams, +): RealtimeVoiceBridgeSession { + let bridge: RealtimeVoiceBridge | undefined; + const requireBridge = () => { + if (!bridge) { + throw new Error("Realtime voice bridge is not ready"); + } + return bridge; + }; + const session: RealtimeVoiceBridgeSession = { + get bridge() { + return requireBridge(); + }, + acknowledgeMark: () => requireBridge().acknowledgeMark(), + close: () => requireBridge().close(), + connect: () => requireBridge().connect(), + sendAudio: (audio) => requireBridge().sendAudio(audio), + sendUserMessage: (text) => requireBridge().sendUserMessage?.(text), + setMediaTimestamp: (ts) => requireBridge().setMediaTimestamp(ts), + submitToolResult: (callId, result) => requireBridge().submitToolResult(callId, result), + triggerGreeting: (instructions) => requireBridge().triggerGreeting?.(instructions), + }; + const canSendAudio = () => params.audioSink.isOpen?.() ?? true; + bridge = params.provider.createBridge({ + providerConfig: params.providerConfig, + instructions: params.instructions, + tools: params.tools, + onAudio: (muLaw) => { + if (canSendAudio()) { + params.audioSink.sendAudio(muLaw); + } + }, + onClearAudio: () => { + if (canSendAudio()) { + params.audioSink.clearAudio?.(); + } + }, + onMark: (markName) => { + if (canSendAudio()) { + params.audioSink.sendMark?.(markName); + } + }, + onTranscript: params.onTranscript, + onToolCall: (event) => { + if (!bridge) { + return; + } + params.onToolCall?.(event, session); + }, + onReady: () => { + if (!bridge) { + return; + } + if (params.triggerGreetingOnReady) { + bridge.triggerGreeting?.(params.initialGreetingInstructions); + } + params.onReady?.(session); + }, + onError: params.onError, + onClose: params.onClose, + }); + + return session; +}