refactor(channels): share turn dispatch results

This commit is contained in:
Peter Steinberger
2026-04-30 02:58:31 +01:00
parent f94d970cee
commit 02ebac6250
21 changed files with 380 additions and 68 deletions

View File

@@ -1,2 +1,2 @@
81de442c9e0c902621f316297af3ad6c48a07c0c6bbfa5ca984419cfdb7f4292 plugin-sdk-api-baseline.json
136836d047bd0fb0723047945fdbd931a9128552ff49e8a27937d54dba0e9709 plugin-sdk-api-baseline.jsonl
ae28566c922ce79527943b069abc199de28e3898ec08eea12c4ff6050795f276 plugin-sdk-api-baseline.json
79446b23832949553b23e7cf92be37b81c69d123fc09bed6f8fc04bd98e9257d plugin-sdk-api-baseline.jsonl

View File

@@ -13,7 +13,10 @@ import {
} from "openclaw/plugin-sdk/channel-reply-pipeline";
import { resolveChannelStreamingBlockEnabled } from "openclaw/plugin-sdk/channel-streaming";
import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime";
import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime";
import { resolveChunkMode } from "openclaw/plugin-sdk/reply-chunking";
@@ -643,7 +646,7 @@ export async function processDiscordMessage(
return;
}
if (!dispatchResult?.queuedFinal) {
if (!hasFinalInboundReplyDispatch(dispatchResult)) {
if (isGuildMessage) {
clearHistoryEntriesIfEnabled({
historyMap: guildHistories,

View File

@@ -12,7 +12,10 @@ import {
} from "openclaw/plugin-sdk/conversation-runtime";
import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
import { normalizeScpRemoteHost } from "openclaw/plugin-sdk/host-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { isInboundPathAllowed, kindFromMime } from "openclaw/plugin-sdk/media-runtime";
import {
clearHistoryEntriesIfEnabled,
@@ -487,9 +490,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
},
}),
});
const queuedFinal = dispatchResult.queuedFinal;
if (!queuedFinal) {
if (!hasFinalInboundReplyDispatch(dispatchResult)) {
if (decision.isGroup && decision.historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: groupHistories,

View File

@@ -2,7 +2,10 @@ import type { webhook } from "@line/bot-sdk";
import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
dispatchReplyWithBufferedBlockDispatcher,
chunkMarkdownText,
@@ -306,9 +309,7 @@ export async function monitorLineProvider(
},
}),
});
const queuedFinal = dispatchResult.queuedFinal;
if (!queuedFinal) {
if (!hasFinalInboundReplyDispatch(dispatchResult)) {
logVerbose(`line: no response generated for message from ${ctxPayload.From}`);
}
} catch (err) {

View File

@@ -3,6 +3,7 @@ import {
evaluateSupplementalContextVisibility,
resolveChannelContextVisibilityMode,
} from "openclaw/plugin-sdk/context-visibility-runtime";
import { hasFinalInboundReplyDispatch } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import type { GetReplyOptions } from "openclaw/plugin-sdk/reply-runtime";
import {
loadSessionStore,
@@ -1963,7 +1964,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
if (isRoom && triggerSnapshot) {
roomHistoryTracker.consumeHistory(_route.agentId, roomId, triggerSnapshot, _messageId);
}
if (!queuedFinal) {
if (!hasFinalInboundReplyDispatch({ queuedFinal, counts })) {
await commitInboundEventIfClaimed();
return;
}

View File

@@ -11,7 +11,11 @@ import {
shouldIncludeSupplementalContext,
} from "openclaw/plugin-sdk/context-visibility-runtime";
import { evaluateSenderGroupAccessForPolicy } from "openclaw/plugin-sdk/group-access";
import { dispatchReplyFromConfigWithSettledDispatcher } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
dispatchReplyFromConfigWithSettledDispatcher,
hasFinalInboundReplyDispatch,
resolveInboundReplyDispatchCounts,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
@@ -864,11 +868,12 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
}),
});
const queuedFinal = dispatchResult?.queuedFinal ?? false;
const counts = dispatchResult?.counts ?? { tool: 0, block: 0, final: 0 };
const counts = resolveInboundReplyDispatchCounts(dispatchResult);
const hasFinalResponse = hasFinalInboundReplyDispatch(dispatchResult);
log.info("dispatch complete", { queuedFinal, counts });
if (!queuedFinal) {
if (!hasFinalResponse) {
if (isRoomish && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: conversationHistories,

View File

@@ -25,7 +25,10 @@ import {
toInternalMessageReceivedContext,
triggerInternalHook,
} from "openclaw/plugin-sdk/hook-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { kindFromMime } from "openclaw/plugin-sdk/media-runtime";
import {
buildPendingHistoryContextFromMap,
@@ -351,8 +354,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
}
},
});
const queuedFinal = dispatchResult?.queuedFinal ?? false;
if (!queuedFinal) {
if (!hasFinalInboundReplyDispatch(dispatchResult)) {
if (entry.isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: deps.groupHistories,

View File

@@ -18,7 +18,10 @@ import {
resolveChannelStreamingPreviewToolProgress,
} from "openclaw/plugin-sdk/channel-streaming";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
hasVisibleInboundReplyDispatch,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime";
import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
@@ -1099,12 +1102,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
}
}
const anyReplyDelivered =
observedReplyDelivery ||
queuedFinal ||
streamFallbackDelivered ||
(counts.block ?? 0) > 0 ||
(counts.final ?? 0) > 0;
const anyReplyDelivered = hasVisibleInboundReplyDispatch(
{ queuedFinal, counts },
{
observedReplyDelivery,
fallbackDelivered: streamFallbackDelivered,
},
);
if (statusReactionsEnabled) {
if (dispatchError) {

View File

@@ -17,7 +17,10 @@ import type {
TelegramAccountConfig,
} from "openclaw/plugin-sdk/config-types";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
createOutboundPayloadPlan,
projectOutboundPayloadPlanForDelivery,
@@ -1306,7 +1309,13 @@ export const dispatchTelegramMessage = async ({
});
}
const hasFinalResponse = queuedFinal || sentFallback || deliverySummary.delivered;
const hasFinalResponse = hasFinalInboundReplyDispatch(
{ queuedFinal },
{
fallbackDelivered: sentFallback,
deliverySummaryDelivered: deliverySummary.delivered,
},
);
if (statusReactionController && !hasFinalResponse) {
void finalizeTelegramStatusReaction({ outcome: "error", hasFinalResponse: false }).catch(

View File

@@ -1,3 +1,4 @@
import { hasVisibleInboundReplyDispatch } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
type DeliverableWhatsAppOutboundPayload,
normalizeWhatsAppOutboundPayload,
@@ -388,8 +389,7 @@ export async function dispatchWhatsAppBufferedReply(params: {
},
});
const didQueueVisibleReply =
queuedFinal || counts.tool > 0 || counts.block > 0 || counts.final > 0;
const didQueueVisibleReply = hasVisibleInboundReplyDispatch({ queuedFinal, counts });
if (!didQueueVisibleReply) {
if (params.shouldClearGroupHistory) {
params.groupHistories.set(params.groupHistoryKey, []);

View File

@@ -1,8 +1,15 @@
import { expect, type Mock } from "vitest";
import type { DispatchFromConfigResult } from "../../../auto-reply/reply/dispatch-from-config.types.js";
import type { MsgContext } from "../../../auto-reply/templating.js";
import { normalizeChatType } from "../../chat-type.js";
import { resolveConversationLabel } from "../../conversation-label.js";
import { validateSenderIdentity } from "../../sender-identity.js";
import {
hasFinalChannelTurnDispatch,
hasVisibleChannelTurnDispatch,
resolveChannelTurnDispatchCounts,
type ChannelTurnDispatchResultLike,
} from "../../turn/dispatch-result.js";
// oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Test helper preserves channel send mock arg types.
export function primeChannelOutboundSendMock<TArgs extends unknown[]>(
@@ -33,3 +40,20 @@ export function expectChannelInboundContextContract(ctx: MsgContext) {
expect(label).toBeTruthy();
}
}
export function expectChannelTurnDispatchResultContract(
result: ChannelTurnDispatchResultLike,
expected: {
visible: boolean;
final?: boolean;
counts?: Partial<DispatchFromConfigResult["counts"]>;
},
) {
expect(hasVisibleChannelTurnDispatch(result)).toBe(expected.visible);
if (expected.final !== undefined) {
expect(hasFinalChannelTurnDispatch(result)).toBe(expected.final);
}
if (expected.counts) {
expect(resolveChannelTurnDispatchCounts(result)).toMatchObject(expected.counts);
}
}

View File

@@ -0,0 +1,62 @@
import type { ReplyDispatchKind } from "../../auto-reply/reply/reply-dispatcher.types.js";
export type ChannelTurnDispatchResultLike =
| {
queuedFinal?: boolean;
counts?: Partial<Record<ReplyDispatchKind, number>>;
}
| null
| undefined;
export type ChannelTurnVisibleDeliverySignals = {
observedReplyDelivery?: boolean;
fallbackDelivered?: boolean;
deliverySummaryDelivered?: boolean;
};
export const EMPTY_CHANNEL_TURN_DISPATCH_COUNTS: Record<ReplyDispatchKind, number> = {
tool: 0,
block: 0,
final: 0,
};
export function resolveChannelTurnDispatchCounts(
result: ChannelTurnDispatchResultLike,
): Record<ReplyDispatchKind, number> {
return {
...EMPTY_CHANNEL_TURN_DISPATCH_COUNTS,
...result?.counts,
};
}
export function hasVisibleChannelTurnDispatch(
result: ChannelTurnDispatchResultLike,
signals: ChannelTurnVisibleDeliverySignals = {},
): boolean {
const counts = resolveChannelTurnDispatchCounts(result);
return (
signals.observedReplyDelivery === true ||
signals.fallbackDelivered === true ||
signals.deliverySummaryDelivered === true ||
result?.queuedFinal === true ||
counts.tool > 0 ||
counts.block > 0 ||
counts.final > 0
);
}
export function hasFinalChannelTurnDispatch(
result: ChannelTurnDispatchResultLike,
signals: Pick<
ChannelTurnVisibleDeliverySignals,
"fallbackDelivered" | "deliverySummaryDelivered"
> = {},
): boolean {
const counts = resolveChannelTurnDispatchCounts(result);
return (
signals.fallbackDelivered === true ||
signals.deliverySummaryDelivered === true ||
result?.queuedFinal === true ||
counts.final > 0
);
}

View File

@@ -6,6 +6,9 @@ import type { RecordInboundSession } from "../session.types.js";
import {
createNoopChannelTurnDeliveryAdapter,
dispatchAssembledChannelTurn,
hasFinalChannelTurnDispatch,
hasVisibleChannelTurnDispatch,
resolveChannelTurnDispatchCounts,
runPreparedChannelTurn,
runChannelTurn,
} from "./kernel.js";
@@ -84,6 +87,7 @@ describe("channel turn kernel", () => {
it("runs prepared dispatches after recording session metadata", async () => {
const events: string[] = [];
const log = vi.fn();
const recordInboundSession = createRecordInboundSession(events);
const runDispatch = vi.fn(async () => {
events.push("dispatch");
@@ -100,6 +104,8 @@ describe("channel turn kernel", () => {
ctxPayload: createCtx(),
recordInboundSession,
runDispatch,
log,
messageId: "msg-1",
record: {
onRecordError: vi.fn(),
},
@@ -107,11 +113,24 @@ describe("channel turn kernel", () => {
expect(events).toEqual(["record", "dispatch"]);
expect(result.dispatchResult?.queuedFinal).toBe(true);
expect(log).toHaveBeenCalledWith(
expect.objectContaining({ stage: "record", event: "start", messageId: "msg-1" }),
);
expect(log).toHaveBeenCalledWith(
expect.objectContaining({ stage: "record", event: "done", messageId: "msg-1" }),
);
expect(log).toHaveBeenCalledWith(
expect.objectContaining({ stage: "dispatch", event: "start", messageId: "msg-1" }),
);
expect(log).toHaveBeenCalledWith(
expect.objectContaining({ stage: "dispatch", event: "done", messageId: "msg-1" }),
);
});
it("cleans up pre-created dispatchers when session recording fails", async () => {
const events: string[] = [];
const recordError = new Error("session store failed");
const log = vi.fn();
const recordInboundSession = vi.fn(async () => {
events.push("record");
throw recordError;
@@ -130,6 +149,7 @@ describe("channel turn kernel", () => {
recordInboundSession,
onPreDispatchFailure,
runDispatch,
log,
record: {
onRecordError: vi.fn(),
},
@@ -139,6 +159,33 @@ describe("channel turn kernel", () => {
expect(events).toEqual(["record", "cleanup"]);
expect(runDispatch).not.toHaveBeenCalled();
expect(onPreDispatchFailure).toHaveBeenCalledWith(recordError);
expect(log).toHaveBeenCalledWith(expect.objectContaining({ stage: "record", event: "error" }));
});
it("normalizes visible dispatch checks", () => {
expect(hasVisibleChannelTurnDispatch(undefined)).toBe(false);
expect(
hasVisibleChannelTurnDispatch({
queuedFinal: false,
counts: { tool: 1, block: 0, final: 0 },
}),
).toBe(true);
expect(
hasVisibleChannelTurnDispatch(undefined, {
observedReplyDelivery: true,
}),
).toBe(true);
expect(
hasFinalChannelTurnDispatch({
queuedFinal: false,
counts: { tool: 1, block: 0, final: 0 },
}),
).toBe(false);
expect(resolveChannelTurnDispatchCounts(undefined)).toEqual({
tool: 0,
block: 0,
final: 0,
});
});
it("drops when ingest returns null", async () => {

View File

@@ -14,6 +14,14 @@ import type {
RunChannelTurnParams,
RunResolvedChannelTurnParams,
} from "./types.js";
export {
EMPTY_CHANNEL_TURN_DISPATCH_COUNTS,
hasFinalChannelTurnDispatch,
hasVisibleChannelTurnDispatch,
resolveChannelTurnDispatchCounts,
type ChannelTurnDispatchResultLike,
type ChannelTurnVisibleDeliverySignals,
} from "./dispatch-result.js";
export type {
AccessFacts,
AssembledChannelTurn,
@@ -100,6 +108,9 @@ export async function dispatchAssembledChannelTurn(
ctxPayload: params.ctxPayload,
recordInboundSession: params.recordInboundSession,
record: params.record,
admission: params.admission,
log: params.log,
messageId: params.messageId,
runDispatch: async () =>
await params.dispatchReplyWithBufferedBlockDispatcher({
ctx: params.ctxPayload,
@@ -122,6 +133,17 @@ export async function runPreparedChannelTurn<
>(
params: PreparedChannelTurn<TDispatchResult>,
): Promise<DispatchedChannelTurnResult<TDispatchResult>> {
const admission = params.admission ?? ({ kind: "dispatch" } as const);
emit({
...params,
event: {
stage: "record",
event: "start",
messageId: params.messageId,
sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey,
admission: admission.kind,
},
});
try {
await params.recordInboundSession({
storePath: params.storePath,
@@ -133,7 +155,28 @@ export async function runPreparedChannelTurn<
onRecordError: params.record?.onRecordError ?? (() => undefined),
trackSessionMetaTask: params.record?.trackSessionMetaTask,
});
emit({
...params,
event: {
stage: "record",
event: "done",
messageId: params.messageId,
sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey,
admission: admission.kind,
},
});
} catch (err) {
emit({
...params,
event: {
stage: "record",
event: "error",
messageId: params.messageId,
sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey,
admission: admission.kind,
error: err,
},
});
try {
await params.onPreDispatchFailure?.(err);
} catch {
@@ -142,10 +185,46 @@ export async function runPreparedChannelTurn<
throw err;
}
const dispatchResult = await params.runDispatch();
emit({
...params,
event: {
stage: "dispatch",
event: "start",
messageId: params.messageId,
sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey,
admission: admission.kind,
},
});
let dispatchResult: TDispatchResult;
try {
dispatchResult = await params.runDispatch();
} catch (err) {
emit({
...params,
event: {
stage: "dispatch",
event: "error",
messageId: params.messageId,
sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey,
admission: admission.kind,
error: err,
},
});
throw err;
}
emit({
...params,
event: {
stage: "dispatch",
event: "done",
messageId: params.messageId,
sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey,
admission: admission.kind,
},
});
return {
admission: { kind: "dispatch" },
admission,
dispatched: true,
ctxPayload: params.ctxPayload,
routeSessionKey: params.routeSessionKey,
@@ -239,25 +318,21 @@ export async function runChannelTurn<TRaw>(
? {
...resolved,
delivery: createNoopChannelTurnDeliveryAdapter(),
admission,
log: params.log,
messageId: input.id,
}
: resolved,
: {
...resolved,
admission,
log: params.log,
messageId: input.id,
},
);
result = {
...dispatchResult,
admission,
};
emit({
...params,
accountId: resolved.accountId ?? params.accountId,
event: {
stage: "dispatch",
event: "done",
messageId: input.id,
sessionKey: resolved.routeSessionKey,
admission: admission.kind,
},
});
} catch (err) {
const failedResult: ChannelTurnResult = {
admission,
@@ -274,12 +349,11 @@ export async function runChannelTurn<TRaw>(
...params,
accountId: resolved.accountId ?? params.accountId,
event: {
stage: "dispatch",
event: "error",
stage: "finalize",
event: "done",
messageId: input.id,
sessionKey: resolved.routeSessionKey,
admission: admission.kind,
error: err,
},
});
throw err;

View File

@@ -209,6 +209,9 @@ export type AssembledChannelTurn = {
replyOptions?: Omit<GetReplyOptions, "onBlockReply">;
replyResolver?: GetReplyFromConfig;
record?: ChannelTurnRecordOptions;
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
log?: (event: ChannelTurnLogEvent) => void;
messageId?: string;
};
export type PreparedChannelTurn<TDispatchResult = DispatchFromConfigResult> = {
@@ -221,6 +224,9 @@ export type PreparedChannelTurn<TDispatchResult = DispatchFromConfigResult> = {
record?: ChannelTurnRecordOptions;
onPreDispatchFailure?: (err: unknown) => void | Promise<void>;
runDispatch: () => Promise<TDispatchResult>;
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
log?: (event: ChannelTurnLogEvent) => void;
messageId?: string;
};
export type ChannelTurnResolved = AssembledChannelTurn & {
@@ -259,7 +265,7 @@ export type ChannelTurnResult = {
};
export type DispatchedChannelTurnResult<TDispatchResult = DispatchFromConfigResult> = {
admission: Extract<ChannelTurnAdmission, { kind: "dispatch" }>;
admission: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
dispatched: true;
ctxPayload: MsgContext;
routeSessionKey: string;

View File

@@ -0,0 +1,18 @@
import { expectChannelTurnDispatchResultContract } from "openclaw/plugin-sdk/channel-contract-testing";
import { describe, it } from "vitest";
describe("channel contract testing helpers", () => {
it("asserts shared channel turn dispatch visibility", () => {
expectChannelTurnDispatchResultContract(
{
queuedFinal: false,
counts: { tool: 0, block: 1, final: 0 },
},
{
visible: true,
final: false,
counts: { block: 1 },
},
);
});
});

View File

@@ -1,5 +1,6 @@
export {
expectChannelInboundContextContract,
expectChannelTurnDispatchResultContract,
primeChannelOutboundSendMock,
} from "../channels/plugins/contracts/test-helpers.js";
export { buildDispatchInboundCaptureMock } from "../channels/plugins/contracts/inbound-testkit.js";

View File

@@ -3,7 +3,12 @@ import type { DispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/rep
import type { FinalizedMsgContext } from "../auto-reply/templating.js";
import type { RecordInboundSession } from "../channels/session.types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { recordInboundSessionAndDispatchReply } from "./inbound-reply-dispatch.js";
import {
hasFinalInboundReplyDispatch,
hasVisibleInboundReplyDispatch,
recordInboundSessionAndDispatchReply,
resolveInboundReplyDispatchCounts,
} from "./inbound-reply-dispatch.js";
describe("recordInboundSessionAndDispatchReply", () => {
it("delegates record and dispatch through the channel turn kernel once", async () => {
@@ -64,4 +69,30 @@ describe("recordInboundSessionAndDispatchReply", () => {
replyToId: undefined,
});
});
it("exports shared visible reply dispatch helpers", () => {
expect(hasVisibleInboundReplyDispatch(undefined)).toBe(false);
expect(
hasVisibleInboundReplyDispatch({
queuedFinal: false,
counts: { tool: 0, block: 1, final: 0 },
}),
).toBe(true);
expect(
hasFinalInboundReplyDispatch({
queuedFinal: false,
counts: { tool: 0, block: 1, final: 0 },
}),
).toBe(false);
expect(
hasFinalInboundReplyDispatch(undefined, {
fallbackDelivered: true,
}),
).toBe(true);
expect(resolveInboundReplyDispatchCounts(undefined)).toEqual({
tool: 0,
block: 0,
final: 0,
});
});
});

View File

@@ -7,7 +7,12 @@ import {
import type { DispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.types.js";
import type { ReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.types.js";
import type { FinalizedMsgContext } from "../auto-reply/templating.js";
import { dispatchAssembledChannelTurn, runPreparedChannelTurn } from "../channels/turn/kernel.js";
import {
hasFinalChannelTurnDispatch,
hasVisibleChannelTurnDispatch,
resolveChannelTurnDispatchCounts,
runPreparedChannelTurn,
} from "../channels/turn/kernel.js";
import type { PreparedChannelTurn } from "../channels/turn/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { createChannelReplyPipeline } from "./channel-reply-pipeline.js";
@@ -28,6 +33,12 @@ export async function runPreparedInboundReplyTurn<TDispatchResult>(
return await runPreparedChannelTurn(params);
}
export {
hasFinalChannelTurnDispatch as hasFinalInboundReplyDispatch,
hasVisibleChannelTurnDispatch as hasVisibleInboundReplyDispatch,
resolveChannelTurnDispatchCounts as resolveInboundReplyDispatchCounts,
};
/** Run `dispatchReplyFromConfig` with a dispatcher that always gets its settled callback. */
export async function dispatchReplyFromConfigWithSettledDispatcher(params: {
cfg: OpenClawConfig;
@@ -134,27 +145,29 @@ export async function recordInboundSessionAndDispatchReply(params: {
});
const deliver = createNormalizedOutboundDeliverer(params.deliver);
await dispatchAssembledChannelTurn({
cfg: params.cfg,
await runPreparedChannelTurn({
channel: params.channel,
accountId: params.accountId,
agentId: params.agentId,
routeSessionKey: params.routeSessionKey,
storePath: params.storePath,
ctxPayload: params.ctxPayload,
recordInboundSession: params.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher: params.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver,
onError: params.onDispatchError,
},
dispatcherOptions: replyPipeline,
replyOptions: {
...params.replyOptions,
onModelSelected,
},
record: {
onRecordError: params.onRecordError,
},
runDispatch: async () =>
await params.dispatchReplyWithBufferedBlockDispatcher({
ctx: params.ctxPayload,
cfg: params.cfg,
dispatcherOptions: {
...replyPipeline,
deliver,
onError: params.onDispatchError,
},
replyOptions: {
...params.replyOptions,
onModelSelected,
},
}),
});
}

View File

@@ -104,7 +104,7 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
replyResolver: params.replyResolver,
});
return {
admission: { kind: "dispatch" as const },
admission: params.admission ?? { kind: "dispatch" as const },
dispatched: true,
ctxPayload: params.ctxPayload,
routeSessionKey: params.routeSessionKey,
@@ -135,7 +135,7 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
}
const dispatchResult = await params.runDispatch();
return {
admission: { kind: "dispatch" as const },
admission: params.admission ?? { kind: "dispatch" as const },
dispatched: true,
ctxPayload: params.ctxPayload,
routeSessionKey: params.routeSessionKey,
@@ -178,10 +178,19 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
};
}
const resolved = await params.adapter.resolveTurn(input, eventClass, preflight ?? {});
const dispatchResult = await dispatchAssembledChannelTurnMock(resolved);
const admission =
resolved.admission ?? preflight.admission ?? ({ kind: "dispatch" } as const);
const dispatchResult = await dispatchAssembledChannelTurnMock({
...resolved,
admission,
delivery:
admission.kind === "observeOnly"
? { deliver: async () => ({ visibleReplySent: false }) }
: resolved.delivery,
});
const result = {
...dispatchResult,
admission: resolved.admission ?? preflight.admission ?? dispatchResult.admission,
admission,
};
await params.adapter.onFinalize?.(result);
return result;

View File

@@ -13,6 +13,7 @@ export {
} from "../channels/ack-reactions.js";
export {
expectChannelInboundContextContract,
expectChannelTurnDispatchResultContract,
primeChannelOutboundSendMock,
} from "../channels/plugins/contracts/test-helpers.js";
export {