refactor: share realtime voice bridge sessions

This commit is contained in:
Peter Steinberger
2026-04-24 01:42:16 +01:00
parent 958afeb397
commit 57e139100b
5 changed files with 278 additions and 51 deletions

View File

@@ -296,7 +296,7 @@ Current bundled provider examples:
| `plugin-sdk/speech` | Speech helpers | Speech provider types plus provider-facing directive, registry, and validation helpers |
| `plugin-sdk/speech-core` | Shared speech core | Speech provider types, registry, directives, normalization |
| `plugin-sdk/realtime-transcription` | Realtime transcription helpers | Provider types, registry helpers, and shared WebSocket session helper |
| `plugin-sdk/realtime-voice` | Realtime voice helpers | Provider types and registry helpers |
| `plugin-sdk/realtime-voice` | Realtime voice helpers | Provider types, registry helpers, and bridge session helpers |
| `plugin-sdk/image-generation-core` | Shared image-generation core | Image-generation types, failover, auth, and registry helpers |
| `plugin-sdk/music-generation` | Music-generation helpers | Music-generation provider/request/result types |
| `plugin-sdk/music-generation-core` | Shared music-generation core | Music-generation types, failover helpers, provider lookup, and model-ref parsing |

View File

@@ -2,10 +2,11 @@ import { randomUUID } from "node:crypto";
import http from "node:http";
import type { Duplex } from "node:stream";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import type {
RealtimeVoiceBridge,
RealtimeVoiceProviderConfig,
RealtimeVoiceProviderPlugin,
import {
createRealtimeVoiceBridgeSession,
type RealtimeVoiceBridgeSession,
type RealtimeVoiceProviderConfig,
type RealtimeVoiceProviderPlugin,
} from "openclaw/plugin-sdk/realtime-voice";
import WebSocket, { WebSocketServer } from "ws";
import type { VoiceCallRealtimeConfig } from "../config.js";
@@ -59,16 +60,7 @@ type CallRegistration = {
initialGreetingInstructions?: string;
};
type ActiveRealtimeVoiceBridge = Pick<
RealtimeVoiceBridge,
| "connect"
| "sendAudio"
| "setMediaTimestamp"
| "submitToolResult"
| "acknowledgeMark"
| "close"
| "triggerGreeting"
>;
type ActiveRealtimeVoiceBridge = RealtimeVoiceBridgeSession;
export class RealtimeCallHandler {
private readonly toolHandlers = new Map<string, ToolHandlerFn>();
@@ -254,34 +246,30 @@ export class RealtimeCallHandler {
this.endCallInManager(callSid, callId, reason);
};
const bridgeRef: { current?: ActiveRealtimeVoiceBridge } = {};
const bridge = this.realtimeProvider.createBridge({
const bridge = createRealtimeVoiceBridgeSession({
provider: this.realtimeProvider,
providerConfig: this.providerConfig,
instructions: this.config.instructions,
tools: this.config.tools,
onAudio: (muLaw) => {
if (ws.readyState !== WebSocket.OPEN) {
return;
}
ws.send(
JSON.stringify({
event: "media",
streamSid,
media: { payload: muLaw.toString("base64") },
}),
);
},
onClearAudio: () => {
if (ws.readyState !== WebSocket.OPEN) {
return;
}
ws.send(JSON.stringify({ event: "clear", streamSid }));
},
onMark: (markName) => {
if (ws.readyState !== WebSocket.OPEN) {
return;
}
ws.send(JSON.stringify({ event: "mark", streamSid, mark: { name: markName } }));
initialGreetingInstructions,
triggerGreetingOnReady: true,
audioSink: {
isOpen: () => ws.readyState === WebSocket.OPEN,
sendAudio: (muLaw) => {
ws.send(
JSON.stringify({
event: "media",
streamSid,
media: { payload: muLaw.toString("base64") },
}),
);
},
clearAudio: () => {
ws.send(JSON.stringify({ event: "clear", streamSid }));
},
sendMark: (markName) => {
ws.send(JSON.stringify({ event: "mark", streamSid, mark: { name: markName } }));
},
},
onTranscript: (role, text, isFinal) => {
if (!isFinal) {
@@ -309,22 +297,15 @@ export class RealtimeCallHandler {
text,
});
},
onToolCall: (toolEvent) => {
const activeBridge = bridgeRef.current;
if (!activeBridge) {
return;
}
onToolCall: (toolEvent, session) => {
void this.executeToolCall(
activeBridge,
session,
callId,
toolEvent.callId || toolEvent.itemId,
toolEvent.name,
toolEvent.args,
);
},
onReady: () => {
bridgeRef.current?.triggerGreeting?.(initialGreetingInstructions);
},
onError: (error) => {
console.error("[voice-call] realtime voice error:", error.message);
},
@@ -348,8 +329,6 @@ export class RealtimeCallHandler {
},
});
bridgeRef.current = bridge;
bridge.connect().catch((error: Error) => {
console.error("[voice-call] Failed to connect realtime bridge:", error);
bridge.close();

View File

@@ -18,3 +18,9 @@ export {
listRealtimeVoiceProviders,
normalizeRealtimeVoiceProviderId,
} from "../realtime-voice/provider-registry.js";
export {
createRealtimeVoiceBridgeSession,
type RealtimeVoiceAudioSink,
type RealtimeVoiceBridgeSession,
type RealtimeVoiceBridgeSessionParams,
} from "../realtime-voice/session-runtime.js";

View File

@@ -0,0 +1,133 @@
import { describe, expect, it, vi } from "vitest";
import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js";
import type { RealtimeVoiceBridge } from "./provider-types.js";
import { createRealtimeVoiceBridgeSession } from "./session-runtime.js";
function makeBridge(overrides: Partial<RealtimeVoiceBridge> = {}): RealtimeVoiceBridge {
return {
acknowledgeMark: vi.fn(),
close: vi.fn(),
connect: vi.fn(async () => {}),
isConnected: vi.fn(() => true),
sendAudio: vi.fn(),
setMediaTimestamp: vi.fn(),
submitToolResult: vi.fn(),
triggerGreeting: vi.fn(),
...overrides,
};
}
describe("realtime voice bridge session runtime", () => {
it("routes provider output through an open audio sink", () => {
let callbacks: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0] | undefined;
const bridge = makeBridge();
const provider: RealtimeVoiceProviderPlugin = {
id: "test",
label: "Test",
isConfigured: () => true,
createBridge: (request) => {
callbacks = request;
return bridge;
},
};
const sendAudio = vi.fn();
const clearAudio = vi.fn();
const sendMark = vi.fn();
createRealtimeVoiceBridgeSession({
provider,
providerConfig: {},
audioSink: {
isOpen: () => true,
sendAudio,
clearAudio,
sendMark,
},
});
callbacks?.onAudio(Buffer.from([1, 2]));
callbacks?.onClearAudio();
callbacks?.onMark?.("mark-1");
expect(sendAudio).toHaveBeenCalledWith(Buffer.from([1, 2]));
expect(clearAudio).toHaveBeenCalled();
expect(sendMark).toHaveBeenCalledWith("mark-1");
});
it("passes tool calls the active session and triggers initial greeting on ready", () => {
let callbacks: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0] | undefined;
const bridge = makeBridge();
const provider: RealtimeVoiceProviderPlugin = {
id: "test",
label: "Test",
isConfigured: () => true,
createBridge: (request) => {
callbacks = request;
return bridge;
},
};
const onToolCall = vi.fn();
const session = createRealtimeVoiceBridgeSession({
provider,
providerConfig: {},
audioSink: { sendAudio: vi.fn() },
initialGreetingInstructions: "Say hello",
triggerGreetingOnReady: true,
onToolCall,
});
const event = {
itemId: "item-1",
callId: "call-1",
name: "lookup",
args: { q: "test" },
};
callbacks?.onReady?.();
callbacks?.onToolCall?.(event);
expect(bridge.triggerGreeting).toHaveBeenCalledWith("Say hello");
expect(onToolCall).toHaveBeenCalledWith(event, session);
});
it("does not expose session callbacks until the provider returns its bridge", () => {
let callbacks: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0] | undefined;
const bridge = makeBridge();
const onReady = vi.fn();
const onToolCall = vi.fn();
const event = {
itemId: "item-1",
callId: "call-1",
name: "lookup",
args: {},
};
const provider: RealtimeVoiceProviderPlugin = {
id: "test",
label: "Test",
isConfigured: () => true,
createBridge: (request) => {
callbacks = request;
request.onReady?.();
request.onToolCall?.(event);
return bridge;
},
};
const session = createRealtimeVoiceBridgeSession({
provider,
providerConfig: {},
audioSink: { sendAudio: vi.fn() },
onReady,
onToolCall,
});
expect(onReady).not.toHaveBeenCalled();
expect(onToolCall).not.toHaveBeenCalled();
callbacks?.onReady?.();
callbacks?.onToolCall?.(event);
expect(onReady).toHaveBeenCalledWith(session);
expect(onToolCall).toHaveBeenCalledWith(event, session);
});
});

View File

@@ -0,0 +1,109 @@
import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js";
import type {
RealtimeVoiceBridge,
RealtimeVoiceCloseReason,
RealtimeVoiceProviderConfig,
RealtimeVoiceRole,
RealtimeVoiceTool,
RealtimeVoiceToolCallEvent,
} from "./provider-types.js";
export type RealtimeVoiceAudioSink = {
isOpen?: () => boolean;
sendAudio: (muLaw: Buffer) => void;
clearAudio?: () => void;
sendMark?: (markName: string) => void;
};
export type RealtimeVoiceBridgeSession = {
bridge: RealtimeVoiceBridge;
acknowledgeMark(): void;
close(): void;
connect(): Promise<void>;
sendAudio(audio: Buffer): void;
sendUserMessage(text: string): void;
setMediaTimestamp(ts: number): void;
submitToolResult(callId: string, result: unknown): void;
triggerGreeting(instructions?: string): void;
};
export type RealtimeVoiceBridgeSessionParams = {
provider: RealtimeVoiceProviderPlugin;
providerConfig: RealtimeVoiceProviderConfig;
audioSink: RealtimeVoiceAudioSink;
instructions?: string;
initialGreetingInstructions?: string;
triggerGreetingOnReady?: boolean;
tools?: RealtimeVoiceTool[];
onTranscript?: (role: RealtimeVoiceRole, text: string, isFinal: boolean) => void;
onToolCall?: (event: RealtimeVoiceToolCallEvent, session: RealtimeVoiceBridgeSession) => void;
onReady?: (session: RealtimeVoiceBridgeSession) => void;
onError?: (error: Error) => void;
onClose?: (reason: RealtimeVoiceCloseReason) => void;
};
export function createRealtimeVoiceBridgeSession(
params: RealtimeVoiceBridgeSessionParams,
): RealtimeVoiceBridgeSession {
let bridge: RealtimeVoiceBridge | undefined;
const requireBridge = () => {
if (!bridge) {
throw new Error("Realtime voice bridge is not ready");
}
return bridge;
};
const session: RealtimeVoiceBridgeSession = {
get bridge() {
return requireBridge();
},
acknowledgeMark: () => requireBridge().acknowledgeMark(),
close: () => requireBridge().close(),
connect: () => requireBridge().connect(),
sendAudio: (audio) => requireBridge().sendAudio(audio),
sendUserMessage: (text) => requireBridge().sendUserMessage?.(text),
setMediaTimestamp: (ts) => requireBridge().setMediaTimestamp(ts),
submitToolResult: (callId, result) => requireBridge().submitToolResult(callId, result),
triggerGreeting: (instructions) => requireBridge().triggerGreeting?.(instructions),
};
const canSendAudio = () => params.audioSink.isOpen?.() ?? true;
bridge = params.provider.createBridge({
providerConfig: params.providerConfig,
instructions: params.instructions,
tools: params.tools,
onAudio: (muLaw) => {
if (canSendAudio()) {
params.audioSink.sendAudio(muLaw);
}
},
onClearAudio: () => {
if (canSendAudio()) {
params.audioSink.clearAudio?.();
}
},
onMark: (markName) => {
if (canSendAudio()) {
params.audioSink.sendMark?.(markName);
}
},
onTranscript: params.onTranscript,
onToolCall: (event) => {
if (!bridge) {
return;
}
params.onToolCall?.(event, session);
},
onReady: () => {
if (!bridge) {
return;
}
if (params.triggerGreetingOnReady) {
bridge.triggerGreeting?.(params.initialGreetingInstructions);
}
params.onReady?.(session);
},
onError: params.onError,
onClose: params.onClose,
});
return session;
}