diff --git a/CHANGELOG.md b/CHANGELOG.md index 364b37d2124..37b84e6319c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,7 @@ Docs: https://docs.openclaw.ai - Providers/GPT-5: move the GPT-5 prompt overlay into the shared provider runtime so compatible GPT-5 models receive the same behavior and heartbeat guidance through OpenAI, OpenRouter, OpenCode, Codex, and other GPT providers; add `agents.defaults.promptOverlays.gpt5.personality` as the global friendly-style toggle while keeping the OpenAI plugin setting as a fallback. - Providers/xAI: add image generation, text-to-speech, and speech-to-text support, including `grok-imagine-image` / `grok-imagine-image-pro`, reference-image edits, six live xAI voices, MP3/WAV/PCM/G.711 TTS formats, `grok-stt` audio transcription, and xAI realtime transcription for Voice Call streaming. (#68694) Thanks @KateWilkins. - Providers/STT: add Voice Call streaming transcription for Deepgram, ElevenLabs, and Mistral, alongside the existing OpenAI and xAI realtime STT paths; ElevenLabs also gains Scribe v2 batch audio transcription for inbound media. -- Plugin SDK/realtime transcription: add a shared WebSocket session helper for streaming STT providers, covering audio queueing, ready handshakes, proxy capture, reconnects, and close flushing so provider plugins can register realtime transcription with much less transport boilerplate. +- Plugin SDK/STT: share realtime transcription WebSocket transport and multipart batch transcription form helpers across bundled STT providers, reducing provider plugin boilerplate while preserving proxy capture, reconnects, audio queueing, close flushing, upload filename normalization, and ready handshakes. - Models/commands: add `/models add ` so you can register a model from chat and use it without restarting the gateway; keep `/models` as a simple provider browser while adding clearer add guidance and copy-friendly command examples. (#70211) Thanks @Takhoffman. - Pi/models: update the bundled pi packages to `0.68.1` and let the OpenCode Go catalog come from pi instead of plugin-maintained model aliases, adding the refreshed `opencode-go/kimi-k2.6`, Qwen, GLM, MiMo, and MiniMax entries. - CLI/doctor plugins: lazy-load doctor plugin paths and prefer installed plugin `dist/*` runtime entries over source-adjacent JavaScript fallbacks, reducing the measured `doctor --non-interactive` runtime by about 74% while keeping cold doctor startup on built plugin artifacts. (#69840) Thanks @gumadeiras. diff --git a/docs/plugins/sdk-migration.md b/docs/plugins/sdk-migration.md index ced0f07f5e9..fdb6f4b24e8 100644 --- a/docs/plugins/sdk-migration.md +++ b/docs/plugins/sdk-migration.md @@ -279,7 +279,7 @@ Current bundled provider examples: | `plugin-sdk/provider-model-shared` | Shared provider model/replay helpers | `ProviderReplayFamily`, `buildProviderReplayFamilyHooks`, `normalizeModelCompat`, shared replay-policy builders, provider-endpoint helpers, and model-id normalization helpers | | `plugin-sdk/provider-catalog-shared` | Shared provider catalog helpers | `findCatalogTemplate`, `buildSingleProviderApiKeyCatalog`, `supportsNativeStreamingUsageCompat`, `applyProviderNativeStreamingUsageCompat` | | `plugin-sdk/provider-onboard` | Provider onboarding patches | Onboarding config helpers | - | `plugin-sdk/provider-http` | Provider HTTP helpers | Generic provider HTTP/endpoint capability helpers | + | `plugin-sdk/provider-http` | Provider HTTP helpers | Generic provider HTTP/endpoint capability helpers, including audio transcription multipart form helpers | | `plugin-sdk/provider-web-fetch` | Provider web-fetch helpers | Web-fetch provider registration/cache helpers | | `plugin-sdk/provider-web-search-config-contract` | Provider web-search config helpers | Narrow web-search config/credential helpers for providers that do not need plugin-enable wiring | | `plugin-sdk/provider-web-search-contract` | Provider web-search contract helpers | Narrow web-search config/credential contract helpers such as `createWebSearchProviderContractFields`, `enablePluginInConfig`, `resolveProviderWebSearchPluginConfig`, and scoped credential setters/getters | diff --git a/docs/plugins/sdk-overview.md b/docs/plugins/sdk-overview.md index 5ce0e772c75..479a3821380 100644 --- a/docs/plugins/sdk-overview.md +++ b/docs/plugins/sdk-overview.md @@ -137,7 +137,7 @@ explicitly promotes one as public. | `plugin-sdk/provider-auth` | `createProviderApiKeyAuthMethod`, `ensureApiKeyFromOptionEnvOrPrompt`, `upsertAuthProfile`, `upsertApiKeyProfile`, `writeOAuthCredentials` | | `plugin-sdk/provider-model-shared` | `ProviderReplayFamily`, `buildProviderReplayFamilyHooks`, `normalizeModelCompat`, shared replay-policy builders, provider-endpoint helpers, and model-id normalization helpers such as `normalizeNativeXaiModelId` | | `plugin-sdk/provider-catalog-shared` | `findCatalogTemplate`, `buildSingleProviderApiKeyCatalog`, `supportsNativeStreamingUsageCompat`, `applyProviderNativeStreamingUsageCompat` | - | `plugin-sdk/provider-http` | Generic provider HTTP/endpoint capability helpers | + | `plugin-sdk/provider-http` | Generic provider HTTP/endpoint capability helpers, including audio transcription multipart form helpers | | `plugin-sdk/provider-web-fetch-contract` | Narrow web-fetch config/selection contract helpers such as `enablePluginInConfig` and `WebFetchProviderPlugin` | | `plugin-sdk/provider-web-fetch` | Web-fetch provider registration/cache helpers | | `plugin-sdk/provider-web-search-config-contract` | Narrow web-search config/credential helpers for providers that do not need plugin-enable wiring | diff --git a/docs/plugins/sdk-provider-plugins.md b/docs/plugins/sdk-provider-plugins.md index 07d8d24f541..dcb7546ec8f 100644 --- a/docs/plugins/sdk-provider-plugins.md +++ b/docs/plugins/sdk-provider-plugins.md @@ -716,6 +716,17 @@ API key auth, and dynamic model resolution. as `maxInputImages`, `maxInputVideos`, and `maxDurationSeconds` are not enough to advertise transform-mode support or disabled modes cleanly. + Prefer the shared WebSocket helper for streaming STT providers. It keeps + proxy capture, reconnect backoff, close flushing, ready handshakes, audio + queueing, and close-event diagnostics consistent across providers while + leaving provider code responsible for only the upstream event mapping. + + Batch STT providers that POST multipart audio should use + `buildAudioTranscriptionFormData(...)` from + `openclaw/plugin-sdk/provider-http` together with the provider HTTP request + helpers. The form helper normalizes upload filenames, including AAC uploads + that need an M4A-style filename for compatible transcription APIs. + Music-generation providers should follow the same pattern: `generate` for prompt-only generation and `edit` for reference-image-based generation. Flat aggregate fields such as `maxInputImages`, diff --git a/docs/plugins/voice-call.md b/docs/plugins/voice-call.md index 331a5f9d977..13438d5fff2 100644 --- a/docs/plugins/voice-call.md +++ b/docs/plugins/voice-call.md @@ -155,7 +155,8 @@ Current runtime behavior: - `streaming.provider` is optional. If unset, Voice Call uses the first registered realtime transcription provider. -- Bundled realtime transcription providers include OpenAI (`openai`) and xAI +- Bundled realtime transcription providers include Deepgram (`deepgram`), + ElevenLabs (`elevenlabs`), Mistral (`mistral`), OpenAI (`openai`), and xAI (`xai`), registered by their provider plugins. - Provider-owned raw config lives under `streaming.providers.`. - If `streaming.provider` points at an unregistered provider, or no realtime diff --git a/extensions/deepgram/audio.live.test.ts b/extensions/deepgram/audio.live.test.ts index 3a0ff252bd8..887ead405ca 100644 --- a/extensions/deepgram/audio.live.test.ts +++ b/extensions/deepgram/audio.live.test.ts @@ -1,10 +1,8 @@ import { describe, expect, it } from "vitest"; import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js"; import { - normalizeTranscriptForMatch, - streamAudioForLiveTest, + runRealtimeSttLiveTest, synthesizeElevenLabsLiveSpeech, - waitForLiveExpectation, } from "../../test/helpers/stt-live-audio.js"; import { transcribeDeepgramAudio } from "./audio.js"; import { buildDeepgramRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; @@ -62,37 +60,15 @@ describeLive("deepgram live", () => { outputFormat: "ulaw_8000", timeoutMs: 30_000, }); - const transcripts: string[] = []; - const partials: string[] = []; - const errors: Error[] = []; - const session = provider.createSession({ + + await runRealtimeSttLiveTest({ + provider, providerConfig: { apiKey: DEEPGRAM_KEY, language: "en-US", endpointingMs: 500, }, - onPartial: (partial) => partials.push(partial), - onTranscript: (transcript) => transcripts.push(transcript), - onError: (error) => errors.push(error), + audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]), }); - - try { - await session.connect(); - await streamAudioForLiveTest({ - audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]), - sendAudio: (chunk) => session.sendAudio(chunk), - }); - - await waitForLiveExpectation(() => { - if (errors[0]) { - throw errors[0]; - } - expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw"); - }, 60_000); - } finally { - session.close(); - } - - expect(partials.length + transcripts.length).toBeGreaterThan(0); }, 90_000); }); diff --git a/extensions/elevenlabs/elevenlabs.live.test.ts b/extensions/elevenlabs/elevenlabs.live.test.ts index 3eff0b29d99..9a69efdebe9 100644 --- a/extensions/elevenlabs/elevenlabs.live.test.ts +++ b/extensions/elevenlabs/elevenlabs.live.test.ts @@ -2,9 +2,8 @@ import { describe, expect, it } from "vitest"; import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js"; import { normalizeTranscriptForMatch, - streamAudioForLiveTest, + runRealtimeSttLiveTest, synthesizeElevenLabsLiveSpeech, - waitForLiveExpectation, } from "../../test/helpers/stt-live-audio.js"; import { elevenLabsMediaUnderstandingProvider } from "./media-understanding-provider.js"; import { buildElevenLabsRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; @@ -45,10 +44,9 @@ describeLive("elevenlabs plugin live", () => { outputFormat: "ulaw_8000", timeoutMs: 30_000, }); - const transcripts: string[] = []; - const partials: string[] = []; - const errors: Error[] = []; - const session = provider.createSession({ + + await runRealtimeSttLiveTest({ + provider, providerConfig: { apiKey: ELEVENLABS_KEY, audioFormat: "ulaw_8000", @@ -56,29 +54,8 @@ describeLive("elevenlabs plugin live", () => { commitStrategy: "vad", languageCode: "en", }, - onPartial: (partial) => partials.push(partial), - onTranscript: (transcript) => transcripts.push(transcript), - onError: (error) => errors.push(error), + audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]), + closeBeforeWait: true, }); - - try { - await session.connect(); - await streamAudioForLiveTest({ - audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]), - sendAudio: (chunk) => session.sendAudio(chunk), - }); - session.close(); - - await waitForLiveExpectation(() => { - if (errors[0]) { - throw errors[0]; - } - expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw"); - }, 60_000); - } finally { - session.close(); - } - - expect(partials.length + transcripts.length).toBeGreaterThan(0); }, 90_000); }); diff --git a/extensions/elevenlabs/media-understanding-provider.test.ts b/extensions/elevenlabs/media-understanding-provider.test.ts index 4362ee91bb4..711831031ef 100644 --- a/extensions/elevenlabs/media-understanding-provider.test.ts +++ b/extensions/elevenlabs/media-understanding-provider.test.ts @@ -31,12 +31,11 @@ describe("elevenLabsMediaUnderstandingProvider", () => { expect(result).toEqual({ text: "hello", model: "scribe_v2" }); expect(fetchMock).toHaveBeenCalledWith( "https://api.elevenlabs.io/v1/speech-to-text", - expect.objectContaining({ - method: "POST", - headers: { "xi-api-key": "eleven-key" }, - }), + expect.objectContaining({ method: "POST" }), ); const init = fetchMock.mock.calls[0]?.[1] as RequestInit; + const headers = new Headers(init.headers); + expect(headers.get("xi-api-key")).toBe("eleven-key"); const form = init.body as FormData; expect(form.get("model_id")).toBe("scribe_v2"); expect(form.get("language_code")).toBe("en"); diff --git a/extensions/elevenlabs/media-understanding-provider.ts b/extensions/elevenlabs/media-understanding-provider.ts index a26a04975f2..2c415b41597 100644 --- a/extensions/elevenlabs/media-understanding-provider.ts +++ b/extensions/elevenlabs/media-understanding-provider.ts @@ -1,51 +1,19 @@ -import path from "node:path"; import type { AudioTranscriptionRequest, AudioTranscriptionResult, MediaUnderstandingProvider, } from "openclaw/plugin-sdk/media-understanding"; -import { normalizeElevenLabsBaseUrl } from "./shared.js"; +import { + assertOkOrThrowHttpError, + buildAudioTranscriptionFormData, + postTranscriptionRequest, + resolveProviderHttpRequestConfig, + requireTranscriptionText, +} from "openclaw/plugin-sdk/provider-http"; +import { DEFAULT_ELEVENLABS_BASE_URL, normalizeElevenLabsBaseUrl } from "./shared.js"; const DEFAULT_ELEVENLABS_STT_MODEL = "scribe_v2"; -function resolveUploadFileName(fileName?: string, mime?: string): string { - const trimmed = fileName?.trim(); - const baseName = trimmed ? path.basename(trimmed) : "audio"; - const lowerMime = mime?.trim().toLowerCase(); - - if (/\.aac$/i.test(baseName)) { - return `${baseName.slice(0, -4) || "audio"}.m4a`; - } - if (!path.extname(baseName) && lowerMime === "audio/aac") { - return `${baseName || "audio"}.m4a`; - } - return baseName; -} - -async function readErrorDetail(res: Response): Promise { - const text = (await res.text()).trim(); - if (!text) { - return undefined; - } - try { - const json = JSON.parse(text) as { - detail?: { message?: string; detail?: string; status?: string; code?: string }; - message?: string; - error?: string; - }; - return ( - json.message ?? - json.detail?.message ?? - json.detail?.detail ?? - json.error ?? - json.detail?.status ?? - json.detail?.code - ); - } catch { - return text.slice(0, 300); - } -} - export async function transcribeElevenLabsAudio( req: AudioTranscriptionRequest, ): Promise { @@ -56,46 +24,51 @@ export async function transcribeElevenLabsAudio( } const model = req.model?.trim() || DEFAULT_ELEVENLABS_STT_MODEL; - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), req.timeoutMs); - - try { - const form = new FormData(); - const bytes = new Uint8Array(req.buffer); - const blob = new Blob([bytes], { type: req.mime ?? "application/octet-stream" }); - form.append("file", blob, resolveUploadFileName(req.fileName, req.mime)); - form.append("model_id", model); - if (req.language?.trim()) { - form.append("language_code", req.language.trim()); - } - if (req.prompt?.trim()) { - form.append("prompt", req.prompt.trim()); - } - - const res = await fetchFn(`${normalizeElevenLabsBaseUrl(req.baseUrl)}/v1/speech-to-text`, { - method: "POST", - headers: { + const { baseUrl, allowPrivateNetwork, headers, dispatcherPolicy } = + resolveProviderHttpRequestConfig({ + baseUrl: normalizeElevenLabsBaseUrl(req.baseUrl), + defaultBaseUrl: DEFAULT_ELEVENLABS_BASE_URL, + headers: req.headers, + request: req.request, + defaultHeaders: { "xi-api-key": apiKey, }, - body: form, - signal: controller.signal, + provider: "elevenlabs", + api: "elevenlabs-speech-to-text", + capability: "audio", + transport: "media-understanding", }); + const form = buildAudioTranscriptionFormData({ + buffer: req.buffer, + fileName: req.fileName, + mime: req.mime, + fields: { + model_id: model, + language_code: req.language, + prompt: req.prompt, + }, + }); + const { response, release } = await postTranscriptionRequest({ + url: `${baseUrl}/v1/speech-to-text`, + headers, + body: form, + timeoutMs: req.timeoutMs, + fetchFn, + allowPrivateNetwork, + dispatcherPolicy, + auditContext: "elevenlabs speech-to-text", + }); - if (!res.ok) { - const detail = await readErrorDetail(res); - throw new Error( - `ElevenLabs audio transcription failed (${res.status})${detail ? `: ${detail}` : ""}`, - ); - } - - const payload = (await res.json()) as { text?: string }; - const text = payload.text?.trim(); - if (!text) { - throw new Error("ElevenLabs audio transcription response missing text"); - } + try { + await assertOkOrThrowHttpError(response, "ElevenLabs audio transcription failed"); + const payload = (await response.json()) as { text?: string }; + const text = requireTranscriptionText( + payload.text, + "ElevenLabs audio transcription response missing text", + ); return { text, model }; } finally { - clearTimeout(timeout); + await release(); } } diff --git a/extensions/mistral/.openclaw-runtime-deps.json b/extensions/mistral/.openclaw-runtime-deps.json new file mode 100644 index 00000000000..b762204638d --- /dev/null +++ b/extensions/mistral/.openclaw-runtime-deps.json @@ -0,0 +1,3 @@ +{ + "specs": ["ws@^8.20.0"] +} diff --git a/extensions/mistral/mistral.live.test.ts b/extensions/mistral/mistral.live.test.ts index 6aec8fd1c1e..b09e759db35 100644 --- a/extensions/mistral/mistral.live.test.ts +++ b/extensions/mistral/mistral.live.test.ts @@ -2,9 +2,8 @@ import { describe, expect, it } from "vitest"; import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js"; import { normalizeTranscriptForMatch, - streamAudioForLiveTest, + runRealtimeSttLiveTest, synthesizeElevenLabsLiveSpeech, - waitForLiveExpectation, } from "../../test/helpers/stt-live-audio.js"; import { mistralMediaUnderstandingProvider } from "./media-understanding-provider.js"; import { buildMistralRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; @@ -46,39 +45,17 @@ describeLive("mistral plugin live", () => { outputFormat: "ulaw_8000", timeoutMs: 30_000, }); - const transcripts: string[] = []; - const partials: string[] = []; - const errors: Error[] = []; - const session = provider.createSession({ + + await runRealtimeSttLiveTest({ + provider, providerConfig: { apiKey: MISTRAL_KEY, sampleRate: 8000, encoding: "pcm_mulaw", targetStreamingDelayMs: 800, }, - onPartial: (partial) => partials.push(partial), - onTranscript: (transcript) => transcripts.push(transcript), - onError: (error) => errors.push(error), + audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]), + closeBeforeWait: true, }); - - try { - await session.connect(); - await streamAudioForLiveTest({ - audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]), - sendAudio: (chunk) => session.sendAudio(chunk), - }); - session.close(); - - await waitForLiveExpectation(() => { - if (errors[0]) { - throw errors[0]; - } - expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw"); - }, 60_000); - } finally { - session.close(); - } - - expect(partials.length + transcripts.length).toBeGreaterThan(0); }, 90_000); }); diff --git a/extensions/openai/openai.live.test.ts b/extensions/openai/openai.live.test.ts index 0f9334d3158..8d6e7f92e51 100644 --- a/extensions/openai/openai.live.test.ts +++ b/extensions/openai/openai.live.test.ts @@ -13,6 +13,7 @@ import { registerProviderPlugin, requireRegisteredProvider, } from "../../test/helpers/plugins/provider-registration.js"; +import { runRealtimeSttLiveTest } from "../../test/helpers/stt-live-audio.js"; import plugin from "./index.js"; const OPENAI_API_KEY = process.env.OPENAI_API_KEY ?? ""; @@ -140,21 +141,6 @@ async function createTempAgentDir(): Promise { return await fs.mkdtemp(path.join(os.tmpdir(), "openai-plugin-live-")); } -async function waitForLiveExpectation(expectation: () => void, timeoutMs = 30_000) { - const started = Date.now(); - let lastError: unknown; - while (Date.now() - started < timeoutMs) { - try { - expectation(); - return; - } catch (error) { - lastError = error; - await new Promise((resolve) => setTimeout(resolve, 100)); - } - } - throw lastError; -} - function normalizeTranscriptForMatch(value: string): string { return value.toLowerCase().replace(/[^a-z0-9]+/g, ""); } @@ -339,40 +325,19 @@ describeLive("openai plugin live", () => { expect(telephony.outputFormat).toBe("pcm"); expect(telephony.sampleRate).toBe(24_000); - const transcripts: string[] = []; - const partials: string[] = []; - const errors: Error[] = []; - const session = realtimeProvider.createSession({ + const speech = convertPcm24kToMulaw8k(telephony.audioBuffer); + const silence = Buffer.alloc(8_000, 0xff); + const audio = Buffer.concat([silence.subarray(0, 4_000), speech, silence]); + const { transcripts, partials } = await runRealtimeSttLiveTest({ + provider: realtimeProvider, providerConfig: { apiKey: OPENAI_API_KEY, language: "en", silenceDurationMs: 500, }, - onPartial: (partial) => partials.push(partial), - onTranscript: (transcript) => transcripts.push(transcript), - onError: (error) => errors.push(error), + audio, }); - try { - await session.connect(); - const speech = convertPcm24kToMulaw8k(telephony.audioBuffer); - const silence = Buffer.alloc(8_000, 0xff); - const audio = Buffer.concat([silence.subarray(0, 4_000), speech, silence]); - for (let offset = 0; offset < audio.byteLength; offset += 160) { - session.sendAudio(audio.subarray(offset, offset + 160)); - await new Promise((resolve) => setTimeout(resolve, 5)); - } - - await waitForLiveExpectation(() => { - if (errors[0]) { - throw errors[0]; - } - expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw"); - }, 60_000); - } finally { - session.close(); - } - const normalized = transcripts.join(" ").toLowerCase(); const compact = normalizeTranscriptForMatch(normalized); expect(compact).toContain("openclaw"); diff --git a/extensions/openai/realtime-transcription-provider.ts b/extensions/openai/realtime-transcription-provider.ts index 7208427d485..db37e431eb3 100644 --- a/extensions/openai/realtime-transcription-provider.ts +++ b/extensions/openai/realtime-transcription-provider.ts @@ -1,20 +1,14 @@ -import { randomUUID } from "node:crypto"; import { - captureWsEvent, - createDebugProxyWebSocketAgent, - resolveDebugProxySettings, -} from "openclaw/plugin-sdk/proxy-capture"; -import type { - RealtimeTranscriptionProviderConfig, - RealtimeTranscriptionProviderPlugin, - RealtimeTranscriptionSession, - RealtimeTranscriptionSessionCreateRequest, + createRealtimeTranscriptionWebSocketSession, + type RealtimeTranscriptionProviderConfig, + type RealtimeTranscriptionProviderPlugin, + type RealtimeTranscriptionSession, + type RealtimeTranscriptionSessionCreateRequest, + type RealtimeTranscriptionWebSocketTransport, } from "openclaw/plugin-sdk/realtime-transcription"; import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input"; -import WebSocket from "ws"; import { asFiniteNumber, - captureOpenAIRealtimeWsClose, readRealtimeErrorDetail, resolveOpenAIProviderConfigRecord, trimToUndefined, @@ -45,6 +39,11 @@ type RealtimeEvent = { error?: unknown; }; +const OPENAI_REALTIME_TRANSCRIPTION_URL = "wss://api.openai.com/v1/realtime?intent=transcription"; +const OPENAI_REALTIME_TRANSCRIPTION_CONNECT_TIMEOUT_MS = 10_000; +const OPENAI_REALTIME_TRANSCRIPTION_MAX_RECONNECT_ATTEMPTS = 5; +const OPENAI_REALTIME_TRANSCRIPTION_RECONNECT_DELAY_MS = 1000; + function normalizeProviderConfig( config: RealtimeTranscriptionProviderConfig, ): OpenAIRealtimeTranscriptionProviderConfig { @@ -67,228 +66,84 @@ function normalizeProviderConfig( }; } -class OpenAIRealtimeTranscriptionSession implements RealtimeTranscriptionSession { - private static readonly MAX_RECONNECT_ATTEMPTS = 5; - private static readonly RECONNECT_DELAY_MS = 1000; - private static readonly CONNECT_TIMEOUT_MS = 10_000; +function createOpenAIRealtimeTranscriptionSession( + config: OpenAIRealtimeTranscriptionSessionConfig, +): RealtimeTranscriptionSession { + let pendingTranscript = ""; - private ws: WebSocket | null = null; - private connected = false; - private closed = false; - private reconnectAttempts = 0; - private pendingTranscript = ""; - private readonly flowId = randomUUID(); - - constructor(private readonly config: OpenAIRealtimeTranscriptionSessionConfig) {} - - async connect(): Promise { - this.closed = false; - this.reconnectAttempts = 0; - await this.doConnect(); - } - - sendAudio(audio: Buffer): void { - if (this.ws?.readyState !== WebSocket.OPEN) { - return; - } - this.sendEvent({ - type: "input_audio_buffer.append", - audio: audio.toString("base64"), - }); - } - - close(): void { - this.closed = true; - this.connected = false; - if (this.ws) { - this.ws.close(1000, "Transcription session closed"); - this.ws = null; - } - } - - isConnected(): boolean { - return this.connected; - } - - private async doConnect(): Promise { - await new Promise((resolve, reject) => { - const url = "wss://api.openai.com/v1/realtime?intent=transcription"; - const debugProxy = resolveDebugProxySettings(); - const proxyAgent = createDebugProxyWebSocketAgent(debugProxy); - this.ws = new WebSocket(url, { - headers: { - Authorization: `Bearer ${this.config.apiKey}`, - "OpenAI-Beta": "realtime=v1", - }, - ...(proxyAgent ? { agent: proxyAgent } : {}), - }); - - const connectTimeout = setTimeout(() => { - reject(new Error("OpenAI realtime transcription connection timeout")); - }, OpenAIRealtimeTranscriptionSession.CONNECT_TIMEOUT_MS); - - this.ws.on("open", () => { - clearTimeout(connectTimeout); - this.connected = true; - this.reconnectAttempts = 0; - captureWsEvent({ - url, - direction: "local", - kind: "ws-open", - flowId: this.flowId, - meta: { - provider: "openai", - capability: "realtime-transcription", - }, - }); - this.sendEvent({ - type: "transcription_session.update", - session: { - input_audio_format: "g711_ulaw", - input_audio_transcription: { - model: this.config.model, - ...(this.config.language ? { language: this.config.language } : {}), - ...(this.config.prompt ? { prompt: this.config.prompt } : {}), - }, - turn_detection: { - type: "server_vad", - threshold: this.config.vadThreshold, - prefix_padding_ms: 300, - silence_duration_ms: this.config.silenceDurationMs, - }, - }, - }); - resolve(); - }); - - this.ws.on("message", (data: Buffer) => { - captureWsEvent({ - url, - direction: "inbound", - kind: "ws-frame", - flowId: this.flowId, - payload: data, - meta: { - provider: "openai", - capability: "realtime-transcription", - }, - }); - try { - this.handleEvent(JSON.parse(data.toString()) as RealtimeEvent); - } catch (error) { - this.config.onError?.(error instanceof Error ? error : new Error(String(error))); - } - }); - - this.ws.on("error", (error) => { - captureWsEvent({ - url, - direction: "local", - kind: "error", - flowId: this.flowId, - errorText: error instanceof Error ? error.message : String(error), - meta: { - provider: "openai", - capability: "realtime-transcription", - }, - }); - if (!this.connected) { - clearTimeout(connectTimeout); - reject(error); - return; - } - this.config.onError?.(error instanceof Error ? error : new Error(String(error))); - }); - - this.ws.on("close", (code, reasonBuffer) => { - captureOpenAIRealtimeWsClose({ - url, - flowId: this.flowId, - capability: "realtime-transcription", - code, - reasonBuffer, - }); - this.connected = false; - if (this.closed) { - return; - } - void this.attemptReconnect(); - }); - }); - } - - private async attemptReconnect(): Promise { - if (this.closed) { - return; - } - if (this.reconnectAttempts >= OpenAIRealtimeTranscriptionSession.MAX_RECONNECT_ATTEMPTS) { - this.config.onError?.(new Error("OpenAI realtime transcription reconnect limit reached")); - return; - } - this.reconnectAttempts += 1; - const delay = - OpenAIRealtimeTranscriptionSession.RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1); - await new Promise((resolve) => setTimeout(resolve, delay)); - if (this.closed) { - return; - } - try { - await this.doConnect(); - } catch (error) { - this.config.onError?.(error instanceof Error ? error : new Error(String(error))); - await this.attemptReconnect(); - } - } - - private handleEvent(event: RealtimeEvent): void { + const handleEvent = (event: RealtimeEvent) => { switch (event.type) { case "conversation.item.input_audio_transcription.delta": if (event.delta) { - this.pendingTranscript += event.delta; - this.config.onPartial?.(this.pendingTranscript); + pendingTranscript += event.delta; + config.onPartial?.(pendingTranscript); } return; case "conversation.item.input_audio_transcription.completed": if (event.transcript) { - this.config.onTranscript?.(event.transcript); + config.onTranscript?.(event.transcript); } - this.pendingTranscript = ""; + pendingTranscript = ""; return; case "input_audio_buffer.speech_started": - this.pendingTranscript = ""; - this.config.onSpeechStart?.(); + pendingTranscript = ""; + config.onSpeechStart?.(); return; case "error": { const detail = readRealtimeErrorDetail(event.error); - this.config.onError?.(new Error(detail)); + config.onError?.(new Error(detail)); return; } default: return; } - } + }; - private sendEvent(event: unknown): void { - if (this.ws?.readyState === WebSocket.OPEN) { - const payload = JSON.stringify(event); - captureWsEvent({ - url: "wss://api.openai.com/v1/realtime?intent=transcription", - direction: "outbound", - kind: "ws-frame", - flowId: this.flowId, - payload, - meta: { - provider: "openai", - capability: "realtime-transcription", + return createRealtimeTranscriptionWebSocketSession({ + providerId: "openai", + callbacks: config, + url: OPENAI_REALTIME_TRANSCRIPTION_URL, + headers: { + Authorization: `Bearer ${config.apiKey}`, + "OpenAI-Beta": "realtime=v1", + }, + readyOnOpen: true, + connectTimeoutMs: OPENAI_REALTIME_TRANSCRIPTION_CONNECT_TIMEOUT_MS, + maxReconnectAttempts: OPENAI_REALTIME_TRANSCRIPTION_MAX_RECONNECT_ATTEMPTS, + reconnectDelayMs: OPENAI_REALTIME_TRANSCRIPTION_RECONNECT_DELAY_MS, + connectTimeoutMessage: "OpenAI realtime transcription connection timeout", + reconnectLimitMessage: "OpenAI realtime transcription reconnect limit reached", + sendAudio: (audio, transport) => { + transport.sendJson({ + type: "input_audio_buffer.append", + audio: audio.toString("base64"), + }); + }, + onOpen: (transport: RealtimeTranscriptionWebSocketTransport) => { + transport.sendJson({ + type: "transcription_session.update", + session: { + input_audio_format: "g711_ulaw", + input_audio_transcription: { + model: config.model, + ...(config.language ? { language: config.language } : {}), + ...(config.prompt ? { prompt: config.prompt } : {}), + }, + turn_detection: { + type: "server_vad", + threshold: config.vadThreshold, + prefix_padding_ms: 300, + silence_duration_ms: config.silenceDurationMs, + }, }, }); - this.ws.send(payload); - } - } + }, + onMessage: handleEvent, + }); } export function buildOpenAIRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin { @@ -306,7 +161,7 @@ export function buildOpenAIRealtimeTranscriptionProvider(): RealtimeTranscriptio if (!apiKey) { throw new Error("OpenAI API key missing"); } - return new OpenAIRealtimeTranscriptionSession({ + return createOpenAIRealtimeTranscriptionSession({ ...req, apiKey, language: config.language, diff --git a/extensions/xai/.openclaw-runtime-deps.json b/extensions/xai/.openclaw-runtime-deps.json new file mode 100644 index 00000000000..8b34547b4bb --- /dev/null +++ b/extensions/xai/.openclaw-runtime-deps.json @@ -0,0 +1,3 @@ +{ + "specs": ["@mariozechner/pi-ai@0.68.1", "@sinclair/typebox@0.34.49", "ws@^8.20.0"] +} diff --git a/extensions/xai/realtime-transcription-provider.ts b/extensions/xai/realtime-transcription-provider.ts index 23b369f1d16..bae22582b29 100644 --- a/extensions/xai/realtime-transcription-provider.ts +++ b/extensions/xai/realtime-transcription-provider.ts @@ -1,18 +1,13 @@ -import { randomUUID } from "node:crypto"; import { - captureWsEvent, - createDebugProxyWebSocketAgent, - resolveDebugProxySettings, -} from "openclaw/plugin-sdk/proxy-capture"; -import type { - RealtimeTranscriptionProviderConfig, - RealtimeTranscriptionProviderPlugin, - RealtimeTranscriptionSession, - RealtimeTranscriptionSessionCreateRequest, + createRealtimeTranscriptionWebSocketSession, + type RealtimeTranscriptionProviderConfig, + type RealtimeTranscriptionProviderPlugin, + type RealtimeTranscriptionSession, + type RealtimeTranscriptionSessionCreateRequest, + type RealtimeTranscriptionWebSocketTransport, } from "openclaw/plugin-sdk/realtime-transcription"; import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; -import WebSocket from "ws"; import { XAI_BASE_URL } from "./model-definitions.js"; type XaiRealtimeTranscriptionEncoding = "pcm" | "mulaw" | "alaw"; @@ -160,307 +155,86 @@ function readTranscriptText(event: XaiRealtimeTranscriptionEvent): string | unde return normalizeOptionalString(event.text ?? event.transcript); } -class XaiRealtimeTranscriptionSession implements RealtimeTranscriptionSession { - private ws: WebSocket | null = null; - private connected = false; - private ready = false; - private closed = false; - private reconnectAttempts = 0; - private queuedAudio: Buffer[] = []; - private queuedBytes = 0; - private closeTimer: ReturnType | undefined; - private lastTranscript: string | undefined; - private speechStarted = false; - private reconnecting = false; - private readonly flowId = randomUUID(); +function createXaiRealtimeTranscriptionSession( + config: XaiRealtimeTranscriptionSessionConfig, +): RealtimeTranscriptionSession { + let lastTranscript: string | undefined; + let speechStarted = false; - constructor(private readonly config: XaiRealtimeTranscriptionSessionConfig) {} - - async connect(): Promise { - this.closed = false; - this.reconnectAttempts = 0; - await this.doConnect(); - } - - sendAudio(audio: Buffer): void { - if (this.closed) { + const emitTranscript = (text: string) => { + if (text === lastTranscript) { return; } - if (this.ws?.readyState === WebSocket.OPEN && this.ready) { - this.sendAudioFrame(audio); + lastTranscript = text; + config.onTranscript?.(text); + }; + + const handleEvent = ( + event: XaiRealtimeTranscriptionEvent, + transport: RealtimeTranscriptionWebSocketTransport, + ) => { + if (event.type === "transcript.created") { + transport.markReady(); return; } - this.queueAudio(audio); - } - - close(): void { - this.closed = true; - this.connected = false; - this.queuedAudio = []; - this.queuedBytes = 0; - if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { - this.forceClose(); + if (!transport.isReady() && event.type === "error") { + transport.failConnect(new Error(readErrorDetail(event.error ?? event.message))); return; } - this.sendEvent({ type: "audio.done" }); - this.closeTimer = setTimeout(() => this.forceClose(), XAI_REALTIME_STT_CLOSE_TIMEOUT_MS); - } - - isConnected(): boolean { - return this.connected; - } - - private async doConnect(): Promise { - await new Promise((resolve, reject) => { - const url = toXaiRealtimeWsUrl(this.config); - const debugProxy = resolveDebugProxySettings(); - const proxyAgent = createDebugProxyWebSocketAgent(debugProxy); - let settled = false; - let opened = false; - const finishConnect = () => { - if (settled) { - return; - } - settled = true; - clearTimeout(connectTimeout); - this.ready = true; - this.flushQueuedAudio(); - resolve(); - }; - const failConnect = (error: Error) => { - if (settled) { - return; - } - settled = true; - clearTimeout(connectTimeout); - this.config.onError?.(error); - this.closed = true; - this.forceClose(); - reject(error); - }; - this.ready = false; - this.ws = new WebSocket(url, { - headers: { - Authorization: `Bearer ${this.config.apiKey}`, - }, - ...(proxyAgent ? { agent: proxyAgent } : {}), - }); - - const connectTimeout = setTimeout(() => { - failConnect(new Error("xAI realtime transcription connection timeout")); - }, XAI_REALTIME_STT_CONNECT_TIMEOUT_MS); - - this.ws.on("open", () => { - opened = true; - this.connected = true; - this.reconnectAttempts = 0; - captureWsEvent({ - url, - direction: "local", - kind: "ws-open", - flowId: this.flowId, - meta: { provider: "xai", capability: "realtime-transcription" }, - }); - }); - - this.ws.on("message", (data: Buffer) => { - captureWsEvent({ - url, - direction: "inbound", - kind: "ws-frame", - flowId: this.flowId, - payload: data, - meta: { provider: "xai", capability: "realtime-transcription" }, - }); - try { - const event = JSON.parse(data.toString()) as XaiRealtimeTranscriptionEvent; - if (event.type === "transcript.created") { - finishConnect(); - return; - } - if (!this.ready && event.type === "error") { - failConnect(new Error(readErrorDetail(event.error ?? event.message))); - return; - } - this.handleEvent(event); - } catch (error) { - this.config.onError?.(error instanceof Error ? error : new Error(String(error))); - } - }); - - this.ws.on("error", (error) => { - captureWsEvent({ - url, - direction: "local", - kind: "error", - flowId: this.flowId, - errorText: error instanceof Error ? error.message : String(error), - meta: { provider: "xai", capability: "realtime-transcription" }, - }); - if (!this.ready) { - failConnect(error instanceof Error ? error : new Error(String(error))); - return; - } - this.config.onError?.(error instanceof Error ? error : new Error(String(error))); - }); - - this.ws.on("close", () => { - clearTimeout(connectTimeout); - this.connected = false; - this.ready = false; - if (this.closeTimer) { - clearTimeout(this.closeTimer); - this.closeTimer = undefined; - } - if (this.closed) { - return; - } - if (!opened || !settled) { - return; - } - void this.attemptReconnect(); - }); - }); - } - - private async attemptReconnect(): Promise { - if (this.closed) { - return; - } - if (this.reconnecting) { - return; - } - if (this.reconnectAttempts >= XAI_REALTIME_STT_MAX_RECONNECT_ATTEMPTS) { - this.config.onError?.(new Error("xAI realtime transcription reconnect limit reached")); - return; - } - this.reconnectAttempts += 1; - const delay = XAI_REALTIME_STT_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1); - this.reconnecting = true; - try { - await new Promise((resolve) => setTimeout(resolve, delay)); - if (this.closed) { - return; - } - await this.doConnect(); - } catch { - if (!this.closed) { - this.reconnecting = false; - await this.attemptReconnect(); - return; - } - } finally { - this.reconnecting = false; - } - } - - private handleEvent(event: XaiRealtimeTranscriptionEvent): void { switch (event.type) { case "transcript.partial": { const text = readTranscriptText(event); if (!text) { return; } - if (!this.speechStarted) { - this.speechStarted = true; - this.config.onSpeechStart?.(); + if (!speechStarted) { + speechStarted = true; + config.onSpeechStart?.(); } if (event.is_final && event.speech_final) { - this.emitTranscript(text); - this.speechStarted = false; + emitTranscript(text); + speechStarted = false; return; } - this.config.onPartial?.(text); + config.onPartial?.(text); return; } case "transcript.done": { const text = readTranscriptText(event); if (text) { - this.emitTranscript(text); + emitTranscript(text); } - this.forceClose(); + transport.closeNow(); return; } case "error": - this.config.onError?.(new Error(readErrorDetail(event.error ?? event.message))); + config.onError?.(new Error(readErrorDetail(event.error ?? event.message))); return; default: return; } - } + }; - private emitTranscript(text: string): void { - if (text === this.lastTranscript) { - return; - } - this.lastTranscript = text; - this.config.onTranscript?.(text); - } - - private queueAudio(audio: Buffer): void { - if (audio.byteLength === 0) { - return; - } - this.queuedAudio.push(Buffer.from(audio)); - this.queuedBytes += audio.byteLength; - while (this.queuedBytes > XAI_REALTIME_STT_MAX_QUEUED_BYTES && this.queuedAudio.length > 0) { - const dropped = this.queuedAudio.shift(); - this.queuedBytes -= dropped?.byteLength ?? 0; - } - } - - private flushQueuedAudio(): void { - for (const audio of this.queuedAudio) { - this.sendAudioFrame(audio); - } - this.queuedAudio = []; - this.queuedBytes = 0; - } - - private sendAudioFrame(audio: Buffer): void { - if (this.ws?.readyState !== WebSocket.OPEN) { - this.queueAudio(audio); - return; - } - captureWsEvent({ - url: toXaiRealtimeWsUrl(this.config), - direction: "outbound", - kind: "ws-frame", - flowId: this.flowId, - payload: audio, - meta: { provider: "xai", capability: "realtime-transcription" }, - }); - this.ws.send(audio); - } - - private sendEvent(event: unknown): void { - if (this.ws?.readyState !== WebSocket.OPEN) { - return; - } - const payload = JSON.stringify(event); - captureWsEvent({ - url: toXaiRealtimeWsUrl(this.config), - direction: "outbound", - kind: "ws-frame", - flowId: this.flowId, - payload, - meta: { provider: "xai", capability: "realtime-transcription" }, - }); - this.ws.send(payload); - } - - private forceClose(): void { - if (this.closeTimer) { - clearTimeout(this.closeTimer); - this.closeTimer = undefined; - } - this.connected = false; - this.ready = false; - if (this.ws) { - this.ws.close(1000, "Transcription session closed"); - this.ws = null; - } - } + return createRealtimeTranscriptionWebSocketSession({ + providerId: "xai", + callbacks: config, + url: () => toXaiRealtimeWsUrl(config), + headers: { Authorization: `Bearer ${config.apiKey}` }, + connectTimeoutMs: XAI_REALTIME_STT_CONNECT_TIMEOUT_MS, + closeTimeoutMs: XAI_REALTIME_STT_CLOSE_TIMEOUT_MS, + maxReconnectAttempts: XAI_REALTIME_STT_MAX_RECONNECT_ATTEMPTS, + reconnectDelayMs: XAI_REALTIME_STT_RECONNECT_DELAY_MS, + maxQueuedBytes: XAI_REALTIME_STT_MAX_QUEUED_BYTES, + connectTimeoutMessage: "xAI realtime transcription connection timeout", + reconnectLimitMessage: "xAI realtime transcription reconnect limit reached", + sendAudio: (audio, transport) => { + transport.sendBinary(audio); + }, + onClose: (transport) => { + transport.sendJson({ type: "audio.done" }); + }, + onMessage: handleEvent, + }); } export function buildXaiRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin { @@ -478,7 +252,7 @@ export function buildXaiRealtimeTranscriptionProvider(): RealtimeTranscriptionPr if (!apiKey) { throw new Error("xAI API key missing"); } - return new XaiRealtimeTranscriptionSession({ + return createXaiRealtimeTranscriptionSession({ ...req, apiKey, baseUrl: normalizeXaiRealtimeBaseUrl(config.baseUrl), diff --git a/extensions/xai/stt.ts b/extensions/xai/stt.ts index 4394be1bcf3..c1c1bf3309b 100644 --- a/extensions/xai/stt.ts +++ b/extensions/xai/stt.ts @@ -5,6 +5,7 @@ import type { } from "openclaw/plugin-sdk/media-understanding"; import { assertOkOrThrowHttpError, + buildAudioTranscriptionFormData, postTranscriptionRequest, resolveProviderHttpRequestConfig, requireTranscriptionText, @@ -41,19 +42,17 @@ export async function transcribeXaiAudio( transport: "media-understanding", }); - const form = new FormData(); - const blob = new Blob([new Uint8Array(params.buffer)], { - type: params.mime ?? "application/octet-stream", - }); - form.append("file", blob, params.fileName || "audio"); const model = normalizeOptionalString(params.model); - if (model) { - form.append("model", model); - } const language = normalizeOptionalString(params.language); - if (language) { - form.append("language", language); - } + const form = buildAudioTranscriptionFormData({ + buffer: params.buffer, + fileName: params.fileName, + mime: params.mime, + fields: { + model, + language, + }, + }); const { response, release } = await postTranscriptionRequest({ url: `${baseUrl}/stt`, diff --git a/extensions/xai/xai.live.test.ts b/extensions/xai/xai.live.test.ts index ec8ce396f51..ed282df57a6 100644 --- a/extensions/xai/xai.live.test.ts +++ b/extensions/xai/xai.live.test.ts @@ -8,6 +8,7 @@ import { registerProviderPlugin, requireRegisteredProvider, } from "../../test/helpers/plugins/provider-registration.js"; +import { runRealtimeSttLiveTest } from "../../test/helpers/stt-live-audio.js"; import plugin from "./index.js"; import { XAI_DEFAULT_STT_MODEL } from "./stt.js"; @@ -66,21 +67,6 @@ const registerXaiPlugin = () => name: "xAI Provider", }); -async function waitForLiveExpectation(expectation: () => void, timeoutMs = 30_000) { - const started = Date.now(); - let lastError: unknown; - while (Date.now() - started < timeoutMs) { - try { - expectation(); - return; - } catch (error) { - lastError = error; - await new Promise((resolve) => setTimeout(resolve, 100)); - } - } - throw lastError; -} - function normalizeTranscriptForMatch(value: string): string { return value.toLowerCase().replace(/[^a-z0-9]+/g, ""); } @@ -216,10 +202,9 @@ describeLive("xai plugin live", () => { expect(telephony.outputFormat).toBe("pcm"); expect(telephony.sampleRate).toBe(24_000); - const transcripts: string[] = []; - const partials: string[] = []; - const errors: Error[] = []; - const session = realtimeProvider.createSession({ + const chunkSize = Math.max(1, Math.floor(telephony.sampleRate * 2 * 0.1)); + const { transcripts, partials } = await runRealtimeSttLiveTest({ + provider: realtimeProvider, providerConfig: { apiKey: XAI_API_KEY, baseUrl: "https://api.x.ai/v1", @@ -229,26 +214,12 @@ describeLive("xai plugin live", () => { endpointingMs: 500, language: "en", }, - onPartial: (partial) => partials.push(partial), - onTranscript: (transcript) => transcripts.push(transcript), - onError: (error) => errors.push(error), + audio: telephony.audioBuffer, + chunkSize, + delayMs: 20, + closeBeforeWait: true, }); - await session.connect(); - const audio = telephony.audioBuffer; - const chunkSize = Math.max(1, Math.floor(telephony.sampleRate * 2 * 0.1)); - for (let offset = 0; offset < audio.byteLength; offset += chunkSize) { - session.sendAudio(audio.subarray(offset, offset + chunkSize)); - await new Promise((resolve) => setTimeout(resolve, 20)); - } - session.close(); - - await waitForLiveExpectation(() => { - if (errors[0]) { - throw errors[0]; - } - expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw"); - }, 60_000); const normalized = transcripts.join(" ").toLowerCase(); const compact = normalizeTranscriptForMatch(normalized); expect(compact).toContain("openclaw"); diff --git a/scripts/test-projects.test-support.mjs b/scripts/test-projects.test-support.mjs index 70cb4819054..bfcee88de2e 100644 --- a/scripts/test-projects.test-support.mjs +++ b/scripts/test-projects.test-support.mjs @@ -15,6 +15,7 @@ import { isFeishuExtensionRoot } from "../test/vitest/vitest.extension-feishu-pa import { isIrcExtensionRoot } from "../test/vitest/vitest.extension-irc-paths.mjs"; import { isMatrixExtensionRoot } from "../test/vitest/vitest.extension-matrix-paths.mjs"; import { isMattermostExtensionRoot } from "../test/vitest/vitest.extension-mattermost-paths.mjs"; +import { isMediaExtensionRoot } from "../test/vitest/vitest.extension-media-paths.mjs"; import { isMemoryExtensionRoot } from "../test/vitest/vitest.extension-memory-paths.mjs"; import { isMessagingExtensionRoot } from "../test/vitest/vitest.extension-messaging-paths.mjs"; import { isMsTeamsExtensionRoot } from "../test/vitest/vitest.extension-msteams-paths.mjs"; @@ -73,6 +74,7 @@ const EXTENSION_IMESSAGE_VITEST_CONFIG = "test/vitest/vitest.extension-imessage. const EXTENSION_IRC_VITEST_CONFIG = "test/vitest/vitest.extension-irc.config.ts"; const EXTENSION_LINE_VITEST_CONFIG = "test/vitest/vitest.extension-line.config.ts"; const EXTENSION_MATTERMOST_VITEST_CONFIG = "test/vitest/vitest.extension-mattermost.config.ts"; +const EXTENSION_MEDIA_VITEST_CONFIG = "test/vitest/vitest.extension-media.config.ts"; const EXTENSION_MATRIX_VITEST_CONFIG = "test/vitest/vitest.extension-matrix.config.ts"; const EXTENSION_MEMORY_VITEST_CONFIG = "test/vitest/vitest.extension-memory.config.ts"; const EXTENSION_MSTEAMS_VITEST_CONFIG = "test/vitest/vitest.extension-msteams.config.ts"; @@ -143,6 +145,7 @@ const VITEST_CONFIG_BY_KIND = { extensionLine: EXTENSION_LINE_VITEST_CONFIG, extensionMatrix: EXTENSION_MATRIX_VITEST_CONFIG, extensionMattermost: EXTENSION_MATTERMOST_VITEST_CONFIG, + extensionMedia: EXTENSION_MEDIA_VITEST_CONFIG, extensionMemory: EXTENSION_MEMORY_VITEST_CONFIG, extensionMessaging: EXTENSION_MESSAGING_VITEST_CONFIG, extensionMsTeams: EXTENSION_MSTEAMS_VITEST_CONFIG, @@ -582,6 +585,9 @@ function classifyTarget(arg, cwd) { if (isMatrixExtensionRoot(extensionRoot)) { return "extensionMatrix"; } + if (isMediaExtensionRoot(extensionRoot)) { + return "extensionMedia"; + } if (isMemoryExtensionRoot(extensionRoot)) { return "extensionMemory"; } @@ -841,6 +847,7 @@ export function buildVitestRunPlans( "extensionWhatsApp", "extensionZalo", "extensionMatrix", + "extensionMedia", "extensionMemory", "extensionMsTeams", "extensionMessaging", diff --git a/src/media-understanding/openai-compatible-audio.ts b/src/media-understanding/openai-compatible-audio.ts index 284d004bc83..6ae7cf8a37c 100644 --- a/src/media-understanding/openai-compatible-audio.ts +++ b/src/media-understanding/openai-compatible-audio.ts @@ -1,6 +1,6 @@ -import path from "node:path"; import { assertOkOrThrowHttpError, + buildAudioTranscriptionFormData, postTranscriptionRequest, resolveProviderHttpRequestConfig, requireTranscriptionText, @@ -18,20 +18,6 @@ function resolveModel(model: string | undefined, fallback: string): string { return trimmed || fallback; } -function resolveUploadFileName(fileName?: string, mime?: string): string { - const trimmed = fileName?.trim(); - const baseName = trimmed ? path.basename(trimmed) : "audio"; - const lowerMime = mime?.trim().toLowerCase(); - - if (/\.aac$/i.test(baseName)) { - return `${baseName.slice(0, -4) || "audio"}.m4a`; - } - if (!path.extname(baseName) && lowerMime === "audio/aac") { - return `${baseName || "audio"}.m4a`; - } - return baseName; -} - export async function transcribeOpenAiCompatibleAudio( params: OpenAiCompatibleAudioParams, ): Promise { @@ -53,20 +39,16 @@ export async function transcribeOpenAiCompatibleAudio( const url = `${baseUrl}/audio/transcriptions`; const model = resolveModel(params.model, params.defaultModel); - const form = new FormData(); - const fileName = resolveUploadFileName(params.fileName, params.mime); - const bytes = new Uint8Array(params.buffer); - const blob = new Blob([bytes], { - type: params.mime ?? "application/octet-stream", + const form = buildAudioTranscriptionFormData({ + buffer: params.buffer, + fileName: params.fileName, + mime: params.mime, + fields: { + model, + language: params.language, + prompt: params.prompt, + }, }); - form.append("file", blob, fileName); - form.append("model", model); - if (params.language?.trim()) { - form.append("language", params.language.trim()); - } - if (params.prompt?.trim()) { - form.append("prompt", params.prompt.trim()); - } const { response: res, release } = await postTranscriptionRequest({ url, diff --git a/src/media-understanding/shared.ts b/src/media-understanding/shared.ts index 6e1df75ad8f..785b77e62f8 100644 --- a/src/media-understanding/shared.ts +++ b/src/media-understanding/shared.ts @@ -1,3 +1,4 @@ +import path from "node:path"; import type { ProviderRequestCapability, ProviderRequestTransport, @@ -22,6 +23,41 @@ const MAX_ERROR_RESPONSE_BYTES = 4096; const DEFAULT_GUARDED_HTTP_TIMEOUT_MS = 60_000; const MAX_AUDIT_CONTEXT_CHARS = 80; +export function resolveAudioTranscriptionUploadFileName(fileName?: string, mime?: string): string { + const trimmed = fileName?.trim(); + const baseName = trimmed ? path.basename(trimmed) : "audio"; + const lowerMime = mime?.trim().toLowerCase(); + + if (/\.aac$/i.test(baseName)) { + return `${baseName.slice(0, -4) || "audio"}.m4a`; + } + if (!path.extname(baseName) && lowerMime === "audio/aac") { + return `${baseName || "audio"}.m4a`; + } + return baseName; +} + +export function buildAudioTranscriptionFormData(params: { + buffer: Buffer; + fileName?: string; + mime?: string; + fields?: Record; +}): FormData { + const form = new FormData(); + const bytes = new Uint8Array(params.buffer); + const blob = new Blob([bytes], { + type: params.mime ?? "application/octet-stream", + }); + form.append("file", blob, resolveAudioTranscriptionUploadFileName(params.fileName, params.mime)); + for (const [name, value] of Object.entries(params.fields ?? {})) { + const text = typeof value === "string" ? value.trim() : value == null ? "" : String(value); + if (text) { + form.append(name, text); + } + } + return form; +} + export type ProviderOperationDeadline = { deadlineAtMs?: number; label: string; diff --git a/src/plugin-sdk/provider-http.ts b/src/plugin-sdk/provider-http.ts index 60c3841f62e..9847a72e92e 100644 --- a/src/plugin-sdk/provider-http.ts +++ b/src/plugin-sdk/provider-http.ts @@ -3,6 +3,7 @@ export { assertOkOrThrowHttpError, + buildAudioTranscriptionFormData, createProviderOperationDeadline, fetchWithTimeout, fetchWithTimeoutGuarded, @@ -12,6 +13,7 @@ export { postTranscriptionRequest, resolveProviderOperationTimeoutMs, resolveProviderHttpRequestConfig, + resolveAudioTranscriptionUploadFileName, requireTranscriptionText, waitProviderOperationPollInterval, } from "../media-understanding/shared.js"; diff --git a/src/realtime-transcription/websocket-session.ts b/src/realtime-transcription/websocket-session.ts index f6efc8662b0..f6c2f0c674e 100644 --- a/src/realtime-transcription/websocket-session.ts +++ b/src/realtime-transcription/websocket-session.ts @@ -246,10 +246,11 @@ class WebSocketRealtimeTranscriptionSession implements RealtimeTranscript this.emitError(normalized); }); - this.ws.on("close", () => { + this.ws.on("close", (code, reasonBuffer) => { if (connectTimeout) { clearTimeout(connectTimeout); } + this.captureClose(code, reasonBuffer); this.connected = false; this.ready = false; if (this.closeTimer) { @@ -393,6 +394,21 @@ class WebSocketRealtimeTranscriptionSession implements RealtimeTranscript meta: { provider: this.options.providerId, capability: "realtime-transcription" }, }); } + + private captureClose(code: number, reasonBuffer: Buffer): void { + captureWsEvent({ + url: this.currentUrl, + direction: "local", + kind: "ws-close", + flowId: this.flowId, + closeCode: code, + meta: { + provider: this.options.providerId, + capability: "realtime-transcription", + reason: reasonBuffer.length > 0 ? reasonBuffer.toString("utf8") : undefined, + }, + }); + } } export function createRealtimeTranscriptionWebSocketSession( diff --git a/test/helpers/stt-live-audio.ts b/test/helpers/stt-live-audio.ts index d03b5d99497..e9c87d13a76 100644 --- a/test/helpers/stt-live-audio.ts +++ b/test/helpers/stt-live-audio.ts @@ -1,3 +1,9 @@ +import type { + RealtimeTranscriptionProviderConfig, + RealtimeTranscriptionProviderPlugin, +} from "openclaw/plugin-sdk/realtime-transcription"; +import { expect } from "vitest"; + const DEFAULT_ELEVENLABS_BASE_URL = "https://api.elevenlabs.io"; const DEFAULT_ELEVENLABS_VOICE_ID = "pMsXgVXv3BLzUgSXRplE"; const DEFAULT_ELEVENLABS_TTS_MODEL_ID = "eleven_multilingual_v2"; @@ -75,3 +81,50 @@ export async function streamAudioForLiveTest(params: { await new Promise((resolve) => setTimeout(resolve, delayMs)); } } + +export async function runRealtimeSttLiveTest(params: { + provider: RealtimeTranscriptionProviderPlugin; + providerConfig: RealtimeTranscriptionProviderConfig; + audio: Buffer; + expectedNormalizedText?: string; + timeoutMs?: number; + closeBeforeWait?: boolean; + chunkSize?: number; + delayMs?: number; +}): Promise<{ transcripts: string[]; partials: string[]; errors: Error[] }> { + const transcripts: string[] = []; + const partials: string[] = []; + const errors: Error[] = []; + const expected = params.expectedNormalizedText ?? "openclaw"; + const session = params.provider.createSession({ + providerConfig: params.providerConfig, + onPartial: (partial) => partials.push(partial), + onTranscript: (transcript) => transcripts.push(transcript), + onError: (error) => errors.push(error), + }); + + try { + await session.connect(); + await streamAudioForLiveTest({ + audio: params.audio, + sendAudio: (chunk) => session.sendAudio(chunk), + chunkSize: params.chunkSize, + delayMs: params.delayMs, + }); + if (params.closeBeforeWait) { + session.close(); + } + + await waitForLiveExpectation(() => { + if (errors[0]) { + throw errors[0]; + } + expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain(expected); + }, params.timeoutMs ?? 60_000); + } finally { + session.close(); + } + + expect(partials.length + transcripts.length).toBeGreaterThan(0); + return { transcripts, partials, errors }; +} diff --git a/test/vitest/vitest.scoped-config.ts b/test/vitest/vitest.scoped-config.ts index 342a4d096cc..93eeb9bcf9a 100644 --- a/test/vitest/vitest.scoped-config.ts +++ b/test/vitest/vitest.scoped-config.ts @@ -68,6 +68,7 @@ const SCOPED_PROJECT_GROUP_ORDER_BY_NAME = new Map( "extension-line", "extension-mattermost", "extension-matrix", + "extension-media", "extension-memory", "extension-messaging", "extension-msteams",