diff --git a/extensions/discord/src/monitor/agent-components.dispatch.ts b/extensions/discord/src/monitor/agent-components.dispatch.ts index fa36babcf3b..7fb085dcd9a 100644 --- a/extensions/discord/src/monitor/agent-components.dispatch.ts +++ b/extensions/discord/src/monitor/agent-components.dispatch.ts @@ -4,7 +4,7 @@ import { resolveEnvelopeFormatOptions, } from "openclaw/plugin-sdk/channel-inbound"; import { isDangerousNameMatchingEnabled } from "openclaw/plugin-sdk/dangerous-name-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { runInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime"; import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime"; import { createNonExitingRuntime, logVerbose } from "openclaw/plugin-sdk/runtime-env"; @@ -270,83 +270,97 @@ export async function dispatchDiscordComponentEvent(params: { startId: params.replyToId, }); - await runPreparedInboundReplyTurn({ + await runInboundReplyTurn({ channel: "discord", accountId, - routeSessionKey: sessionKey, - storePath, - ctxPayload, - recordInboundSession, - record: { - updateLastRoute: interactionCtx.isDirectMessage - ? { - sessionKey: route.mainSessionKey, - channel: "discord", - to: - resolveDiscordComponentOriginatingTo(interactionCtx) ?? - `user:${interactionCtx.userId}`, - accountId, - mainDmOwnerPin: pinnedMainDmOwner - ? { - ownerRecipient: pinnedMainDmOwner, - senderRecipient: interactionCtx.userId, - onSkip: ({ ownerRecipient, senderRecipient }) => { - logVerbose( - `discord: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`, - ); - }, - } - : undefined, - } - : undefined, - onRecordError: (err) => { - logVerbose(`discord: failed updating component session meta: ${String(err)}`); - }, - }, - runDispatch: () => - dispatchReplyWithBufferedBlockDispatcher({ - ctx: ctxPayload, - cfg: ctx.cfg, - replyOptions: { onModelSelected }, - dispatcherOptions: { - ...replyPipeline, - humanDelay: resolveHumanDelayConfig(ctx.cfg, agentId), - deliver: async (payload) => { - const replyToId = replyReference.use(); - await deliverDiscordReply({ - cfg: ctx.cfg, - replies: [payload], - target: deliverTarget, - token, - accountId, - rest: interaction.client.rest, - runtime, - replyToId, - replyToMode, - textLimit, - maxLinesPerMessage: resolveDiscordMaxLinesPerMessage({ - cfg: ctx.cfg, - discordConfig: ctx.discordConfig, + raw: interaction, + adapter: { + ingest: () => ({ + id: interaction.id, + rawText: ctxPayload.RawBody ?? "", + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: interaction, + }), + resolveTurn: () => ({ + channel: "discord", + accountId, + routeSessionKey: sessionKey, + storePath, + ctxPayload, + recordInboundSession, + record: { + updateLastRoute: interactionCtx.isDirectMessage + ? { + sessionKey: route.mainSessionKey, + channel: "discord", + to: + resolveDiscordComponentOriginatingTo(interactionCtx) ?? + `user:${interactionCtx.userId}`, accountId, - }), - tableMode, - chunkMode: resolveChunkMode(ctx.cfg, "discord", accountId), - mediaLocalRoots, - }); - replyReference.markSent(); - }, - onReplyStart: async () => { - try { - const { sendTyping } = await loadTypingRuntime(); - await sendTyping({ rest: feedbackRest, channelId: typingChannelId }); - } catch (err) { - logVerbose(`discord: typing failed for component reply: ${String(err)}`); - } - }, - onError: (err) => { - logError(`discord component dispatch failed: ${String(err)}`); + mainDmOwnerPin: pinnedMainDmOwner + ? { + ownerRecipient: pinnedMainDmOwner, + senderRecipient: interactionCtx.userId, + onSkip: ({ ownerRecipient, senderRecipient }) => { + logVerbose( + `discord: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`, + ); + }, + } + : undefined, + } + : undefined, + onRecordError: (err) => { + logVerbose(`discord: failed updating component session meta: ${String(err)}`); }, }, + runDispatch: () => + dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: ctx.cfg, + replyOptions: { onModelSelected }, + dispatcherOptions: { + ...replyPipeline, + humanDelay: resolveHumanDelayConfig(ctx.cfg, agentId), + deliver: async (payload) => { + const replyToId = replyReference.use(); + await deliverDiscordReply({ + cfg: ctx.cfg, + replies: [payload], + target: deliverTarget, + token, + accountId, + rest: interaction.client.rest, + runtime, + replyToId, + replyToMode, + textLimit, + maxLinesPerMessage: resolveDiscordMaxLinesPerMessage({ + cfg: ctx.cfg, + discordConfig: ctx.discordConfig, + accountId, + }), + tableMode, + chunkMode: resolveChunkMode(ctx.cfg, "discord", accountId), + mediaLocalRoots, + }); + replyReference.markSent(); + }, + onReplyStart: async () => { + try { + const { sendTyping } = await loadTypingRuntime(); + await sendTyping({ rest: feedbackRest, channelId: typingChannelId }); + } catch (err) { + logVerbose(`discord: typing failed for component reply: ${String(err)}`); + } + }, + onError: (err) => { + logError(`discord component dispatch failed: ${String(err)}`); + }, + }, + }), }), + }, }); } diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index 311ec7b15f8..9c660b50481 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -15,13 +15,12 @@ import { resolveChannelStreamingBlockEnabled } from "openclaw/plugin-sdk/channel import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime"; import { hasFinalInboundReplyDispatch, - runPreparedInboundReplyTurn, + runInboundReplyTurn, } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime"; import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime"; import { resolveChunkMode } from "openclaw/plugin-sdk/reply-chunking"; import type { ReplyPayload } from "openclaw/plugin-sdk/reply-dispatch-runtime"; -import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; import { danger, logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolveDiscordMaxLinesPerMessage } from "../accounts.js"; @@ -480,109 +479,135 @@ export async function processDiscordMessage( await settleDispatchBeforeStart(); return; } - const preparedResult = await runPreparedInboundReplyTurn({ + const preparedResult = await runInboundReplyTurn({ channel: "discord", accountId: route.accountId, - routeSessionKey: persistedSessionKey, - storePath: turn.storePath, - ctxPayload, - recordInboundSession, - record: turn.record, - onPreDispatchFailure: settleDispatchBeforeStart, - runDispatch: () => - dispatchInboundMessage({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - abortSignal, - skillFilter: channelConfig?.skills, - sourceReplyDeliveryMode, - disableBlockStreaming: sourceRepliesAreToolOnly - ? true - : (draftPreview.disableBlockStreamingForDraft ?? - (typeof resolvedBlockStreamingEnabled === "boolean" - ? !resolvedBlockStreamingEnabled - : undefined)), - onPartialReply: draftPreview.draftStream - ? (payload) => draftPreview.updateFromPartial(payload.text) - : undefined, - onAssistantMessageStart: draftPreview.draftStream - ? draftPreview.handleAssistantMessageBoundary - : undefined, - onReasoningEnd: draftPreview.draftStream - ? draftPreview.handleAssistantMessageBoundary - : undefined, - onModelSelected, - suppressDefaultToolProgressMessages: draftPreview.previewToolProgressEnabled - ? true - : undefined, - onReasoningStream: async () => { - await statusReactions.setThinking(); - }, - onToolStart: async (payload) => { - if (isProcessAborted(abortSignal)) { - return; - } - await statusReactions.setTool(payload.name); - draftPreview.pushToolProgress( - payload.name ? `tool: ${payload.name}` : "tool running", - ); - }, - onItemEvent: async (payload) => { - draftPreview.pushToolProgress( - payload.progressText ?? payload.summary ?? payload.title ?? payload.name, - ); - }, - onPlanUpdate: async (payload) => { - if (payload.phase !== "update") { - return; - } - draftPreview.pushToolProgress( - payload.explanation ?? payload.steps?.[0] ?? "planning", - ); - }, - onApprovalEvent: async (payload) => { - if (payload.phase !== "requested") { - return; - } - draftPreview.pushToolProgress( - payload.command ? `approval: ${payload.command}` : "approval requested", - ); - }, - onCommandOutput: async (payload) => { - if (payload.phase !== "end") { - return; - } - draftPreview.pushToolProgress( - payload.name - ? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}` - : payload.title, - ); - }, - onPatchSummary: async (payload) => { - if (payload.phase !== "end") { - return; - } - draftPreview.pushToolProgress(payload.summary ?? payload.title ?? "patch applied"); - }, - onCompactionStart: async () => { - if (isProcessAborted(abortSignal)) { - return; - } - await statusReactions.setCompacting(); - }, - onCompactionEnd: async () => { - if (isProcessAborted(abortSignal)) { - return; - } - statusReactions.cancelPending(); - await statusReactions.setThinking(); - }, - }, + raw: ctx, + adapter: { + ingest: () => ({ + id: message.id, + timestamp: message.timestamp ? Date.parse(message.timestamp) : undefined, + rawText: text, + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: message, }), + resolveTurn: () => ({ + channel: "discord", + accountId: route.accountId, + routeSessionKey: persistedSessionKey, + storePath: turn.storePath, + ctxPayload, + recordInboundSession, + record: turn.record, + history: { + isGroup: isGuildMessage, + historyKey: messageChannelId, + historyMap: guildHistories, + limit: historyLimit, + }, + onPreDispatchFailure: settleDispatchBeforeStart, + runDispatch: () => + dispatchInboundMessage({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + abortSignal, + skillFilter: channelConfig?.skills, + sourceReplyDeliveryMode, + disableBlockStreaming: sourceRepliesAreToolOnly + ? true + : (draftPreview.disableBlockStreamingForDraft ?? + (typeof resolvedBlockStreamingEnabled === "boolean" + ? !resolvedBlockStreamingEnabled + : undefined)), + onPartialReply: draftPreview.draftStream + ? (payload) => draftPreview.updateFromPartial(payload.text) + : undefined, + onAssistantMessageStart: draftPreview.draftStream + ? draftPreview.handleAssistantMessageBoundary + : undefined, + onReasoningEnd: draftPreview.draftStream + ? draftPreview.handleAssistantMessageBoundary + : undefined, + onModelSelected, + suppressDefaultToolProgressMessages: draftPreview.previewToolProgressEnabled + ? true + : undefined, + onReasoningStream: async () => { + await statusReactions.setThinking(); + }, + onToolStart: async (payload) => { + if (isProcessAborted(abortSignal)) { + return; + } + await statusReactions.setTool(payload.name); + draftPreview.pushToolProgress( + payload.name ? `tool: ${payload.name}` : "tool running", + ); + }, + onItemEvent: async (payload) => { + draftPreview.pushToolProgress( + payload.progressText ?? payload.summary ?? payload.title ?? payload.name, + ); + }, + onPlanUpdate: async (payload) => { + if (payload.phase !== "update") { + return; + } + draftPreview.pushToolProgress( + payload.explanation ?? payload.steps?.[0] ?? "planning", + ); + }, + onApprovalEvent: async (payload) => { + if (payload.phase !== "requested") { + return; + } + draftPreview.pushToolProgress( + payload.command ? `approval: ${payload.command}` : "approval requested", + ); + }, + onCommandOutput: async (payload) => { + if (payload.phase !== "end") { + return; + } + draftPreview.pushToolProgress( + payload.name + ? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}` + : payload.title, + ); + }, + onPatchSummary: async (payload) => { + if (payload.phase !== "end") { + return; + } + draftPreview.pushToolProgress( + payload.summary ?? payload.title ?? "patch applied", + ); + }, + onCompactionStart: async () => { + if (isProcessAborted(abortSignal)) { + return; + } + await statusReactions.setCompacting(); + }, + onCompactionEnd: async () => { + if (isProcessAborted(abortSignal)) { + return; + } + statusReactions.cancelPending(); + await statusReactions.setThinking(); + }, + }, + }), + }), + }, }); + if (!preparedResult.dispatched) { + return; + } dispatchResult = preparedResult.dispatchResult; if (isProcessAborted(abortSignal)) { dispatchAborted = true; @@ -646,27 +671,14 @@ export async function processDiscordMessage( return; } - if (!hasFinalInboundReplyDispatch(dispatchResult)) { - if (isGuildMessage) { - clearHistoryEntriesIfEnabled({ - historyMap: guildHistories, - historyKey: messageChannelId, - limit: historyLimit, - }); - } + const finalDispatchResult = dispatchResult; + if (!finalDispatchResult || !hasFinalInboundReplyDispatch(finalDispatchResult)) { return; } if (shouldLogVerbose()) { - const finalCount = dispatchResult.counts.final; + const finalCount = finalDispatchResult.counts.final; logVerbose( `discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, ); } - if (isGuildMessage) { - clearHistoryEntriesIfEnabled({ - historyMap: guildHistories, - historyKey: messageChannelId, - limit: historyLimit, - }); - } } diff --git a/extensions/feishu/src/bot.broadcast.test.ts b/extensions/feishu/src/bot.broadcast.test.ts index a85af7984a3..c569fa32196 100644 --- a/extensions/feishu/src/bot.broadcast.test.ts +++ b/extensions/feishu/src/bot.broadcast.test.ts @@ -111,6 +111,39 @@ describe("broadcast dispatch", () => { saveMediaBuffer: mockSaveMediaBuffer, }, turn: { + run: vi.fn(async (params: Parameters[0]) => { + const input = await params.adapter.ingest(params.raw); + if (!input) { + return { + admission: { kind: "drop" as const, reason: "ingest-null" }, + dispatched: false, + }; + } + const eventClass = { + kind: "message" as const, + canStartAgentTurn: true, + }; + const turn = await params.adapter.resolveTurn(input, eventClass, {}); + if (!("runDispatch" in turn)) { + throw new Error("feishu broadcast test runtime only supports prepared turns"); + } + await turn.recordInboundSession({ + storePath: turn.storePath, + sessionKey: turn.ctxPayload.SessionKey ?? turn.routeSessionKey, + ctx: turn.ctxPayload, + groupResolution: turn.record?.groupResolution, + createIfMissing: turn.record?.createIfMissing, + updateLastRoute: turn.record?.updateLastRoute, + onRecordError: turn.record?.onRecordError ?? (() => undefined), + }); + return { + admission: { kind: "dispatch" as const }, + dispatched: true, + ctxPayload: turn.ctxPayload, + routeSessionKey: turn.routeSessionKey, + dispatchResult: await turn.runDispatch(), + }; + }), runPrepared: vi.fn( async (turn: Parameters[0]) => { await turn.recordInboundSession({ diff --git a/extensions/feishu/src/bot.test.ts b/extensions/feishu/src/bot.test.ts index 503e411426f..baf69539883 100644 --- a/extensions/feishu/src/bot.test.ts +++ b/extensions/feishu/src/bot.test.ts @@ -198,6 +198,16 @@ function createFeishuBotRuntime(overrides: DeepPartial = {}): Plu buildPairingReply: vi.fn(), }, turn: { + run: vi.fn(async (params) => { + const input = await params.adapter.ingest(params.raw); + const turn = await params.adapter.resolveTurn(input, { + kind: "message", + canStartAgentTurn: true, + }); + return { + dispatchResult: await turn.runDispatch(), + }; + }), runPrepared: vi.fn(async (params) => ({ dispatchResult: await params.runDispatch(), })), diff --git a/extensions/feishu/src/bot.ts b/extensions/feishu/src/bot.ts index 3d88ad790d0..f22d6434705 100644 --- a/extensions/feishu/src/bot.ts +++ b/extensions/feishu/src/bot.ts @@ -1312,31 +1312,46 @@ export async function handleFeishuMessage(params: { log( `feishu[${account.accountId}]: broadcast active dispatch agent=${agentId} (session=${agentSessionKey})`, ); - await core.channel.turn.runPrepared({ + await core.channel.turn.run({ channel: "feishu", accountId: route.accountId, - routeSessionKey: agentSessionKey, - storePath: agentStorePath, - ctxPayload: agentCtx, - recordInboundSession: core.channel.session.recordInboundSession, - record: agentRecord, - onPreDispatchFailure: () => - core.channel.reply.settleReplyDispatcher({ - dispatcher, - onSettled: () => markDispatchIdle(), + raw: ctx, + adapter: { + ingest: () => ({ + id: ctx.messageId, + timestamp: messageCreateTimeMs, + rawText: ctx.content, + textForAgent: agentCtx.BodyForAgent, + textForCommands: agentCtx.CommandBody, + raw: ctx, }), - runDispatch: () => - core.channel.reply.withReplyDispatcher({ - dispatcher, - onSettled: () => markDispatchIdle(), - run: () => - core.channel.reply.dispatchReplyFromConfig({ - ctx: agentCtx, - cfg, + resolveTurn: () => ({ + channel: "feishu", + accountId: route.accountId, + routeSessionKey: agentSessionKey, + storePath: agentStorePath, + ctxPayload: agentCtx, + recordInboundSession: core.channel.session.recordInboundSession, + record: agentRecord, + onPreDispatchFailure: () => + core.channel.reply.settleReplyDispatcher({ dispatcher, - replyOptions, + onSettled: () => markDispatchIdle(), + }), + runDispatch: () => + core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => markDispatchIdle(), + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: agentCtx, + cfg, + dispatcher, + replyOptions, + }), }), }), + }, }); } else { // Observer agent: no-op dispatcher (session entry + inference, no Feishu reply). @@ -1356,24 +1371,39 @@ export async function handleFeishuMessage(params: { log( `feishu[${account.accountId}]: broadcast observer dispatch agent=${agentId} (session=${agentSessionKey})`, ); - await core.channel.turn.runPrepared({ + await core.channel.turn.run({ channel: "feishu", accountId: route.accountId, - routeSessionKey: agentSessionKey, - storePath: agentStorePath, - ctxPayload: agentCtx, - recordInboundSession: core.channel.session.recordInboundSession, - record: agentRecord, - runDispatch: () => - core.channel.reply.withReplyDispatcher({ - dispatcher: noopDispatcher, - run: () => - core.channel.reply.dispatchReplyFromConfig({ - ctx: agentCtx, - cfg, + raw: ctx, + adapter: { + ingest: () => ({ + id: ctx.messageId, + timestamp: messageCreateTimeMs, + rawText: ctx.content, + textForAgent: agentCtx.BodyForAgent, + textForCommands: agentCtx.CommandBody, + raw: ctx, + }), + resolveTurn: () => ({ + channel: "feishu", + accountId: route.accountId, + routeSessionKey: agentSessionKey, + storePath: agentStorePath, + ctxPayload: agentCtx, + recordInboundSession: core.channel.session.recordInboundSession, + record: agentRecord, + runDispatch: () => + core.channel.reply.withReplyDispatcher({ dispatcher: noopDispatcher, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: agentCtx, + cfg, + dispatcher: noopDispatcher, + }), }), }), + }, }); } }; @@ -1445,49 +1475,66 @@ export async function handleFeishuMessage(params: { }); log(`feishu[${account.accountId}]: dispatching to agent (session=${route.sessionKey})`); - const { dispatchResult } = await core.channel.turn.runPrepared({ + const turnResult = await core.channel.turn.run({ channel: "feishu", accountId: route.accountId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - record: { - onRecordError: (err) => { - log( - `feishu[${account.accountId}]: failed to record inbound session ${route.sessionKey}: ${String(err)}`, - ); - }, - }, - onPreDispatchFailure: () => - core.channel.reply.settleReplyDispatcher({ - dispatcher, - onSettled: () => markDispatchIdle(), + raw: ctx, + adapter: { + ingest: () => ({ + id: ctx.messageId, + timestamp: messageCreateTimeMs, + rawText: ctx.content, + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: ctx, }), - runDispatch: () => - core.channel.reply.withReplyDispatcher({ - dispatcher, - onSettled: () => { - markDispatchIdle(); + resolveTurn: () => ({ + channel: "feishu", + accountId: route.accountId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + record: { + onRecordError: (err) => { + log( + `feishu[${account.accountId}]: failed to record inbound session ${route.sessionKey}: ${String(err)}`, + ); + }, }, - run: () => - core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, + history: { + isGroup, + historyKey, + historyMap: chatHistories, + limit: historyLimit, + }, + onPreDispatchFailure: () => + core.channel.reply.settleReplyDispatcher({ dispatcher, - replyOptions, + onSettled: () => markDispatchIdle(), + }), + runDispatch: () => + core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { + markDispatchIdle(); + }, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions, + }), }), }), + }, }); - const { queuedFinal, counts } = dispatchResult; - - if (isGroup && historyKey && chatHistories) { - clearHistoryEntriesIfEnabled({ - historyMap: chatHistories, - historyKey, - limit: historyLimit, - }); + if (!turnResult.dispatched) { + return; } + const { dispatchResult } = turnResult; + const { queuedFinal, counts } = dispatchResult; log( `feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`, diff --git a/extensions/feishu/src/comment-handler.test.ts b/extensions/feishu/src/comment-handler.test.ts index 8f0748fe38a..bfb20455c18 100644 --- a/extensions/feishu/src/comment-handler.test.ts +++ b/extensions/feishu/src/comment-handler.test.ts @@ -134,6 +134,26 @@ function createTestRuntime(overrides?: { recordInboundSession, }, turn: { + run: vi.fn(async (params: Parameters[0]) => { + const input = await params.adapter.ingest(params.raw); + if (!input) { + return { + admission: { kind: "drop" as const, reason: "ingest-null" }, + dispatched: false, + }; + } + const eventClass = { + kind: "message" as const, + canStartAgentTurn: true, + }; + const turn = await params.adapter.resolveTurn(input, eventClass, {}); + if (!("runDispatch" in turn)) { + throw new Error("feishu comment test runtime only supports prepared turns"); + } + return await runPrepared( + turn as Parameters[0], + ); + }) as unknown as PluginRuntime["channel"]["turn"]["run"], runPrepared: runPrepared as unknown as PluginRuntime["channel"]["turn"]["runPrepared"], }, pairing: { diff --git a/extensions/feishu/src/comment-handler.ts b/extensions/feishu/src/comment-handler.ts index ce14619f15b..df1a21ba035 100644 --- a/extensions/feishu/src/comment-handler.ts +++ b/extensions/feishu/src/comment-handler.ts @@ -241,42 +241,58 @@ export async function handleFeishuCommentEvent( `feishu[${account.accountId}]: dispatching drive comment to agent ` + `(session=${commentSessionKey} comment=${turn.commentId} type=${turn.noticeType})`, ); - const { dispatchResult } = await core.channel.turn.runPrepared({ + const turnResult = await core.channel.turn.run({ channel: "feishu", accountId: route.accountId, - routeSessionKey: commentSessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - record: { - onRecordError: (err) => { - error( - `feishu[${account.accountId}]: failed to record comment inbound session ${commentSessionKey}: ${String(err)}`, - ); - }, - }, - onPreDispatchFailure: async () => { - dispatchSettledBeforeStart = true; - await core.channel.reply.settleReplyDispatcher({ - dispatcher, - onSettled: () => { - markRunComplete(); - markDispatchIdle(); + raw: turn, + adapter: { + ingest: () => ({ + id: turn.messageId, + timestamp: parseTimestampMs(turn.timestamp), + rawText: ctxPayload.RawBody ?? "", + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: turn, + }), + resolveTurn: () => ({ + channel: "feishu", + accountId: route.accountId, + routeSessionKey: commentSessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + record: { + onRecordError: (err) => { + error( + `feishu[${account.accountId}]: failed to record comment inbound session ${commentSessionKey}: ${String(err)}`, + ); + }, }, - }); - }, - runDispatch: () => - core.channel.reply.withReplyDispatcher({ - dispatcher, - run: () => - core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg: effectiveCfg, + onPreDispatchFailure: async () => { + dispatchSettledBeforeStart = true; + await core.channel.reply.settleReplyDispatcher({ dispatcher, - replyOptions, + onSettled: () => { + markRunComplete(); + markDispatchIdle(); + }, + }); + }, + runDispatch: () => + core.channel.reply.withReplyDispatcher({ + dispatcher, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg: effectiveCfg, + dispatcher, + replyOptions, + }), }), }), + }, }); + const dispatchResult = turnResult.dispatched ? turnResult.dispatchResult : undefined; const queuedFinal = dispatchResult?.queuedFinal ?? false; const counts = dispatchResult?.counts ?? { tool: 0, block: 0, final: 0 }; log( diff --git a/extensions/imessage/src/monitor/monitor-provider.ts b/extensions/imessage/src/monitor/monitor-provider.ts index 50a56c7c100..c705a0a1bdb 100644 --- a/extensions/imessage/src/monitor/monitor-provider.ts +++ b/extensions/imessage/src/monitor/monitor-provider.ts @@ -12,7 +12,7 @@ import { } from "openclaw/plugin-sdk/conversation-runtime"; import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime"; import { normalizeScpRemoteHost } from "openclaw/plugin-sdk/host-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { runInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { isInboundPathAllowed, kindFromMime } from "openclaw/plugin-sdk/media-runtime"; import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "openclaw/plugin-sdk/reply-history"; import { resolveTextChunkLimit } from "openclaw/plugin-sdk/reply-runtime"; @@ -435,59 +435,74 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }, }); - await runPreparedInboundReplyTurn({ + await runInboundReplyTurn({ channel: "imessage", accountId: decision.route.accountId, - routeSessionKey: decision.route.sessionKey, - storePath, - ctxPayload, - recordInboundSession, - record: { - updateLastRoute: - !decision.isGroup && updateTarget - ? { - sessionKey: decision.route.mainSessionKey, - channel: "imessage", - to: updateTarget, - accountId: decision.route.accountId, - mainDmOwnerPin: - pinnedMainDmOwner && decision.senderNormalized - ? { - ownerRecipient: pinnedMainDmOwner, - senderRecipient: decision.senderNormalized, - onSkip: ({ ownerRecipient, senderRecipient }) => { - logVerbose( - `imessage: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`, - ); - }, - } - : undefined, - } - : undefined, - onRecordError: (err) => { - logVerbose(`imessage: failed updating session meta: ${String(err)}`); - }, - }, - history: { - isGroup: decision.isGroup, - historyKey: decision.historyKey, - historyMap: groupHistories, - limit: historyLimit, - }, - onPreDispatchFailure: () => settleReplyDispatcher({ dispatcher }), - runDispatch: () => - dispatchInboundMessage({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - disableBlockStreaming: - typeof accountInfo.config.blockStreaming === "boolean" - ? !accountInfo.config.blockStreaming - : undefined, - onModelSelected, - }, + raw: decision, + adapter: { + ingest: () => ({ + id: ctxPayload.MessageSid ?? `${ctxPayload.From}:${Date.now()}`, + timestamp: typeof ctxPayload.Timestamp === "number" ? ctxPayload.Timestamp : undefined, + rawText: ctxPayload.RawBody ?? "", + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: decision, }), + resolveTurn: () => ({ + channel: "imessage", + accountId: decision.route.accountId, + routeSessionKey: decision.route.sessionKey, + storePath, + ctxPayload, + recordInboundSession, + record: { + updateLastRoute: + !decision.isGroup && updateTarget + ? { + sessionKey: decision.route.mainSessionKey, + channel: "imessage", + to: updateTarget, + accountId: decision.route.accountId, + mainDmOwnerPin: + pinnedMainDmOwner && decision.senderNormalized + ? { + ownerRecipient: pinnedMainDmOwner, + senderRecipient: decision.senderNormalized, + onSkip: ({ ownerRecipient, senderRecipient }) => { + logVerbose( + `imessage: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`, + ); + }, + } + : undefined, + } + : undefined, + onRecordError: (err) => { + logVerbose(`imessage: failed updating session meta: ${String(err)}`); + }, + }, + history: { + isGroup: decision.isGroup, + historyKey: decision.historyKey, + historyMap: groupHistories, + limit: historyLimit, + }, + onPreDispatchFailure: () => settleReplyDispatcher({ dispatcher }), + runDispatch: () => + dispatchInboundMessage({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + disableBlockStreaming: + typeof accountInfo.config.blockStreaming === "boolean" + ? !accountInfo.config.blockStreaming + : undefined, + onModelSelected, + }, + }), + }), + }, }); } diff --git a/extensions/line/src/monitor.ts b/extensions/line/src/monitor.ts index 4894f21739e..5b957f700f5 100644 --- a/extensions/line/src/monitor.ts +++ b/extensions/line/src/monitor.ts @@ -231,7 +231,7 @@ export async function monitorLineProvider( }); const core = getLineRuntime(); - const { dispatchResult } = await core.channel.turn.run({ + const turnResult = await core.channel.turn.run({ channel: "line", accountId: route.accountId, raw: ctx, @@ -316,6 +316,7 @@ export async function monitorLineProvider( }), }, }); + const dispatchResult = turnResult.dispatched ? turnResult.dispatchResult : undefined; if (!hasFinalInboundReplyDispatch(dispatchResult)) { logVerbose(`line: no response generated for message from ${ctxPayload.From}`); } diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index e8f03fb62f7..4859873619a 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -142,6 +142,28 @@ export function createMatrixHandlerTestHarness( }; }, ); + const run = vi.fn( + async (params: Parameters[0]) => { + const input = await params.adapter.ingest(params.raw); + if (!input) { + return { admission: { kind: "drop" as const, reason: "ingest-null" }, dispatched: false }; + } + const eventClass = (await params.adapter.classify?.(input)) ?? { + kind: "message" as const, + canStartAgentTurn: true, + }; + const preflightResult = await params.adapter.preflight?.(input, eventClass); + const preflight = + preflightResult && "kind" in preflightResult + ? { admission: preflightResult } + : (preflightResult ?? {}); + const turn = await params.adapter.resolveTurn(input, eventClass, preflight); + if ("runDispatch" in turn) { + return await runPrepared(turn); + } + throw new Error("matrix test helper only supports prepared turn dispatch"); + }, + ); const dmPolicy = options.dmPolicy ?? "open"; const allowFrom = options.allowFrom ?? (dmPolicy === "open" ? ["*"] : []); const cfgForHandler = @@ -229,6 +251,7 @@ export function createMatrixHandlerTestHarness( }), }, turn: { + run, runPrepared, }, reactions: { diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 87c2ee6d9eb..170159b723c 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1829,106 +1829,127 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam onIdle: typingCallbacks.onIdle, }); - const { dispatchResult } = await core.channel.turn.runPrepared({ + const turnResult = await core.channel.turn.run({ channel: "matrix", accountId: _route.accountId, - routeSessionKey: _route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - record: { - updateLastRoute: isDirectMessage - ? { - sessionKey: _route.mainSessionKey, - channel: "matrix", - to: `room:${roomId}`, - accountId: _route.accountId, + raw: event, + adapter: { + ingest: () => ({ + id: _messageId, + rawText: bodyText, + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: event, + }), + resolveTurn: () => ({ + channel: "matrix", + accountId: _route.accountId, + routeSessionKey: _route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + record: { + updateLastRoute: isDirectMessage + ? { + sessionKey: _route.mainSessionKey, + channel: "matrix", + to: `room:${roomId}`, + accountId: _route.accountId, + } + : undefined, + onRecordError: (err) => { + logger.warn("failed updating session meta", { + error: String(err), + storePath, + sessionKey: ctxPayload.SessionKey ?? _route.sessionKey, + }); + }, + }, + onPreDispatchFailure: () => + core.channel.reply.settleReplyDispatcher({ + dispatcher, + onSettled: () => { + markRunComplete(); + markDispatchIdle(); + }, + }), + runDispatch: async () => { + if ( + sharedDmContextNotice && + markTrackedRoomIfFirst(sharedDmContextNoticeRooms, roomId) + ) { + client + .sendMessage(roomId, { + msgtype: "m.notice", + body: sharedDmContextNotice, + }) + .catch((err) => { + logVerboseMessage( + `matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`, + ); + }); } - : undefined, - onRecordError: (err) => { - logger.warn("failed updating session meta", { - error: String(err), - storePath, - sessionKey: ctxPayload.SessionKey ?? _route.sessionKey, - }); - }, - }, - onPreDispatchFailure: () => - core.channel.reply.settleReplyDispatcher({ - dispatcher, - onSettled: () => { - markRunComplete(); - markDispatchIdle(); + + return await core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { + markDispatchIdle(); + }, + run: async () => { + try { + return await core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + skillFilter: roomConfig?.skills, + // Keep block streaming enabled when explicitly requested, even + // with draft previews on. The draft remains the live preview + // for the current assistant block, while block deliveries + // finalize completed blocks into their own preserved events. + disableBlockStreaming: !blockStreamingEnabled, + onPartialReply: draftStream + ? (payload) => { + latestDraftFullText = payload.text ?? ""; + suppressPreviewToolProgressForAnswerText(latestDraftFullText); + updateDraftFromLatestFullText(); + } + : undefined, + onBlockReplyQueued: draftStream + ? (payload, context) => { + if (payload.isCompactionNotice === true) { + return; + } + queueDraftBlockBoundary(payload, context); + } + : undefined, + // Reset draft boundary bookkeeping on assistant message + // boundaries so post-tool blocks stream from a fresh + // cumulative payload (payload.text resets upstream). + onAssistantMessageStart: draftStream + ? () => { + resetDraftBlockOffsets(); + resetPreviewToolProgress(); + } + : undefined, + ...buildPreviewToolProgressReplyOptions(), + onModelSelected, + }, + }); + } finally { + markRunComplete(); + } + }, + }); }, }), - runDispatch: async () => { - if (sharedDmContextNotice && markTrackedRoomIfFirst(sharedDmContextNoticeRooms, roomId)) { - client - .sendMessage(roomId, { - msgtype: "m.notice", - body: sharedDmContextNotice, - }) - .catch((err) => { - logVerboseMessage( - `matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`, - ); - }); - } - - return await core.channel.reply.withReplyDispatcher({ - dispatcher, - onSettled: () => { - markDispatchIdle(); - }, - run: async () => { - try { - return await core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - skillFilter: roomConfig?.skills, - // Keep block streaming enabled when explicitly requested, even - // with draft previews on. The draft remains the live preview - // for the current assistant block, while block deliveries - // finalize completed blocks into their own preserved events. - disableBlockStreaming: !blockStreamingEnabled, - onPartialReply: draftStream - ? (payload) => { - latestDraftFullText = payload.text ?? ""; - suppressPreviewToolProgressForAnswerText(latestDraftFullText); - updateDraftFromLatestFullText(); - } - : undefined, - onBlockReplyQueued: draftStream - ? (payload, context) => { - if (payload.isCompactionNotice === true) { - return; - } - queueDraftBlockBoundary(payload, context); - } - : undefined, - // Reset draft boundary bookkeeping on assistant message - // boundaries so post-tool blocks stream from a fresh - // cumulative payload (payload.text resets upstream). - onAssistantMessageStart: draftStream - ? () => { - resetDraftBlockOffsets(); - resetPreviewToolProgress(); - } - : undefined, - ...buildPreviewToolProgressReplyOptions(), - onModelSelected, - }, - }); - } finally { - markRunComplete(); - } - }, - }); }, }); + if (!turnResult.dispatched) { + return; + } + const { dispatchResult } = turnResult; const { queuedFinal, counts } = dispatchResult; if (finalReplyDeliveryFailed) { if (retryableReplyDeliveryFailed) { diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index d4ca3f5118f..adba19588ae 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -69,7 +69,6 @@ import { buildAgentMediaPayload, buildModelsProviderData, buildPendingHistoryContextFromMap, - clearHistoryEntriesIfEnabled, createChannelPairingController, createChannelReplyPipeline, DEFAULT_GROUP_HISTORY_LIMIT, @@ -1721,74 +1720,95 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} let dispatchSettledBeforeStart = false; try { - await core.channel.turn.runPrepared({ + await core.channel.turn.run({ channel: "mattermost", accountId: route.accountId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - record: { - updateLastRoute: - kind === "direct" - ? { - sessionKey: route.mainSessionKey, - channel: "mattermost", - to, - accountId: route.accountId, - } - : undefined, - onRecordError: (err) => { - logVerboseMessage( - `mattermost: failed updating session meta id=${post.id ?? "unknown"}: ${String(err)}`, - ); - }, - }, - onPreDispatchFailure: async () => { - dispatchSettledBeforeStart = true; - await core.channel.reply.settleReplyDispatcher({ - dispatcher, - onSettled: () => { - markRunComplete(); - markDispatchIdle(); - }, - }); - }, - runDispatch: () => - core.channel.reply.withReplyDispatcher({ - dispatcher, - onSettled: () => { - markDispatchIdle(); - }, - run: () => - core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - disableBlockStreaming: true, - onModelSelected, - onPartialReply: (payload) => { - updateDraftFromPartial(payload.text); - }, - onAssistantMessageStart: () => { - lastPartialText = ""; - }, - onReasoningEnd: () => { - lastPartialText = ""; - }, - onReasoningStream: async () => { - if (!lastPartialText) { - draftStream.update("Thinking…"); + raw: post, + adapter: { + ingest: () => ({ + id: post.id ?? `${to}:${Date.now()}`, + timestamp: post.create_at ?? undefined, + rawText, + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: post, + }), + resolveTurn: () => ({ + channel: "mattermost", + accountId: route.accountId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: core.channel.session.recordInboundSession, + record: { + updateLastRoute: + kind === "direct" + ? { + sessionKey: route.mainSessionKey, + channel: "mattermost", + to, + accountId: route.accountId, } - }, - onToolStart: async (payload) => { - draftStream.update(buildMattermostToolStatusText(payload)); - }, + : undefined, + onRecordError: (err) => { + logVerboseMessage( + `mattermost: failed updating session meta id=${post.id ?? "unknown"}: ${String(err)}`, + ); + }, + }, + history: { + isGroup: Boolean(historyKey), + historyKey: historyKey ?? undefined, + historyMap: channelHistories, + limit: historyLimit, + }, + onPreDispatchFailure: async () => { + dispatchSettledBeforeStart = true; + await core.channel.reply.settleReplyDispatcher({ + dispatcher, + onSettled: () => { + markRunComplete(); + markDispatchIdle(); }, + }); + }, + runDispatch: () => + core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { + markDispatchIdle(); + }, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + disableBlockStreaming: true, + onModelSelected, + onPartialReply: (payload) => { + updateDraftFromPartial(payload.text); + }, + onAssistantMessageStart: () => { + lastPartialText = ""; + }, + onReasoningEnd: () => { + lastPartialText = ""; + }, + onReasoningStream: async () => { + if (!lastPartialText) { + draftStream.update("Thinking…"); + } + }, + onToolStart: async (payload) => { + draftStream.update(buildMattermostToolStatusText(payload)); + }, + }, + }), }), }), + }, }); } finally { try { @@ -1800,13 +1820,6 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} markRunComplete(); } } - if (historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: channelHistories, - historyKey, - limit: historyLimit, - }); - } }, }); if (replayResult === "duplicate") { diff --git a/extensions/msteams/src/monitor-handler.test-helpers.ts b/extensions/msteams/src/monitor-handler.test-helpers.ts index 2787f4f7909..11a87ef226c 100644 --- a/extensions/msteams/src/monitor-handler.test-helpers.ts +++ b/extensions/msteams/src/monitor-handler.test-helpers.ts @@ -40,6 +40,26 @@ export function installMSTeamsTestRuntime(options: MSTeamsTestRuntimeOptions = { }; }, ); + const run = vi.fn(async (params: Parameters[0]) => { + const input = await params.adapter.ingest(params.raw); + if (!input) { + return { admission: { kind: "drop" as const, reason: "ingest-null" }, dispatched: false }; + } + const eventClass = (await params.adapter.classify?.(input)) ?? { + kind: "message" as const, + canStartAgentTurn: true, + }; + const preflightResult = await params.adapter.preflight?.(input, eventClass); + const preflight = + preflightResult && "kind" in preflightResult + ? { admission: preflightResult } + : (preflightResult ?? {}); + const turn = await params.adapter.resolveTurn(input, eventClass, preflight); + if ("runDispatch" in turn) { + return await runPrepared(turn); + } + throw new Error("msteams test runtime only supports prepared turn dispatch"); + }); setMSTeamsRuntime({ logging: { shouldLogVerbose: () => false }, system: { enqueueSystemEvent: options.enqueueSystemEvent ?? vi.fn() }, @@ -90,6 +110,7 @@ export function installMSTeamsTestRuntime(options: MSTeamsTestRuntimeOptions = { ...(options.resolveStorePath ? { resolveStorePath: options.resolveStorePath } : {}), }, turn: { + run: run as unknown as PluginRuntime["channel"]["turn"]["run"], runPrepared: runPrepared as unknown as PluginRuntime["channel"]["turn"]["runPrepared"], }, }, diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index cc7f276ee82..f9488f48eba 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -18,7 +18,6 @@ import { } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { buildPendingHistoryContextFromMap, - clearHistoryEntriesIfEnabled, DEFAULT_GROUP_HISTORY_LIMIT, recordPendingHistoryEntryIfEnabled, type HistoryEntry, @@ -840,33 +839,57 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { log.info("dispatching to agent", { sessionKey: route.sessionKey }); try { - const { dispatchResult } = await core.channel.turn.runPrepared({ + const turnResult = await core.channel.turn.run({ channel: "msteams", accountId: route.accountId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: core.channel.session.recordInboundSession, - record: { - onRecordError: (err) => { - logVerboseMessage(`msteams: failed updating session meta: ${formatUnknownError(err)}`); - }, - }, - onPreDispatchFailure: () => - core.channel.reply.settleReplyDispatcher({ - dispatcher, - onSettled: () => markDispatchIdle(), + raw: context, + adapter: { + ingest: () => ({ + id: activity.id ?? `${teamsFrom}:${Date.now()}`, + timestamp: timestamp?.getTime(), + rawText: rawBody, + textForAgent: bodyForAgent, + textForCommands: commandBody, + raw: activity, }), - runDispatch: () => - dispatchReplyFromConfigWithSettledDispatcher({ - cfg, + resolveTurn: () => ({ + channel: "msteams", + accountId: route.accountId, + routeSessionKey: route.sessionKey, + storePath, ctxPayload, - dispatcher, - onSettled: () => markDispatchIdle(), - replyOptions, - configOverride, + recordInboundSession: core.channel.session.recordInboundSession, + record: { + onRecordError: (err) => { + logVerboseMessage( + `msteams: failed updating session meta: ${formatUnknownError(err)}`, + ); + }, + }, + history: { + isGroup: isRoomish, + historyKey, + historyMap: conversationHistories, + limit: historyLimit, + }, + onPreDispatchFailure: () => + core.channel.reply.settleReplyDispatcher({ + dispatcher, + onSettled: () => markDispatchIdle(), + }), + runDispatch: () => + dispatchReplyFromConfigWithSettledDispatcher({ + cfg, + ctxPayload, + dispatcher, + onSettled: () => markDispatchIdle(), + replyOptions, + configOverride, + }), }), + }, }); + const dispatchResult = turnResult.dispatched ? turnResult.dispatchResult : undefined; const queuedFinal = dispatchResult?.queuedFinal ?? false; const counts = resolveInboundReplyDispatchCounts(dispatchResult); const hasFinalResponse = hasFinalInboundReplyDispatch(dispatchResult); @@ -874,26 +897,12 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { log.info("dispatch complete", { queuedFinal, counts }); if (!hasFinalResponse) { - if (isRoomish && historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: conversationHistories, - historyKey, - limit: historyLimit, - }); - } return; } const finalCount = counts.final; logVerboseMessage( `msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`, ); - if (isRoomish && historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: conversationHistories, - historyKey, - limit: historyLimit, - }); - } } catch (err) { log.error("dispatch failed", { error: formatUnknownError(err) }); runtime.error?.(`msteams dispatch failed: ${formatUnknownError(err)}`); diff --git a/extensions/qqbot/src/engine/gateway/inbound-pipeline.self-echo.test.ts b/extensions/qqbot/src/engine/gateway/inbound-pipeline.self-echo.test.ts index 29f884e7579..dfb8d38443a 100644 --- a/extensions/qqbot/src/engine/gateway/inbound-pipeline.self-echo.test.ts +++ b/extensions/qqbot/src/engine/gateway/inbound-pipeline.self-echo.test.ts @@ -69,9 +69,24 @@ function makeRuntime(): GatewayPluginRuntime { recordInboundSession: vi.fn(async () => undefined), }, turn: { - runPrepared: vi.fn(async (rawParams: unknown) => { - const params = rawParams as { runDispatch: () => Promise }; - return { dispatchResult: await params.runDispatch() }; + run: vi.fn(async (rawParams: unknown) => { + const params = rawParams as { + raw: unknown; + adapter: { + ingest: (raw: unknown) => unknown; + resolveTurn: (...args: unknown[]) => unknown; + }; + }; + const input = await params.adapter.ingest(params.raw); + const turn = (await params.adapter.resolveTurn( + input, + { + kind: "message", + canStartAgentTurn: true, + }, + {}, + )) as { runDispatch: () => Promise }; + return { dispatchResult: await turn.runDispatch() }; }), }, text: { diff --git a/extensions/qqbot/src/engine/gateway/outbound-dispatch.test.ts b/extensions/qqbot/src/engine/gateway/outbound-dispatch.test.ts index 2af25134627..b7d85c29501 100644 --- a/extensions/qqbot/src/engine/gateway/outbound-dispatch.test.ts +++ b/extensions/qqbot/src/engine/gateway/outbound-dispatch.test.ts @@ -141,9 +141,24 @@ function makeRuntime(params: { recordInboundSession: vi.fn(async () => undefined), }, turn: { - runPrepared: vi.fn(async (rawParams: unknown) => { - const params = rawParams as { runDispatch: () => Promise }; - return { dispatchResult: await params.runDispatch() }; + run: vi.fn(async (rawParams: unknown) => { + const params = rawParams as { + raw: unknown; + adapter: { + ingest: (raw: unknown) => unknown; + resolveTurn: (...args: unknown[]) => unknown; + }; + }; + const input = await params.adapter.ingest(params.raw); + const turn = (await params.adapter.resolveTurn( + input, + { + kind: "message", + canStartAgentTurn: true, + }, + {}, + )) as { runDispatch: () => Promise }; + return { dispatchResult: await turn.runDispatch() }; }), }, text: { diff --git a/extensions/qqbot/src/engine/gateway/outbound-dispatch.ts b/extensions/qqbot/src/engine/gateway/outbound-dispatch.ts index f07f5711eed..51e225a4743 100644 --- a/extensions/qqbot/src/engine/gateway/outbound-dispatch.ts +++ b/extensions/qqbot/src/engine/gateway/outbound-dispatch.ts @@ -10,6 +10,7 @@ * Separated from gateway.ts for testability and to keep handleMessage thin. */ +import type { FinalizedMsgContext } from "openclaw/plugin-sdk/reply-runtime"; import { parseAndSendMediaTags, sendPlainReply, @@ -224,238 +225,256 @@ export async function dispatchOutbound( const storePath = runtime.channel.session.resolveStorePath(cfgWithSession.session?.store, { agentId, }); - const dispatchPromise = runtime.channel.turn.runPrepared({ + const dispatchPromise = runtime.channel.turn.run({ channel: "qqbot", accountId: inbound.route.accountId, - routeSessionKey: inbound.route.sessionKey, - storePath, - ctxPayload, - recordInboundSession: runtime.channel.session.recordInboundSession, - record: { - onRecordError: (err: unknown) => { - log?.error( - `Session metadata update failed: ${err instanceof Error ? err.message : String(err)}`, - ); - }, - }, - runDispatch: () => - runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ - ctx: ctxPayload, - cfg, - dispatcherOptions: { - responsePrefix: messagesConfig.responsePrefix, - deliver: async (payload: ReplyDeliverPayload, info: { kind: string }) => { - hasResponse = true; + raw: inbound, + adapter: { + ingest: () => ({ + id: ctxPayload.MessageSid ?? `${ctxPayload.From}:${Date.now()}`, + rawText: ctxPayload.RawBody ?? "", + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: inbound, + }), + resolveTurn: () => ({ + channel: "qqbot", + accountId: inbound.route.accountId, + routeSessionKey: inbound.route.sessionKey, + storePath, + ctxPayload, + recordInboundSession: runtime.channel.session.recordInboundSession, + record: { + onRecordError: (err: unknown) => { + log?.error( + `Session metadata update failed: ${err instanceof Error ? err.message : String(err)}`, + ); + }, + }, + runDispatch: () => + runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg, + dispatcherOptions: { + responsePrefix: messagesConfig.responsePrefix, + deliver: async (payload: ReplyDeliverPayload, info: { kind: string }) => { + hasResponse = true; - // ---- Tool deliver ---- - if (info.kind === "tool") { - toolDeliverCount++; - const toolText = (payload.text ?? "").trim(); - if (toolText) { - toolTexts.push(toolText); - } - if (payload.mediaUrls?.length) { - toolMediaUrls.push(...payload.mediaUrls); - } - if (payload.mediaUrl && !toolMediaUrls.includes(payload.mediaUrl)) { - toolMediaUrls.push(payload.mediaUrl); - } + // ---- Tool deliver ---- + if (info.kind === "tool") { + toolDeliverCount++; + const toolText = (payload.text ?? "").trim(); + if (toolText) { + toolTexts.push(toolText); + } + if (payload.mediaUrls?.length) { + toolMediaUrls.push(...payload.mediaUrls); + } + if (payload.mediaUrl && !toolMediaUrls.includes(payload.mediaUrl)) { + toolMediaUrls.push(payload.mediaUrl); + } - if (hasBlockResponse && toolMediaUrls.length > 0) { - const urlsToSend = [...toolMediaUrls]; - toolMediaUrls.length = 0; - for (const mediaUrl of urlsToSend) { - try { - await sendMedia({ - to: qualifiedTarget, - text: "", - mediaUrl, - accountId: account.accountId, - replyToId: event.messageId, - account, - }); - } catch {} - } - return; - } - if (toolFallbackSent) { - return; - } - if (toolOnlyTimeoutId) { - if (toolRenewalCount < MAX_TOOL_RENEWALS) { - clearTimeout(toolOnlyTimeoutId); - toolRenewalCount++; - } else { + if (hasBlockResponse && toolMediaUrls.length > 0) { + const urlsToSend = [...toolMediaUrls]; + toolMediaUrls.length = 0; + for (const mediaUrl of urlsToSend) { + try { + await sendMedia({ + to: qualifiedTarget, + text: "", + mediaUrl, + accountId: account.accountId, + replyToId: event.messageId, + account, + }); + } catch {} + } + return; + } + if (toolFallbackSent) { + return; + } + if (toolOnlyTimeoutId) { + if (toolRenewalCount < MAX_TOOL_RENEWALS) { + clearTimeout(toolOnlyTimeoutId); + toolRenewalCount++; + } else { + return; + } + } + toolOnlyTimeoutId = setTimeout(async () => { + if (!hasBlockResponse && !toolFallbackSent) { + toolFallbackSent = true; + try { + await sendToolFallback(); + } catch {} + } + }, TOOL_ONLY_TIMEOUT); return; } - } - toolOnlyTimeoutId = setTimeout(async () => { - if (!hasBlockResponse && !toolFallbackSent) { - toolFallbackSent = true; - try { - await sendToolFallback(); - } catch {} + + // ---- Block deliver ---- + hasBlockResponse = true; + inbound.typing.keepAlive?.stop(); + if (timeoutId) { + clearTimeout(timeoutId); + timeoutId = null; } - }, TOOL_ONLY_TIMEOUT); - return; - } - - // ---- Block deliver ---- - hasBlockResponse = true; - inbound.typing.keepAlive?.stop(); - if (timeoutId) { - clearTimeout(timeoutId); - timeoutId = null; - } - if (toolOnlyTimeoutId) { - clearTimeout(toolOnlyTimeoutId); - toolOnlyTimeoutId = null; - } - - if (streamingController && !streamingController.isTerminalPhase) { - try { - await streamingController.onDeliver(payload); - } catch (err) { - log?.error( - `Streaming deliver error: ${err instanceof Error ? err.message : String(err)}`, - ); - } - - const replyPreview = (payload.text ?? "").trim(); - if ( - event.type === "group" && - (replyPreview === "NO_REPLY" || replyPreview === "[SKIP]") - ) { - log?.info( - `Model decided to skip group message (${replyPreview}) from ${event.senderId}`, - ); - return; - } - - if (streamingController.shouldFallbackToStatic) { - log?.info("Streaming API unavailable, falling back to static for this deliver"); - } else { - recordOutbound(); - return; - } - } - - const quoteRef = event.msgIdx; - let quoteRefUsed = false; - const consumeQuoteRef = (): string | undefined => { - if (quoteRef && !quoteRefUsed) { - quoteRefUsed = true; - return quoteRef; - } - return undefined; - }; - - let replyText = payload.text ?? ""; - const deliverEvent = { - type: event.type, - senderId: event.senderId, - messageId: event.messageId, - channelId: event.channelId, - groupOpenid: event.groupOpenid, - msgIdx: event.msgIdx, - }; - const deliverActx = { account, qualifiedTarget, log }; - - // 1. Media tags - const mediaResult = await parseAndSendMediaTags( - replyText, - deliverEvent, - deliverActx, - sendWithRetry, - consumeQuoteRef, - deliverDeps, - ); - if (mediaResult.handled) { - recordOutbound(); - return; - } - replyText = mediaResult.normalizedText; - - // 2. Structured payload (QQBOT_PAYLOAD:) - const handled = await handleStructuredPayload( - replyCtx, - replyText, - recordOutbound, - replyDeps, - ); - if (handled) { - return; - } - - // 3. Voice-intent plain text - if (payload.audioAsVoice === true && !payload.mediaUrl && !payload.mediaUrls?.length) { - const sentVoice = await sendTextAsVoiceReply(replyCtx, replyText, replyDeps); - if (sentVoice) { - recordOutbound(); - return; - } - } - - // 4. Plain text + images/media - await sendPlainReply( - payload, - replyText, - deliverEvent, - deliverActx, - sendWithRetry, - consumeQuoteRef, - toolMediaUrls, - deliverDeps, - ); - recordOutbound(); - }, - onError: async (err: unknown) => { - if (streamingController && !streamingController.isTerminalPhase) { - try { - await streamingController.onError(err); - } catch (streamErr) { - const streamErrMsg = - streamErr instanceof Error ? streamErr.message : String(streamErr); - log?.error(`Streaming onError failed: ${streamErrMsg}`); - } - if (!streamingController.shouldFallbackToStatic) { - return; - } - } - const errMsg = err instanceof Error ? err.message : String(err); - log?.error(`Dispatch error: ${errMsg}`); - hasResponse = true; - if (timeoutId) { - clearTimeout(timeoutId); - timeoutId = null; - } - }, - }, - replyOptions: { - disableBlockStreaming: useOfficialC2cStream - ? true - : (() => { - const s = account.config?.streaming; - if (s === false) { - return true; + if (toolOnlyTimeoutId) { + clearTimeout(toolOnlyTimeoutId); + toolOnlyTimeoutId = null; } - return typeof s === "object" && s !== null && s.mode === "off"; - })(), - ...(streamingController - ? { - onPartialReply: async (payload: { text?: string }) => { + + if (streamingController && !streamingController.isTerminalPhase) { try { - await streamingController.onPartialReply(payload); - } catch (partialErr) { + await streamingController.onDeliver(payload); + } catch (err) { log?.error( - `Streaming onPartialReply error: ${partialErr instanceof Error ? partialErr.message : String(partialErr)}`, + `Streaming deliver error: ${err instanceof Error ? err.message : String(err)}`, ); } - }, - } - : {}), - }, + + const replyPreview = (payload.text ?? "").trim(); + if ( + event.type === "group" && + (replyPreview === "NO_REPLY" || replyPreview === "[SKIP]") + ) { + log?.info( + `Model decided to skip group message (${replyPreview}) from ${event.senderId}`, + ); + return; + } + + if (streamingController.shouldFallbackToStatic) { + log?.info("Streaming API unavailable, falling back to static for this deliver"); + } else { + recordOutbound(); + return; + } + } + + const quoteRef = event.msgIdx; + let quoteRefUsed = false; + const consumeQuoteRef = (): string | undefined => { + if (quoteRef && !quoteRefUsed) { + quoteRefUsed = true; + return quoteRef; + } + return undefined; + }; + + let replyText = payload.text ?? ""; + const deliverEvent = { + type: event.type, + senderId: event.senderId, + messageId: event.messageId, + channelId: event.channelId, + groupOpenid: event.groupOpenid, + msgIdx: event.msgIdx, + }; + const deliverActx = { account, qualifiedTarget, log }; + + // 1. Media tags + const mediaResult = await parseAndSendMediaTags( + replyText, + deliverEvent, + deliverActx, + sendWithRetry, + consumeQuoteRef, + deliverDeps, + ); + if (mediaResult.handled) { + recordOutbound(); + return; + } + replyText = mediaResult.normalizedText; + + // 2. Structured payload (QQBOT_PAYLOAD:) + const handled = await handleStructuredPayload( + replyCtx, + replyText, + recordOutbound, + replyDeps, + ); + if (handled) { + return; + } + + // 3. Voice-intent plain text + if ( + payload.audioAsVoice === true && + !payload.mediaUrl && + !payload.mediaUrls?.length + ) { + const sentVoice = await sendTextAsVoiceReply(replyCtx, replyText, replyDeps); + if (sentVoice) { + recordOutbound(); + return; + } + } + + // 4. Plain text + images/media + await sendPlainReply( + payload, + replyText, + deliverEvent, + deliverActx, + sendWithRetry, + consumeQuoteRef, + toolMediaUrls, + deliverDeps, + ); + recordOutbound(); + }, + onError: async (err: unknown) => { + if (streamingController && !streamingController.isTerminalPhase) { + try { + await streamingController.onError(err); + } catch (streamErr) { + const streamErrMsg = + streamErr instanceof Error ? streamErr.message : String(streamErr); + log?.error(`Streaming onError failed: ${streamErrMsg}`); + } + if (!streamingController.shouldFallbackToStatic) { + return; + } + } + const errMsg = err instanceof Error ? err.message : String(err); + log?.error(`Dispatch error: ${errMsg}`); + hasResponse = true; + if (timeoutId) { + clearTimeout(timeoutId); + timeoutId = null; + } + }, + }, + replyOptions: { + disableBlockStreaming: useOfficialC2cStream + ? true + : (() => { + const s = account.config?.streaming; + if (s === false) { + return true; + } + return typeof s === "object" && s !== null && s.mode === "off"; + })(), + ...(streamingController + ? { + onPartialReply: async (payload: { text?: string }) => { + try { + await streamingController.onPartialReply(payload); + } catch (partialErr) { + log?.error( + `Streaming onPartialReply error: ${partialErr instanceof Error ? partialErr.message : String(partialErr)}`, + ); + } + }, + } + : {}), + }, + }), }), + }, }); try { @@ -493,7 +512,10 @@ export async function dispatchOutbound( // ============ ctxPayload builder ============ -function buildCtxPayload(inbound: InboundContext, runtime: GatewayPluginRuntime): unknown { +function buildCtxPayload( + inbound: InboundContext, + runtime: GatewayPluginRuntime, +): FinalizedMsgContext { const { event } = inbound; return runtime.channel.reply.finalizeInboundContext({ Body: inbound.body, @@ -549,5 +571,5 @@ function buildCtxPayload(inbound: InboundContext, runtime: GatewayPluginRuntime) ReplyToIsQuote: inbound.replyTo.isQuote, } : {}), - }); + }) as FinalizedMsgContext; } diff --git a/extensions/qqbot/src/engine/gateway/types.ts b/extensions/qqbot/src/engine/gateway/types.ts index 1ce28d84946..ec6ea48456d 100644 --- a/extensions/qqbot/src/engine/gateway/types.ts +++ b/extensions/qqbot/src/engine/gateway/types.ts @@ -57,7 +57,7 @@ export interface GatewayPluginRuntime { recordInboundSession: (params: unknown) => Promise; }; turn: { - runPrepared: (params: unknown) => Promise; + run: (params: unknown) => Promise; }; text: { chunkMarkdownText: (text: string, limit: number) => string[]; diff --git a/extensions/signal/src/monitor/event-handler.ts b/extensions/signal/src/monitor/event-handler.ts index a81b550b6cc..d87764630a1 100644 --- a/extensions/signal/src/monitor/event-handler.ts +++ b/extensions/signal/src/monitor/event-handler.ts @@ -25,7 +25,7 @@ import { toInternalMessageReceivedContext, triggerInternalHook, } from "openclaw/plugin-sdk/hook-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { runInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { kindFromMime } from "openclaw/plugin-sdk/media-runtime"; import { buildPendingHistoryContextFromMap, @@ -288,72 +288,85 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { }, }); - await runPreparedInboundReplyTurn({ + await runInboundReplyTurn({ channel: "signal", accountId: route.accountId, - routeSessionKey: route.sessionKey, - storePath, - ctxPayload, - recordInboundSession, - record: { - updateLastRoute: !entry.isGroup - ? { - sessionKey: route.mainSessionKey, - channel: "signal", - to: entry.senderRecipient, - accountId: route.accountId, - mainDmOwnerPin: (() => { - const pinnedOwner = resolvePinnedMainDmOwnerFromAllowlist({ - dmScope: deps.cfg.session?.dmScope, - allowFrom: deps.allowFrom, - normalizeEntry: normalizeSignalAllowRecipient, - }); - if (!pinnedOwner) { - return undefined; - } - return { - ownerRecipient: pinnedOwner, - senderRecipient: entry.senderRecipient, - onSkip: ({ ownerRecipient, senderRecipient }) => { - logVerbose( - `signal: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`, - ); - }, - }; - })(), - } - : undefined, - onRecordError: (err) => { - logVerbose(`signal: failed updating session meta: ${String(err)}`); - }, - }, - history: { - isGroup: entry.isGroup, - historyKey, - historyMap: deps.groupHistories, - limit: deps.historyLimit, - }, - onPreDispatchFailure: () => - settleReplyDispatcher({ - dispatcher, - onSettled: () => markDispatchIdle(), + raw: entry, + adapter: { + ingest: () => ({ + id: entry.messageId ?? `${entry.timestamp ?? Date.now()}`, + timestamp: entry.timestamp, + rawText: entry.bodyText, + raw: entry, }), - runDispatch: async () => { - try { - return await dispatchInboundMessage({ - ctx: ctxPayload, - cfg: deps.cfg, - dispatcher, - replyOptions: { - ...replyOptions, - disableBlockStreaming: - typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined, - onModelSelected, + resolveTurn: () => ({ + channel: "signal", + accountId: route.accountId, + routeSessionKey: route.sessionKey, + storePath, + ctxPayload, + recordInboundSession, + record: { + updateLastRoute: !entry.isGroup + ? { + sessionKey: route.mainSessionKey, + channel: "signal", + to: entry.senderRecipient, + accountId: route.accountId, + mainDmOwnerPin: (() => { + const pinnedOwner = resolvePinnedMainDmOwnerFromAllowlist({ + dmScope: deps.cfg.session?.dmScope, + allowFrom: deps.allowFrom, + normalizeEntry: normalizeSignalAllowRecipient, + }); + if (!pinnedOwner) { + return undefined; + } + return { + ownerRecipient: pinnedOwner, + senderRecipient: entry.senderRecipient, + onSkip: ({ ownerRecipient, senderRecipient }) => { + logVerbose( + `signal: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`, + ); + }, + }; + })(), + } + : undefined, + onRecordError: (err) => { + logVerbose(`signal: failed updating session meta: ${String(err)}`); }, - }); - } finally { - markDispatchIdle(); - } + }, + history: { + isGroup: entry.isGroup, + historyKey, + historyMap: deps.groupHistories, + limit: deps.historyLimit, + }, + onPreDispatchFailure: () => + settleReplyDispatcher({ + dispatcher, + onSettled: () => markDispatchIdle(), + }), + runDispatch: async () => { + try { + return await dispatchInboundMessage({ + ctx: ctxPayload, + cfg: deps.cfg, + dispatcher, + replyOptions: { + ...replyOptions, + disableBlockStreaming: + typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined, + onModelSelected, + }, + }); + } finally { + markDispatchIdle(); + } + }, + }), }, }); } diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index c70624a6c4a..ceab70cd4da 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -19,8 +19,9 @@ import { } from "openclaw/plugin-sdk/channel-streaming"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { + type ChannelTurnRecordOptions, hasVisibleInboundReplyDispatch, - runPreparedInboundReplyTurn, + runInboundReplyTurn, } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime"; import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history"; @@ -987,93 +988,111 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag let counts: { final?: number; block?: number } = {}; let dispatchSettledBeforeStart = false; try { - const { dispatchResult } = await runPreparedInboundReplyTurn({ + const turnResult = await runInboundReplyTurn({ channel: "slack", accountId: route.accountId, - routeSessionKey: route.sessionKey, - storePath: prepared.turn.storePath, - ctxPayload: prepared.ctxPayload, - recordInboundSession, - record: prepared.turn.record as Parameters[0]["record"], - onPreDispatchFailure: async () => { - dispatchSettledBeforeStart = true; - await settleReplyDispatcher({ - dispatcher, - onSettled: () => markDispatchIdle(), - }); - }, - runDispatch: () => - dispatchInboundMessage({ - ctx: prepared.ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - skillFilter: prepared.channelConfig?.skills, - sourceReplyDeliveryMode, - hasRepliedRef, - disableBlockStreaming, - onModelSelected, - suppressDefaultToolProgressMessages: previewToolProgressEnabled ? true : undefined, - onPartialReply: useStreaming - ? undefined - : !previewStreamingEnabled - ? undefined - : async (payload) => { - updateDraftFromPartial(payload.text); - }, - onAssistantMessageStart: onDraftBoundary, - onReasoningEnd: onDraftBoundary, - onReasoningStream: statusReactionsEnabled - ? async () => { - await statusReactions.setThinking(); - } - : undefined, - onToolStart: async (payload) => { - if (statusReactionsEnabled) { - await statusReactions.setTool(payload.name); - } - pushPreviewToolProgress(payload.name ? `tool: ${payload.name}` : "tool running"); - }, - onItemEvent: async (payload) => { - pushPreviewToolProgress( - payload.progressText ?? payload.summary ?? payload.title ?? payload.name, - ); - }, - onPlanUpdate: async (payload) => { - if (payload.phase !== "update") { - return; - } - pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning"); - }, - onApprovalEvent: async (payload) => { - if (payload.phase !== "requested") { - return; - } - pushPreviewToolProgress( - payload.command ? `approval: ${payload.command}` : "approval requested", - ); - }, - onCommandOutput: async (payload) => { - if (payload.phase !== "end") { - return; - } - pushPreviewToolProgress( - payload.name - ? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}` - : payload.title, - ); - }, - onPatchSummary: async (payload) => { - if (payload.phase !== "end") { - return; - } - pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied"); - }, - }, + raw: prepared.message, + adapter: { + ingest: () => ({ + id: prepared.message.ts ?? `${prepared.ctxPayload.From}:${Date.now()}`, + timestamp: prepared.message.ts ? Number(prepared.message.ts) * 1000 : undefined, + rawText: prepared.ctxPayload.RawBody ?? "", + textForAgent: prepared.ctxPayload.BodyForAgent, + textForCommands: prepared.ctxPayload.CommandBody, + raw: prepared.message, }), + resolveTurn: () => ({ + channel: "slack", + accountId: route.accountId, + routeSessionKey: route.sessionKey, + storePath: prepared.turn.storePath, + ctxPayload: prepared.ctxPayload, + recordInboundSession, + record: prepared.turn.record as ChannelTurnRecordOptions, + onPreDispatchFailure: async () => { + dispatchSettledBeforeStart = true; + await settleReplyDispatcher({ + dispatcher, + onSettled: () => markDispatchIdle(), + }); + }, + runDispatch: () => + dispatchInboundMessage({ + ctx: prepared.ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + skillFilter: prepared.channelConfig?.skills, + sourceReplyDeliveryMode, + hasRepliedRef, + disableBlockStreaming, + onModelSelected, + suppressDefaultToolProgressMessages: previewToolProgressEnabled ? true : undefined, + onPartialReply: useStreaming + ? undefined + : !previewStreamingEnabled + ? undefined + : async (payload) => { + updateDraftFromPartial(payload.text); + }, + onAssistantMessageStart: onDraftBoundary, + onReasoningEnd: onDraftBoundary, + onReasoningStream: statusReactionsEnabled + ? async () => { + await statusReactions.setThinking(); + } + : undefined, + onToolStart: async (payload) => { + if (statusReactionsEnabled) { + await statusReactions.setTool(payload.name); + } + pushPreviewToolProgress(payload.name ? `tool: ${payload.name}` : "tool running"); + }, + onItemEvent: async (payload) => { + pushPreviewToolProgress( + payload.progressText ?? payload.summary ?? payload.title ?? payload.name, + ); + }, + onPlanUpdate: async (payload) => { + if (payload.phase !== "update") { + return; + } + pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning"); + }, + onApprovalEvent: async (payload) => { + if (payload.phase !== "requested") { + return; + } + pushPreviewToolProgress( + payload.command ? `approval: ${payload.command}` : "approval requested", + ); + }, + onCommandOutput: async (payload) => { + if (payload.phase !== "end") { + return; + } + pushPreviewToolProgress( + payload.name + ? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}` + : payload.title, + ); + }, + onPatchSummary: async (payload) => { + if (payload.phase !== "end") { + return; + } + pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied"); + }, + }, + }), + }), + }, }); - const result = dispatchResult; + if (!turnResult.dispatched) { + return; + } + const result = turnResult.dispatchResult; queuedFinal = result.queuedFinal; counts = result.counts; } catch (err) { diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index c6e0c10d65d..24be0903df5 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -19,7 +19,7 @@ import type { import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { hasFinalInboundReplyDispatch, - runPreparedInboundReplyTurn, + runInboundReplyTurn, } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { createOutboundPayloadPlan, @@ -844,313 +844,337 @@ export const dispatchTelegramMessage = async ({ }); try { - const { dispatchResult } = await runPreparedInboundReplyTurn({ + const turnResult = await runInboundReplyTurn({ channel: "telegram", accountId: route.accountId, - routeSessionKey: route.sessionKey, - storePath: context.turn.storePath, - ctxPayload, - recordInboundSession: context.turn.recordInboundSession, - record: context.turn.record, - runDispatch: () => - telegramDeps.dispatchReplyWithBufferedBlockDispatcher({ - ctx: ctxPayload, - cfg, - dispatcherOptions: { - ...replyPipeline, - beforeDeliver: async (payload) => payload, - deliver: async (payload, info) => { - if (isDispatchSuperseded()) { - return; - } - const clearPendingCompactionReplayBoundaryOnVisibleBoundary = ( - didDeliver: boolean, - ) => { - if (didDeliver && info.kind !== "final") { - pendingCompactionReplayBoundary = false; - } - }; - if (payload.isError === true) { - hadErrorReplyFailureOrSkip = true; - } - if (info.kind === "final") { - await enqueueDraftLaneEvent(async () => {}); - } - if ( - shouldSuppressLocalTelegramExecApprovalPrompt({ - cfg, - accountId: route.accountId, - payload, - }) - ) { - queuedFinal = true; - return; - } - const previewButtons = ( - payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined - )?.buttons; - const split = splitTextIntoLaneSegments(payload.text); - const segments = split.segments; - const reply = resolveSendableOutboundReplyParts(payload); - const _hasMedia = reply.hasMedia; - - const flushBufferedFinalAnswer = async () => { - const buffered = reasoningStepState.takeBufferedFinalAnswer(); - if (!buffered) { - return; - } - const bufferedButtons = ( - buffered.payload.channelData?.telegram as - | { buttons?: TelegramInlineButtons } - | undefined - )?.buttons; - await deliverLaneText({ - laneName: "answer", - text: buffered.text, - payload: buffered.payload, - infoKind: "final", - previewButtons: bufferedButtons, - }); - reasoningStepState.resetForNextStep(); - }; - - for (const segment of segments) { - if ( - segment.lane === "answer" && - info.kind === "final" && - reasoningStepState.shouldBufferFinalAnswer() - ) { - reasoningStepState.bufferFinalAnswer({ - payload, - text: segment.text, - }); - continue; - } - if (segment.lane === "reasoning") { - reasoningStepState.noteReasoningHint(); - } - const result = await deliverLaneText({ - laneName: segment.lane, - text: segment.text, - payload, - infoKind: info.kind, - previewButtons, - allowPreviewUpdateForNonFinal: segment.lane === "reasoning", - }); - if (info.kind === "final") { - emitPreviewFinalizedHook(result); - } - if (segment.lane === "reasoning") { - if (result.kind !== "skipped") { - reasoningStepState.noteReasoningDelivered(); - await flushBufferedFinalAnswer(); - } - continue; - } - if (info.kind === "final") { - if (reasoningLane.hasStreamedMessage) { - activePreviewLifecycleByLane.reasoning = "complete"; - retainPreviewOnCleanupByLane.reasoning = true; - } - reasoningStepState.resetForNextStep(); - } - } - if (segments.length > 0) { - if (info.kind === "final") { - pendingCompactionReplayBoundary = false; - } - return; - } - if (split.suppressedReasoningOnly) { - if (reply.hasMedia) { - const payloadWithoutSuppressedReasoning = - typeof payload.text === "string" ? { ...payload, text: "" } : payload; - clearPendingCompactionReplayBoundaryOnVisibleBoundary( - await sendPayload(payloadWithoutSuppressedReasoning), - ); - } - if (info.kind === "final") { - await flushBufferedFinalAnswer(); - pendingCompactionReplayBoundary = false; - } - return; - } - - if (info.kind === "final") { - await answerLane.stream?.stop(); - await reasoningLane.stream?.stop(); - reasoningStepState.resetForNextStep(); - } - const canSendAsIs = reply.hasMedia || reply.text.length > 0; - if (!canSendAsIs) { - if (info.kind === "final") { - await flushBufferedFinalAnswer(); - pendingCompactionReplayBoundary = false; - } - return; - } - clearPendingCompactionReplayBoundaryOnVisibleBoundary(await sendPayload(payload)); - if (info.kind === "final") { - await flushBufferedFinalAnswer(); - pendingCompactionReplayBoundary = false; - } - }, - onSkip: (payload, info) => { - if (payload.isError === true) { - hadErrorReplyFailureOrSkip = true; - } - if (info.reason !== "silent") { - deliveryState.markNonSilentSkip(); - } - }, - onError: (err, info) => { - const errorPolicy = resolveTelegramErrorPolicy({ - accountConfig: telegramCfg, - groupConfig, - topicConfig, - }); - if (isSilentErrorPolicy(errorPolicy.policy)) { - return; - } - if ( - errorPolicy.policy === "once" && - shouldSuppressTelegramError({ - scopeKey: buildTelegramErrorScopeKey({ - accountId: route.accountId, - chatId, - threadId: threadSpec.id, - }), - cooldownMs: errorPolicy.cooldownMs, - errorMessage: String(err), - }) - ) { - return; - } - deliveryState.markNonSilentFailure(); - runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); - }, - }, - replyOptions: { - skillFilter, - disableBlockStreaming, - onPartialReply: - answerLane.stream || reasoningLane.stream - ? (payload) => - enqueueDraftLaneEvent(async () => { - await ingestDraftLaneSegments(payload.text); - }) - : undefined, - onReasoningStream: reasoningLane.stream - ? (payload) => - enqueueDraftLaneEvent(async () => { - if (splitReasoningOnNextStream) { - reasoningLane.stream?.forceNewMessage(); - resetDraftLaneState(reasoningLane); - splitReasoningOnNextStream = false; - } - await ingestDraftLaneSegments(payload.text); - }) - : undefined, - onAssistantMessageStart: answerLane.stream - ? () => - enqueueDraftLaneEvent(async () => { - reasoningStepState.resetForNextStep(); - previewToolProgressSuppressed = false; - previewToolProgressLines = []; - if (skipNextAnswerMessageStartRotation) { - skipNextAnswerMessageStartRotation = false; - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; - return; - } - if (pendingCompactionReplayBoundary) { - pendingCompactionReplayBoundary = false; - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; - return; - } - await rotateAnswerLaneForNewAssistantMessage(); - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; - }) - : undefined, - onReasoningEnd: reasoningLane.stream - ? () => - enqueueDraftLaneEvent(async () => { - splitReasoningOnNextStream = reasoningLane.hasStreamedMessage; - previewToolProgressSuppressed = false; - previewToolProgressLines = []; - }) - : undefined, - suppressDefaultToolProgressMessages: - !previewStreamingEnabled || Boolean(answerLane.stream), - onToolStart: async (payload) => { - const toolName = payload.name?.trim(); - if (statusReactionController && toolName) { - await statusReactionController.setTool(toolName); - } - pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running"); - }, - onItemEvent: async (payload) => { - pushPreviewToolProgress( - payload.progressText ?? payload.summary ?? payload.title ?? payload.name, - ); - }, - onPlanUpdate: async (payload) => { - if (payload.phase !== "update") { - return; - } - pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning"); - }, - onApprovalEvent: async (payload) => { - if (payload.phase !== "requested") { - return; - } - pushPreviewToolProgress( - payload.command ? `approval: ${payload.command}` : "approval requested", - ); - }, - onCommandOutput: async (payload) => { - if (payload.phase !== "end") { - return; - } - pushPreviewToolProgress( - payload.name - ? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}` - : payload.title, - ); - }, - onPatchSummary: async (payload) => { - if (payload.phase !== "end") { - return; - } - pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied"); - }, - onCompactionStart: - statusReactionController || answerLane.stream - ? async () => { - if ( - answerLane.hasStreamedMessage && - activePreviewLifecycleByLane.answer === "transient" - ) { - pendingCompactionReplayBoundary = true; - } - if (statusReactionController) { - await statusReactionController.setCompacting(); - } - } - : undefined, - onCompactionEnd: statusReactionController - ? async () => { - statusReactionController.cancelPending(); - await statusReactionController.setThinking(); - } - : undefined, - onModelSelected, - }, + raw: context, + adapter: { + ingest: () => ({ + id: ctxPayload.MessageSid ?? `${chatId}:${Date.now()}`, + timestamp: typeof ctxPayload.Timestamp === "number" ? ctxPayload.Timestamp : undefined, + rawText: ctxPayload.RawBody ?? "", + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: context, }), + resolveTurn: () => ({ + channel: "telegram", + accountId: route.accountId, + routeSessionKey: route.sessionKey, + storePath: context.turn.storePath, + ctxPayload, + recordInboundSession: context.turn.recordInboundSession, + record: context.turn.record, + runDispatch: () => + telegramDeps.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg, + dispatcherOptions: { + ...replyPipeline, + beforeDeliver: async (payload) => payload, + deliver: async (payload, info) => { + if (isDispatchSuperseded()) { + return; + } + const clearPendingCompactionReplayBoundaryOnVisibleBoundary = ( + didDeliver: boolean, + ) => { + if (didDeliver && info.kind !== "final") { + pendingCompactionReplayBoundary = false; + } + }; + if (payload.isError === true) { + hadErrorReplyFailureOrSkip = true; + } + if (info.kind === "final") { + await enqueueDraftLaneEvent(async () => {}); + } + if ( + shouldSuppressLocalTelegramExecApprovalPrompt({ + cfg, + accountId: route.accountId, + payload, + }) + ) { + queuedFinal = true; + return; + } + const previewButtons = ( + payload.channelData?.telegram as + | { buttons?: TelegramInlineButtons } + | undefined + )?.buttons; + const split = splitTextIntoLaneSegments(payload.text); + const segments = split.segments; + const reply = resolveSendableOutboundReplyParts(payload); + const _hasMedia = reply.hasMedia; + + const flushBufferedFinalAnswer = async () => { + const buffered = reasoningStepState.takeBufferedFinalAnswer(); + if (!buffered) { + return; + } + const bufferedButtons = ( + buffered.payload.channelData?.telegram as + | { buttons?: TelegramInlineButtons } + | undefined + )?.buttons; + await deliverLaneText({ + laneName: "answer", + text: buffered.text, + payload: buffered.payload, + infoKind: "final", + previewButtons: bufferedButtons, + }); + reasoningStepState.resetForNextStep(); + }; + + for (const segment of segments) { + if ( + segment.lane === "answer" && + info.kind === "final" && + reasoningStepState.shouldBufferFinalAnswer() + ) { + reasoningStepState.bufferFinalAnswer({ + payload, + text: segment.text, + }); + continue; + } + if (segment.lane === "reasoning") { + reasoningStepState.noteReasoningHint(); + } + const result = await deliverLaneText({ + laneName: segment.lane, + text: segment.text, + payload, + infoKind: info.kind, + previewButtons, + allowPreviewUpdateForNonFinal: segment.lane === "reasoning", + }); + if (info.kind === "final") { + emitPreviewFinalizedHook(result); + } + if (segment.lane === "reasoning") { + if (result.kind !== "skipped") { + reasoningStepState.noteReasoningDelivered(); + await flushBufferedFinalAnswer(); + } + continue; + } + if (info.kind === "final") { + if (reasoningLane.hasStreamedMessage) { + activePreviewLifecycleByLane.reasoning = "complete"; + retainPreviewOnCleanupByLane.reasoning = true; + } + reasoningStepState.resetForNextStep(); + } + } + if (segments.length > 0) { + if (info.kind === "final") { + pendingCompactionReplayBoundary = false; + } + return; + } + if (split.suppressedReasoningOnly) { + if (reply.hasMedia) { + const payloadWithoutSuppressedReasoning = + typeof payload.text === "string" ? { ...payload, text: "" } : payload; + clearPendingCompactionReplayBoundaryOnVisibleBoundary( + await sendPayload(payloadWithoutSuppressedReasoning), + ); + } + if (info.kind === "final") { + await flushBufferedFinalAnswer(); + pendingCompactionReplayBoundary = false; + } + return; + } + + if (info.kind === "final") { + await answerLane.stream?.stop(); + await reasoningLane.stream?.stop(); + reasoningStepState.resetForNextStep(); + } + const canSendAsIs = reply.hasMedia || reply.text.length > 0; + if (!canSendAsIs) { + if (info.kind === "final") { + await flushBufferedFinalAnswer(); + pendingCompactionReplayBoundary = false; + } + return; + } + clearPendingCompactionReplayBoundaryOnVisibleBoundary( + await sendPayload(payload), + ); + if (info.kind === "final") { + await flushBufferedFinalAnswer(); + pendingCompactionReplayBoundary = false; + } + }, + onSkip: (payload, info) => { + if (payload.isError === true) { + hadErrorReplyFailureOrSkip = true; + } + if (info.reason !== "silent") { + deliveryState.markNonSilentSkip(); + } + }, + onError: (err, info) => { + const errorPolicy = resolveTelegramErrorPolicy({ + accountConfig: telegramCfg, + groupConfig, + topicConfig, + }); + if (isSilentErrorPolicy(errorPolicy.policy)) { + return; + } + if ( + errorPolicy.policy === "once" && + shouldSuppressTelegramError({ + scopeKey: buildTelegramErrorScopeKey({ + accountId: route.accountId, + chatId, + threadId: threadSpec.id, + }), + cooldownMs: errorPolicy.cooldownMs, + errorMessage: String(err), + }) + ) { + return; + } + deliveryState.markNonSilentFailure(); + runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); + }, + }, + replyOptions: { + skillFilter, + disableBlockStreaming, + onPartialReply: + answerLane.stream || reasoningLane.stream + ? (payload) => + enqueueDraftLaneEvent(async () => { + await ingestDraftLaneSegments(payload.text); + }) + : undefined, + onReasoningStream: reasoningLane.stream + ? (payload) => + enqueueDraftLaneEvent(async () => { + if (splitReasoningOnNextStream) { + reasoningLane.stream?.forceNewMessage(); + resetDraftLaneState(reasoningLane); + splitReasoningOnNextStream = false; + } + await ingestDraftLaneSegments(payload.text); + }) + : undefined, + onAssistantMessageStart: answerLane.stream + ? () => + enqueueDraftLaneEvent(async () => { + reasoningStepState.resetForNextStep(); + previewToolProgressSuppressed = false; + previewToolProgressLines = []; + if (skipNextAnswerMessageStartRotation) { + skipNextAnswerMessageStartRotation = false; + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; + return; + } + if (pendingCompactionReplayBoundary) { + pendingCompactionReplayBoundary = false; + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; + return; + } + await rotateAnswerLaneForNewAssistantMessage(); + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; + }) + : undefined, + onReasoningEnd: reasoningLane.stream + ? () => + enqueueDraftLaneEvent(async () => { + splitReasoningOnNextStream = reasoningLane.hasStreamedMessage; + previewToolProgressSuppressed = false; + previewToolProgressLines = []; + }) + : undefined, + suppressDefaultToolProgressMessages: + !previewStreamingEnabled || Boolean(answerLane.stream), + onToolStart: async (payload) => { + const toolName = payload.name?.trim(); + if (statusReactionController && toolName) { + await statusReactionController.setTool(toolName); + } + pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running"); + }, + onItemEvent: async (payload) => { + pushPreviewToolProgress( + payload.progressText ?? payload.summary ?? payload.title ?? payload.name, + ); + }, + onPlanUpdate: async (payload) => { + if (payload.phase !== "update") { + return; + } + pushPreviewToolProgress( + payload.explanation ?? payload.steps?.[0] ?? "planning", + ); + }, + onApprovalEvent: async (payload) => { + if (payload.phase !== "requested") { + return; + } + pushPreviewToolProgress( + payload.command ? `approval: ${payload.command}` : "approval requested", + ); + }, + onCommandOutput: async (payload) => { + if (payload.phase !== "end") { + return; + } + pushPreviewToolProgress( + payload.name + ? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}` + : payload.title, + ); + }, + onPatchSummary: async (payload) => { + if (payload.phase !== "end") { + return; + } + pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied"); + }, + onCompactionStart: + statusReactionController || answerLane.stream + ? async () => { + if ( + answerLane.hasStreamedMessage && + activePreviewLifecycleByLane.answer === "transient" + ) { + pendingCompactionReplayBoundary = true; + } + if (statusReactionController) { + await statusReactionController.setCompacting(); + } + } + : undefined, + onCompactionEnd: statusReactionController + ? async () => { + statusReactionController.cancelPending(); + await statusReactionController.setThinking(); + } + : undefined, + onModelSelected, + }, + }), + }), + }, }); - ({ queuedFinal } = dispatchResult); + if (!turnResult.dispatched) { + return; + } + ({ queuedFinal } = turnResult.dispatchResult); } catch (err) { dispatchError = err; runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`)); diff --git a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts index 2bcbaaba203..79caeb846c9 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/process-message.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/process-message.ts @@ -13,7 +13,7 @@ import { toPluginMessageReceivedEvent, triggerInternalHook, } from "openclaw/plugin-sdk/hook-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { runInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { getGlobalHookRunner } from "openclaw/plugin-sdk/plugin-runtime"; import { resolveBatchedReplyThreadingPolicy } from "openclaw/plugin-sdk/reply-reference"; import { getPrimaryIdentityId, getSelfIdentity, getSenderIdentity } from "../../identity.js"; @@ -454,52 +454,68 @@ export async function processMessage(params: { warn: params.replyLogger.warn.bind(params.replyLogger), }); - const { dispatchResult: didSendReply } = await runPreparedInboundReplyTurn({ + const turnResult = await runInboundReplyTurn({ channel: "whatsapp", accountId: params.route.accountId, - routeSessionKey: params.route.sessionKey, - storePath, - ctxPayload, - recordInboundSession, - record: { - onRecordError: (err) => { - params.replyLogger.warn( - { - error: formatError(err), - storePath, - sessionKey: params.route.sessionKey, - }, - "failed updating session meta", - ); - }, - trackSessionMetaTask: (task) => { - trackBackgroundTask(params.backgroundTasks, task); - }, - }, - runDispatch: () => - dispatchWhatsAppBufferedReply({ - cfg: params.cfg, - connectionId: params.connectionId, - context: ctxPayload, - conversationId, - deliverReply: deliverWebReply, - groupHistories: params.groupHistories, - groupHistoryKey: params.groupHistoryKey, - maxMediaBytes: params.maxMediaBytes, - maxMediaTextChunkLimit: params.maxMediaTextChunkLimit, - msg: params.msg, - onModelSelected, - rememberSentText: params.rememberSentText, - replyLogger: params.replyLogger, - replyPipeline: { - ...replyPipeline, - responsePrefix, - }, - replyResolver: params.replyResolver, - route: params.route, - shouldClearGroupHistory, + raw: params.msg, + adapter: { + ingest: () => ({ + id: params.msg.id ?? `${conversationId}:${Date.now()}`, + timestamp: params.msg.timestamp, + rawText: ctxPayload.RawBody ?? "", + textForAgent: ctxPayload.BodyForAgent, + textForCommands: ctxPayload.CommandBody, + raw: params.msg, }), + resolveTurn: () => ({ + channel: "whatsapp", + accountId: params.route.accountId, + routeSessionKey: params.route.sessionKey, + storePath, + ctxPayload, + recordInboundSession, + record: { + onRecordError: (err) => { + params.replyLogger.warn( + { + error: formatError(err), + storePath, + sessionKey: params.route.sessionKey, + }, + "failed updating session meta", + ); + }, + trackSessionMetaTask: (task) => { + trackBackgroundTask(params.backgroundTasks, task); + }, + }, + runDispatch: () => + dispatchWhatsAppBufferedReply({ + cfg: params.cfg, + connectionId: params.connectionId, + context: ctxPayload, + conversationId, + deliverReply: deliverWebReply, + groupHistories: params.groupHistories, + groupHistoryKey: params.groupHistoryKey, + maxMediaBytes: params.maxMediaBytes, + maxMediaTextChunkLimit: params.maxMediaTextChunkLimit, + msg: params.msg, + onModelSelected, + rememberSentText: params.rememberSentText, + replyLogger: params.replyLogger, + replyPipeline: { + ...replyPipeline, + responsePrefix, + }, + replyResolver: params.replyResolver, + route: params.route, + shouldClearGroupHistory, + }), + }), + }, }); + const didSendReply = turnResult.dispatched ? turnResult.dispatchResult : false; removeAckReactionHandleAfterReply({ removeAfterReply: Boolean(params.cfg.messages?.removeAckAfterReply && didSendReply), ackReaction, diff --git a/extensions/zalo/src/test-support/lifecycle-test-support.ts b/extensions/zalo/src/test-support/lifecycle-test-support.ts index 1b4d346976b..c5ae886a036 100644 --- a/extensions/zalo/src/test-support/lifecycle-test-support.ts +++ b/extensions/zalo/src/test-support/lifecycle-test-support.ts @@ -257,13 +257,23 @@ export function createImageLifecycleCore() { updateLastRoute: resolved.record?.updateLastRoute, onRecordError: resolved.record?.onRecordError ?? (() => undefined), }); + if ("runDispatch" in resolved) { + const dispatchResult = await resolved.runDispatch(); + return { + admission: { kind: "dispatch" as const }, + dispatched: true, + ctxPayload: resolved.ctxPayload, + routeSessionKey: resolved.routeSessionKey, + dispatchResult, + }; + } const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({ ctx: resolved.ctxPayload, cfg: resolved.cfg, dispatcherOptions: { ...resolved.dispatcherOptions, - deliver: async (payload, info) => { - await resolved.delivery.deliver(payload, info); + deliver: async (...args: Parameters) => { + await resolved.delivery.deliver(...args); }, onError: resolved.delivery.onError, }, diff --git a/extensions/zalouser/src/monitor.group-gating.test.ts b/extensions/zalouser/src/monitor.group-gating.test.ts index f4252a52d2f..4be8361ffb5 100644 --- a/extensions/zalouser/src/monitor.group-gating.test.ts +++ b/extensions/zalouser/src/monitor.group-gating.test.ts @@ -102,13 +102,23 @@ function installRuntime(params: { updateLastRoute: turn.record?.updateLastRoute, onRecordError: turn.record?.onRecordError ?? (() => undefined), }); + if ("runDispatch" in turn) { + const dispatchResult = await turn.runDispatch(); + return { + admission: { kind: "dispatch" as const }, + dispatched: true, + ctxPayload: turn.ctxPayload, + routeSessionKey: turn.routeSessionKey, + dispatchResult, + }; + } const dispatchResult = await turn.dispatchReplyWithBufferedBlockDispatcher({ ctx: turn.ctxPayload, cfg: turn.cfg, dispatcherOptions: { ...turn.dispatcherOptions, - deliver: async (payload, info) => { - await turn.delivery.deliver(payload, info); + deliver: async (...args: Parameters) => { + await turn.delivery.deliver(...args); }, onError: turn.delivery.onError, }, diff --git a/src/channels/turn/kernel.test.ts b/src/channels/turn/kernel.test.ts index 58e10ceb9ec..5ffe58c7598 100644 --- a/src/channels/turn/kernel.test.ts +++ b/src/channels/turn/kernel.test.ts @@ -310,6 +310,38 @@ describe("channel turn kernel", () => { ); }); + it("runs custom prepared dispatch from a full turn adapter", async () => { + const events: string[] = []; + const result = await runChannelTurn({ + channel: "test", + raw: { id: "msg-1", text: "hello" }, + adapter: { + ingest: () => ({ id: "msg-1", rawText: "hello" }), + resolveTurn: () => ({ + channel: "test", + routeSessionKey: "agent:main:test:peer", + storePath: "/tmp/sessions.json", + ctxPayload: createCtx(), + recordInboundSession: createRecordInboundSession(events), + runDispatch: async () => { + events.push("custom-dispatch"); + return { + queuedFinal: true, + counts: { tool: 0, block: 0, final: 1 }, + }; + }, + }), + }, + }); + + expect(events).toEqual(["record", "custom-dispatch"]); + expect(result.dispatched).toBe(true); + if (!result.dispatched) { + throw new Error("expected dispatch"); + } + expect(result.dispatchResult.queuedFinal).toBe(true); + }); + it("finalizes failed dispatches before rethrowing", async () => { const onFinalize = vi.fn(); const dispatchError = new Error("dispatch failed"); diff --git a/src/channels/turn/kernel.ts b/src/channels/turn/kernel.ts index 2e5afe56fd7..b9da060445c 100644 --- a/src/channels/turn/kernel.ts +++ b/src/channels/turn/kernel.ts @@ -9,12 +9,12 @@ import type { ChannelTurnDeliveryAdapter, ChannelTurnHistoryFinalizeOptions, ChannelTurnLogEvent, + ChannelTurnResolved, ChannelTurnResult, DispatchedChannelTurnResult, PreparedChannelTurn, PreflightFacts, RunChannelTurnParams, - RunResolvedChannelTurnParams, } from "./types.js"; export { EMPTY_CHANNEL_TURN_DISPATCH_COUNTS, @@ -49,7 +49,6 @@ export type { ReplyPlanFacts, RouteFacts, RunChannelTurnParams, - RunResolvedChannelTurnParams, SenderFacts, SupplementalContextFacts, } from "./types.js"; @@ -143,6 +142,29 @@ export async function dispatchAssembledChannelTurn( }); } +function isPreparedChannelTurn( + value: ChannelTurnResolved, +): value is PreparedChannelTurn & { + admission?: Extract; +} { + return "runDispatch" in value; +} + +async function dispatchResolvedChannelTurn( + params: ChannelTurnResolved & { + admission: Extract; + log?: (event: ChannelTurnLogEvent) => void; + messageId?: string; + }, +): Promise> { + if (isPreparedChannelTurn(params)) { + return await runPreparedChannelTurn(params); + } + return (await dispatchAssembledChannelTurn( + params, + )) as DispatchedChannelTurnResult; +} + export async function runPreparedChannelTurn< TDispatchResult = DispatchedChannelTurnResult["dispatchResult"], >( @@ -248,9 +270,12 @@ export async function runPreparedChannelTurn< }; } -export async function runChannelTurn( - params: RunChannelTurnParams, -): Promise { +export async function runChannelTurn< + TRaw, + TDispatchResult = DispatchedChannelTurnResult["dispatchResult"], +>( + params: RunChannelTurnParams, +): Promise> { emit({ ...params, event: { stage: "ingest", event: "start" }, @@ -327,9 +352,9 @@ export async function runChannelTurn( }); const admission = resolved.admission ?? preflightAdmission ?? ({ kind: "dispatch" } as const); - let result: ChannelTurnResult; + let result: ChannelTurnResult; try { - const dispatchResult = await dispatchAssembledChannelTurn( + const dispatchResult = await dispatchResolvedChannelTurn( admission.kind === "observeOnly" ? { ...resolved, @@ -350,7 +375,7 @@ export async function runChannelTurn( admission, }; } catch (err) { - const failedResult: ChannelTurnResult = { + const failedResult: ChannelTurnResult = { admission, dispatched: false, ctxPayload: resolved.ctxPayload, @@ -406,18 +431,3 @@ export async function runChannelTurn( return result; } - -export async function runResolvedChannelTurn( - params: RunResolvedChannelTurnParams, -): Promise { - return await runChannelTurn({ - channel: params.channel, - accountId: params.accountId, - raw: params.raw, - log: params.log, - adapter: { - ingest: (raw) => (typeof params.input === "function" ? params.input(raw) : params.input), - resolveTurn: params.resolveTurn, - }, - }); -} diff --git a/src/channels/turn/types.ts b/src/channels/turn/types.ts index b6e60b15b0d..af302f9a2ca 100644 --- a/src/channels/turn/types.ts +++ b/src/channels/turn/types.ts @@ -240,9 +240,13 @@ export type PreparedChannelTurn = { messageId?: string; }; -export type ChannelTurnResolved = AssembledChannelTurn & { - admission?: Extract; -}; +export type ChannelTurnResolved = + | (AssembledChannelTurn & { + admission?: Extract; + }) + | (PreparedChannelTurn & { + admission?: Extract; + }); export type ChannelTurnStage = | "ingest" @@ -267,13 +271,14 @@ export type ChannelTurnLogEvent = { error?: unknown; }; -export type ChannelTurnResult = { - admission: ChannelTurnAdmission; - dispatched: boolean; - ctxPayload?: MsgContext; - routeSessionKey?: string; - dispatchResult?: DispatchFromConfigResult; -}; +export type ChannelTurnResult = + | DispatchedChannelTurnResult + | { + admission: ChannelTurnAdmission; + dispatched: false; + ctxPayload?: MsgContext; + routeSessionKey?: string; + }; export type DispatchedChannelTurnResult = { admission: Extract; @@ -283,7 +288,7 @@ export type DispatchedChannelTurnResult = { +export type ChannelTurnAdapter = { ingest: (raw: TRaw) => Promise | NormalizedTurnInput | null; classify?: (input: NormalizedTurnInput) => Promise | ChannelEventClass; preflight?: ( @@ -299,29 +304,14 @@ export type ChannelTurnAdapter = { input: NormalizedTurnInput, eventClass: ChannelEventClass, preflight: PreflightFacts, - ) => Promise | ChannelTurnResolved; - onFinalize?: (result: ChannelTurnResult) => Promise | void; + ) => Promise> | ChannelTurnResolved; + onFinalize?: (result: ChannelTurnResult) => Promise | void; }; -export type RunChannelTurnParams = { +export type RunChannelTurnParams = { channel: string; accountId?: string; raw: TRaw; - adapter: ChannelTurnAdapter; - log?: (event: ChannelTurnLogEvent) => void; -}; - -export type RunResolvedChannelTurnParams = { - channel: string; - accountId?: string; - raw: TRaw; - input: - | NormalizedTurnInput - | ((raw: TRaw) => Promise | NormalizedTurnInput | null); - resolveTurn: ( - input: NormalizedTurnInput, - eventClass: ChannelEventClass, - preflight: PreflightFacts, - ) => Promise | ChannelTurnResolved; + adapter: ChannelTurnAdapter; log?: (event: ChannelTurnLogEvent) => void; }; diff --git a/src/plugin-sdk/inbound-reply-dispatch.ts b/src/plugin-sdk/inbound-reply-dispatch.ts index 50d36606f45..28c523797e3 100644 --- a/src/plugin-sdk/inbound-reply-dispatch.ts +++ b/src/plugin-sdk/inbound-reply-dispatch.ts @@ -11,9 +11,11 @@ import { hasFinalChannelTurnDispatch, hasVisibleChannelTurnDispatch, resolveChannelTurnDispatchCounts, + runChannelTurn, runPreparedChannelTurn, } from "../channels/turn/kernel.js"; -import type { PreparedChannelTurn } from "../channels/turn/types.js"; +import type { PreparedChannelTurn, RunChannelTurnParams } from "../channels/turn/types.js"; +export type { ChannelTurnRecordOptions } from "../channels/turn/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { createChannelReplyPipeline } from "./channel-reply-pipeline.js"; import { createNormalizedOutboundDeliverer, type OutboundReplyPayload } from "./reply-payload.js"; @@ -33,6 +35,13 @@ export async function runPreparedInboundReplyTurn( return await runPreparedChannelTurn(params); } +/** Run a channel turn through shared ingest, record, dispatch, and finalize ordering. */ +export async function runInboundReplyTurn( + params: RunChannelTurnParams, +) { + return await runChannelTurn(params); +} + export { hasFinalChannelTurnDispatch as hasFinalInboundReplyDispatch, hasVisibleChannelTurnDispatch as hasVisibleInboundReplyDispatch, diff --git a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts index 4a3f63ddea8..0920b8e95cf 100644 --- a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts +++ b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts @@ -78,40 +78,64 @@ export function createPluginRuntimeMock(overrides: DeepPartial = createTaskFlowSessionMock, ) as unknown as PluginRuntime["tasks"]["managedFlows"]["fromToolContext"], }; - const dispatchAssembledChannelTurnMock = vi.fn( - async (params: Parameters[0]) => { - await params.recordInboundSession({ - storePath: params.storePath, - sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey, - ctx: params.ctxPayload, - groupResolution: params.record?.groupResolution, - createIfMissing: params.record?.createIfMissing, - updateLastRoute: params.record?.updateLastRoute, - onRecordError: params.record?.onRecordError ?? (() => undefined), - trackSessionMetaTask: params.record?.trackSessionMetaTask, - }); - const dispatchResult = await params.dispatchReplyWithBufferedBlockDispatcher({ - ctx: params.ctxPayload, - cfg: params.cfg, + const dispatchAssembledChannelTurnMock = vi.fn(async (params: Record) => { + const ctxPayload = params.ctxPayload as Record; + const record = params.record as + | Parameters[0]["record"] + | undefined; + const recordInboundSession = params.recordInboundSession as Parameters< + PluginRuntime["channel"]["turn"]["runPrepared"] + >[0]["recordInboundSession"]; + const routeSessionKey = params.routeSessionKey as string; + const storePath = params.storePath as string; + const delivery = params.delivery as { + deliver: (payload: unknown, info: unknown) => Promise; + onError?: (err: unknown, info: { kind: string }) => void; + }; + const ctxSessionKey = ctxPayload.SessionKey; + const sessionKey = typeof ctxSessionKey === "string" ? ctxSessionKey : routeSessionKey; + const dispatchReplyWithBufferedBlockDispatcher = + params.dispatchReplyWithBufferedBlockDispatcher as (params: { + ctx: unknown; + cfg: unknown; dispatcherOptions: { - ...params.dispatcherOptions, - deliver: async (payload, info) => { - await params.delivery.deliver(payload, info); - }, - onError: params.delivery.onError, + deliver: (payload: unknown, info: unknown) => Promise; + onError?: (err: unknown, info: { kind: string }) => void; + }; + replyOptions?: unknown; + replyResolver?: unknown; + }) => Promise; + await recordInboundSession({ + storePath, + sessionKey, + ctx: ctxPayload, + groupResolution: record?.groupResolution, + createIfMissing: record?.createIfMissing, + updateLastRoute: record?.updateLastRoute, + onRecordError: record?.onRecordError ?? (() => undefined), + trackSessionMetaTask: record?.trackSessionMetaTask, + }); + const dispatchResult = await dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: params.cfg, + dispatcherOptions: { + ...(params.dispatcherOptions as Record | undefined), + deliver: async (payload, info) => { + await delivery.deliver(payload, info); }, - replyOptions: params.replyOptions, - replyResolver: params.replyResolver, - }); - return { - admission: params.admission ?? { kind: "dispatch" as const }, - dispatched: true, - ctxPayload: params.ctxPayload, - routeSessionKey: params.routeSessionKey, - dispatchResult, - }; - }, - ) as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"]; + onError: delivery.onError, + }, + replyOptions: params.replyOptions, + replyResolver: params.replyResolver, + }); + return { + admission: params.admission ?? { kind: "dispatch" }, + dispatched: true, + ctxPayload, + routeSessionKey, + dispatchResult, + }; + }); const runPreparedChannelTurnMock = vi.fn( async (params: Parameters[0]) => { try { @@ -180,18 +204,24 @@ export function createPluginRuntimeMock(overrides: DeepPartial = const resolved = await params.adapter.resolveTurn(input, eventClass, preflight ?? {}); const admission = resolved.admission ?? preflight.admission ?? ({ kind: "dispatch" } as const); - const dispatchResult = await dispatchAssembledChannelTurnMock({ - ...resolved, - admission, - delivery: - admission.kind === "observeOnly" - ? { deliver: async () => ({ visibleReplySent: false }) } - : resolved.delivery, - }); + const dispatchResult = + "runDispatch" in resolved + ? await runPreparedChannelTurnMock({ + ...resolved, + admission, + }) + : await dispatchAssembledChannelTurnMock({ + ...resolved, + admission, + delivery: + admission.kind === "observeOnly" + ? { deliver: async () => ({ visibleReplySent: false }) } + : resolved.delivery, + }); const result = { ...dispatchResult, admission, - }; + } as Parameters>[0]; await params.adapter.onFinalize?.(result); return result; }, @@ -233,28 +263,6 @@ export function createPluginRuntimeMock(overrides: DeepPartial = ...params.extra, }) as ReturnType, ) as unknown as PluginRuntime["channel"]["turn"]["buildContext"]; - const runResolvedChannelTurnMock = vi.fn( - async (params: Parameters[0]) => { - const input = - typeof params.input === "function" ? await params.input(params.raw) : params.input; - if (!input) { - return { - admission: { kind: "drop" as const, reason: "ingest-null" }, - dispatched: false, - }; - } - return await runChannelTurnMock({ - channel: params.channel, - accountId: params.accountId, - raw: params.raw, - log: params.log, - adapter: { - ingest: () => input, - resolveTurn: params.resolveTurn, - }, - }); - }, - ) as unknown as PluginRuntime["channel"]["turn"]["runResolved"]; const base: PluginRuntime = { version: "1.0.0-test", config: { @@ -609,10 +617,8 @@ export function createPluginRuntimeMock(overrides: DeepPartial = }, turn: { run: runChannelTurnMock, - runResolved: runResolvedChannelTurnMock, buildContext: buildChannelTurnContextMock, runPrepared: runPreparedChannelTurnMock, - dispatchAssembled: dispatchAssembledChannelTurnMock, }, threadBindings: { setIdleTimeoutBySessionKey: diff --git a/src/plugins/runtime/runtime-channel.ts b/src/plugins/runtime/runtime-channel.ts index 8e25706dae6..eab284e445b 100644 --- a/src/plugins/runtime/runtime-channel.ts +++ b/src/plugins/runtime/runtime-channel.ts @@ -52,10 +52,8 @@ import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load import { recordInboundSession } from "../../channels/session.js"; import { buildChannelTurnContext, - dispatchAssembledChannelTurn, runChannelTurn, runPreparedChannelTurn, - runResolvedChannelTurn, } from "../../channels/turn/kernel.js"; import { resolveChannelGroupPolicy, @@ -174,10 +172,8 @@ export function createRuntimeChannel(): PluginRuntime["channel"] { }, turn: { run: runChannelTurn, - runResolved: runResolvedChannelTurn, buildContext: buildChannelTurnContext, runPrepared: runPreparedChannelTurn, - dispatchAssembled: dispatchAssembledChannelTurn, }, threadBindings: { setIdleTimeoutBySessionKey: ({ channelId, targetSessionKey, accountId, idleTimeoutMs }) => diff --git a/src/plugins/runtime/types-channel.ts b/src/plugins/runtime/types-channel.ts index dd1a9b4f439..a55ab37f968 100644 --- a/src/plugins/runtime/types-channel.ts +++ b/src/plugins/runtime/types-channel.ts @@ -153,12 +153,8 @@ export type PluginRuntimeChannel = { }; turn: { run: typeof import("../../channels/turn/kernel.js").runChannelTurn; - /** @deprecated Prefer `run(...)`. */ - runResolved: typeof import("../../channels/turn/kernel.js").runResolvedChannelTurn; buildContext: typeof import("../../channels/turn/kernel.js").buildChannelTurnContext; runPrepared: typeof import("../../channels/turn/kernel.js").runPreparedChannelTurn; - /** @deprecated Prefer `run(...)` or `runPrepared(...)`. */ - dispatchAssembled: typeof import("../../channels/turn/kernel.js").dispatchAssembledChannelTurn; }; threadBindings: { setIdleTimeoutBySessionKey: (params: {