diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 382c4046c93..0312f7a979a 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -81de442c9e0c902621f316297af3ad6c48a07c0c6bbfa5ca984419cfdb7f4292 plugin-sdk-api-baseline.json -136836d047bd0fb0723047945fdbd931a9128552ff49e8a27937d54dba0e9709 plugin-sdk-api-baseline.jsonl +ae28566c922ce79527943b069abc199de28e3898ec08eea12c4ff6050795f276 plugin-sdk-api-baseline.json +79446b23832949553b23e7cf92be37b81c69d123fc09bed6f8fc04bd98e9257d plugin-sdk-api-baseline.jsonl diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index 217a4b78d8c..311ec7b15f8 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -13,7 +13,10 @@ import { } from "openclaw/plugin-sdk/channel-reply-pipeline"; import { resolveChannelStreamingBlockEnabled } from "openclaw/plugin-sdk/channel-streaming"; import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { + hasFinalInboundReplyDispatch, + runPreparedInboundReplyTurn, +} from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime"; import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime"; import { resolveChunkMode } from "openclaw/plugin-sdk/reply-chunking"; @@ -643,7 +646,7 @@ export async function processDiscordMessage( return; } - if (!dispatchResult?.queuedFinal) { + if (!hasFinalInboundReplyDispatch(dispatchResult)) { if (isGuildMessage) { clearHistoryEntriesIfEnabled({ historyMap: guildHistories, diff --git a/extensions/imessage/src/monitor/monitor-provider.ts b/extensions/imessage/src/monitor/monitor-provider.ts index d5505434175..b7e0cebd522 100644 --- a/extensions/imessage/src/monitor/monitor-provider.ts +++ b/extensions/imessage/src/monitor/monitor-provider.ts @@ -12,7 +12,10 @@ import { } from "openclaw/plugin-sdk/conversation-runtime"; import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime"; import { normalizeScpRemoteHost } from "openclaw/plugin-sdk/host-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { + hasFinalInboundReplyDispatch, + runPreparedInboundReplyTurn, +} from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { isInboundPathAllowed, kindFromMime } from "openclaw/plugin-sdk/media-runtime"; import { clearHistoryEntriesIfEnabled, @@ -487,9 +490,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }, }), }); - const queuedFinal = dispatchResult.queuedFinal; - - if (!queuedFinal) { + if (!hasFinalInboundReplyDispatch(dispatchResult)) { if (decision.isGroup && decision.historyKey) { clearHistoryEntriesIfEnabled({ historyMap: groupHistories, diff --git a/extensions/line/src/monitor.ts b/extensions/line/src/monitor.ts index da9fb60d97b..00380f12f5d 100644 --- a/extensions/line/src/monitor.ts +++ b/extensions/line/src/monitor.ts @@ -2,7 +2,10 @@ import type { webhook } from "@line/bot-sdk"; import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { + hasFinalInboundReplyDispatch, + runPreparedInboundReplyTurn, +} from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { dispatchReplyWithBufferedBlockDispatcher, chunkMarkdownText, @@ -306,9 +309,7 @@ export async function monitorLineProvider( }, }), }); - const queuedFinal = dispatchResult.queuedFinal; - - if (!queuedFinal) { + if (!hasFinalInboundReplyDispatch(dispatchResult)) { logVerbose(`line: no response generated for message from ${ctxPayload.From}`); } } catch (err) { diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index 39c388cde29..87c2ee6d9eb 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -3,6 +3,7 @@ import { evaluateSupplementalContextVisibility, resolveChannelContextVisibilityMode, } from "openclaw/plugin-sdk/context-visibility-runtime"; +import { hasFinalInboundReplyDispatch } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import type { GetReplyOptions } from "openclaw/plugin-sdk/reply-runtime"; import { loadSessionStore, @@ -1963,7 +1964,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam if (isRoom && triggerSnapshot) { roomHistoryTracker.consumeHistory(_route.agentId, roomId, triggerSnapshot, _messageId); } - if (!queuedFinal) { + if (!hasFinalInboundReplyDispatch({ queuedFinal, counts })) { await commitInboundEventIfClaimed(); return; } diff --git a/extensions/msteams/src/monitor-handler/message-handler.ts b/extensions/msteams/src/monitor-handler/message-handler.ts index e0756803f9b..cc7f276ee82 100644 --- a/extensions/msteams/src/monitor-handler/message-handler.ts +++ b/extensions/msteams/src/monitor-handler/message-handler.ts @@ -11,7 +11,11 @@ import { shouldIncludeSupplementalContext, } from "openclaw/plugin-sdk/context-visibility-runtime"; import { evaluateSenderGroupAccessForPolicy } from "openclaw/plugin-sdk/group-access"; -import { dispatchReplyFromConfigWithSettledDispatcher } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { + dispatchReplyFromConfigWithSettledDispatcher, + hasFinalInboundReplyDispatch, + resolveInboundReplyDispatchCounts, +} from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { buildPendingHistoryContextFromMap, clearHistoryEntriesIfEnabled, @@ -864,11 +868,12 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { }), }); const queuedFinal = dispatchResult?.queuedFinal ?? false; - const counts = dispatchResult?.counts ?? { tool: 0, block: 0, final: 0 }; + const counts = resolveInboundReplyDispatchCounts(dispatchResult); + const hasFinalResponse = hasFinalInboundReplyDispatch(dispatchResult); log.info("dispatch complete", { queuedFinal, counts }); - if (!queuedFinal) { + if (!hasFinalResponse) { if (isRoomish && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: conversationHistories, diff --git a/extensions/signal/src/monitor/event-handler.ts b/extensions/signal/src/monitor/event-handler.ts index 43cbf9dd7e6..e05da655040 100644 --- a/extensions/signal/src/monitor/event-handler.ts +++ b/extensions/signal/src/monitor/event-handler.ts @@ -25,7 +25,10 @@ import { toInternalMessageReceivedContext, triggerInternalHook, } from "openclaw/plugin-sdk/hook-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { + hasFinalInboundReplyDispatch, + runPreparedInboundReplyTurn, +} from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { kindFromMime } from "openclaw/plugin-sdk/media-runtime"; import { buildPendingHistoryContextFromMap, @@ -351,8 +354,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { } }, }); - const queuedFinal = dispatchResult?.queuedFinal ?? false; - if (!queuedFinal) { + if (!hasFinalInboundReplyDispatch(dispatchResult)) { if (entry.isGroup && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: deps.groupHistories, diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 74ada3fb910..c70624a6c4a 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -18,7 +18,10 @@ import { resolveChannelStreamingPreviewToolProgress, } from "openclaw/plugin-sdk/channel-streaming"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { + hasVisibleInboundReplyDispatch, + runPreparedInboundReplyTurn, +} from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime"; import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; @@ -1099,12 +1102,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag } } - const anyReplyDelivered = - observedReplyDelivery || - queuedFinal || - streamFallbackDelivered || - (counts.block ?? 0) > 0 || - (counts.final ?? 0) > 0; + const anyReplyDelivered = hasVisibleInboundReplyDispatch( + { queuedFinal, counts }, + { + observedReplyDelivery, + fallbackDelivered: streamFallbackDelivered, + }, + ); if (statusReactionsEnabled) { if (dispatchError) { diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index d72a0e8461a..c6e0c10d65d 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -17,7 +17,10 @@ import type { TelegramAccountConfig, } from "openclaw/plugin-sdk/config-types"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; -import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch"; +import { + hasFinalInboundReplyDispatch, + runPreparedInboundReplyTurn, +} from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { createOutboundPayloadPlan, projectOutboundPayloadPlanForDelivery, @@ -1306,7 +1309,13 @@ export const dispatchTelegramMessage = async ({ }); } - const hasFinalResponse = queuedFinal || sentFallback || deliverySummary.delivered; + const hasFinalResponse = hasFinalInboundReplyDispatch( + { queuedFinal }, + { + fallbackDelivered: sentFallback, + deliverySummaryDelivered: deliverySummary.delivered, + }, + ); if (statusReactionController && !hasFinalResponse) { void finalizeTelegramStatusReaction({ outcome: "error", hasFinalResponse: false }).catch( diff --git a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts index 6a01d0f7cd7..f1e2cdc6a13 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.ts @@ -1,3 +1,4 @@ +import { hasVisibleInboundReplyDispatch } from "openclaw/plugin-sdk/inbound-reply-dispatch"; import { type DeliverableWhatsAppOutboundPayload, normalizeWhatsAppOutboundPayload, @@ -388,8 +389,7 @@ export async function dispatchWhatsAppBufferedReply(params: { }, }); - const didQueueVisibleReply = - queuedFinal || counts.tool > 0 || counts.block > 0 || counts.final > 0; + const didQueueVisibleReply = hasVisibleInboundReplyDispatch({ queuedFinal, counts }); if (!didQueueVisibleReply) { if (params.shouldClearGroupHistory) { params.groupHistories.set(params.groupHistoryKey, []); diff --git a/src/channels/plugins/contracts/test-helpers.ts b/src/channels/plugins/contracts/test-helpers.ts index 742c515e08b..d940f043681 100644 --- a/src/channels/plugins/contracts/test-helpers.ts +++ b/src/channels/plugins/contracts/test-helpers.ts @@ -1,8 +1,15 @@ import { expect, type Mock } from "vitest"; +import type { DispatchFromConfigResult } from "../../../auto-reply/reply/dispatch-from-config.types.js"; import type { MsgContext } from "../../../auto-reply/templating.js"; import { normalizeChatType } from "../../chat-type.js"; import { resolveConversationLabel } from "../../conversation-label.js"; import { validateSenderIdentity } from "../../sender-identity.js"; +import { + hasFinalChannelTurnDispatch, + hasVisibleChannelTurnDispatch, + resolveChannelTurnDispatchCounts, + type ChannelTurnDispatchResultLike, +} from "../../turn/dispatch-result.js"; // oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Test helper preserves channel send mock arg types. export function primeChannelOutboundSendMock( @@ -33,3 +40,20 @@ export function expectChannelInboundContextContract(ctx: MsgContext) { expect(label).toBeTruthy(); } } + +export function expectChannelTurnDispatchResultContract( + result: ChannelTurnDispatchResultLike, + expected: { + visible: boolean; + final?: boolean; + counts?: Partial; + }, +) { + expect(hasVisibleChannelTurnDispatch(result)).toBe(expected.visible); + if (expected.final !== undefined) { + expect(hasFinalChannelTurnDispatch(result)).toBe(expected.final); + } + if (expected.counts) { + expect(resolveChannelTurnDispatchCounts(result)).toMatchObject(expected.counts); + } +} diff --git a/src/channels/turn/dispatch-result.ts b/src/channels/turn/dispatch-result.ts new file mode 100644 index 00000000000..4be1d37b117 --- /dev/null +++ b/src/channels/turn/dispatch-result.ts @@ -0,0 +1,62 @@ +import type { ReplyDispatchKind } from "../../auto-reply/reply/reply-dispatcher.types.js"; + +export type ChannelTurnDispatchResultLike = + | { + queuedFinal?: boolean; + counts?: Partial>; + } + | null + | undefined; + +export type ChannelTurnVisibleDeliverySignals = { + observedReplyDelivery?: boolean; + fallbackDelivered?: boolean; + deliverySummaryDelivered?: boolean; +}; + +export const EMPTY_CHANNEL_TURN_DISPATCH_COUNTS: Record = { + tool: 0, + block: 0, + final: 0, +}; + +export function resolveChannelTurnDispatchCounts( + result: ChannelTurnDispatchResultLike, +): Record { + return { + ...EMPTY_CHANNEL_TURN_DISPATCH_COUNTS, + ...result?.counts, + }; +} + +export function hasVisibleChannelTurnDispatch( + result: ChannelTurnDispatchResultLike, + signals: ChannelTurnVisibleDeliverySignals = {}, +): boolean { + const counts = resolveChannelTurnDispatchCounts(result); + return ( + signals.observedReplyDelivery === true || + signals.fallbackDelivered === true || + signals.deliverySummaryDelivered === true || + result?.queuedFinal === true || + counts.tool > 0 || + counts.block > 0 || + counts.final > 0 + ); +} + +export function hasFinalChannelTurnDispatch( + result: ChannelTurnDispatchResultLike, + signals: Pick< + ChannelTurnVisibleDeliverySignals, + "fallbackDelivered" | "deliverySummaryDelivered" + > = {}, +): boolean { + const counts = resolveChannelTurnDispatchCounts(result); + return ( + signals.fallbackDelivered === true || + signals.deliverySummaryDelivered === true || + result?.queuedFinal === true || + counts.final > 0 + ); +} diff --git a/src/channels/turn/kernel.test.ts b/src/channels/turn/kernel.test.ts index a4af11f8f8c..37df367f9be 100644 --- a/src/channels/turn/kernel.test.ts +++ b/src/channels/turn/kernel.test.ts @@ -6,6 +6,9 @@ import type { RecordInboundSession } from "../session.types.js"; import { createNoopChannelTurnDeliveryAdapter, dispatchAssembledChannelTurn, + hasFinalChannelTurnDispatch, + hasVisibleChannelTurnDispatch, + resolveChannelTurnDispatchCounts, runPreparedChannelTurn, runChannelTurn, } from "./kernel.js"; @@ -84,6 +87,7 @@ describe("channel turn kernel", () => { it("runs prepared dispatches after recording session metadata", async () => { const events: string[] = []; + const log = vi.fn(); const recordInboundSession = createRecordInboundSession(events); const runDispatch = vi.fn(async () => { events.push("dispatch"); @@ -100,6 +104,8 @@ describe("channel turn kernel", () => { ctxPayload: createCtx(), recordInboundSession, runDispatch, + log, + messageId: "msg-1", record: { onRecordError: vi.fn(), }, @@ -107,11 +113,24 @@ describe("channel turn kernel", () => { expect(events).toEqual(["record", "dispatch"]); expect(result.dispatchResult?.queuedFinal).toBe(true); + expect(log).toHaveBeenCalledWith( + expect.objectContaining({ stage: "record", event: "start", messageId: "msg-1" }), + ); + expect(log).toHaveBeenCalledWith( + expect.objectContaining({ stage: "record", event: "done", messageId: "msg-1" }), + ); + expect(log).toHaveBeenCalledWith( + expect.objectContaining({ stage: "dispatch", event: "start", messageId: "msg-1" }), + ); + expect(log).toHaveBeenCalledWith( + expect.objectContaining({ stage: "dispatch", event: "done", messageId: "msg-1" }), + ); }); it("cleans up pre-created dispatchers when session recording fails", async () => { const events: string[] = []; const recordError = new Error("session store failed"); + const log = vi.fn(); const recordInboundSession = vi.fn(async () => { events.push("record"); throw recordError; @@ -130,6 +149,7 @@ describe("channel turn kernel", () => { recordInboundSession, onPreDispatchFailure, runDispatch, + log, record: { onRecordError: vi.fn(), }, @@ -139,6 +159,33 @@ describe("channel turn kernel", () => { expect(events).toEqual(["record", "cleanup"]); expect(runDispatch).not.toHaveBeenCalled(); expect(onPreDispatchFailure).toHaveBeenCalledWith(recordError); + expect(log).toHaveBeenCalledWith(expect.objectContaining({ stage: "record", event: "error" })); + }); + + it("normalizes visible dispatch checks", () => { + expect(hasVisibleChannelTurnDispatch(undefined)).toBe(false); + expect( + hasVisibleChannelTurnDispatch({ + queuedFinal: false, + counts: { tool: 1, block: 0, final: 0 }, + }), + ).toBe(true); + expect( + hasVisibleChannelTurnDispatch(undefined, { + observedReplyDelivery: true, + }), + ).toBe(true); + expect( + hasFinalChannelTurnDispatch({ + queuedFinal: false, + counts: { tool: 1, block: 0, final: 0 }, + }), + ).toBe(false); + expect(resolveChannelTurnDispatchCounts(undefined)).toEqual({ + tool: 0, + block: 0, + final: 0, + }); }); it("drops when ingest returns null", async () => { diff --git a/src/channels/turn/kernel.ts b/src/channels/turn/kernel.ts index e018c053c4d..ebc4f7e88de 100644 --- a/src/channels/turn/kernel.ts +++ b/src/channels/turn/kernel.ts @@ -14,6 +14,14 @@ import type { RunChannelTurnParams, RunResolvedChannelTurnParams, } from "./types.js"; +export { + EMPTY_CHANNEL_TURN_DISPATCH_COUNTS, + hasFinalChannelTurnDispatch, + hasVisibleChannelTurnDispatch, + resolveChannelTurnDispatchCounts, + type ChannelTurnDispatchResultLike, + type ChannelTurnVisibleDeliverySignals, +} from "./dispatch-result.js"; export type { AccessFacts, AssembledChannelTurn, @@ -100,6 +108,9 @@ export async function dispatchAssembledChannelTurn( ctxPayload: params.ctxPayload, recordInboundSession: params.recordInboundSession, record: params.record, + admission: params.admission, + log: params.log, + messageId: params.messageId, runDispatch: async () => await params.dispatchReplyWithBufferedBlockDispatcher({ ctx: params.ctxPayload, @@ -122,6 +133,17 @@ export async function runPreparedChannelTurn< >( params: PreparedChannelTurn, ): Promise> { + const admission = params.admission ?? ({ kind: "dispatch" } as const); + emit({ + ...params, + event: { + stage: "record", + event: "start", + messageId: params.messageId, + sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey, + admission: admission.kind, + }, + }); try { await params.recordInboundSession({ storePath: params.storePath, @@ -133,7 +155,28 @@ export async function runPreparedChannelTurn< onRecordError: params.record?.onRecordError ?? (() => undefined), trackSessionMetaTask: params.record?.trackSessionMetaTask, }); + emit({ + ...params, + event: { + stage: "record", + event: "done", + messageId: params.messageId, + sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey, + admission: admission.kind, + }, + }); } catch (err) { + emit({ + ...params, + event: { + stage: "record", + event: "error", + messageId: params.messageId, + sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey, + admission: admission.kind, + error: err, + }, + }); try { await params.onPreDispatchFailure?.(err); } catch { @@ -142,10 +185,46 @@ export async function runPreparedChannelTurn< throw err; } - const dispatchResult = await params.runDispatch(); + emit({ + ...params, + event: { + stage: "dispatch", + event: "start", + messageId: params.messageId, + sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey, + admission: admission.kind, + }, + }); + let dispatchResult: TDispatchResult; + try { + dispatchResult = await params.runDispatch(); + } catch (err) { + emit({ + ...params, + event: { + stage: "dispatch", + event: "error", + messageId: params.messageId, + sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey, + admission: admission.kind, + error: err, + }, + }); + throw err; + } + emit({ + ...params, + event: { + stage: "dispatch", + event: "done", + messageId: params.messageId, + sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey, + admission: admission.kind, + }, + }); return { - admission: { kind: "dispatch" }, + admission, dispatched: true, ctxPayload: params.ctxPayload, routeSessionKey: params.routeSessionKey, @@ -239,25 +318,21 @@ export async function runChannelTurn( ? { ...resolved, delivery: createNoopChannelTurnDeliveryAdapter(), + admission, + log: params.log, + messageId: input.id, } - : resolved, + : { + ...resolved, + admission, + log: params.log, + messageId: input.id, + }, ); result = { ...dispatchResult, admission, }; - - emit({ - ...params, - accountId: resolved.accountId ?? params.accountId, - event: { - stage: "dispatch", - event: "done", - messageId: input.id, - sessionKey: resolved.routeSessionKey, - admission: admission.kind, - }, - }); } catch (err) { const failedResult: ChannelTurnResult = { admission, @@ -274,12 +349,11 @@ export async function runChannelTurn( ...params, accountId: resolved.accountId ?? params.accountId, event: { - stage: "dispatch", - event: "error", + stage: "finalize", + event: "done", messageId: input.id, sessionKey: resolved.routeSessionKey, admission: admission.kind, - error: err, }, }); throw err; diff --git a/src/channels/turn/types.ts b/src/channels/turn/types.ts index 5c75849392a..5714c6c5c47 100644 --- a/src/channels/turn/types.ts +++ b/src/channels/turn/types.ts @@ -209,6 +209,9 @@ export type AssembledChannelTurn = { replyOptions?: Omit; replyResolver?: GetReplyFromConfig; record?: ChannelTurnRecordOptions; + admission?: Extract; + log?: (event: ChannelTurnLogEvent) => void; + messageId?: string; }; export type PreparedChannelTurn = { @@ -221,6 +224,9 @@ export type PreparedChannelTurn = { record?: ChannelTurnRecordOptions; onPreDispatchFailure?: (err: unknown) => void | Promise; runDispatch: () => Promise; + admission?: Extract; + log?: (event: ChannelTurnLogEvent) => void; + messageId?: string; }; export type ChannelTurnResolved = AssembledChannelTurn & { @@ -259,7 +265,7 @@ export type ChannelTurnResult = { }; export type DispatchedChannelTurnResult = { - admission: Extract; + admission: Extract; dispatched: true; ctxPayload: MsgContext; routeSessionKey: string; diff --git a/src/plugin-sdk/channel-contract-testing.test.ts b/src/plugin-sdk/channel-contract-testing.test.ts new file mode 100644 index 00000000000..19a1341108b --- /dev/null +++ b/src/plugin-sdk/channel-contract-testing.test.ts @@ -0,0 +1,18 @@ +import { expectChannelTurnDispatchResultContract } from "openclaw/plugin-sdk/channel-contract-testing"; +import { describe, it } from "vitest"; + +describe("channel contract testing helpers", () => { + it("asserts shared channel turn dispatch visibility", () => { + expectChannelTurnDispatchResultContract( + { + queuedFinal: false, + counts: { tool: 0, block: 1, final: 0 }, + }, + { + visible: true, + final: false, + counts: { block: 1 }, + }, + ); + }); +}); diff --git a/src/plugin-sdk/channel-contract-testing.ts b/src/plugin-sdk/channel-contract-testing.ts index 9104ff87b2d..a2f2ce24dd9 100644 --- a/src/plugin-sdk/channel-contract-testing.ts +++ b/src/plugin-sdk/channel-contract-testing.ts @@ -1,5 +1,6 @@ export { expectChannelInboundContextContract, + expectChannelTurnDispatchResultContract, primeChannelOutboundSendMock, } from "../channels/plugins/contracts/test-helpers.js"; export { buildDispatchInboundCaptureMock } from "../channels/plugins/contracts/inbound-testkit.js"; diff --git a/src/plugin-sdk/inbound-reply-dispatch.test.ts b/src/plugin-sdk/inbound-reply-dispatch.test.ts index 21abe7adb29..d02c94a25b8 100644 --- a/src/plugin-sdk/inbound-reply-dispatch.test.ts +++ b/src/plugin-sdk/inbound-reply-dispatch.test.ts @@ -3,7 +3,12 @@ import type { DispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/rep import type { FinalizedMsgContext } from "../auto-reply/templating.js"; import type { RecordInboundSession } from "../channels/session.types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { recordInboundSessionAndDispatchReply } from "./inbound-reply-dispatch.js"; +import { + hasFinalInboundReplyDispatch, + hasVisibleInboundReplyDispatch, + recordInboundSessionAndDispatchReply, + resolveInboundReplyDispatchCounts, +} from "./inbound-reply-dispatch.js"; describe("recordInboundSessionAndDispatchReply", () => { it("delegates record and dispatch through the channel turn kernel once", async () => { @@ -64,4 +69,30 @@ describe("recordInboundSessionAndDispatchReply", () => { replyToId: undefined, }); }); + + it("exports shared visible reply dispatch helpers", () => { + expect(hasVisibleInboundReplyDispatch(undefined)).toBe(false); + expect( + hasVisibleInboundReplyDispatch({ + queuedFinal: false, + counts: { tool: 0, block: 1, final: 0 }, + }), + ).toBe(true); + expect( + hasFinalInboundReplyDispatch({ + queuedFinal: false, + counts: { tool: 0, block: 1, final: 0 }, + }), + ).toBe(false); + expect( + hasFinalInboundReplyDispatch(undefined, { + fallbackDelivered: true, + }), + ).toBe(true); + expect(resolveInboundReplyDispatchCounts(undefined)).toEqual({ + tool: 0, + block: 0, + final: 0, + }); + }); }); diff --git a/src/plugin-sdk/inbound-reply-dispatch.ts b/src/plugin-sdk/inbound-reply-dispatch.ts index d83cf47f4db..50d36606f45 100644 --- a/src/plugin-sdk/inbound-reply-dispatch.ts +++ b/src/plugin-sdk/inbound-reply-dispatch.ts @@ -7,7 +7,12 @@ import { import type { DispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.types.js"; import type { ReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.types.js"; import type { FinalizedMsgContext } from "../auto-reply/templating.js"; -import { dispatchAssembledChannelTurn, runPreparedChannelTurn } from "../channels/turn/kernel.js"; +import { + hasFinalChannelTurnDispatch, + hasVisibleChannelTurnDispatch, + resolveChannelTurnDispatchCounts, + runPreparedChannelTurn, +} from "../channels/turn/kernel.js"; import type { PreparedChannelTurn } from "../channels/turn/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { createChannelReplyPipeline } from "./channel-reply-pipeline.js"; @@ -28,6 +33,12 @@ export async function runPreparedInboundReplyTurn( return await runPreparedChannelTurn(params); } +export { + hasFinalChannelTurnDispatch as hasFinalInboundReplyDispatch, + hasVisibleChannelTurnDispatch as hasVisibleInboundReplyDispatch, + resolveChannelTurnDispatchCounts as resolveInboundReplyDispatchCounts, +}; + /** Run `dispatchReplyFromConfig` with a dispatcher that always gets its settled callback. */ export async function dispatchReplyFromConfigWithSettledDispatcher(params: { cfg: OpenClawConfig; @@ -134,27 +145,29 @@ export async function recordInboundSessionAndDispatchReply(params: { }); const deliver = createNormalizedOutboundDeliverer(params.deliver); - await dispatchAssembledChannelTurn({ - cfg: params.cfg, + await runPreparedChannelTurn({ channel: params.channel, accountId: params.accountId, - agentId: params.agentId, routeSessionKey: params.routeSessionKey, storePath: params.storePath, ctxPayload: params.ctxPayload, recordInboundSession: params.recordInboundSession, - dispatchReplyWithBufferedBlockDispatcher: params.dispatchReplyWithBufferedBlockDispatcher, - delivery: { - deliver, - onError: params.onDispatchError, - }, - dispatcherOptions: replyPipeline, - replyOptions: { - ...params.replyOptions, - onModelSelected, - }, record: { onRecordError: params.onRecordError, }, + runDispatch: async () => + await params.dispatchReplyWithBufferedBlockDispatcher({ + ctx: params.ctxPayload, + cfg: params.cfg, + dispatcherOptions: { + ...replyPipeline, + deliver, + onError: params.onDispatchError, + }, + replyOptions: { + ...params.replyOptions, + onModelSelected, + }, + }), }); } diff --git a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts index 2dd1e856c82..4a3f63ddea8 100644 --- a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts +++ b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts @@ -104,7 +104,7 @@ export function createPluginRuntimeMock(overrides: DeepPartial = replyResolver: params.replyResolver, }); return { - admission: { kind: "dispatch" as const }, + admission: params.admission ?? { kind: "dispatch" as const }, dispatched: true, ctxPayload: params.ctxPayload, routeSessionKey: params.routeSessionKey, @@ -135,7 +135,7 @@ export function createPluginRuntimeMock(overrides: DeepPartial = } const dispatchResult = await params.runDispatch(); return { - admission: { kind: "dispatch" as const }, + admission: params.admission ?? { kind: "dispatch" as const }, dispatched: true, ctxPayload: params.ctxPayload, routeSessionKey: params.routeSessionKey, @@ -178,10 +178,19 @@ export function createPluginRuntimeMock(overrides: DeepPartial = }; } const resolved = await params.adapter.resolveTurn(input, eventClass, preflight ?? {}); - const dispatchResult = await dispatchAssembledChannelTurnMock(resolved); + const admission = + resolved.admission ?? preflight.admission ?? ({ kind: "dispatch" } as const); + const dispatchResult = await dispatchAssembledChannelTurnMock({ + ...resolved, + admission, + delivery: + admission.kind === "observeOnly" + ? { deliver: async () => ({ visibleReplySent: false }) } + : resolved.delivery, + }); const result = { ...dispatchResult, - admission: resolved.admission ?? preflight.admission ?? dispatchResult.admission, + admission, }; await params.adapter.onFinalize?.(result); return result; diff --git a/src/plugin-sdk/testing.ts b/src/plugin-sdk/testing.ts index 0896cabcb5e..c79bcb1d6d1 100644 --- a/src/plugin-sdk/testing.ts +++ b/src/plugin-sdk/testing.ts @@ -13,6 +13,7 @@ export { } from "../channels/ack-reactions.js"; export { expectChannelInboundContextContract, + expectChannelTurnDispatchResultContract, primeChannelOutboundSendMock, } from "../channels/plugins/contracts/test-helpers.js"; export {