refactor(channels): route inbound turns through kernel

This commit is contained in:
Peter Steinberger
2026-04-30 04:08:44 +01:00
parent 6e73101df3
commit ffe67e9cdc
31 changed files with 1827 additions and 1389 deletions

View File

@@ -4,7 +4,7 @@ import {
resolveEnvelopeFormatOptions,
} from "openclaw/plugin-sdk/channel-inbound";
import { isDangerousNameMatchingEnabled } from "openclaw/plugin-sdk/dangerous-name-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { runInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime";
import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime";
import { createNonExitingRuntime, logVerbose } from "openclaw/plugin-sdk/runtime-env";
@@ -270,83 +270,97 @@ export async function dispatchDiscordComponentEvent(params: {
startId: params.replyToId,
});
await runPreparedInboundReplyTurn({
await runInboundReplyTurn({
channel: "discord",
accountId,
routeSessionKey: sessionKey,
storePath,
ctxPayload,
recordInboundSession,
record: {
updateLastRoute: interactionCtx.isDirectMessage
? {
sessionKey: route.mainSessionKey,
channel: "discord",
to:
resolveDiscordComponentOriginatingTo(interactionCtx) ??
`user:${interactionCtx.userId}`,
accountId,
mainDmOwnerPin: pinnedMainDmOwner
? {
ownerRecipient: pinnedMainDmOwner,
senderRecipient: interactionCtx.userId,
onSkip: ({ ownerRecipient, senderRecipient }) => {
logVerbose(
`discord: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`,
);
},
}
: undefined,
}
: undefined,
onRecordError: (err) => {
logVerbose(`discord: failed updating component session meta: ${String(err)}`);
},
},
runDispatch: () =>
dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: ctx.cfg,
replyOptions: { onModelSelected },
dispatcherOptions: {
...replyPipeline,
humanDelay: resolveHumanDelayConfig(ctx.cfg, agentId),
deliver: async (payload) => {
const replyToId = replyReference.use();
await deliverDiscordReply({
cfg: ctx.cfg,
replies: [payload],
target: deliverTarget,
token,
accountId,
rest: interaction.client.rest,
runtime,
replyToId,
replyToMode,
textLimit,
maxLinesPerMessage: resolveDiscordMaxLinesPerMessage({
cfg: ctx.cfg,
discordConfig: ctx.discordConfig,
raw: interaction,
adapter: {
ingest: () => ({
id: interaction.id,
rawText: ctxPayload.RawBody ?? "",
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: interaction,
}),
resolveTurn: () => ({
channel: "discord",
accountId,
routeSessionKey: sessionKey,
storePath,
ctxPayload,
recordInboundSession,
record: {
updateLastRoute: interactionCtx.isDirectMessage
? {
sessionKey: route.mainSessionKey,
channel: "discord",
to:
resolveDiscordComponentOriginatingTo(interactionCtx) ??
`user:${interactionCtx.userId}`,
accountId,
}),
tableMode,
chunkMode: resolveChunkMode(ctx.cfg, "discord", accountId),
mediaLocalRoots,
});
replyReference.markSent();
},
onReplyStart: async () => {
try {
const { sendTyping } = await loadTypingRuntime();
await sendTyping({ rest: feedbackRest, channelId: typingChannelId });
} catch (err) {
logVerbose(`discord: typing failed for component reply: ${String(err)}`);
}
},
onError: (err) => {
logError(`discord component dispatch failed: ${String(err)}`);
mainDmOwnerPin: pinnedMainDmOwner
? {
ownerRecipient: pinnedMainDmOwner,
senderRecipient: interactionCtx.userId,
onSkip: ({ ownerRecipient, senderRecipient }) => {
logVerbose(
`discord: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`,
);
},
}
: undefined,
}
: undefined,
onRecordError: (err) => {
logVerbose(`discord: failed updating component session meta: ${String(err)}`);
},
},
runDispatch: () =>
dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: ctx.cfg,
replyOptions: { onModelSelected },
dispatcherOptions: {
...replyPipeline,
humanDelay: resolveHumanDelayConfig(ctx.cfg, agentId),
deliver: async (payload) => {
const replyToId = replyReference.use();
await deliverDiscordReply({
cfg: ctx.cfg,
replies: [payload],
target: deliverTarget,
token,
accountId,
rest: interaction.client.rest,
runtime,
replyToId,
replyToMode,
textLimit,
maxLinesPerMessage: resolveDiscordMaxLinesPerMessage({
cfg: ctx.cfg,
discordConfig: ctx.discordConfig,
accountId,
}),
tableMode,
chunkMode: resolveChunkMode(ctx.cfg, "discord", accountId),
mediaLocalRoots,
});
replyReference.markSent();
},
onReplyStart: async () => {
try {
const { sendTyping } = await loadTypingRuntime();
await sendTyping({ rest: feedbackRest, channelId: typingChannelId });
} catch (err) {
logVerbose(`discord: typing failed for component reply: ${String(err)}`);
}
},
onError: (err) => {
logError(`discord component dispatch failed: ${String(err)}`);
},
},
}),
}),
},
});
}

View File

@@ -15,13 +15,12 @@ import { resolveChannelStreamingBlockEnabled } from "openclaw/plugin-sdk/channel
import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
runInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/markdown-table-runtime";
import { getAgentScopedMediaLocalRoots } from "openclaw/plugin-sdk/media-runtime";
import { resolveChunkMode } from "openclaw/plugin-sdk/reply-chunking";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-dispatch-runtime";
import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { danger, logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env";
import { resolveDiscordMaxLinesPerMessage } from "../accounts.js";
@@ -480,109 +479,135 @@ export async function processDiscordMessage(
await settleDispatchBeforeStart();
return;
}
const preparedResult = await runPreparedInboundReplyTurn({
const preparedResult = await runInboundReplyTurn({
channel: "discord",
accountId: route.accountId,
routeSessionKey: persistedSessionKey,
storePath: turn.storePath,
ctxPayload,
recordInboundSession,
record: turn.record,
onPreDispatchFailure: settleDispatchBeforeStart,
runDispatch: () =>
dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
abortSignal,
skillFilter: channelConfig?.skills,
sourceReplyDeliveryMode,
disableBlockStreaming: sourceRepliesAreToolOnly
? true
: (draftPreview.disableBlockStreamingForDraft ??
(typeof resolvedBlockStreamingEnabled === "boolean"
? !resolvedBlockStreamingEnabled
: undefined)),
onPartialReply: draftPreview.draftStream
? (payload) => draftPreview.updateFromPartial(payload.text)
: undefined,
onAssistantMessageStart: draftPreview.draftStream
? draftPreview.handleAssistantMessageBoundary
: undefined,
onReasoningEnd: draftPreview.draftStream
? draftPreview.handleAssistantMessageBoundary
: undefined,
onModelSelected,
suppressDefaultToolProgressMessages: draftPreview.previewToolProgressEnabled
? true
: undefined,
onReasoningStream: async () => {
await statusReactions.setThinking();
},
onToolStart: async (payload) => {
if (isProcessAborted(abortSignal)) {
return;
}
await statusReactions.setTool(payload.name);
draftPreview.pushToolProgress(
payload.name ? `tool: ${payload.name}` : "tool running",
);
},
onItemEvent: async (payload) => {
draftPreview.pushToolProgress(
payload.progressText ?? payload.summary ?? payload.title ?? payload.name,
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
draftPreview.pushToolProgress(
payload.explanation ?? payload.steps?.[0] ?? "planning",
);
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
draftPreview.pushToolProgress(
payload.command ? `approval: ${payload.command}` : "approval requested",
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
draftPreview.pushToolProgress(
payload.name
? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}`
: payload.title,
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
draftPreview.pushToolProgress(payload.summary ?? payload.title ?? "patch applied");
},
onCompactionStart: async () => {
if (isProcessAborted(abortSignal)) {
return;
}
await statusReactions.setCompacting();
},
onCompactionEnd: async () => {
if (isProcessAborted(abortSignal)) {
return;
}
statusReactions.cancelPending();
await statusReactions.setThinking();
},
},
raw: ctx,
adapter: {
ingest: () => ({
id: message.id,
timestamp: message.timestamp ? Date.parse(message.timestamp) : undefined,
rawText: text,
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: message,
}),
resolveTurn: () => ({
channel: "discord",
accountId: route.accountId,
routeSessionKey: persistedSessionKey,
storePath: turn.storePath,
ctxPayload,
recordInboundSession,
record: turn.record,
history: {
isGroup: isGuildMessage,
historyKey: messageChannelId,
historyMap: guildHistories,
limit: historyLimit,
},
onPreDispatchFailure: settleDispatchBeforeStart,
runDispatch: () =>
dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
abortSignal,
skillFilter: channelConfig?.skills,
sourceReplyDeliveryMode,
disableBlockStreaming: sourceRepliesAreToolOnly
? true
: (draftPreview.disableBlockStreamingForDraft ??
(typeof resolvedBlockStreamingEnabled === "boolean"
? !resolvedBlockStreamingEnabled
: undefined)),
onPartialReply: draftPreview.draftStream
? (payload) => draftPreview.updateFromPartial(payload.text)
: undefined,
onAssistantMessageStart: draftPreview.draftStream
? draftPreview.handleAssistantMessageBoundary
: undefined,
onReasoningEnd: draftPreview.draftStream
? draftPreview.handleAssistantMessageBoundary
: undefined,
onModelSelected,
suppressDefaultToolProgressMessages: draftPreview.previewToolProgressEnabled
? true
: undefined,
onReasoningStream: async () => {
await statusReactions.setThinking();
},
onToolStart: async (payload) => {
if (isProcessAborted(abortSignal)) {
return;
}
await statusReactions.setTool(payload.name);
draftPreview.pushToolProgress(
payload.name ? `tool: ${payload.name}` : "tool running",
);
},
onItemEvent: async (payload) => {
draftPreview.pushToolProgress(
payload.progressText ?? payload.summary ?? payload.title ?? payload.name,
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
draftPreview.pushToolProgress(
payload.explanation ?? payload.steps?.[0] ?? "planning",
);
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
draftPreview.pushToolProgress(
payload.command ? `approval: ${payload.command}` : "approval requested",
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
draftPreview.pushToolProgress(
payload.name
? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}`
: payload.title,
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
draftPreview.pushToolProgress(
payload.summary ?? payload.title ?? "patch applied",
);
},
onCompactionStart: async () => {
if (isProcessAborted(abortSignal)) {
return;
}
await statusReactions.setCompacting();
},
onCompactionEnd: async () => {
if (isProcessAborted(abortSignal)) {
return;
}
statusReactions.cancelPending();
await statusReactions.setThinking();
},
},
}),
}),
},
});
if (!preparedResult.dispatched) {
return;
}
dispatchResult = preparedResult.dispatchResult;
if (isProcessAborted(abortSignal)) {
dispatchAborted = true;
@@ -646,27 +671,14 @@ export async function processDiscordMessage(
return;
}
if (!hasFinalInboundReplyDispatch(dispatchResult)) {
if (isGuildMessage) {
clearHistoryEntriesIfEnabled({
historyMap: guildHistories,
historyKey: messageChannelId,
limit: historyLimit,
});
}
const finalDispatchResult = dispatchResult;
if (!finalDispatchResult || !hasFinalInboundReplyDispatch(finalDispatchResult)) {
return;
}
if (shouldLogVerbose()) {
const finalCount = dispatchResult.counts.final;
const finalCount = finalDispatchResult.counts.final;
logVerbose(
`discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`,
);
}
if (isGuildMessage) {
clearHistoryEntriesIfEnabled({
historyMap: guildHistories,
historyKey: messageChannelId,
limit: historyLimit,
});
}
}

View File

@@ -111,6 +111,39 @@ describe("broadcast dispatch", () => {
saveMediaBuffer: mockSaveMediaBuffer,
},
turn: {
run: vi.fn(async (params: Parameters<PluginRuntime["channel"]["turn"]["run"]>[0]) => {
const input = await params.adapter.ingest(params.raw);
if (!input) {
return {
admission: { kind: "drop" as const, reason: "ingest-null" },
dispatched: false,
};
}
const eventClass = {
kind: "message" as const,
canStartAgentTurn: true,
};
const turn = await params.adapter.resolveTurn(input, eventClass, {});
if (!("runDispatch" in turn)) {
throw new Error("feishu broadcast test runtime only supports prepared turns");
}
await turn.recordInboundSession({
storePath: turn.storePath,
sessionKey: turn.ctxPayload.SessionKey ?? turn.routeSessionKey,
ctx: turn.ctxPayload,
groupResolution: turn.record?.groupResolution,
createIfMissing: turn.record?.createIfMissing,
updateLastRoute: turn.record?.updateLastRoute,
onRecordError: turn.record?.onRecordError ?? (() => undefined),
});
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: turn.ctxPayload,
routeSessionKey: turn.routeSessionKey,
dispatchResult: await turn.runDispatch(),
};
}),
runPrepared: vi.fn(
async (turn: Parameters<PluginRuntime["channel"]["turn"]["runPrepared"]>[0]) => {
await turn.recordInboundSession({

View File

@@ -198,6 +198,16 @@ function createFeishuBotRuntime(overrides: DeepPartial<PluginRuntime> = {}): Plu
buildPairingReply: vi.fn(),
},
turn: {
run: vi.fn(async (params) => {
const input = await params.adapter.ingest(params.raw);
const turn = await params.adapter.resolveTurn(input, {
kind: "message",
canStartAgentTurn: true,
});
return {
dispatchResult: await turn.runDispatch(),
};
}),
runPrepared: vi.fn(async (params) => ({
dispatchResult: await params.runDispatch(),
})),

View File

@@ -1312,31 +1312,46 @@ export async function handleFeishuMessage(params: {
log(
`feishu[${account.accountId}]: broadcast active dispatch agent=${agentId} (session=${agentSessionKey})`,
);
await core.channel.turn.runPrepared({
await core.channel.turn.run({
channel: "feishu",
accountId: route.accountId,
routeSessionKey: agentSessionKey,
storePath: agentStorePath,
ctxPayload: agentCtx,
recordInboundSession: core.channel.session.recordInboundSession,
record: agentRecord,
onPreDispatchFailure: () =>
core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
raw: ctx,
adapter: {
ingest: () => ({
id: ctx.messageId,
timestamp: messageCreateTimeMs,
rawText: ctx.content,
textForAgent: agentCtx.BodyForAgent,
textForCommands: agentCtx.CommandBody,
raw: ctx,
}),
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: agentCtx,
cfg,
resolveTurn: () => ({
channel: "feishu",
accountId: route.accountId,
routeSessionKey: agentSessionKey,
storePath: agentStorePath,
ctxPayload: agentCtx,
recordInboundSession: core.channel.session.recordInboundSession,
record: agentRecord,
onPreDispatchFailure: () =>
core.channel.reply.settleReplyDispatcher({
dispatcher,
replyOptions,
onSettled: () => markDispatchIdle(),
}),
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: agentCtx,
cfg,
dispatcher,
replyOptions,
}),
}),
}),
},
});
} else {
// Observer agent: no-op dispatcher (session entry + inference, no Feishu reply).
@@ -1356,24 +1371,39 @@ export async function handleFeishuMessage(params: {
log(
`feishu[${account.accountId}]: broadcast observer dispatch agent=${agentId} (session=${agentSessionKey})`,
);
await core.channel.turn.runPrepared({
await core.channel.turn.run({
channel: "feishu",
accountId: route.accountId,
routeSessionKey: agentSessionKey,
storePath: agentStorePath,
ctxPayload: agentCtx,
recordInboundSession: core.channel.session.recordInboundSession,
record: agentRecord,
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher: noopDispatcher,
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: agentCtx,
cfg,
raw: ctx,
adapter: {
ingest: () => ({
id: ctx.messageId,
timestamp: messageCreateTimeMs,
rawText: ctx.content,
textForAgent: agentCtx.BodyForAgent,
textForCommands: agentCtx.CommandBody,
raw: ctx,
}),
resolveTurn: () => ({
channel: "feishu",
accountId: route.accountId,
routeSessionKey: agentSessionKey,
storePath: agentStorePath,
ctxPayload: agentCtx,
recordInboundSession: core.channel.session.recordInboundSession,
record: agentRecord,
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher: noopDispatcher,
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: agentCtx,
cfg,
dispatcher: noopDispatcher,
}),
}),
}),
},
});
}
};
@@ -1445,49 +1475,66 @@ export async function handleFeishuMessage(params: {
});
log(`feishu[${account.accountId}]: dispatching to agent (session=${route.sessionKey})`);
const { dispatchResult } = await core.channel.turn.runPrepared({
const turnResult = await core.channel.turn.run({
channel: "feishu",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
onRecordError: (err) => {
log(
`feishu[${account.accountId}]: failed to record inbound session ${route.sessionKey}: ${String(err)}`,
);
},
},
onPreDispatchFailure: () =>
core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
raw: ctx,
adapter: {
ingest: () => ({
id: ctx.messageId,
timestamp: messageCreateTimeMs,
rawText: ctx.content,
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: ctx,
}),
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
resolveTurn: () => ({
channel: "feishu",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
onRecordError: (err) => {
log(
`feishu[${account.accountId}]: failed to record inbound session ${route.sessionKey}: ${String(err)}`,
);
},
},
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
history: {
isGroup,
historyKey,
historyMap: chatHistories,
limit: historyLimit,
},
onPreDispatchFailure: () =>
core.channel.reply.settleReplyDispatcher({
dispatcher,
replyOptions,
onSettled: () => markDispatchIdle(),
}),
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
},
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions,
}),
}),
}),
},
});
const { queuedFinal, counts } = dispatchResult;
if (isGroup && historyKey && chatHistories) {
clearHistoryEntriesIfEnabled({
historyMap: chatHistories,
historyKey,
limit: historyLimit,
});
if (!turnResult.dispatched) {
return;
}
const { dispatchResult } = turnResult;
const { queuedFinal, counts } = dispatchResult;
log(
`feishu[${account.accountId}]: dispatch complete (queuedFinal=${queuedFinal}, replies=${counts.final})`,

View File

@@ -134,6 +134,26 @@ function createTestRuntime(overrides?: {
recordInboundSession,
},
turn: {
run: vi.fn(async (params: Parameters<PluginRuntime["channel"]["turn"]["run"]>[0]) => {
const input = await params.adapter.ingest(params.raw);
if (!input) {
return {
admission: { kind: "drop" as const, reason: "ingest-null" },
dispatched: false,
};
}
const eventClass = {
kind: "message" as const,
canStartAgentTurn: true,
};
const turn = await params.adapter.resolveTurn(input, eventClass, {});
if (!("runDispatch" in turn)) {
throw new Error("feishu comment test runtime only supports prepared turns");
}
return await runPrepared(
turn as Parameters<PluginRuntime["channel"]["turn"]["runPrepared"]>[0],
);
}) as unknown as PluginRuntime["channel"]["turn"]["run"],
runPrepared: runPrepared as unknown as PluginRuntime["channel"]["turn"]["runPrepared"],
},
pairing: {

View File

@@ -241,42 +241,58 @@ export async function handleFeishuCommentEvent(
`feishu[${account.accountId}]: dispatching drive comment to agent ` +
`(session=${commentSessionKey} comment=${turn.commentId} type=${turn.noticeType})`,
);
const { dispatchResult } = await core.channel.turn.runPrepared({
const turnResult = await core.channel.turn.run({
channel: "feishu",
accountId: route.accountId,
routeSessionKey: commentSessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
onRecordError: (err) => {
error(
`feishu[${account.accountId}]: failed to record comment inbound session ${commentSessionKey}: ${String(err)}`,
);
},
},
onPreDispatchFailure: async () => {
dispatchSettledBeforeStart = true;
await core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => {
markRunComplete();
markDispatchIdle();
raw: turn,
adapter: {
ingest: () => ({
id: turn.messageId,
timestamp: parseTimestampMs(turn.timestamp),
rawText: ctxPayload.RawBody ?? "",
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: turn,
}),
resolveTurn: () => ({
channel: "feishu",
accountId: route.accountId,
routeSessionKey: commentSessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
onRecordError: (err) => {
error(
`feishu[${account.accountId}]: failed to record comment inbound session ${commentSessionKey}: ${String(err)}`,
);
},
},
});
},
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher,
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg: effectiveCfg,
onPreDispatchFailure: async () => {
dispatchSettledBeforeStart = true;
await core.channel.reply.settleReplyDispatcher({
dispatcher,
replyOptions,
onSettled: () => {
markRunComplete();
markDispatchIdle();
},
});
},
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher,
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg: effectiveCfg,
dispatcher,
replyOptions,
}),
}),
}),
},
});
const dispatchResult = turnResult.dispatched ? turnResult.dispatchResult : undefined;
const queuedFinal = dispatchResult?.queuedFinal ?? false;
const counts = dispatchResult?.counts ?? { tool: 0, block: 0, final: 0 };
log(

View File

@@ -12,7 +12,7 @@ import {
} from "openclaw/plugin-sdk/conversation-runtime";
import { recordInboundSession } from "openclaw/plugin-sdk/conversation-runtime";
import { normalizeScpRemoteHost } from "openclaw/plugin-sdk/host-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { runInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { isInboundPathAllowed, kindFromMime } from "openclaw/plugin-sdk/media-runtime";
import { DEFAULT_GROUP_HISTORY_LIMIT, type HistoryEntry } from "openclaw/plugin-sdk/reply-history";
import { resolveTextChunkLimit } from "openclaw/plugin-sdk/reply-runtime";
@@ -435,59 +435,74 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
},
});
await runPreparedInboundReplyTurn({
await runInboundReplyTurn({
channel: "imessage",
accountId: decision.route.accountId,
routeSessionKey: decision.route.sessionKey,
storePath,
ctxPayload,
recordInboundSession,
record: {
updateLastRoute:
!decision.isGroup && updateTarget
? {
sessionKey: decision.route.mainSessionKey,
channel: "imessage",
to: updateTarget,
accountId: decision.route.accountId,
mainDmOwnerPin:
pinnedMainDmOwner && decision.senderNormalized
? {
ownerRecipient: pinnedMainDmOwner,
senderRecipient: decision.senderNormalized,
onSkip: ({ ownerRecipient, senderRecipient }) => {
logVerbose(
`imessage: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`,
);
},
}
: undefined,
}
: undefined,
onRecordError: (err) => {
logVerbose(`imessage: failed updating session meta: ${String(err)}`);
},
},
history: {
isGroup: decision.isGroup,
historyKey: decision.historyKey,
historyMap: groupHistories,
limit: historyLimit,
},
onPreDispatchFailure: () => settleReplyDispatcher({ dispatcher }),
runDispatch: () =>
dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
disableBlockStreaming:
typeof accountInfo.config.blockStreaming === "boolean"
? !accountInfo.config.blockStreaming
: undefined,
onModelSelected,
},
raw: decision,
adapter: {
ingest: () => ({
id: ctxPayload.MessageSid ?? `${ctxPayload.From}:${Date.now()}`,
timestamp: typeof ctxPayload.Timestamp === "number" ? ctxPayload.Timestamp : undefined,
rawText: ctxPayload.RawBody ?? "",
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: decision,
}),
resolveTurn: () => ({
channel: "imessage",
accountId: decision.route.accountId,
routeSessionKey: decision.route.sessionKey,
storePath,
ctxPayload,
recordInboundSession,
record: {
updateLastRoute:
!decision.isGroup && updateTarget
? {
sessionKey: decision.route.mainSessionKey,
channel: "imessage",
to: updateTarget,
accountId: decision.route.accountId,
mainDmOwnerPin:
pinnedMainDmOwner && decision.senderNormalized
? {
ownerRecipient: pinnedMainDmOwner,
senderRecipient: decision.senderNormalized,
onSkip: ({ ownerRecipient, senderRecipient }) => {
logVerbose(
`imessage: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`,
);
},
}
: undefined,
}
: undefined,
onRecordError: (err) => {
logVerbose(`imessage: failed updating session meta: ${String(err)}`);
},
},
history: {
isGroup: decision.isGroup,
historyKey: decision.historyKey,
historyMap: groupHistories,
limit: historyLimit,
},
onPreDispatchFailure: () => settleReplyDispatcher({ dispatcher }),
runDispatch: () =>
dispatchInboundMessage({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
disableBlockStreaming:
typeof accountInfo.config.blockStreaming === "boolean"
? !accountInfo.config.blockStreaming
: undefined,
onModelSelected,
},
}),
}),
},
});
}

View File

@@ -231,7 +231,7 @@ export async function monitorLineProvider(
});
const core = getLineRuntime();
const { dispatchResult } = await core.channel.turn.run({
const turnResult = await core.channel.turn.run({
channel: "line",
accountId: route.accountId,
raw: ctx,
@@ -316,6 +316,7 @@ export async function monitorLineProvider(
}),
},
});
const dispatchResult = turnResult.dispatched ? turnResult.dispatchResult : undefined;
if (!hasFinalInboundReplyDispatch(dispatchResult)) {
logVerbose(`line: no response generated for message from ${ctxPayload.From}`);
}

View File

@@ -142,6 +142,28 @@ export function createMatrixHandlerTestHarness(
};
},
);
const run = vi.fn(
async (params: Parameters<MatrixMonitorHandlerParams["core"]["channel"]["turn"]["run"]>[0]) => {
const input = await params.adapter.ingest(params.raw);
if (!input) {
return { admission: { kind: "drop" as const, reason: "ingest-null" }, dispatched: false };
}
const eventClass = (await params.adapter.classify?.(input)) ?? {
kind: "message" as const,
canStartAgentTurn: true,
};
const preflightResult = await params.adapter.preflight?.(input, eventClass);
const preflight =
preflightResult && "kind" in preflightResult
? { admission: preflightResult }
: (preflightResult ?? {});
const turn = await params.adapter.resolveTurn(input, eventClass, preflight);
if ("runDispatch" in turn) {
return await runPrepared(turn);
}
throw new Error("matrix test helper only supports prepared turn dispatch");
},
);
const dmPolicy = options.dmPolicy ?? "open";
const allowFrom = options.allowFrom ?? (dmPolicy === "open" ? ["*"] : []);
const cfgForHandler =
@@ -229,6 +251,7 @@ export function createMatrixHandlerTestHarness(
}),
},
turn: {
run,
runPrepared,
},
reactions: {

View File

@@ -1829,106 +1829,127 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
onIdle: typingCallbacks.onIdle,
});
const { dispatchResult } = await core.channel.turn.runPrepared({
const turnResult = await core.channel.turn.run({
channel: "matrix",
accountId: _route.accountId,
routeSessionKey: _route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
updateLastRoute: isDirectMessage
? {
sessionKey: _route.mainSessionKey,
channel: "matrix",
to: `room:${roomId}`,
accountId: _route.accountId,
raw: event,
adapter: {
ingest: () => ({
id: _messageId,
rawText: bodyText,
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: event,
}),
resolveTurn: () => ({
channel: "matrix",
accountId: _route.accountId,
routeSessionKey: _route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
updateLastRoute: isDirectMessage
? {
sessionKey: _route.mainSessionKey,
channel: "matrix",
to: `room:${roomId}`,
accountId: _route.accountId,
}
: undefined,
onRecordError: (err) => {
logger.warn("failed updating session meta", {
error: String(err),
storePath,
sessionKey: ctxPayload.SessionKey ?? _route.sessionKey,
});
},
},
onPreDispatchFailure: () =>
core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => {
markRunComplete();
markDispatchIdle();
},
}),
runDispatch: async () => {
if (
sharedDmContextNotice &&
markTrackedRoomIfFirst(sharedDmContextNoticeRooms, roomId)
) {
client
.sendMessage(roomId, {
msgtype: "m.notice",
body: sharedDmContextNotice,
})
.catch((err) => {
logVerboseMessage(
`matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`,
);
});
}
: undefined,
onRecordError: (err) => {
logger.warn("failed updating session meta", {
error: String(err),
storePath,
sessionKey: ctxPayload.SessionKey ?? _route.sessionKey,
});
},
},
onPreDispatchFailure: () =>
core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => {
markRunComplete();
markDispatchIdle();
return await core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
},
run: async () => {
try {
return await core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: roomConfig?.skills,
// Keep block streaming enabled when explicitly requested, even
// with draft previews on. The draft remains the live preview
// for the current assistant block, while block deliveries
// finalize completed blocks into their own preserved events.
disableBlockStreaming: !blockStreamingEnabled,
onPartialReply: draftStream
? (payload) => {
latestDraftFullText = payload.text ?? "";
suppressPreviewToolProgressForAnswerText(latestDraftFullText);
updateDraftFromLatestFullText();
}
: undefined,
onBlockReplyQueued: draftStream
? (payload, context) => {
if (payload.isCompactionNotice === true) {
return;
}
queueDraftBlockBoundary(payload, context);
}
: undefined,
// Reset draft boundary bookkeeping on assistant message
// boundaries so post-tool blocks stream from a fresh
// cumulative payload (payload.text resets upstream).
onAssistantMessageStart: draftStream
? () => {
resetDraftBlockOffsets();
resetPreviewToolProgress();
}
: undefined,
...buildPreviewToolProgressReplyOptions(),
onModelSelected,
},
});
} finally {
markRunComplete();
}
},
});
},
}),
runDispatch: async () => {
if (sharedDmContextNotice && markTrackedRoomIfFirst(sharedDmContextNoticeRooms, roomId)) {
client
.sendMessage(roomId, {
msgtype: "m.notice",
body: sharedDmContextNotice,
})
.catch((err) => {
logVerboseMessage(
`matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`,
);
});
}
return await core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
},
run: async () => {
try {
return await core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: roomConfig?.skills,
// Keep block streaming enabled when explicitly requested, even
// with draft previews on. The draft remains the live preview
// for the current assistant block, while block deliveries
// finalize completed blocks into their own preserved events.
disableBlockStreaming: !blockStreamingEnabled,
onPartialReply: draftStream
? (payload) => {
latestDraftFullText = payload.text ?? "";
suppressPreviewToolProgressForAnswerText(latestDraftFullText);
updateDraftFromLatestFullText();
}
: undefined,
onBlockReplyQueued: draftStream
? (payload, context) => {
if (payload.isCompactionNotice === true) {
return;
}
queueDraftBlockBoundary(payload, context);
}
: undefined,
// Reset draft boundary bookkeeping on assistant message
// boundaries so post-tool blocks stream from a fresh
// cumulative payload (payload.text resets upstream).
onAssistantMessageStart: draftStream
? () => {
resetDraftBlockOffsets();
resetPreviewToolProgress();
}
: undefined,
...buildPreviewToolProgressReplyOptions(),
onModelSelected,
},
});
} finally {
markRunComplete();
}
},
});
},
});
if (!turnResult.dispatched) {
return;
}
const { dispatchResult } = turnResult;
const { queuedFinal, counts } = dispatchResult;
if (finalReplyDeliveryFailed) {
if (retryableReplyDeliveryFailed) {

View File

@@ -69,7 +69,6 @@ import {
buildAgentMediaPayload,
buildModelsProviderData,
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
createChannelPairingController,
createChannelReplyPipeline,
DEFAULT_GROUP_HISTORY_LIMIT,
@@ -1721,74 +1720,95 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
let dispatchSettledBeforeStart = false;
try {
await core.channel.turn.runPrepared({
await core.channel.turn.run({
channel: "mattermost",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
updateLastRoute:
kind === "direct"
? {
sessionKey: route.mainSessionKey,
channel: "mattermost",
to,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerboseMessage(
`mattermost: failed updating session meta id=${post.id ?? "unknown"}: ${String(err)}`,
);
},
},
onPreDispatchFailure: async () => {
dispatchSettledBeforeStart = true;
await core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => {
markRunComplete();
markDispatchIdle();
},
});
},
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
},
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming: true,
onModelSelected,
onPartialReply: (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: () => {
lastPartialText = "";
},
onReasoningEnd: () => {
lastPartialText = "";
},
onReasoningStream: async () => {
if (!lastPartialText) {
draftStream.update("Thinking…");
raw: post,
adapter: {
ingest: () => ({
id: post.id ?? `${to}:${Date.now()}`,
timestamp: post.create_at ?? undefined,
rawText,
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: post,
}),
resolveTurn: () => ({
channel: "mattermost",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
updateLastRoute:
kind === "direct"
? {
sessionKey: route.mainSessionKey,
channel: "mattermost",
to,
accountId: route.accountId,
}
},
onToolStart: async (payload) => {
draftStream.update(buildMattermostToolStatusText(payload));
},
: undefined,
onRecordError: (err) => {
logVerboseMessage(
`mattermost: failed updating session meta id=${post.id ?? "unknown"}: ${String(err)}`,
);
},
},
history: {
isGroup: Boolean(historyKey),
historyKey: historyKey ?? undefined,
historyMap: channelHistories,
limit: historyLimit,
},
onPreDispatchFailure: async () => {
dispatchSettledBeforeStart = true;
await core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => {
markRunComplete();
markDispatchIdle();
},
});
},
runDispatch: () =>
core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
},
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming: true,
onModelSelected,
onPartialReply: (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: () => {
lastPartialText = "";
},
onReasoningEnd: () => {
lastPartialText = "";
},
onReasoningStream: async () => {
if (!lastPartialText) {
draftStream.update("Thinking…");
}
},
onToolStart: async (payload) => {
draftStream.update(buildMattermostToolStatusText(payload));
},
},
}),
}),
}),
},
});
} finally {
try {
@@ -1800,13 +1820,6 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
markRunComplete();
}
}
if (historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: channelHistories,
historyKey,
limit: historyLimit,
});
}
},
});
if (replayResult === "duplicate") {

View File

@@ -40,6 +40,26 @@ export function installMSTeamsTestRuntime(options: MSTeamsTestRuntimeOptions = {
};
},
);
const run = vi.fn(async (params: Parameters<PluginRuntime["channel"]["turn"]["run"]>[0]) => {
const input = await params.adapter.ingest(params.raw);
if (!input) {
return { admission: { kind: "drop" as const, reason: "ingest-null" }, dispatched: false };
}
const eventClass = (await params.adapter.classify?.(input)) ?? {
kind: "message" as const,
canStartAgentTurn: true,
};
const preflightResult = await params.adapter.preflight?.(input, eventClass);
const preflight =
preflightResult && "kind" in preflightResult
? { admission: preflightResult }
: (preflightResult ?? {});
const turn = await params.adapter.resolveTurn(input, eventClass, preflight);
if ("runDispatch" in turn) {
return await runPrepared(turn);
}
throw new Error("msteams test runtime only supports prepared turn dispatch");
});
setMSTeamsRuntime({
logging: { shouldLogVerbose: () => false },
system: { enqueueSystemEvent: options.enqueueSystemEvent ?? vi.fn() },
@@ -90,6 +110,7 @@ export function installMSTeamsTestRuntime(options: MSTeamsTestRuntimeOptions = {
...(options.resolveStorePath ? { resolveStorePath: options.resolveStorePath } : {}),
},
turn: {
run: run as unknown as PluginRuntime["channel"]["turn"]["run"],
runPrepared: runPrepared as unknown as PluginRuntime["channel"]["turn"]["runPrepared"],
},
},

View File

@@ -18,7 +18,6 @@ import {
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
DEFAULT_GROUP_HISTORY_LIMIT,
recordPendingHistoryEntryIfEnabled,
type HistoryEntry,
@@ -840,33 +839,57 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
log.info("dispatching to agent", { sessionKey: route.sessionKey });
try {
const { dispatchResult } = await core.channel.turn.runPrepared({
const turnResult = await core.channel.turn.run({
channel: "msteams",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
onRecordError: (err) => {
logVerboseMessage(`msteams: failed updating session meta: ${formatUnknownError(err)}`);
},
},
onPreDispatchFailure: () =>
core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
raw: context,
adapter: {
ingest: () => ({
id: activity.id ?? `${teamsFrom}:${Date.now()}`,
timestamp: timestamp?.getTime(),
rawText: rawBody,
textForAgent: bodyForAgent,
textForCommands: commandBody,
raw: activity,
}),
runDispatch: () =>
dispatchReplyFromConfigWithSettledDispatcher({
cfg,
resolveTurn: () => ({
channel: "msteams",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
dispatcher,
onSettled: () => markDispatchIdle(),
replyOptions,
configOverride,
recordInboundSession: core.channel.session.recordInboundSession,
record: {
onRecordError: (err) => {
logVerboseMessage(
`msteams: failed updating session meta: ${formatUnknownError(err)}`,
);
},
},
history: {
isGroup: isRoomish,
historyKey,
historyMap: conversationHistories,
limit: historyLimit,
},
onPreDispatchFailure: () =>
core.channel.reply.settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
}),
runDispatch: () =>
dispatchReplyFromConfigWithSettledDispatcher({
cfg,
ctxPayload,
dispatcher,
onSettled: () => markDispatchIdle(),
replyOptions,
configOverride,
}),
}),
},
});
const dispatchResult = turnResult.dispatched ? turnResult.dispatchResult : undefined;
const queuedFinal = dispatchResult?.queuedFinal ?? false;
const counts = resolveInboundReplyDispatchCounts(dispatchResult);
const hasFinalResponse = hasFinalInboundReplyDispatch(dispatchResult);
@@ -874,26 +897,12 @@ export function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) {
log.info("dispatch complete", { queuedFinal, counts });
if (!hasFinalResponse) {
if (isRoomish && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: conversationHistories,
historyKey,
limit: historyLimit,
});
}
return;
}
const finalCount = counts.final;
logVerboseMessage(
`msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`,
);
if (isRoomish && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: conversationHistories,
historyKey,
limit: historyLimit,
});
}
} catch (err) {
log.error("dispatch failed", { error: formatUnknownError(err) });
runtime.error?.(`msteams dispatch failed: ${formatUnknownError(err)}`);

View File

@@ -69,9 +69,24 @@ function makeRuntime(): GatewayPluginRuntime {
recordInboundSession: vi.fn(async () => undefined),
},
turn: {
runPrepared: vi.fn(async (rawParams: unknown) => {
const params = rawParams as { runDispatch: () => Promise<unknown> };
return { dispatchResult: await params.runDispatch() };
run: vi.fn(async (rawParams: unknown) => {
const params = rawParams as {
raw: unknown;
adapter: {
ingest: (raw: unknown) => unknown;
resolveTurn: (...args: unknown[]) => unknown;
};
};
const input = await params.adapter.ingest(params.raw);
const turn = (await params.adapter.resolveTurn(
input,
{
kind: "message",
canStartAgentTurn: true,
},
{},
)) as { runDispatch: () => Promise<unknown> };
return { dispatchResult: await turn.runDispatch() };
}),
},
text: {

View File

@@ -141,9 +141,24 @@ function makeRuntime(params: {
recordInboundSession: vi.fn(async () => undefined),
},
turn: {
runPrepared: vi.fn(async (rawParams: unknown) => {
const params = rawParams as { runDispatch: () => Promise<unknown> };
return { dispatchResult: await params.runDispatch() };
run: vi.fn(async (rawParams: unknown) => {
const params = rawParams as {
raw: unknown;
adapter: {
ingest: (raw: unknown) => unknown;
resolveTurn: (...args: unknown[]) => unknown;
};
};
const input = await params.adapter.ingest(params.raw);
const turn = (await params.adapter.resolveTurn(
input,
{
kind: "message",
canStartAgentTurn: true,
},
{},
)) as { runDispatch: () => Promise<unknown> };
return { dispatchResult: await turn.runDispatch() };
}),
},
text: {

View File

@@ -10,6 +10,7 @@
* Separated from gateway.ts for testability and to keep handleMessage thin.
*/
import type { FinalizedMsgContext } from "openclaw/plugin-sdk/reply-runtime";
import {
parseAndSendMediaTags,
sendPlainReply,
@@ -224,238 +225,256 @@ export async function dispatchOutbound(
const storePath = runtime.channel.session.resolveStorePath(cfgWithSession.session?.store, {
agentId,
});
const dispatchPromise = runtime.channel.turn.runPrepared({
const dispatchPromise = runtime.channel.turn.run({
channel: "qqbot",
accountId: inbound.route.accountId,
routeSessionKey: inbound.route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: runtime.channel.session.recordInboundSession,
record: {
onRecordError: (err: unknown) => {
log?.error(
`Session metadata update failed: ${err instanceof Error ? err.message : String(err)}`,
);
},
},
runDispatch: () =>
runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
responsePrefix: messagesConfig.responsePrefix,
deliver: async (payload: ReplyDeliverPayload, info: { kind: string }) => {
hasResponse = true;
raw: inbound,
adapter: {
ingest: () => ({
id: ctxPayload.MessageSid ?? `${ctxPayload.From}:${Date.now()}`,
rawText: ctxPayload.RawBody ?? "",
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: inbound,
}),
resolveTurn: () => ({
channel: "qqbot",
accountId: inbound.route.accountId,
routeSessionKey: inbound.route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: runtime.channel.session.recordInboundSession,
record: {
onRecordError: (err: unknown) => {
log?.error(
`Session metadata update failed: ${err instanceof Error ? err.message : String(err)}`,
);
},
},
runDispatch: () =>
runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
responsePrefix: messagesConfig.responsePrefix,
deliver: async (payload: ReplyDeliverPayload, info: { kind: string }) => {
hasResponse = true;
// ---- Tool deliver ----
if (info.kind === "tool") {
toolDeliverCount++;
const toolText = (payload.text ?? "").trim();
if (toolText) {
toolTexts.push(toolText);
}
if (payload.mediaUrls?.length) {
toolMediaUrls.push(...payload.mediaUrls);
}
if (payload.mediaUrl && !toolMediaUrls.includes(payload.mediaUrl)) {
toolMediaUrls.push(payload.mediaUrl);
}
// ---- Tool deliver ----
if (info.kind === "tool") {
toolDeliverCount++;
const toolText = (payload.text ?? "").trim();
if (toolText) {
toolTexts.push(toolText);
}
if (payload.mediaUrls?.length) {
toolMediaUrls.push(...payload.mediaUrls);
}
if (payload.mediaUrl && !toolMediaUrls.includes(payload.mediaUrl)) {
toolMediaUrls.push(payload.mediaUrl);
}
if (hasBlockResponse && toolMediaUrls.length > 0) {
const urlsToSend = [...toolMediaUrls];
toolMediaUrls.length = 0;
for (const mediaUrl of urlsToSend) {
try {
await sendMedia({
to: qualifiedTarget,
text: "",
mediaUrl,
accountId: account.accountId,
replyToId: event.messageId,
account,
});
} catch {}
}
return;
}
if (toolFallbackSent) {
return;
}
if (toolOnlyTimeoutId) {
if (toolRenewalCount < MAX_TOOL_RENEWALS) {
clearTimeout(toolOnlyTimeoutId);
toolRenewalCount++;
} else {
if (hasBlockResponse && toolMediaUrls.length > 0) {
const urlsToSend = [...toolMediaUrls];
toolMediaUrls.length = 0;
for (const mediaUrl of urlsToSend) {
try {
await sendMedia({
to: qualifiedTarget,
text: "",
mediaUrl,
accountId: account.accountId,
replyToId: event.messageId,
account,
});
} catch {}
}
return;
}
if (toolFallbackSent) {
return;
}
if (toolOnlyTimeoutId) {
if (toolRenewalCount < MAX_TOOL_RENEWALS) {
clearTimeout(toolOnlyTimeoutId);
toolRenewalCount++;
} else {
return;
}
}
toolOnlyTimeoutId = setTimeout(async () => {
if (!hasBlockResponse && !toolFallbackSent) {
toolFallbackSent = true;
try {
await sendToolFallback();
} catch {}
}
}, TOOL_ONLY_TIMEOUT);
return;
}
}
toolOnlyTimeoutId = setTimeout(async () => {
if (!hasBlockResponse && !toolFallbackSent) {
toolFallbackSent = true;
try {
await sendToolFallback();
} catch {}
// ---- Block deliver ----
hasBlockResponse = true;
inbound.typing.keepAlive?.stop();
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
}, TOOL_ONLY_TIMEOUT);
return;
}
// ---- Block deliver ----
hasBlockResponse = true;
inbound.typing.keepAlive?.stop();
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
if (toolOnlyTimeoutId) {
clearTimeout(toolOnlyTimeoutId);
toolOnlyTimeoutId = null;
}
if (streamingController && !streamingController.isTerminalPhase) {
try {
await streamingController.onDeliver(payload);
} catch (err) {
log?.error(
`Streaming deliver error: ${err instanceof Error ? err.message : String(err)}`,
);
}
const replyPreview = (payload.text ?? "").trim();
if (
event.type === "group" &&
(replyPreview === "NO_REPLY" || replyPreview === "[SKIP]")
) {
log?.info(
`Model decided to skip group message (${replyPreview}) from ${event.senderId}`,
);
return;
}
if (streamingController.shouldFallbackToStatic) {
log?.info("Streaming API unavailable, falling back to static for this deliver");
} else {
recordOutbound();
return;
}
}
const quoteRef = event.msgIdx;
let quoteRefUsed = false;
const consumeQuoteRef = (): string | undefined => {
if (quoteRef && !quoteRefUsed) {
quoteRefUsed = true;
return quoteRef;
}
return undefined;
};
let replyText = payload.text ?? "";
const deliverEvent = {
type: event.type,
senderId: event.senderId,
messageId: event.messageId,
channelId: event.channelId,
groupOpenid: event.groupOpenid,
msgIdx: event.msgIdx,
};
const deliverActx = { account, qualifiedTarget, log };
// 1. Media tags
const mediaResult = await parseAndSendMediaTags(
replyText,
deliverEvent,
deliverActx,
sendWithRetry,
consumeQuoteRef,
deliverDeps,
);
if (mediaResult.handled) {
recordOutbound();
return;
}
replyText = mediaResult.normalizedText;
// 2. Structured payload (QQBOT_PAYLOAD:)
const handled = await handleStructuredPayload(
replyCtx,
replyText,
recordOutbound,
replyDeps,
);
if (handled) {
return;
}
// 3. Voice-intent plain text
if (payload.audioAsVoice === true && !payload.mediaUrl && !payload.mediaUrls?.length) {
const sentVoice = await sendTextAsVoiceReply(replyCtx, replyText, replyDeps);
if (sentVoice) {
recordOutbound();
return;
}
}
// 4. Plain text + images/media
await sendPlainReply(
payload,
replyText,
deliverEvent,
deliverActx,
sendWithRetry,
consumeQuoteRef,
toolMediaUrls,
deliverDeps,
);
recordOutbound();
},
onError: async (err: unknown) => {
if (streamingController && !streamingController.isTerminalPhase) {
try {
await streamingController.onError(err);
} catch (streamErr) {
const streamErrMsg =
streamErr instanceof Error ? streamErr.message : String(streamErr);
log?.error(`Streaming onError failed: ${streamErrMsg}`);
}
if (!streamingController.shouldFallbackToStatic) {
return;
}
}
const errMsg = err instanceof Error ? err.message : String(err);
log?.error(`Dispatch error: ${errMsg}`);
hasResponse = true;
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
},
},
replyOptions: {
disableBlockStreaming: useOfficialC2cStream
? true
: (() => {
const s = account.config?.streaming;
if (s === false) {
return true;
if (toolOnlyTimeoutId) {
clearTimeout(toolOnlyTimeoutId);
toolOnlyTimeoutId = null;
}
return typeof s === "object" && s !== null && s.mode === "off";
})(),
...(streamingController
? {
onPartialReply: async (payload: { text?: string }) => {
if (streamingController && !streamingController.isTerminalPhase) {
try {
await streamingController.onPartialReply(payload);
} catch (partialErr) {
await streamingController.onDeliver(payload);
} catch (err) {
log?.error(
`Streaming onPartialReply error: ${partialErr instanceof Error ? partialErr.message : String(partialErr)}`,
`Streaming deliver error: ${err instanceof Error ? err.message : String(err)}`,
);
}
},
}
: {}),
},
const replyPreview = (payload.text ?? "").trim();
if (
event.type === "group" &&
(replyPreview === "NO_REPLY" || replyPreview === "[SKIP]")
) {
log?.info(
`Model decided to skip group message (${replyPreview}) from ${event.senderId}`,
);
return;
}
if (streamingController.shouldFallbackToStatic) {
log?.info("Streaming API unavailable, falling back to static for this deliver");
} else {
recordOutbound();
return;
}
}
const quoteRef = event.msgIdx;
let quoteRefUsed = false;
const consumeQuoteRef = (): string | undefined => {
if (quoteRef && !quoteRefUsed) {
quoteRefUsed = true;
return quoteRef;
}
return undefined;
};
let replyText = payload.text ?? "";
const deliverEvent = {
type: event.type,
senderId: event.senderId,
messageId: event.messageId,
channelId: event.channelId,
groupOpenid: event.groupOpenid,
msgIdx: event.msgIdx,
};
const deliverActx = { account, qualifiedTarget, log };
// 1. Media tags
const mediaResult = await parseAndSendMediaTags(
replyText,
deliverEvent,
deliverActx,
sendWithRetry,
consumeQuoteRef,
deliverDeps,
);
if (mediaResult.handled) {
recordOutbound();
return;
}
replyText = mediaResult.normalizedText;
// 2. Structured payload (QQBOT_PAYLOAD:)
const handled = await handleStructuredPayload(
replyCtx,
replyText,
recordOutbound,
replyDeps,
);
if (handled) {
return;
}
// 3. Voice-intent plain text
if (
payload.audioAsVoice === true &&
!payload.mediaUrl &&
!payload.mediaUrls?.length
) {
const sentVoice = await sendTextAsVoiceReply(replyCtx, replyText, replyDeps);
if (sentVoice) {
recordOutbound();
return;
}
}
// 4. Plain text + images/media
await sendPlainReply(
payload,
replyText,
deliverEvent,
deliverActx,
sendWithRetry,
consumeQuoteRef,
toolMediaUrls,
deliverDeps,
);
recordOutbound();
},
onError: async (err: unknown) => {
if (streamingController && !streamingController.isTerminalPhase) {
try {
await streamingController.onError(err);
} catch (streamErr) {
const streamErrMsg =
streamErr instanceof Error ? streamErr.message : String(streamErr);
log?.error(`Streaming onError failed: ${streamErrMsg}`);
}
if (!streamingController.shouldFallbackToStatic) {
return;
}
}
const errMsg = err instanceof Error ? err.message : String(err);
log?.error(`Dispatch error: ${errMsg}`);
hasResponse = true;
if (timeoutId) {
clearTimeout(timeoutId);
timeoutId = null;
}
},
},
replyOptions: {
disableBlockStreaming: useOfficialC2cStream
? true
: (() => {
const s = account.config?.streaming;
if (s === false) {
return true;
}
return typeof s === "object" && s !== null && s.mode === "off";
})(),
...(streamingController
? {
onPartialReply: async (payload: { text?: string }) => {
try {
await streamingController.onPartialReply(payload);
} catch (partialErr) {
log?.error(
`Streaming onPartialReply error: ${partialErr instanceof Error ? partialErr.message : String(partialErr)}`,
);
}
},
}
: {}),
},
}),
}),
},
});
try {
@@ -493,7 +512,10 @@ export async function dispatchOutbound(
// ============ ctxPayload builder ============
function buildCtxPayload(inbound: InboundContext, runtime: GatewayPluginRuntime): unknown {
function buildCtxPayload(
inbound: InboundContext,
runtime: GatewayPluginRuntime,
): FinalizedMsgContext {
const { event } = inbound;
return runtime.channel.reply.finalizeInboundContext({
Body: inbound.body,
@@ -549,5 +571,5 @@ function buildCtxPayload(inbound: InboundContext, runtime: GatewayPluginRuntime)
ReplyToIsQuote: inbound.replyTo.isQuote,
}
: {}),
});
}) as FinalizedMsgContext;
}

View File

@@ -57,7 +57,7 @@ export interface GatewayPluginRuntime {
recordInboundSession: (params: unknown) => Promise<unknown>;
};
turn: {
runPrepared: (params: unknown) => Promise<unknown>;
run: (params: unknown) => Promise<unknown>;
};
text: {
chunkMarkdownText: (text: string, limit: number) => string[];

View File

@@ -25,7 +25,7 @@ import {
toInternalMessageReceivedContext,
triggerInternalHook,
} from "openclaw/plugin-sdk/hook-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { runInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { kindFromMime } from "openclaw/plugin-sdk/media-runtime";
import {
buildPendingHistoryContextFromMap,
@@ -288,72 +288,85 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
},
});
await runPreparedInboundReplyTurn({
await runInboundReplyTurn({
channel: "signal",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession,
record: {
updateLastRoute: !entry.isGroup
? {
sessionKey: route.mainSessionKey,
channel: "signal",
to: entry.senderRecipient,
accountId: route.accountId,
mainDmOwnerPin: (() => {
const pinnedOwner = resolvePinnedMainDmOwnerFromAllowlist({
dmScope: deps.cfg.session?.dmScope,
allowFrom: deps.allowFrom,
normalizeEntry: normalizeSignalAllowRecipient,
});
if (!pinnedOwner) {
return undefined;
}
return {
ownerRecipient: pinnedOwner,
senderRecipient: entry.senderRecipient,
onSkip: ({ ownerRecipient, senderRecipient }) => {
logVerbose(
`signal: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`,
);
},
};
})(),
}
: undefined,
onRecordError: (err) => {
logVerbose(`signal: failed updating session meta: ${String(err)}`);
},
},
history: {
isGroup: entry.isGroup,
historyKey,
historyMap: deps.groupHistories,
limit: deps.historyLimit,
},
onPreDispatchFailure: () =>
settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
raw: entry,
adapter: {
ingest: () => ({
id: entry.messageId ?? `${entry.timestamp ?? Date.now()}`,
timestamp: entry.timestamp,
rawText: entry.bodyText,
raw: entry,
}),
runDispatch: async () => {
try {
return await dispatchInboundMessage({
ctx: ctxPayload,
cfg: deps.cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined,
onModelSelected,
resolveTurn: () => ({
channel: "signal",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession,
record: {
updateLastRoute: !entry.isGroup
? {
sessionKey: route.mainSessionKey,
channel: "signal",
to: entry.senderRecipient,
accountId: route.accountId,
mainDmOwnerPin: (() => {
const pinnedOwner = resolvePinnedMainDmOwnerFromAllowlist({
dmScope: deps.cfg.session?.dmScope,
allowFrom: deps.allowFrom,
normalizeEntry: normalizeSignalAllowRecipient,
});
if (!pinnedOwner) {
return undefined;
}
return {
ownerRecipient: pinnedOwner,
senderRecipient: entry.senderRecipient,
onSkip: ({ ownerRecipient, senderRecipient }) => {
logVerbose(
`signal: skip main-session last route for ${senderRecipient} (pinned owner ${ownerRecipient})`,
);
},
};
})(),
}
: undefined,
onRecordError: (err) => {
logVerbose(`signal: failed updating session meta: ${String(err)}`);
},
});
} finally {
markDispatchIdle();
}
},
history: {
isGroup: entry.isGroup,
historyKey,
historyMap: deps.groupHistories,
limit: deps.historyLimit,
},
onPreDispatchFailure: () =>
settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
}),
runDispatch: async () => {
try {
return await dispatchInboundMessage({
ctx: ctxPayload,
cfg: deps.cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined,
onModelSelected,
},
});
} finally {
markDispatchIdle();
}
},
}),
},
});
}

View File

@@ -19,8 +19,9 @@ import {
} from "openclaw/plugin-sdk/channel-streaming";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import {
type ChannelTurnRecordOptions,
hasVisibleInboundReplyDispatch,
runPreparedInboundReplyTurn,
runInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { resolveAgentOutboundIdentity } from "openclaw/plugin-sdk/outbound-runtime";
import { clearHistoryEntriesIfEnabled } from "openclaw/plugin-sdk/reply-history";
@@ -987,93 +988,111 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
let counts: { final?: number; block?: number } = {};
let dispatchSettledBeforeStart = false;
try {
const { dispatchResult } = await runPreparedInboundReplyTurn({
const turnResult = await runInboundReplyTurn({
channel: "slack",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath: prepared.turn.storePath,
ctxPayload: prepared.ctxPayload,
recordInboundSession,
record: prepared.turn.record as Parameters<typeof runPreparedInboundReplyTurn>[0]["record"],
onPreDispatchFailure: async () => {
dispatchSettledBeforeStart = true;
await settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
});
},
runDispatch: () =>
dispatchInboundMessage({
ctx: prepared.ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: prepared.channelConfig?.skills,
sourceReplyDeliveryMode,
hasRepliedRef,
disableBlockStreaming,
onModelSelected,
suppressDefaultToolProgressMessages: previewToolProgressEnabled ? true : undefined,
onPartialReply: useStreaming
? undefined
: !previewStreamingEnabled
? undefined
: async (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: onDraftBoundary,
onReasoningEnd: onDraftBoundary,
onReasoningStream: statusReactionsEnabled
? async () => {
await statusReactions.setThinking();
}
: undefined,
onToolStart: async (payload) => {
if (statusReactionsEnabled) {
await statusReactions.setTool(payload.name);
}
pushPreviewToolProgress(payload.name ? `tool: ${payload.name}` : "tool running");
},
onItemEvent: async (payload) => {
pushPreviewToolProgress(
payload.progressText ?? payload.summary ?? payload.title ?? payload.name,
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning");
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
pushPreviewToolProgress(
payload.command ? `approval: ${payload.command}` : "approval requested",
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
pushPreviewToolProgress(
payload.name
? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}`
: payload.title,
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied");
},
},
raw: prepared.message,
adapter: {
ingest: () => ({
id: prepared.message.ts ?? `${prepared.ctxPayload.From}:${Date.now()}`,
timestamp: prepared.message.ts ? Number(prepared.message.ts) * 1000 : undefined,
rawText: prepared.ctxPayload.RawBody ?? "",
textForAgent: prepared.ctxPayload.BodyForAgent,
textForCommands: prepared.ctxPayload.CommandBody,
raw: prepared.message,
}),
resolveTurn: () => ({
channel: "slack",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath: prepared.turn.storePath,
ctxPayload: prepared.ctxPayload,
recordInboundSession,
record: prepared.turn.record as ChannelTurnRecordOptions,
onPreDispatchFailure: async () => {
dispatchSettledBeforeStart = true;
await settleReplyDispatcher({
dispatcher,
onSettled: () => markDispatchIdle(),
});
},
runDispatch: () =>
dispatchInboundMessage({
ctx: prepared.ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
skillFilter: prepared.channelConfig?.skills,
sourceReplyDeliveryMode,
hasRepliedRef,
disableBlockStreaming,
onModelSelected,
suppressDefaultToolProgressMessages: previewToolProgressEnabled ? true : undefined,
onPartialReply: useStreaming
? undefined
: !previewStreamingEnabled
? undefined
: async (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: onDraftBoundary,
onReasoningEnd: onDraftBoundary,
onReasoningStream: statusReactionsEnabled
? async () => {
await statusReactions.setThinking();
}
: undefined,
onToolStart: async (payload) => {
if (statusReactionsEnabled) {
await statusReactions.setTool(payload.name);
}
pushPreviewToolProgress(payload.name ? `tool: ${payload.name}` : "tool running");
},
onItemEvent: async (payload) => {
pushPreviewToolProgress(
payload.progressText ?? payload.summary ?? payload.title ?? payload.name,
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning");
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
pushPreviewToolProgress(
payload.command ? `approval: ${payload.command}` : "approval requested",
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
pushPreviewToolProgress(
payload.name
? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}`
: payload.title,
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied");
},
},
}),
}),
},
});
const result = dispatchResult;
if (!turnResult.dispatched) {
return;
}
const result = turnResult.dispatchResult;
queuedFinal = result.queuedFinal;
counts = result.counts;
} catch (err) {

View File

@@ -19,7 +19,7 @@ import type {
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import {
hasFinalInboundReplyDispatch,
runPreparedInboundReplyTurn,
runInboundReplyTurn,
} from "openclaw/plugin-sdk/inbound-reply-dispatch";
import {
createOutboundPayloadPlan,
@@ -844,313 +844,337 @@ export const dispatchTelegramMessage = async ({
});
try {
const { dispatchResult } = await runPreparedInboundReplyTurn({
const turnResult = await runInboundReplyTurn({
channel: "telegram",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath: context.turn.storePath,
ctxPayload,
recordInboundSession: context.turn.recordInboundSession,
record: context.turn.record,
runDispatch: () =>
telegramDeps.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
...replyPipeline,
beforeDeliver: async (payload) => payload,
deliver: async (payload, info) => {
if (isDispatchSuperseded()) {
return;
}
const clearPendingCompactionReplayBoundaryOnVisibleBoundary = (
didDeliver: boolean,
) => {
if (didDeliver && info.kind !== "final") {
pendingCompactionReplayBoundary = false;
}
};
if (payload.isError === true) {
hadErrorReplyFailureOrSkip = true;
}
if (info.kind === "final") {
await enqueueDraftLaneEvent(async () => {});
}
if (
shouldSuppressLocalTelegramExecApprovalPrompt({
cfg,
accountId: route.accountId,
payload,
})
) {
queuedFinal = true;
return;
}
const previewButtons = (
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
)?.buttons;
const split = splitTextIntoLaneSegments(payload.text);
const segments = split.segments;
const reply = resolveSendableOutboundReplyParts(payload);
const _hasMedia = reply.hasMedia;
const flushBufferedFinalAnswer = async () => {
const buffered = reasoningStepState.takeBufferedFinalAnswer();
if (!buffered) {
return;
}
const bufferedButtons = (
buffered.payload.channelData?.telegram as
| { buttons?: TelegramInlineButtons }
| undefined
)?.buttons;
await deliverLaneText({
laneName: "answer",
text: buffered.text,
payload: buffered.payload,
infoKind: "final",
previewButtons: bufferedButtons,
});
reasoningStepState.resetForNextStep();
};
for (const segment of segments) {
if (
segment.lane === "answer" &&
info.kind === "final" &&
reasoningStepState.shouldBufferFinalAnswer()
) {
reasoningStepState.bufferFinalAnswer({
payload,
text: segment.text,
});
continue;
}
if (segment.lane === "reasoning") {
reasoningStepState.noteReasoningHint();
}
const result = await deliverLaneText({
laneName: segment.lane,
text: segment.text,
payload,
infoKind: info.kind,
previewButtons,
allowPreviewUpdateForNonFinal: segment.lane === "reasoning",
});
if (info.kind === "final") {
emitPreviewFinalizedHook(result);
}
if (segment.lane === "reasoning") {
if (result.kind !== "skipped") {
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
}
continue;
}
if (info.kind === "final") {
if (reasoningLane.hasStreamedMessage) {
activePreviewLifecycleByLane.reasoning = "complete";
retainPreviewOnCleanupByLane.reasoning = true;
}
reasoningStepState.resetForNextStep();
}
}
if (segments.length > 0) {
if (info.kind === "final") {
pendingCompactionReplayBoundary = false;
}
return;
}
if (split.suppressedReasoningOnly) {
if (reply.hasMedia) {
const payloadWithoutSuppressedReasoning =
typeof payload.text === "string" ? { ...payload, text: "" } : payload;
clearPendingCompactionReplayBoundaryOnVisibleBoundary(
await sendPayload(payloadWithoutSuppressedReasoning),
);
}
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
return;
}
if (info.kind === "final") {
await answerLane.stream?.stop();
await reasoningLane.stream?.stop();
reasoningStepState.resetForNextStep();
}
const canSendAsIs = reply.hasMedia || reply.text.length > 0;
if (!canSendAsIs) {
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
return;
}
clearPendingCompactionReplayBoundaryOnVisibleBoundary(await sendPayload(payload));
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
},
onSkip: (payload, info) => {
if (payload.isError === true) {
hadErrorReplyFailureOrSkip = true;
}
if (info.reason !== "silent") {
deliveryState.markNonSilentSkip();
}
},
onError: (err, info) => {
const errorPolicy = resolveTelegramErrorPolicy({
accountConfig: telegramCfg,
groupConfig,
topicConfig,
});
if (isSilentErrorPolicy(errorPolicy.policy)) {
return;
}
if (
errorPolicy.policy === "once" &&
shouldSuppressTelegramError({
scopeKey: buildTelegramErrorScopeKey({
accountId: route.accountId,
chatId,
threadId: threadSpec.id,
}),
cooldownMs: errorPolicy.cooldownMs,
errorMessage: String(err),
})
) {
return;
}
deliveryState.markNonSilentFailure();
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
},
},
replyOptions: {
skillFilter,
disableBlockStreaming,
onPartialReply:
answerLane.stream || reasoningLane.stream
? (payload) =>
enqueueDraftLaneEvent(async () => {
await ingestDraftLaneSegments(payload.text);
})
: undefined,
onReasoningStream: reasoningLane.stream
? (payload) =>
enqueueDraftLaneEvent(async () => {
if (splitReasoningOnNextStream) {
reasoningLane.stream?.forceNewMessage();
resetDraftLaneState(reasoningLane);
splitReasoningOnNextStream = false;
}
await ingestDraftLaneSegments(payload.text);
})
: undefined,
onAssistantMessageStart: answerLane.stream
? () =>
enqueueDraftLaneEvent(async () => {
reasoningStepState.resetForNextStep();
previewToolProgressSuppressed = false;
previewToolProgressLines = [];
if (skipNextAnswerMessageStartRotation) {
skipNextAnswerMessageStartRotation = false;
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
return;
}
if (pendingCompactionReplayBoundary) {
pendingCompactionReplayBoundary = false;
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
return;
}
await rotateAnswerLaneForNewAssistantMessage();
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
})
: undefined,
onReasoningEnd: reasoningLane.stream
? () =>
enqueueDraftLaneEvent(async () => {
splitReasoningOnNextStream = reasoningLane.hasStreamedMessage;
previewToolProgressSuppressed = false;
previewToolProgressLines = [];
})
: undefined,
suppressDefaultToolProgressMessages:
!previewStreamingEnabled || Boolean(answerLane.stream),
onToolStart: async (payload) => {
const toolName = payload.name?.trim();
if (statusReactionController && toolName) {
await statusReactionController.setTool(toolName);
}
pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running");
},
onItemEvent: async (payload) => {
pushPreviewToolProgress(
payload.progressText ?? payload.summary ?? payload.title ?? payload.name,
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning");
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
pushPreviewToolProgress(
payload.command ? `approval: ${payload.command}` : "approval requested",
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
pushPreviewToolProgress(
payload.name
? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}`
: payload.title,
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied");
},
onCompactionStart:
statusReactionController || answerLane.stream
? async () => {
if (
answerLane.hasStreamedMessage &&
activePreviewLifecycleByLane.answer === "transient"
) {
pendingCompactionReplayBoundary = true;
}
if (statusReactionController) {
await statusReactionController.setCompacting();
}
}
: undefined,
onCompactionEnd: statusReactionController
? async () => {
statusReactionController.cancelPending();
await statusReactionController.setThinking();
}
: undefined,
onModelSelected,
},
raw: context,
adapter: {
ingest: () => ({
id: ctxPayload.MessageSid ?? `${chatId}:${Date.now()}`,
timestamp: typeof ctxPayload.Timestamp === "number" ? ctxPayload.Timestamp : undefined,
rawText: ctxPayload.RawBody ?? "",
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: context,
}),
resolveTurn: () => ({
channel: "telegram",
accountId: route.accountId,
routeSessionKey: route.sessionKey,
storePath: context.turn.storePath,
ctxPayload,
recordInboundSession: context.turn.recordInboundSession,
record: context.turn.record,
runDispatch: () =>
telegramDeps.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
...replyPipeline,
beforeDeliver: async (payload) => payload,
deliver: async (payload, info) => {
if (isDispatchSuperseded()) {
return;
}
const clearPendingCompactionReplayBoundaryOnVisibleBoundary = (
didDeliver: boolean,
) => {
if (didDeliver && info.kind !== "final") {
pendingCompactionReplayBoundary = false;
}
};
if (payload.isError === true) {
hadErrorReplyFailureOrSkip = true;
}
if (info.kind === "final") {
await enqueueDraftLaneEvent(async () => {});
}
if (
shouldSuppressLocalTelegramExecApprovalPrompt({
cfg,
accountId: route.accountId,
payload,
})
) {
queuedFinal = true;
return;
}
const previewButtons = (
payload.channelData?.telegram as
| { buttons?: TelegramInlineButtons }
| undefined
)?.buttons;
const split = splitTextIntoLaneSegments(payload.text);
const segments = split.segments;
const reply = resolveSendableOutboundReplyParts(payload);
const _hasMedia = reply.hasMedia;
const flushBufferedFinalAnswer = async () => {
const buffered = reasoningStepState.takeBufferedFinalAnswer();
if (!buffered) {
return;
}
const bufferedButtons = (
buffered.payload.channelData?.telegram as
| { buttons?: TelegramInlineButtons }
| undefined
)?.buttons;
await deliverLaneText({
laneName: "answer",
text: buffered.text,
payload: buffered.payload,
infoKind: "final",
previewButtons: bufferedButtons,
});
reasoningStepState.resetForNextStep();
};
for (const segment of segments) {
if (
segment.lane === "answer" &&
info.kind === "final" &&
reasoningStepState.shouldBufferFinalAnswer()
) {
reasoningStepState.bufferFinalAnswer({
payload,
text: segment.text,
});
continue;
}
if (segment.lane === "reasoning") {
reasoningStepState.noteReasoningHint();
}
const result = await deliverLaneText({
laneName: segment.lane,
text: segment.text,
payload,
infoKind: info.kind,
previewButtons,
allowPreviewUpdateForNonFinal: segment.lane === "reasoning",
});
if (info.kind === "final") {
emitPreviewFinalizedHook(result);
}
if (segment.lane === "reasoning") {
if (result.kind !== "skipped") {
reasoningStepState.noteReasoningDelivered();
await flushBufferedFinalAnswer();
}
continue;
}
if (info.kind === "final") {
if (reasoningLane.hasStreamedMessage) {
activePreviewLifecycleByLane.reasoning = "complete";
retainPreviewOnCleanupByLane.reasoning = true;
}
reasoningStepState.resetForNextStep();
}
}
if (segments.length > 0) {
if (info.kind === "final") {
pendingCompactionReplayBoundary = false;
}
return;
}
if (split.suppressedReasoningOnly) {
if (reply.hasMedia) {
const payloadWithoutSuppressedReasoning =
typeof payload.text === "string" ? { ...payload, text: "" } : payload;
clearPendingCompactionReplayBoundaryOnVisibleBoundary(
await sendPayload(payloadWithoutSuppressedReasoning),
);
}
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
return;
}
if (info.kind === "final") {
await answerLane.stream?.stop();
await reasoningLane.stream?.stop();
reasoningStepState.resetForNextStep();
}
const canSendAsIs = reply.hasMedia || reply.text.length > 0;
if (!canSendAsIs) {
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
return;
}
clearPendingCompactionReplayBoundaryOnVisibleBoundary(
await sendPayload(payload),
);
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
},
onSkip: (payload, info) => {
if (payload.isError === true) {
hadErrorReplyFailureOrSkip = true;
}
if (info.reason !== "silent") {
deliveryState.markNonSilentSkip();
}
},
onError: (err, info) => {
const errorPolicy = resolveTelegramErrorPolicy({
accountConfig: telegramCfg,
groupConfig,
topicConfig,
});
if (isSilentErrorPolicy(errorPolicy.policy)) {
return;
}
if (
errorPolicy.policy === "once" &&
shouldSuppressTelegramError({
scopeKey: buildTelegramErrorScopeKey({
accountId: route.accountId,
chatId,
threadId: threadSpec.id,
}),
cooldownMs: errorPolicy.cooldownMs,
errorMessage: String(err),
})
) {
return;
}
deliveryState.markNonSilentFailure();
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
},
},
replyOptions: {
skillFilter,
disableBlockStreaming,
onPartialReply:
answerLane.stream || reasoningLane.stream
? (payload) =>
enqueueDraftLaneEvent(async () => {
await ingestDraftLaneSegments(payload.text);
})
: undefined,
onReasoningStream: reasoningLane.stream
? (payload) =>
enqueueDraftLaneEvent(async () => {
if (splitReasoningOnNextStream) {
reasoningLane.stream?.forceNewMessage();
resetDraftLaneState(reasoningLane);
splitReasoningOnNextStream = false;
}
await ingestDraftLaneSegments(payload.text);
})
: undefined,
onAssistantMessageStart: answerLane.stream
? () =>
enqueueDraftLaneEvent(async () => {
reasoningStepState.resetForNextStep();
previewToolProgressSuppressed = false;
previewToolProgressLines = [];
if (skipNextAnswerMessageStartRotation) {
skipNextAnswerMessageStartRotation = false;
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
return;
}
if (pendingCompactionReplayBoundary) {
pendingCompactionReplayBoundary = false;
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
return;
}
await rotateAnswerLaneForNewAssistantMessage();
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
})
: undefined,
onReasoningEnd: reasoningLane.stream
? () =>
enqueueDraftLaneEvent(async () => {
splitReasoningOnNextStream = reasoningLane.hasStreamedMessage;
previewToolProgressSuppressed = false;
previewToolProgressLines = [];
})
: undefined,
suppressDefaultToolProgressMessages:
!previewStreamingEnabled || Boolean(answerLane.stream),
onToolStart: async (payload) => {
const toolName = payload.name?.trim();
if (statusReactionController && toolName) {
await statusReactionController.setTool(toolName);
}
pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running");
},
onItemEvent: async (payload) => {
pushPreviewToolProgress(
payload.progressText ?? payload.summary ?? payload.title ?? payload.name,
);
},
onPlanUpdate: async (payload) => {
if (payload.phase !== "update") {
return;
}
pushPreviewToolProgress(
payload.explanation ?? payload.steps?.[0] ?? "planning",
);
},
onApprovalEvent: async (payload) => {
if (payload.phase !== "requested") {
return;
}
pushPreviewToolProgress(
payload.command ? `approval: ${payload.command}` : "approval requested",
);
},
onCommandOutput: async (payload) => {
if (payload.phase !== "end") {
return;
}
pushPreviewToolProgress(
payload.name
? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}`
: payload.title,
);
},
onPatchSummary: async (payload) => {
if (payload.phase !== "end") {
return;
}
pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied");
},
onCompactionStart:
statusReactionController || answerLane.stream
? async () => {
if (
answerLane.hasStreamedMessage &&
activePreviewLifecycleByLane.answer === "transient"
) {
pendingCompactionReplayBoundary = true;
}
if (statusReactionController) {
await statusReactionController.setCompacting();
}
}
: undefined,
onCompactionEnd: statusReactionController
? async () => {
statusReactionController.cancelPending();
await statusReactionController.setThinking();
}
: undefined,
onModelSelected,
},
}),
}),
},
});
({ queuedFinal } = dispatchResult);
if (!turnResult.dispatched) {
return;
}
({ queuedFinal } = turnResult.dispatchResult);
} catch (err) {
dispatchError = err;
runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`));

View File

@@ -13,7 +13,7 @@ import {
toPluginMessageReceivedEvent,
triggerInternalHook,
} from "openclaw/plugin-sdk/hook-runtime";
import { runPreparedInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { runInboundReplyTurn } from "openclaw/plugin-sdk/inbound-reply-dispatch";
import { getGlobalHookRunner } from "openclaw/plugin-sdk/plugin-runtime";
import { resolveBatchedReplyThreadingPolicy } from "openclaw/plugin-sdk/reply-reference";
import { getPrimaryIdentityId, getSelfIdentity, getSenderIdentity } from "../../identity.js";
@@ -454,52 +454,68 @@ export async function processMessage(params: {
warn: params.replyLogger.warn.bind(params.replyLogger),
});
const { dispatchResult: didSendReply } = await runPreparedInboundReplyTurn({
const turnResult = await runInboundReplyTurn({
channel: "whatsapp",
accountId: params.route.accountId,
routeSessionKey: params.route.sessionKey,
storePath,
ctxPayload,
recordInboundSession,
record: {
onRecordError: (err) => {
params.replyLogger.warn(
{
error: formatError(err),
storePath,
sessionKey: params.route.sessionKey,
},
"failed updating session meta",
);
},
trackSessionMetaTask: (task) => {
trackBackgroundTask(params.backgroundTasks, task);
},
},
runDispatch: () =>
dispatchWhatsAppBufferedReply({
cfg: params.cfg,
connectionId: params.connectionId,
context: ctxPayload,
conversationId,
deliverReply: deliverWebReply,
groupHistories: params.groupHistories,
groupHistoryKey: params.groupHistoryKey,
maxMediaBytes: params.maxMediaBytes,
maxMediaTextChunkLimit: params.maxMediaTextChunkLimit,
msg: params.msg,
onModelSelected,
rememberSentText: params.rememberSentText,
replyLogger: params.replyLogger,
replyPipeline: {
...replyPipeline,
responsePrefix,
},
replyResolver: params.replyResolver,
route: params.route,
shouldClearGroupHistory,
raw: params.msg,
adapter: {
ingest: () => ({
id: params.msg.id ?? `${conversationId}:${Date.now()}`,
timestamp: params.msg.timestamp,
rawText: ctxPayload.RawBody ?? "",
textForAgent: ctxPayload.BodyForAgent,
textForCommands: ctxPayload.CommandBody,
raw: params.msg,
}),
resolveTurn: () => ({
channel: "whatsapp",
accountId: params.route.accountId,
routeSessionKey: params.route.sessionKey,
storePath,
ctxPayload,
recordInboundSession,
record: {
onRecordError: (err) => {
params.replyLogger.warn(
{
error: formatError(err),
storePath,
sessionKey: params.route.sessionKey,
},
"failed updating session meta",
);
},
trackSessionMetaTask: (task) => {
trackBackgroundTask(params.backgroundTasks, task);
},
},
runDispatch: () =>
dispatchWhatsAppBufferedReply({
cfg: params.cfg,
connectionId: params.connectionId,
context: ctxPayload,
conversationId,
deliverReply: deliverWebReply,
groupHistories: params.groupHistories,
groupHistoryKey: params.groupHistoryKey,
maxMediaBytes: params.maxMediaBytes,
maxMediaTextChunkLimit: params.maxMediaTextChunkLimit,
msg: params.msg,
onModelSelected,
rememberSentText: params.rememberSentText,
replyLogger: params.replyLogger,
replyPipeline: {
...replyPipeline,
responsePrefix,
},
replyResolver: params.replyResolver,
route: params.route,
shouldClearGroupHistory,
}),
}),
},
});
const didSendReply = turnResult.dispatched ? turnResult.dispatchResult : false;
removeAckReactionHandleAfterReply({
removeAfterReply: Boolean(params.cfg.messages?.removeAckAfterReply && didSendReply),
ackReaction,

View File

@@ -257,13 +257,23 @@ export function createImageLifecycleCore() {
updateLastRoute: resolved.record?.updateLastRoute,
onRecordError: resolved.record?.onRecordError ?? (() => undefined),
});
if ("runDispatch" in resolved) {
const dispatchResult = await resolved.runDispatch();
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: resolved.ctxPayload,
routeSessionKey: resolved.routeSessionKey,
dispatchResult,
};
}
const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({
ctx: resolved.ctxPayload,
cfg: resolved.cfg,
dispatcherOptions: {
...resolved.dispatcherOptions,
deliver: async (payload, info) => {
await resolved.delivery.deliver(payload, info);
deliver: async (...args: Parameters<typeof resolved.delivery.deliver>) => {
await resolved.delivery.deliver(...args);
},
onError: resolved.delivery.onError,
},

View File

@@ -102,13 +102,23 @@ function installRuntime(params: {
updateLastRoute: turn.record?.updateLastRoute,
onRecordError: turn.record?.onRecordError ?? (() => undefined),
});
if ("runDispatch" in turn) {
const dispatchResult = await turn.runDispatch();
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: turn.ctxPayload,
routeSessionKey: turn.routeSessionKey,
dispatchResult,
};
}
const dispatchResult = await turn.dispatchReplyWithBufferedBlockDispatcher({
ctx: turn.ctxPayload,
cfg: turn.cfg,
dispatcherOptions: {
...turn.dispatcherOptions,
deliver: async (payload, info) => {
await turn.delivery.deliver(payload, info);
deliver: async (...args: Parameters<typeof turn.delivery.deliver>) => {
await turn.delivery.deliver(...args);
},
onError: turn.delivery.onError,
},

View File

@@ -310,6 +310,38 @@ describe("channel turn kernel", () => {
);
});
it("runs custom prepared dispatch from a full turn adapter", async () => {
const events: string[] = [];
const result = await runChannelTurn({
channel: "test",
raw: { id: "msg-1", text: "hello" },
adapter: {
ingest: () => ({ id: "msg-1", rawText: "hello" }),
resolveTurn: () => ({
channel: "test",
routeSessionKey: "agent:main:test:peer",
storePath: "/tmp/sessions.json",
ctxPayload: createCtx(),
recordInboundSession: createRecordInboundSession(events),
runDispatch: async () => {
events.push("custom-dispatch");
return {
queuedFinal: true,
counts: { tool: 0, block: 0, final: 1 },
};
},
}),
},
});
expect(events).toEqual(["record", "custom-dispatch"]);
expect(result.dispatched).toBe(true);
if (!result.dispatched) {
throw new Error("expected dispatch");
}
expect(result.dispatchResult.queuedFinal).toBe(true);
});
it("finalizes failed dispatches before rethrowing", async () => {
const onFinalize = vi.fn();
const dispatchError = new Error("dispatch failed");

View File

@@ -9,12 +9,12 @@ import type {
ChannelTurnDeliveryAdapter,
ChannelTurnHistoryFinalizeOptions,
ChannelTurnLogEvent,
ChannelTurnResolved,
ChannelTurnResult,
DispatchedChannelTurnResult,
PreparedChannelTurn,
PreflightFacts,
RunChannelTurnParams,
RunResolvedChannelTurnParams,
} from "./types.js";
export {
EMPTY_CHANNEL_TURN_DISPATCH_COUNTS,
@@ -49,7 +49,6 @@ export type {
ReplyPlanFacts,
RouteFacts,
RunChannelTurnParams,
RunResolvedChannelTurnParams,
SenderFacts,
SupplementalContextFacts,
} from "./types.js";
@@ -143,6 +142,29 @@ export async function dispatchAssembledChannelTurn(
});
}
function isPreparedChannelTurn<TDispatchResult>(
value: ChannelTurnResolved<TDispatchResult>,
): value is PreparedChannelTurn<TDispatchResult> & {
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
} {
return "runDispatch" in value;
}
async function dispatchResolvedChannelTurn<TDispatchResult>(
params: ChannelTurnResolved<TDispatchResult> & {
admission: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
log?: (event: ChannelTurnLogEvent) => void;
messageId?: string;
},
): Promise<DispatchedChannelTurnResult<TDispatchResult>> {
if (isPreparedChannelTurn(params)) {
return await runPreparedChannelTurn(params);
}
return (await dispatchAssembledChannelTurn(
params,
)) as DispatchedChannelTurnResult<TDispatchResult>;
}
export async function runPreparedChannelTurn<
TDispatchResult = DispatchedChannelTurnResult["dispatchResult"],
>(
@@ -248,9 +270,12 @@ export async function runPreparedChannelTurn<
};
}
export async function runChannelTurn<TRaw>(
params: RunChannelTurnParams<TRaw>,
): Promise<ChannelTurnResult> {
export async function runChannelTurn<
TRaw,
TDispatchResult = DispatchedChannelTurnResult["dispatchResult"],
>(
params: RunChannelTurnParams<TRaw, TDispatchResult>,
): Promise<ChannelTurnResult<TDispatchResult>> {
emit({
...params,
event: { stage: "ingest", event: "start" },
@@ -327,9 +352,9 @@ export async function runChannelTurn<TRaw>(
});
const admission = resolved.admission ?? preflightAdmission ?? ({ kind: "dispatch" } as const);
let result: ChannelTurnResult;
let result: ChannelTurnResult<TDispatchResult>;
try {
const dispatchResult = await dispatchAssembledChannelTurn(
const dispatchResult = await dispatchResolvedChannelTurn(
admission.kind === "observeOnly"
? {
...resolved,
@@ -350,7 +375,7 @@ export async function runChannelTurn<TRaw>(
admission,
};
} catch (err) {
const failedResult: ChannelTurnResult = {
const failedResult: ChannelTurnResult<TDispatchResult> = {
admission,
dispatched: false,
ctxPayload: resolved.ctxPayload,
@@ -406,18 +431,3 @@ export async function runChannelTurn<TRaw>(
return result;
}
export async function runResolvedChannelTurn<TRaw>(
params: RunResolvedChannelTurnParams<TRaw>,
): Promise<ChannelTurnResult> {
return await runChannelTurn({
channel: params.channel,
accountId: params.accountId,
raw: params.raw,
log: params.log,
adapter: {
ingest: (raw) => (typeof params.input === "function" ? params.input(raw) : params.input),
resolveTurn: params.resolveTurn,
},
});
}

View File

@@ -240,9 +240,13 @@ export type PreparedChannelTurn<TDispatchResult = DispatchFromConfigResult> = {
messageId?: string;
};
export type ChannelTurnResolved = AssembledChannelTurn & {
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
};
export type ChannelTurnResolved<TDispatchResult = DispatchFromConfigResult> =
| (AssembledChannelTurn & {
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
})
| (PreparedChannelTurn<TDispatchResult> & {
admission?: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
});
export type ChannelTurnStage =
| "ingest"
@@ -267,13 +271,14 @@ export type ChannelTurnLogEvent = {
error?: unknown;
};
export type ChannelTurnResult = {
admission: ChannelTurnAdmission;
dispatched: boolean;
ctxPayload?: MsgContext;
routeSessionKey?: string;
dispatchResult?: DispatchFromConfigResult;
};
export type ChannelTurnResult<TDispatchResult = DispatchFromConfigResult> =
| DispatchedChannelTurnResult<TDispatchResult>
| {
admission: ChannelTurnAdmission;
dispatched: false;
ctxPayload?: MsgContext;
routeSessionKey?: string;
};
export type DispatchedChannelTurnResult<TDispatchResult = DispatchFromConfigResult> = {
admission: Extract<ChannelTurnAdmission, { kind: "dispatch" | "observeOnly" }>;
@@ -283,7 +288,7 @@ export type DispatchedChannelTurnResult<TDispatchResult = DispatchFromConfigResu
dispatchResult: TDispatchResult;
};
export type ChannelTurnAdapter<TRaw> = {
export type ChannelTurnAdapter<TRaw, TDispatchResult = DispatchFromConfigResult> = {
ingest: (raw: TRaw) => Promise<NormalizedTurnInput | null> | NormalizedTurnInput | null;
classify?: (input: NormalizedTurnInput) => Promise<ChannelEventClass> | ChannelEventClass;
preflight?: (
@@ -299,29 +304,14 @@ export type ChannelTurnAdapter<TRaw> = {
input: NormalizedTurnInput,
eventClass: ChannelEventClass,
preflight: PreflightFacts,
) => Promise<ChannelTurnResolved> | ChannelTurnResolved;
onFinalize?: (result: ChannelTurnResult) => Promise<void> | void;
) => Promise<ChannelTurnResolved<TDispatchResult>> | ChannelTurnResolved<TDispatchResult>;
onFinalize?: (result: ChannelTurnResult<TDispatchResult>) => Promise<void> | void;
};
export type RunChannelTurnParams<TRaw> = {
export type RunChannelTurnParams<TRaw, TDispatchResult = DispatchFromConfigResult> = {
channel: string;
accountId?: string;
raw: TRaw;
adapter: ChannelTurnAdapter<TRaw>;
log?: (event: ChannelTurnLogEvent) => void;
};
export type RunResolvedChannelTurnParams<TRaw> = {
channel: string;
accountId?: string;
raw: TRaw;
input:
| NormalizedTurnInput
| ((raw: TRaw) => Promise<NormalizedTurnInput | null> | NormalizedTurnInput | null);
resolveTurn: (
input: NormalizedTurnInput,
eventClass: ChannelEventClass,
preflight: PreflightFacts,
) => Promise<ChannelTurnResolved> | ChannelTurnResolved;
adapter: ChannelTurnAdapter<TRaw, TDispatchResult>;
log?: (event: ChannelTurnLogEvent) => void;
};

View File

@@ -11,9 +11,11 @@ import {
hasFinalChannelTurnDispatch,
hasVisibleChannelTurnDispatch,
resolveChannelTurnDispatchCounts,
runChannelTurn,
runPreparedChannelTurn,
} from "../channels/turn/kernel.js";
import type { PreparedChannelTurn } from "../channels/turn/types.js";
import type { PreparedChannelTurn, RunChannelTurnParams } from "../channels/turn/types.js";
export type { ChannelTurnRecordOptions } from "../channels/turn/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { createChannelReplyPipeline } from "./channel-reply-pipeline.js";
import { createNormalizedOutboundDeliverer, type OutboundReplyPayload } from "./reply-payload.js";
@@ -33,6 +35,13 @@ export async function runPreparedInboundReplyTurn<TDispatchResult>(
return await runPreparedChannelTurn(params);
}
/** Run a channel turn through shared ingest, record, dispatch, and finalize ordering. */
export async function runInboundReplyTurn<TRaw, TDispatchResult = DispatchFromConfigResult>(
params: RunChannelTurnParams<TRaw, TDispatchResult>,
) {
return await runChannelTurn(params);
}
export {
hasFinalChannelTurnDispatch as hasFinalInboundReplyDispatch,
hasVisibleChannelTurnDispatch as hasVisibleInboundReplyDispatch,

View File

@@ -78,40 +78,64 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
createTaskFlowSessionMock,
) as unknown as PluginRuntime["tasks"]["managedFlows"]["fromToolContext"],
};
const dispatchAssembledChannelTurnMock = vi.fn(
async (params: Parameters<PluginRuntime["channel"]["turn"]["dispatchAssembled"]>[0]) => {
await params.recordInboundSession({
storePath: params.storePath,
sessionKey: params.ctxPayload.SessionKey ?? params.routeSessionKey,
ctx: params.ctxPayload,
groupResolution: params.record?.groupResolution,
createIfMissing: params.record?.createIfMissing,
updateLastRoute: params.record?.updateLastRoute,
onRecordError: params.record?.onRecordError ?? (() => undefined),
trackSessionMetaTask: params.record?.trackSessionMetaTask,
});
const dispatchResult = await params.dispatchReplyWithBufferedBlockDispatcher({
ctx: params.ctxPayload,
cfg: params.cfg,
const dispatchAssembledChannelTurnMock = vi.fn(async (params: Record<string, unknown>) => {
const ctxPayload = params.ctxPayload as Record<string, unknown>;
const record = params.record as
| Parameters<PluginRuntime["channel"]["turn"]["runPrepared"]>[0]["record"]
| undefined;
const recordInboundSession = params.recordInboundSession as Parameters<
PluginRuntime["channel"]["turn"]["runPrepared"]
>[0]["recordInboundSession"];
const routeSessionKey = params.routeSessionKey as string;
const storePath = params.storePath as string;
const delivery = params.delivery as {
deliver: (payload: unknown, info: unknown) => Promise<unknown>;
onError?: (err: unknown, info: { kind: string }) => void;
};
const ctxSessionKey = ctxPayload.SessionKey;
const sessionKey = typeof ctxSessionKey === "string" ? ctxSessionKey : routeSessionKey;
const dispatchReplyWithBufferedBlockDispatcher =
params.dispatchReplyWithBufferedBlockDispatcher as (params: {
ctx: unknown;
cfg: unknown;
dispatcherOptions: {
...params.dispatcherOptions,
deliver: async (payload, info) => {
await params.delivery.deliver(payload, info);
},
onError: params.delivery.onError,
deliver: (payload: unknown, info: unknown) => Promise<void>;
onError?: (err: unknown, info: { kind: string }) => void;
};
replyOptions?: unknown;
replyResolver?: unknown;
}) => Promise<unknown>;
await recordInboundSession({
storePath,
sessionKey,
ctx: ctxPayload,
groupResolution: record?.groupResolution,
createIfMissing: record?.createIfMissing,
updateLastRoute: record?.updateLastRoute,
onRecordError: record?.onRecordError ?? (() => undefined),
trackSessionMetaTask: record?.trackSessionMetaTask,
});
const dispatchResult = await dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: params.cfg,
dispatcherOptions: {
...(params.dispatcherOptions as Record<string, unknown> | undefined),
deliver: async (payload, info) => {
await delivery.deliver(payload, info);
},
replyOptions: params.replyOptions,
replyResolver: params.replyResolver,
});
return {
admission: params.admission ?? { kind: "dispatch" as const },
dispatched: true,
ctxPayload: params.ctxPayload,
routeSessionKey: params.routeSessionKey,
dispatchResult,
};
},
) as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"];
onError: delivery.onError,
},
replyOptions: params.replyOptions,
replyResolver: params.replyResolver,
});
return {
admission: params.admission ?? { kind: "dispatch" },
dispatched: true,
ctxPayload,
routeSessionKey,
dispatchResult,
};
});
const runPreparedChannelTurnMock = vi.fn(
async (params: Parameters<PluginRuntime["channel"]["turn"]["runPrepared"]>[0]) => {
try {
@@ -180,18 +204,24 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
const resolved = await params.adapter.resolveTurn(input, eventClass, preflight ?? {});
const admission =
resolved.admission ?? preflight.admission ?? ({ kind: "dispatch" } as const);
const dispatchResult = await dispatchAssembledChannelTurnMock({
...resolved,
admission,
delivery:
admission.kind === "observeOnly"
? { deliver: async () => ({ visibleReplySent: false }) }
: resolved.delivery,
});
const dispatchResult =
"runDispatch" in resolved
? await runPreparedChannelTurnMock({
...resolved,
admission,
})
: await dispatchAssembledChannelTurnMock({
...resolved,
admission,
delivery:
admission.kind === "observeOnly"
? { deliver: async () => ({ visibleReplySent: false }) }
: resolved.delivery,
});
const result = {
...dispatchResult,
admission,
};
} as Parameters<NonNullable<typeof params.adapter.onFinalize>>[0];
await params.adapter.onFinalize?.(result);
return result;
},
@@ -233,28 +263,6 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
...params.extra,
}) as ReturnType<PluginRuntime["channel"]["turn"]["buildContext"]>,
) as unknown as PluginRuntime["channel"]["turn"]["buildContext"];
const runResolvedChannelTurnMock = vi.fn(
async (params: Parameters<PluginRuntime["channel"]["turn"]["runResolved"]>[0]) => {
const input =
typeof params.input === "function" ? await params.input(params.raw) : params.input;
if (!input) {
return {
admission: { kind: "drop" as const, reason: "ingest-null" },
dispatched: false,
};
}
return await runChannelTurnMock({
channel: params.channel,
accountId: params.accountId,
raw: params.raw,
log: params.log,
adapter: {
ingest: () => input,
resolveTurn: params.resolveTurn,
},
});
},
) as unknown as PluginRuntime["channel"]["turn"]["runResolved"];
const base: PluginRuntime = {
version: "1.0.0-test",
config: {
@@ -609,10 +617,8 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
},
turn: {
run: runChannelTurnMock,
runResolved: runResolvedChannelTurnMock,
buildContext: buildChannelTurnContextMock,
runPrepared: runPreparedChannelTurnMock,
dispatchAssembled: dispatchAssembledChannelTurnMock,
},
threadBindings: {
setIdleTimeoutBySessionKey:

View File

@@ -52,10 +52,8 @@ import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load
import { recordInboundSession } from "../../channels/session.js";
import {
buildChannelTurnContext,
dispatchAssembledChannelTurn,
runChannelTurn,
runPreparedChannelTurn,
runResolvedChannelTurn,
} from "../../channels/turn/kernel.js";
import {
resolveChannelGroupPolicy,
@@ -174,10 +172,8 @@ export function createRuntimeChannel(): PluginRuntime["channel"] {
},
turn: {
run: runChannelTurn,
runResolved: runResolvedChannelTurn,
buildContext: buildChannelTurnContext,
runPrepared: runPreparedChannelTurn,
dispatchAssembled: dispatchAssembledChannelTurn,
},
threadBindings: {
setIdleTimeoutBySessionKey: ({ channelId, targetSessionKey, accountId, idleTimeoutMs }) =>

View File

@@ -153,12 +153,8 @@ export type PluginRuntimeChannel = {
};
turn: {
run: typeof import("../../channels/turn/kernel.js").runChannelTurn;
/** @deprecated Prefer `run(...)`. */
runResolved: typeof import("../../channels/turn/kernel.js").runResolvedChannelTurn;
buildContext: typeof import("../../channels/turn/kernel.js").buildChannelTurnContext;
runPrepared: typeof import("../../channels/turn/kernel.js").runPreparedChannelTurn;
/** @deprecated Prefer `run(...)` or `runPrepared(...)`. */
dispatchAssembled: typeof import("../../channels/turn/kernel.js").dispatchAssembledChannelTurn;
};
threadBindings: {
setIdleTimeoutBySessionKey: (params: {