diff --git a/docs/docs.json b/docs/docs.json index 57a9b05bead..bc2a3ba2896 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -1211,6 +1211,7 @@ "plugins/sdk-subpaths", "plugins/sdk-entrypoints", "plugins/sdk-runtime", + "plugins/sdk-channel-turn", "plugins/sdk-agent-harness", "plugins/sdk-setup", "plugins/sdk-testing", diff --git a/docs/plugins/sdk-channel-plugins.md b/docs/plugins/sdk-channel-plugins.md index 0d767819844..f645d2203d9 100644 --- a/docs/plugins/sdk-channel-plugins.md +++ b/docs/plugins/sdk-channel-plugins.md @@ -681,6 +681,9 @@ Write colocated tests in `src/channel.test.ts`: TTS, STT, media, subagent via api.runtime + + Shared inbound turn lifecycle: ingest, resolve, record, dispatch, finalize + diff --git a/docs/plugins/sdk-channel-turn.md b/docs/plugins/sdk-channel-turn.md new file mode 100644 index 00000000000..97cca37cd97 --- /dev/null +++ b/docs/plugins/sdk-channel-turn.md @@ -0,0 +1,393 @@ +--- +summary: "runtime.channel.turn -- the shared inbound turn kernel that bundled and third-party channel plugins use to record, dispatch, and finalize agent turns" +title: "Channel turn kernel" +sidebarTitle: "Channel turn" +read_when: + - You are building a channel plugin and want the shared inbound turn lifecycle + - You are migrating a channel monitor off hand-rolled record/dispatch glue + - You need to understand admission, ingest, classify, preflight, resolve, record, dispatch, and finalize stages +--- + +The channel turn kernel is the shared inbound state machine that turns a normalized platform event into an agent turn. Channel plugins provide the platform facts and the delivery callback. Core owns the orchestration: ingest, classify, preflight, resolve, authorize, assemble, record, dispatch, and finalize. + +Use this when your plugin is on the inbound message hot path. For non-message events (slash commands, modals, button interactions, lifecycle events, reactions, voice state), keep them plugin-local. The kernel only owns events that may become an agent text turn. + + + The kernel is reached through the injected plugin runtime as `runtime.channel.turn.*`. The plugin runtime type is exported from `openclaw/plugin-sdk/core`, so third-party native plugins can use these entry points the same way bundled channel plugins do. + + +## Why a shared kernel + +Channel plugins repeat the same inbound flow: normalize, route, gate, build a context, record session metadata, dispatch the agent turn, finalize delivery state. Without a shared kernel, a change to mention gating, tool-only visible replies, session metadata, pending history, or dispatch finalization has to be applied per channel. + +The kernel keeps four concepts deliberately separate: + +- `ConversationFacts`: where the message came from +- `RouteFacts`: which agent and session should process it +- `ReplyPlanFacts`: where visible replies should go +- `MessageFacts`: what body and supplemental context the agent should see + +Slack DMs, Telegram topics, Matrix threads, and Feishu topic sessions all distinguish these in practice. Treating them as one identifier causes drift over time. + +## Stage lifecycle + +The kernel runs the same fixed pipeline regardless of channel: + +1. `ingest` -- adapter converts a raw platform event into `NormalizedTurnInput` +2. `classify` -- adapter declares whether this event can start an agent turn +3. `preflight` -- adapter does dedupe, self-echo, hydration, debounce, decryption, partial fact prefill +4. `resolve` -- adapter returns a fully assembled turn (route, reply plan, message, delivery) +5. `authorize` -- DM, group, mention, and command policy applied to the assembled facts +6. `assemble` -- `FinalizedMsgContext` built from the facts via `buildContext` +7. `record` -- inbound session metadata and last route persisted +8. `dispatch` -- agent turn executed through the buffered block dispatcher +9. `finalize` -- adapter `onFinalize` runs even on dispatch error + +Each stage emits a structured log event when a `log` callback is supplied. See [Observability](#observability). + +## Admission kinds + +The kernel does not throw when a turn is gated. It returns a `ChannelTurnAdmission`: + +| Kind | When | +| ------------- | -------------------------------------------------------------------------------------------------------------------------------------------- | +| `dispatch` | Turn is admitted. Agent turn runs and the visible reply path is exercised. | +| `observeOnly` | Turn runs end-to-end but the delivery adapter sends nothing visible. Used for broadcast observer agents and other passive multi-agent flows. | +| `handled` | A platform event was consumed locally (lifecycle, reaction, button, modal). Kernel skips dispatch. | +| `drop` | Skip path. Optionally `recordHistory: true` keeps the message in pending group history so a future mention has context. | + +Admission can come from `classify` (event class said it cannot start a turn), from `preflight` (dedupe, self-echo, missing mention with history record), or from `resolveTurn` itself. + +## Entry points + +The runtime exposes three preferred entry points so adapters can opt in at the level that matches the channel. + +```typescript +runtime.channel.turn.run(...) // adapter-driven full pipeline +runtime.channel.turn.runPrepared(...) // channel owns dispatch; kernel runs record + finalize +runtime.channel.turn.buildContext(...) // pure facts to FinalizedMsgContext mapping +``` + +Two older runtime helpers remain available for Plugin SDK compatibility: + +```typescript +runtime.channel.turn.runResolved(...) // deprecated compatibility alias; prefer run +runtime.channel.turn.dispatchAssembled(...) // deprecated compatibility alias; prefer run or runPrepared +``` + +### run + +Use when your channel can express its inbound flow as a `ChannelTurnAdapter`. The adapter has callbacks for `ingest`, optional `classify`, optional `preflight`, mandatory `resolveTurn`, and optional `onFinalize`. + +```typescript +await runtime.channel.turn.run({ + channel: "tlon", + accountId, + raw: platformEvent, + adapter: { + ingest(raw) { + return { + id: raw.messageId, + timestamp: raw.timestamp, + rawText: raw.body, + textForAgent: raw.body, + }; + }, + classify(input) { + return { kind: "message", canStartAgentTurn: input.rawText.length > 0 }; + }, + async preflight(input, eventClass) { + if (await isDuplicate(input.id)) { + return { admission: { kind: "drop", reason: "dedupe" } }; + } + return {}; + }, + resolveTurn(input) { + return buildAssembledTurn(input); + }, + onFinalize(result) { + clearPendingGroupHistory(result); + }, + }, +}); +``` + +`run` is the right shape when the channel has small adapter logic and benefits from owning the lifecycle through hooks. + +### runPrepared + +Use when the channel has a complex local dispatcher with previews, retries, edits, or thread bootstrap that must stay channel-owned. The kernel still records the inbound session before dispatch and surfaces a uniform `DispatchedChannelTurnResult`. + +```typescript +const { dispatchResult } = await runtime.channel.turn.runPrepared({ + channel: "matrix", + accountId, + routeSessionKey, + storePath, + ctxPayload, + recordInboundSession, + record: { + onRecordError, + updateLastRoute, + }, + onPreDispatchFailure: async (err) => { + await stopStatusReactions(); + }, + runDispatch: async () => { + return await runMatrixOwnedDispatcher(); + }, +}); +``` + +Rich channels (Matrix, Mattermost, Microsoft Teams, Feishu, QQ Bot) use `runPrepared` because their dispatcher orchestrates platform-specific behavior the kernel must not learn about. + +### buildContext + +A pure function that maps fact bundles into `FinalizedMsgContext`. Use it when your channel hand-rolls part of the pipeline but wants consistent context shape. + +```typescript +const ctxPayload = runtime.channel.turn.buildContext({ + channel: "googlechat", + accountId, + messageId, + timestamp, + from, + sender, + conversation, + route, + reply, + message, + access, + media, + supplemental, +}); +``` + +`buildContext` is also useful inside `resolveTurn` callbacks when assembling a turn for `run`. + + + Deprecated SDK helpers such as `dispatchInboundReplyWithBase` still bridge through an assembled-turn helper. New plugin code should use `run` or `runPrepared`. + + +## Fact types + +The facts the kernel consumes from your adapter are platform-agnostic. Translate platform objects into these shapes before handing them to the kernel. + +### NormalizedTurnInput + +| Field | Purpose | +| ----------------- | ---------------------------------------------------------------------------- | +| `id` | Stable message id used for dedupe and logs | +| `timestamp` | Optional epoch ms | +| `rawText` | Body as received from the platform | +| `textForAgent` | Optional cleaned body for the agent (mention strip, typing trim) | +| `textForCommands` | Optional body used for `/command` parsing | +| `raw` | Optional pass-through reference for adapter callbacks that need the original | + +### ChannelEventClass + +| Field | Purpose | +| ---------------------- | ----------------------------------------------------------------------- | +| `kind` | `message`, `command`, `interaction`, `reaction`, `lifecycle`, `unknown` | +| `canStartAgentTurn` | If false the kernel returns `{ kind: "handled" }` | +| `requiresImmediateAck` | Hint for adapters that need to ACK before dispatch | + +### SenderFacts + +| Field | Purpose | +| -------------- | -------------------------------------------------------------- | +| `id` | Stable platform sender id | +| `name` | Display name | +| `username` | Handle if distinct from `name` | +| `tag` | Discord-style discriminator or platform tag | +| `roles` | Role ids, used for member-role allowlist matching | +| `isBot` | True when the sender is a known bot (kernel uses for dropping) | +| `isSelf` | True when the sender is the configured agent itself | +| `displayLabel` | Pre-rendered label for envelope text | + +### ConversationFacts + +| Field | Purpose | +| ----------------- | -------------------------------------------------------------------- | +| `kind` | `direct`, `group`, or `channel` | +| `id` | Conversation id used for routing | +| `label` | Human label for the envelope | +| `spaceId` | Optional outer space identifier (Slack workspace, Matrix homeserver) | +| `parentId` | Outer conversation id when this is a thread | +| `threadId` | Thread id when this message is inside a thread | +| `nativeChannelId` | Platform-native channel id when different from the routing id | +| `routePeer` | Peer used for `resolveAgentRoute` lookup | + +### RouteFacts + +| Field | Purpose | +| ----------------------- | ---------------------------------------------------------- | +| `agentId` | Agent that should handle this turn | +| `accountId` | Optional override (multi-account channels) | +| `routeSessionKey` | Session key used for routing | +| `dispatchSessionKey` | Session key used at dispatch when different from route key | +| `persistedSessionKey` | Session key written to persisted session metadata | +| `parentSessionKey` | Parent for branched/threaded sessions | +| `modelParentSessionKey` | Model-side parent for branched sessions | +| `mainSessionKey` | Main DM owner pin for direct conversations | +| `createIfMissing` | Allow record step to create a missing session row | + +### ReplyPlanFacts + +| Field | Purpose | +| ------------------------- | ------------------------------------------------------- | +| `to` | Logical reply target written into context `To` | +| `originatingTo` | Originating context target (`OriginatingTo`) | +| `nativeChannelId` | Platform-native channel id for delivery | +| `replyTarget` | Final visible-reply destination if it differs from `to` | +| `deliveryTarget` | Lower-level delivery override | +| `replyToId` | Quoted/anchored message id | +| `replyToIdFull` | Full-form quoted id when the platform has both | +| `messageThreadId` | Thread id at delivery time | +| `threadParentId` | Parent message id of the thread | +| `sourceReplyDeliveryMode` | `thread`, `reply`, `channel`, `direct`, or `none` | + +### AccessFacts + +`AccessFacts` carries the booleans the authorize stage needs. Identity matching stays in the channel: the kernel only consumes the result. + +| Field | Purpose | +| ---------- | ------------------------------------------------------------------------- | +| `dm` | DM allow/pairing/deny decision and `allowFrom` list | +| `group` | Group policy, route allow, sender allow, allowlist, mention requirement | +| `commands` | Command authorization across configured authorizers | +| `mentions` | Whether mention detection is possible and whether the agent was mentioned | + +### MessageFacts + +| Field | Purpose | +| ---------------- | -------------------------------------------------------------- | +| `body` | Final envelope body (formatted) | +| `rawBody` | Raw inbound body | +| `bodyForAgent` | Body the agent sees | +| `commandBody` | Body used for command parsing | +| `envelopeFrom` | Pre-rendered sender label for the envelope | +| `senderLabel` | Optional override for the rendered sender | +| `preview` | Short redacted preview for logs | +| `inboundHistory` | Recent inbound history entries when the channel keeps a buffer | + +### SupplementalContextFacts + +Supplemental context covers quote, forwarded, and thread-bootstrap context. The kernel applies the configured `contextVisibility` policy. The channel adapter only provides facts and `senderAllowed` flags so cross-channel policy stays consistent. + +### InboundMediaFacts + +Media is fact-shaped. Platform download, auth, SSRF policy, CDN rules, and decryption stay channel-local. The kernel maps facts into `MediaPath`, `MediaUrl`, `MediaType`, `MediaPaths`, `MediaUrls`, `MediaTypes`, and `MediaTranscribedIndexes`. + +## Adapter contract + +For full `run`, the adapter shape is: + +```typescript +type ChannelTurnAdapter = { + ingest(raw: TRaw): Promise | NormalizedTurnInput | null; + classify?(input: NormalizedTurnInput): Promise | ChannelEventClass; + preflight?( + input: NormalizedTurnInput, + eventClass: ChannelEventClass, + ): Promise; + resolveTurn( + input: NormalizedTurnInput, + eventClass: ChannelEventClass, + preflight: PreflightFacts, + ): Promise | ChannelTurnResolved; + onFinalize?(result: ChannelTurnResult): Promise | void; +}; +``` + +`resolveTurn` returns a `ChannelTurnResolved`, which is an `AssembledChannelTurn` with an optional admission kind. Returning `{ admission: { kind: "observeOnly" } }` runs the turn without producing visible output. The adapter still owns the delivery callback; it just becomes a no-op for that turn. + +`onFinalize` runs on every result, including dispatch errors. Use it to clear pending group history, remove ack reactions, stop status indicators, and flush local state. + +## Delivery adapter + +The kernel does not call the platform directly. The channel hands the kernel a `ChannelTurnDeliveryAdapter`: + +```typescript +type ChannelTurnDeliveryAdapter = { + deliver(payload: ReplyPayload, info: ChannelDeliveryInfo): Promise; + onError?(err: unknown, info: { kind: string }): void; +}; + +type ChannelDeliveryResult = { + messageIds?: string[]; + threadId?: string; + replyToId?: string; + visibleReplySent?: boolean; +}; +``` + +`deliver` is called once per buffered reply chunk. Return platform message ids when the channel has them so the dispatcher can preserve thread anchors and edit later chunks. For observe-only turns, return `{ visibleReplySent: false }` or use `createNoopChannelTurnDeliveryAdapter()`. + +## Record options + +The record stage wraps `recordInboundSession`. Most channels can use the defaults. Override via `record`: + +```typescript +record: { + groupResolution, + createIfMissing: true, + updateLastRoute, + onRecordError: (err) => log.warn("record failed", err), + trackSessionMetaTask: (task) => pendingTasks.push(task), +} +``` + +The dispatcher waits for the record stage. If record throws, the kernel runs `onPreDispatchFailure` (when provided to `runPrepared`) and rethrows. + +## Observability + +Each stage emits a structured event when a `log` callback is supplied: + +```typescript +await runtime.channel.turn.run({ + channel: "twitch", + accountId, + raw, + adapter, + log: (event) => { + runtime.log?.debug?.(`turn.${event.stage}:${event.event}`, { + channel: event.channel, + accountId: event.accountId, + messageId: event.messageId, + sessionKey: event.sessionKey, + admission: event.admission, + reason: event.reason, + }); + }, +}); +``` + +Logged stages: `ingest`, `classify`, `preflight`, `resolve`, `authorize`, `assemble`, `record`, `dispatch`, `finalize`. Avoid logging raw bodies; use `MessageFacts.preview` for short redacted previews. + +## What stays channel-local + +The kernel owns orchestration. The channel still owns: + +- Platform transports (gateway, REST, websocket, polling, webhooks) +- Identity resolution and display-name matching +- Native commands, slash commands, autocomplete, modals, buttons, voice state +- Card, modal, and adaptive-card rendering +- Media auth, CDN rules, encrypted media, transcription +- Edit, reaction, redaction, and presence APIs +- Backfill and platform-side history fetch +- Pairing flows that require platform-specific verification + +If two channels start needing the same helper for one of these, extract a shared SDK helper instead of pushing it into the kernel. + +## Stability + +`runtime.channel.turn.*` is part of the public plugin runtime surface. The fact types (`SenderFacts`, `ConversationFacts`, `RouteFacts`, `ReplyPlanFacts`, `AccessFacts`, `MessageFacts`, `SupplementalContextFacts`, `InboundMediaFacts`) and admission shapes (`ChannelTurnAdmission`, `ChannelEventClass`) are reachable through `PluginRuntime` from `openclaw/plugin-sdk/core`. + +Backward compatibility rules apply: new fact fields are additive, admission kinds are not renamed, and the entry point names stay stable. New channel needs that require a non-additive change must go through the plugin SDK migration process. + +## Related + +- [Building channel plugins](/plugins/sdk-channel-plugins) for the broader channel plugin contract +- [Plugin runtime helpers](/plugins/sdk-runtime) for other `runtime.*` surfaces +- [Plugin internals](/plugins/architecture-internals) for load pipeline and registry mechanics diff --git a/extensions/feishu/src/comment-handler.test.ts b/extensions/feishu/src/comment-handler.test.ts index 5c8875e2ab0..8f0748fe38a 100644 --- a/extensions/feishu/src/comment-handler.test.ts +++ b/extensions/feishu/src/comment-handler.test.ts @@ -135,8 +135,6 @@ function createTestRuntime(overrides?: { }, turn: { runPrepared: runPrepared as unknown as PluginRuntime["channel"]["turn"]["runPrepared"], - dispatchAssembled: - vi.fn() as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"], }, pairing: { readAllowFromStore: vi.fn(overrides?.readAllowFromStore ?? (async () => [])), diff --git a/extensions/googlechat/src/monitor.ts b/extensions/googlechat/src/monitor.ts index 25fad685113..5e744289938 100644 --- a/extensions/googlechat/src/monitor.ts +++ b/extensions/googlechat/src/monitor.ts @@ -295,60 +295,62 @@ async function processMessageWithPipeline(params: { accountId: route.accountId, }); - await core.channel.turn.runResolved({ + await core.channel.turn.run({ channel: "googlechat", accountId: route.accountId, raw: message, - input: { - id: message.name ?? spaceId, - timestamp: event.eventTime ? Date.parse(event.eventTime) : undefined, - rawText: rawBody, - textForAgent: rawBody, - textForCommands: rawBody, - raw: message, + adapter: { + ingest: () => ({ + id: message.name ?? spaceId, + timestamp: event.eventTime ? Date.parse(event.eventTime) : undefined, + rawText: rawBody, + textForAgent: rawBody, + textForCommands: rawBody, + raw: message, + }), + resolveTurn: () => ({ + cfg: config, + channel: "googlechat", + accountId: route.accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverGoogleChatReply({ + payload, + account, + spaceId, + runtime, + core, + config, + statusSink, + typingMessageName, + }); + // Only use typing message for first delivery + typingMessageName = undefined; + }, + onError: (err, info) => { + runtime.error?.( + `[${account.accountId}] Google Chat ${info.kind} reply failed: ${String(err)}`, + ); + }, + }, + dispatcherOptions: replyPipeline, + replyOptions: { + onModelSelected, + }, + record: { + onRecordError: (err) => { + runtime.error?.(`googlechat: failed updating session meta: ${String(err)}`); + }, + }, + }), }, - resolveTurn: () => ({ - cfg: config, - channel: "googlechat", - accountId: route.accountId, - agentId: route.agentId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverGoogleChatReply({ - payload, - account, - spaceId, - runtime, - core, - config, - statusSink, - typingMessageName, - }); - // Only use typing message for first delivery - typingMessageName = undefined; - }, - onError: (err, info) => { - runtime.error?.( - `[${account.accountId}] Google Chat ${info.kind} reply failed: ${String(err)}`, - ); - }, - }, - dispatcherOptions: replyPipeline, - replyOptions: { - onModelSelected, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`googlechat: failed updating session meta: ${String(err)}`); - }, - }, - }), }); } diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index 76ae6822b2e..e8f03fb62f7 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -230,7 +230,6 @@ export function createMatrixHandlerTestHarness( }, turn: { runPrepared, - dispatchAssembled: vi.fn(), }, reactions: { shouldAckReaction: options.shouldAckReaction ?? (() => false), diff --git a/extensions/mattermost/src/mattermost/monitor.inbound-system-event.test.ts b/extensions/mattermost/src/mattermost/monitor.inbound-system-event.test.ts index fdad21388db..565130203cf 100644 --- a/extensions/mattermost/src/mattermost/monitor.inbound-system-event.test.ts +++ b/extensions/mattermost/src/mattermost/monitor.inbound-system-event.test.ts @@ -253,7 +253,6 @@ function createRuntimeCore(cfg: OpenClawConfig) { }, turn: { runPrepared, - dispatchAssembled: vi.fn(), }, text: { chunkMarkdownTextWithMode: (text: string) => [text], diff --git a/extensions/msteams/src/monitor-handler.test-helpers.ts b/extensions/msteams/src/monitor-handler.test-helpers.ts index 7ae4b0d2f16..2787f4f7909 100644 --- a/extensions/msteams/src/monitor-handler.test-helpers.ts +++ b/extensions/msteams/src/monitor-handler.test-helpers.ts @@ -91,8 +91,6 @@ export function installMSTeamsTestRuntime(options: MSTeamsTestRuntimeOptions = { }, turn: { runPrepared: runPrepared as unknown as PluginRuntime["channel"]["turn"]["runPrepared"], - dispatchAssembled: - vi.fn() as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"], }, }, } as unknown as PluginRuntime); diff --git a/extensions/synology-chat/src/channel.test-mocks.ts b/extensions/synology-chat/src/channel.test-mocks.ts index a62d2ebc95f..9201072cd80 100644 --- a/extensions/synology-chat/src/channel.test-mocks.ts +++ b/extensions/synology-chat/src/channel.test-mocks.ts @@ -169,45 +169,7 @@ vi.mock("./runtime.js", () => ({ routeSessionKey: resolved.routeSessionKey, }; }), - runResolved: vi.fn(async (params) => { - const input = - typeof params.input === "function" ? await params.input(params.raw) : params.input; - if (!input) { - return { admission: { kind: "drop", reason: "ingest-null" }, dispatched: false }; - } - const resolved = await params.resolveTurn(input, { - kind: "message", - canStartAgentTurn: true, - }); - const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({ - ctx: resolved.ctxPayload, - cfg: mockRuntimeConfig, - dispatcherOptions: { - ...resolved.dispatcherOptions, - deliver: resolved.delivery.deliver, - onError: resolved.delivery.onError, - }, - }); - return { - admission: { kind: "dispatch" }, - dispatched: true, - dispatchResult, - ctxPayload: resolved.ctxPayload, - routeSessionKey: resolved.routeSessionKey, - }; - }), buildContext: buildChannelTurnContextMock, - dispatchAssembled: vi.fn(async (params) => ({ - dispatchResult: await params.dispatchReplyWithBufferedBlockDispatcher({ - ctx: params.ctxPayload, - cfg: mockRuntimeConfig, - dispatcherOptions: { - ...params.dispatcherOptions, - deliver: params.delivery.deliver, - onError: params.delivery.onError, - }, - }), - })), }, }, })), diff --git a/extensions/synology-chat/src/inbound-turn.ts b/extensions/synology-chat/src/inbound-turn.ts index 88e028c89ad..7f8921547f4 100644 --- a/extensions/synology-chat/src/inbound-turn.ts +++ b/extensions/synology-chat/src/inbound-turn.ts @@ -72,96 +72,98 @@ export async function dispatchSynologyChatInboundTurn(params: { userId: params.msg.from, }); - await resolved.rt.channel.turn.runResolved({ + await resolved.rt.channel.turn.run({ channel: CHANNEL_ID, accountId: params.account.accountId, raw: params.msg, - input: (msg) => ({ - id: `${params.account.accountId}:${msg.from}`, - timestamp: Date.now(), - rawText: msg.body, - textForAgent: msg.body, - textForCommands: msg.body, - raw: msg, - }), - resolveTurn: (input) => { - const chatKind = - params.msg.chatType === "group" || params.msg.chatType === "channel" - ? params.msg.chatType - : "direct"; - const msgCtx = resolved.rt.channel.turn.buildContext({ - channel: CHANNEL_ID, - accountId: params.account.accountId, - timestamp: input.timestamp, - from: `synology-chat:${params.msg.from}`, - sender: { - id: params.msg.from, - name: params.msg.senderName, - }, - conversation: { - kind: chatKind, - id: params.msg.from, - label: params.msg.senderName || params.msg.from, - routePeer: { - kind: "direct", - id: params.msg.from, - }, - }, - route: { - agentId: resolved.route.agentId, + adapter: { + ingest: (msg) => ({ + id: `${params.account.accountId}:${msg.from}`, + timestamp: Date.now(), + rawText: msg.body, + textForAgent: msg.body, + textForCommands: msg.body, + raw: msg, + }), + resolveTurn: (input) => { + const chatKind = + params.msg.chatType === "group" || params.msg.chatType === "channel" + ? params.msg.chatType + : "direct"; + const msgCtx = resolved.rt.channel.turn.buildContext({ + channel: CHANNEL_ID, accountId: params.account.accountId, - routeSessionKey: resolved.sessionKey, - dispatchSessionKey: resolved.sessionKey, - }, - reply: { - to: `synology-chat:${params.msg.from}`, - originatingTo: `synology-chat:${params.msg.from}`, - }, - message: { - rawBody: input.rawText, - commandBody: input.textForCommands, - bodyForAgent: input.textForAgent, - envelopeFrom: params.msg.senderName, - }, - extra: { - ChatType: params.msg.chatType, - CommandAuthorized: params.msg.commandAuthorized, - }, - }); - const storePath = resolved.rt.channel.session.resolveStorePath(currentCfg.session?.store, { - agentId: resolved.route.agentId, - }); - return { - cfg: currentCfg, - channel: CHANNEL_ID, - accountId: params.account.accountId, - agentId: resolved.route.agentId, - routeSessionKey: resolved.route.sessionKey, - storePath, - ctxPayload: msgCtx, - recordInboundSession: resolved.rt.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - resolved.rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverSynologyChatReply({ - account: params.account, - sendUserId, - payload, - }); + timestamp: input.timestamp, + from: `synology-chat:${params.msg.from}`, + sender: { + id: params.msg.from, + name: params.msg.senderName, }, - }, - dispatcherOptions: { - onReplyStart: () => { - params.log?.info?.(`Agent reply started for ${params.msg.from}`); + conversation: { + kind: chatKind, + id: params.msg.from, + label: params.msg.senderName || params.msg.from, + routePeer: { + kind: "direct", + id: params.msg.from, + }, }, - }, - record: { - onRecordError: (err) => { - params.log?.info?.(`Session metadata update failed for ${params.msg.from}`, err); + route: { + agentId: resolved.route.agentId, + accountId: params.account.accountId, + routeSessionKey: resolved.sessionKey, + dispatchSessionKey: resolved.sessionKey, }, - }, - }; + reply: { + to: `synology-chat:${params.msg.from}`, + originatingTo: `synology-chat:${params.msg.from}`, + }, + message: { + rawBody: input.rawText, + commandBody: input.textForCommands, + bodyForAgent: input.textForAgent, + envelopeFrom: params.msg.senderName, + }, + extra: { + ChatType: params.msg.chatType, + CommandAuthorized: params.msg.commandAuthorized, + }, + }); + const storePath = resolved.rt.channel.session.resolveStorePath(currentCfg.session?.store, { + agentId: resolved.route.agentId, + }); + return { + cfg: currentCfg, + channel: CHANNEL_ID, + accountId: params.account.accountId, + agentId: resolved.route.agentId, + routeSessionKey: resolved.route.sessionKey, + storePath, + ctxPayload: msgCtx, + recordInboundSession: resolved.rt.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + resolved.rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverSynologyChatReply({ + account: params.account, + sendUserId, + payload, + }); + }, + }, + dispatcherOptions: { + onReplyStart: () => { + params.log?.info?.(`Agent reply started for ${params.msg.from}`); + }, + }, + record: { + onRecordError: (err) => { + params.log?.info?.(`Session metadata update failed for ${params.msg.from}`, err); + }, + }, + }; + }, }, }); diff --git a/extensions/twitch/src/monitor.ts b/extensions/twitch/src/monitor.ts index 06500f444c7..5cf976bc460 100644 --- a/extensions/twitch/src/monitor.ts +++ b/extensions/twitch/src/monitor.ts @@ -51,126 +51,128 @@ async function processTwitchMessage(params: { const { message, account, accountId, config, runtime, core, statusSink } = params; const cfg = config as OpenClawConfig; - await core.channel.turn.runResolved({ + await core.channel.turn.run({ channel: "twitch", accountId, raw: message, - input: (incoming) => ({ - id: incoming.id ?? `${incoming.channel}:${incoming.timestamp?.getTime() ?? Date.now()}`, - timestamp: incoming.timestamp?.getTime(), - rawText: incoming.message, - textForAgent: incoming.message, - textForCommands: incoming.message, - raw: incoming, - }), - resolveTurn: (input) => { - const route = core.channel.routing.resolveAgentRoute({ - cfg, - channel: "twitch", - accountId, - peer: { - kind: "group", - id: message.channel, - }, - }); - const senderId = message.userId ?? message.username; - const fromLabel = message.displayName ?? message.username; - const body = core.channel.reply.formatAgentEnvelope({ - channel: "Twitch", - from: fromLabel, - timestamp: input.timestamp, - envelope: core.channel.reply.resolveEnvelopeFormatOptions(cfg), - body: input.rawText, - }); - const ctxPayload = core.channel.turn.buildContext({ - channel: "twitch", - accountId, - messageId: input.id, - timestamp: input.timestamp, - from: `twitch:user:${senderId}`, - sender: { - id: senderId, - name: fromLabel, - username: message.username, - }, - conversation: { - kind: "group", - id: message.channel, - label: message.channel, - routePeer: { + adapter: { + ingest: (incoming) => ({ + id: incoming.id ?? `${incoming.channel}:${incoming.timestamp?.getTime() ?? Date.now()}`, + timestamp: incoming.timestamp?.getTime(), + rawText: incoming.message, + textForAgent: incoming.message, + textForCommands: incoming.message, + raw: incoming, + }), + resolveTurn: (input) => { + const route = core.channel.routing.resolveAgentRoute({ + cfg, + channel: "twitch", + accountId, + peer: { kind: "group", id: message.channel, }, - }, - route: { + }); + const senderId = message.userId ?? message.username; + const fromLabel = message.displayName ?? message.username; + const body = core.channel.reply.formatAgentEnvelope({ + channel: "Twitch", + from: fromLabel, + timestamp: input.timestamp, + envelope: core.channel.reply.resolveEnvelopeFormatOptions(cfg), + body: input.rawText, + }); + const ctxPayload = core.channel.turn.buildContext({ + channel: "twitch", + accountId, + messageId: input.id, + timestamp: input.timestamp, + from: `twitch:user:${senderId}`, + sender: { + id: senderId, + name: fromLabel, + username: message.username, + }, + conversation: { + kind: "group", + id: message.channel, + label: message.channel, + routePeer: { + kind: "group", + id: message.channel, + }, + }, + route: { + agentId: route.agentId, + accountId: route.accountId, + routeSessionKey: route.sessionKey, + }, + reply: { + to: `twitch:channel:${message.channel}`, + originatingTo: `twitch:channel:${message.channel}`, + }, + message: { + body, + rawBody: input.rawText, + bodyForAgent: input.textForAgent, + commandBody: input.textForCommands, + envelopeFrom: fromLabel, + }, + }); + const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + const tableMode = core.channel.text.resolveMarkdownTableMode({ + cfg, + channel: "twitch", + accountId, + }); + const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({ + cfg, + agentId: route.agentId, + channel: "twitch", + accountId, + }); + return { + cfg, + channel: "twitch", + accountId, agentId: route.agentId, - accountId: route.accountId, routeSessionKey: route.sessionKey, - }, - reply: { - to: `twitch:channel:${message.channel}`, - originatingTo: `twitch:channel:${message.channel}`, - }, - message: { - body, - rawBody: input.rawText, - bodyForAgent: input.textForAgent, - commandBody: input.textForCommands, - envelopeFrom: fromLabel, - }, - }); - const storePath = core.channel.session.resolveStorePath(cfg.session?.store, { - agentId: route.agentId, - }); - const tableMode = core.channel.text.resolveMarkdownTableMode({ - cfg, - channel: "twitch", - accountId, - }); - const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({ - cfg, - agentId: route.agentId, - channel: "twitch", - accountId, - }); - return { - cfg, - channel: "twitch", - accountId, - agentId: route.agentId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverTwitchReply({ - payload, - channel: message.channel, - account, - accountId, - config, - tableMode, - runtime, - statusSink, - }); + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverTwitchReply({ + payload, + channel: message.channel, + account, + accountId, + config, + tableMode, + runtime, + statusSink, + }); + }, + onError: (err, info) => { + runtime.error?.(`Twitch ${info.kind} reply failed: ${String(err)}`); + }, }, - onError: (err, info) => { - runtime.error?.(`Twitch ${info.kind} reply failed: ${String(err)}`); + dispatcherOptions: replyPipeline, + replyOptions: { + onModelSelected, }, - }, - dispatcherOptions: replyPipeline, - replyOptions: { - onModelSelected, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`Failed updating session meta: ${String(err)}`); + record: { + onRecordError: (err) => { + runtime.error?.(`Failed updating session meta: ${String(err)}`); + }, }, - }, - }; + }; + }, }, }); } diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index 28afba75469..2bcf0236813 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -667,63 +667,67 @@ async function processMessageWithPipeline(params: ZaloMessagePipelineParams): Pr }, }); - await core.channel.turn.runResolved({ + await core.channel.turn.run({ channel: "zalo", accountId: account.accountId, raw: message, - input: { - id: message_id, - timestamp: date ? date * 1000 : undefined, - rawText: rawBody, - textForAgent: rawBody, - textForCommands: rawBody, - raw: message, + adapter: { + ingest: () => ({ + id: message_id, + timestamp: date ? date * 1000 : undefined, + rawText: rawBody, + textForAgent: rawBody, + textForCommands: rawBody, + raw: message, + }), + resolveTurn: () => ({ + cfg: config, + channel: "zalo", + accountId: account.accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverZaloReply({ + payload, + token, + chatId, + runtime, + core, + config, + webhookUrl: params.webhookUrl, + webhookPath: params.webhookPath, + proxyUrl: account.config.proxy, + mediaMaxBytes: params.mediaMaxMb * 1024 * 1024, + canHostMedia: params.canHostMedia, + accountId: account.accountId, + statusSink, + fetcher, + tableMode, + }); + }, + onError: (err, info) => { + runtime.error?.( + `[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`, + ); + }, + }, + dispatcherOptions: replyPipeline, + replyOptions: { + onModelSelected, + }, + record: { + onRecordError: (err) => { + runtime.error?.(`zalo: failed updating session meta: ${String(err)}`); + }, + }, + }), }, - resolveTurn: () => ({ - cfg: config, - channel: "zalo", - accountId: account.accountId, - agentId: route.agentId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverZaloReply({ - payload, - token, - chatId, - runtime, - core, - config, - webhookUrl: params.webhookUrl, - webhookPath: params.webhookPath, - proxyUrl: account.config.proxy, - mediaMaxBytes: params.mediaMaxMb * 1024 * 1024, - canHostMedia: params.canHostMedia, - accountId: account.accountId, - statusSink, - fetcher, - tableMode, - }); - }, - onError: (err, info) => { - runtime.error?.(`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`); - }, - }, - dispatcherOptions: replyPipeline, - replyOptions: { - onModelSelected, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`zalo: failed updating session meta: ${String(err)}`); - }, - }, - }), }); } diff --git a/extensions/zalo/src/test-support/lifecycle-test-support.ts b/extensions/zalo/src/test-support/lifecycle-test-support.ts index 1ece16c9568..1b4d346976b 100644 --- a/extensions/zalo/src/test-support/lifecycle-test-support.ts +++ b/extensions/zalo/src/test-support/lifecycle-test-support.ts @@ -278,90 +278,8 @@ export function createImageLifecycleCore() { dispatchResult, }; }) as unknown as PluginRuntime["channel"]["turn"]["run"], - runResolved: vi.fn( - async (params: Parameters[0]) => { - const input = - typeof params.input === "function" ? await params.input(params.raw) : params.input; - if (!input) { - return { - admission: { kind: "drop" as const, reason: "ingest-null" }, - dispatched: false, - }; - } - const resolved = await params.resolveTurn( - input, - { - kind: "message", - canStartAgentTurn: true, - }, - {}, - ); - await resolved.recordInboundSession({ - storePath: resolved.storePath, - sessionKey: resolved.ctxPayload.SessionKey ?? resolved.routeSessionKey, - ctx: resolved.ctxPayload, - groupResolution: resolved.record?.groupResolution, - createIfMissing: resolved.record?.createIfMissing, - updateLastRoute: resolved.record?.updateLastRoute, - onRecordError: resolved.record?.onRecordError ?? (() => undefined), - }); - const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({ - ctx: resolved.ctxPayload, - cfg: resolved.cfg, - dispatcherOptions: { - ...resolved.dispatcherOptions, - deliver: async (payload, info) => { - await resolved.delivery.deliver(payload, info); - }, - onError: resolved.delivery.onError, - }, - replyOptions: resolved.replyOptions, - replyResolver: resolved.replyResolver, - }); - return { - admission: { kind: "dispatch" as const }, - dispatched: true, - ctxPayload: resolved.ctxPayload, - routeSessionKey: resolved.routeSessionKey, - dispatchResult, - }; - }, - ) as unknown as PluginRuntime["channel"]["turn"]["runResolved"], buildContext: buildChannelTurnContextMock as unknown as PluginRuntime["channel"]["turn"]["buildContext"], - dispatchAssembled: vi.fn( - async (turn: Parameters[0]) => { - await turn.recordInboundSession({ - storePath: turn.storePath, - sessionKey: turn.ctxPayload.SessionKey ?? turn.routeSessionKey, - ctx: turn.ctxPayload, - groupResolution: turn.record?.groupResolution, - createIfMissing: turn.record?.createIfMissing, - updateLastRoute: turn.record?.updateLastRoute, - onRecordError: turn.record?.onRecordError ?? (() => undefined), - }); - const dispatchResult = await turn.dispatchReplyWithBufferedBlockDispatcher({ - ctx: turn.ctxPayload, - cfg: turn.cfg, - dispatcherOptions: { - ...turn.dispatcherOptions, - deliver: async (payload, info) => { - await turn.delivery.deliver(payload, info); - }, - onError: turn.delivery.onError, - }, - replyOptions: turn.replyOptions, - replyResolver: turn.replyResolver, - }); - return { - admission: { kind: "dispatch" as const }, - dispatched: true, - ctxPayload: turn.ctxPayload, - routeSessionKey: turn.routeSessionKey, - dispatchResult, - }; - }, - ) as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"], }, commands: { shouldComputeCommandAuthorized: vi.fn( diff --git a/extensions/zalouser/src/monitor.group-gating.test.ts b/extensions/zalouser/src/monitor.group-gating.test.ts index 6f91ed0b3a4..f4252a52d2f 100644 --- a/extensions/zalouser/src/monitor.group-gating.test.ts +++ b/extensions/zalouser/src/monitor.group-gating.test.ts @@ -89,39 +89,40 @@ function installRuntime(params: { const readSessionUpdatedAt = vi.fn( (_params?: { storePath: string; sessionKey: string }): number | undefined => undefined, ); - const dispatchAssembled = vi.fn( - async (turn: Parameters[0]) => { - await turn.recordInboundSession({ - storePath: turn.storePath, - sessionKey: turn.ctxPayload.SessionKey ?? turn.routeSessionKey, - ctx: turn.ctxPayload, - groupResolution: turn.record?.groupResolution, - createIfMissing: turn.record?.createIfMissing, - updateLastRoute: turn.record?.updateLastRoute, - onRecordError: turn.record?.onRecordError ?? (() => undefined), - }); - const dispatchResult = await turn.dispatchReplyWithBufferedBlockDispatcher({ - ctx: turn.ctxPayload, - cfg: turn.cfg, - dispatcherOptions: { - ...turn.dispatcherOptions, - deliver: async (payload, info) => { - await turn.delivery.deliver(payload, info); - }, - onError: turn.delivery.onError, + type ResolvedTurn = Awaited< + ReturnType[0]["adapter"]["resolveTurn"]> + >; + const dispatchAssembled = vi.fn(async (turn: ResolvedTurn) => { + await turn.recordInboundSession({ + storePath: turn.storePath, + sessionKey: turn.ctxPayload.SessionKey ?? turn.routeSessionKey, + ctx: turn.ctxPayload, + groupResolution: turn.record?.groupResolution, + createIfMissing: turn.record?.createIfMissing, + updateLastRoute: turn.record?.updateLastRoute, + onRecordError: turn.record?.onRecordError ?? (() => undefined), + }); + const dispatchResult = await turn.dispatchReplyWithBufferedBlockDispatcher({ + ctx: turn.ctxPayload, + cfg: turn.cfg, + dispatcherOptions: { + ...turn.dispatcherOptions, + deliver: async (payload, info) => { + await turn.delivery.deliver(payload, info); }, - replyOptions: turn.replyOptions, - replyResolver: turn.replyResolver, - }); - return { - admission: { kind: "dispatch" as const }, - dispatched: true, - ctxPayload: turn.ctxPayload, - routeSessionKey: turn.routeSessionKey, - dispatchResult, - }; - }, - ); + onError: turn.delivery.onError, + }, + replyOptions: turn.replyOptions, + replyResolver: turn.replyResolver, + }); + return { + admission: { kind: "dispatch" as const }, + dispatched: true, + ctxPayload: turn.ctxPayload, + routeSessionKey: turn.routeSessionKey, + dispatchResult, + }; + }); const runTurn = vi.fn(async (params: Parameters[0]) => { const input = await params.adapter.ingest(params.raw); if (!input) { @@ -137,27 +138,6 @@ function installRuntime(params: { ); return await dispatchAssembled(resolved); }); - const runResolvedTurn = vi.fn( - async (params: Parameters[0]) => { - const input = - typeof params.input === "function" ? await params.input(params.raw) : params.input; - if (!input) { - return { - admission: { kind: "drop" as const, reason: "ingest-null" }, - dispatched: false, - }; - } - const resolved = await params.resolveTurn( - input, - { - kind: "message", - canStartAgentTurn: true, - }, - {}, - ); - return await dispatchAssembled(resolved); - }, - ); const buildContext = vi.fn( (params: Parameters[0]) => ({ @@ -264,10 +244,7 @@ function installRuntime(params: { }, turn: { run: runTurn as unknown as PluginRuntime["channel"]["turn"]["run"], - runResolved: runResolvedTurn as unknown as PluginRuntime["channel"]["turn"]["runResolved"], buildContext: buildContext as unknown as PluginRuntime["channel"]["turn"]["buildContext"], - dispatchAssembled: - dispatchAssembled as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"], }, text: { resolveMarkdownTableMode: vi.fn(() => "code"), diff --git a/extensions/zalouser/src/monitor.ts b/extensions/zalouser/src/monitor.ts index 3c83d1195bf..878f4d0e124 100644 --- a/extensions/zalouser/src/monitor.ts +++ b/extensions/zalouser/src/monitor.ts @@ -671,64 +671,66 @@ async function processMessage( }, }); - await core.channel.turn.runResolved({ + await core.channel.turn.run({ channel: "zalouser", accountId: account.accountId, raw: message, - input: { - id: messageSid ?? `${message.timestampMs}`, - timestamp: message.timestampMs, - rawText: rawBody, - textForAgent: rawBody, - textForCommands: commandBody, - raw: message, - }, - resolveTurn: () => ({ - cfg: config, - channel: "zalouser", - accountId: account.accountId, - agentId: route.agentId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: - core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver: async (payload) => { - await deliverZalouserReply({ - payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string }, - profile: account.profile, - chatId, - isGroup, - runtime, - core, - config, - accountId: account.accountId, - statusSink, - tableMode: core.channel.text.resolveMarkdownTableMode({ - cfg: config, - channel: "zalouser", + adapter: { + ingest: () => ({ + id: messageSid ?? `${message.timestampMs}`, + timestamp: message.timestampMs, + rawText: rawBody, + textForAgent: rawBody, + textForCommands: commandBody, + raw: message, + }), + resolveTurn: () => ({ + cfg: config, + channel: "zalouser", + accountId: account.accountId, + agentId: route.agentId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + dispatchReplyWithBufferedBlockDispatcher: + core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, + delivery: { + deliver: async (payload) => { + await deliverZalouserReply({ + payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string }, + profile: account.profile, + chatId, + isGroup, + runtime, + core, + config, accountId: account.accountId, - }), - }); + statusSink, + tableMode: core.channel.text.resolveMarkdownTableMode({ + cfg: config, + channel: "zalouser", + accountId: account.accountId, + }), + }); + }, + onError: (err, info) => { + runtime.error( + `[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`, + ); + }, }, - onError: (err, info) => { - runtime.error( - `[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`, - ); + dispatcherOptions: replyPipeline, + replyOptions: { + onModelSelected, }, - }, - dispatcherOptions: replyPipeline, - replyOptions: { - onModelSelected, - }, - record: { - onRecordError: (err) => { - runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`); + record: { + onRecordError: (err) => { + runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`); + }, }, - }, - }), + }), + }, }); if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ diff --git a/src/channels/turn/kernel.test.ts b/src/channels/turn/kernel.test.ts index e4325bd14b0..a4af11f8f8c 100644 --- a/src/channels/turn/kernel.test.ts +++ b/src/channels/turn/kernel.test.ts @@ -195,8 +195,9 @@ describe("channel turn kernel", () => { expect(resolveTurn).not.toHaveBeenCalled(); }); - it("runs observe-only preflights through resolve, record, dispatch, and finalize", async () => { + it("runs observe-only preflights through resolve, record, dispatch, and finalize without visible delivery", async () => { const events: string[] = []; + const deliver = vi.fn(); const onFinalize = vi.fn(); const result = await runChannelTurn({ channel: "test", @@ -213,7 +214,7 @@ describe("channel turn kernel", () => { ctxPayload: createCtx({ SessionKey: "agent:observer:test:peer" }), recordInboundSession: createRecordInboundSession(events), dispatchReplyWithBufferedBlockDispatcher: createDispatch(events), - delivery: createNoopChannelTurnDeliveryAdapter(), + delivery: { deliver }, record: { onRecordError: vi.fn(), }, @@ -228,6 +229,7 @@ describe("channel turn kernel", () => { }); expect(result.dispatched).toBe(true); expect(events).toEqual(["record", "dispatch"]); + expect(deliver).not.toHaveBeenCalled(); expect(onFinalize).toHaveBeenCalledWith( expect.objectContaining({ admission: { kind: "observeOnly", reason: "broadcast-observer" }, diff --git a/src/channels/turn/kernel.ts b/src/channels/turn/kernel.ts index 4abda92bf2f..e018c053c4d 100644 --- a/src/channels/turn/kernel.ts +++ b/src/channels/turn/kernel.ts @@ -234,7 +234,14 @@ export async function runChannelTurn( const admission = resolved.admission ?? preflightAdmission ?? ({ kind: "dispatch" } as const); let result: ChannelTurnResult; try { - const dispatchResult = await dispatchAssembledChannelTurn(resolved); + const dispatchResult = await dispatchAssembledChannelTurn( + admission.kind === "observeOnly" + ? { + ...resolved, + delivery: createNoopChannelTurnDeliveryAdapter(), + } + : resolved, + ); result = { ...dispatchResult, admission, diff --git a/src/plugins/runtime/types-channel.ts b/src/plugins/runtime/types-channel.ts index 8db2c944497..dd1a9b4f439 100644 --- a/src/plugins/runtime/types-channel.ts +++ b/src/plugins/runtime/types-channel.ts @@ -153,9 +153,11 @@ export type PluginRuntimeChannel = { }; turn: { run: typeof import("../../channels/turn/kernel.js").runChannelTurn; + /** @deprecated Prefer `run(...)`. */ runResolved: typeof import("../../channels/turn/kernel.js").runResolvedChannelTurn; buildContext: typeof import("../../channels/turn/kernel.js").buildChannelTurnContext; runPrepared: typeof import("../../channels/turn/kernel.js").runPreparedChannelTurn; + /** @deprecated Prefer `run(...)` or `runPrepared(...)`. */ dispatchAssembled: typeof import("../../channels/turn/kernel.js").dispatchAssembledChannelTurn; }; threadBindings: {