refactor(channels): share route identity keys

This commit is contained in:
Peter Steinberger
2026-04-27 22:35:15 +01:00
parent 3eec9e4642
commit f368d3b49f
14 changed files with 190 additions and 35 deletions

View File

@@ -1,7 +1,7 @@
import { afterEach, describe, expect, it } from "vitest";
import { importFreshModule } from "../../../test/helpers/import-fresh.js";
import type { MsgContext } from "../templating.js";
import { resetInboundDedupe } from "./inbound-dedupe.js";
import { buildInboundDedupeKey, resetInboundDedupe } from "./inbound-dedupe.js";
const sharedInboundContext: MsgContext = {
Provider: "discord",
@@ -41,6 +41,20 @@ describe("inbound dedupe", () => {
}
});
it("deduplicates inbound messages with equivalent numeric and string thread ids", () => {
expect(
buildInboundDedupeKey({
...sharedInboundContext,
MessageThreadId: 77,
}),
).toBe(
buildInboundDedupeKey({
...sharedInboundContext,
MessageThreadId: "77",
}),
);
});
it("shares claim/release state across distinct module instances", async () => {
const inboundA = await importFreshModule<typeof import("./inbound-dedupe.js")>(
import.meta.url,

View File

@@ -1,3 +1,4 @@
import { channelRouteIdentityKey } from "../../channels/route/ref.js";
import { logVerbose, shouldLogVerbose } from "../../globals.js";
import { resolveGlobalDedupeCache, type DedupeCache } from "../../infra/dedupe.js";
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
@@ -68,11 +69,13 @@ export function buildInboundDedupeKey(ctx: MsgContext): string | null {
}
const sessionScope = resolveInboundDedupeSessionScope(ctx);
const accountId = normalizeOptionalString(ctx.AccountId) ?? "";
const threadId =
ctx.MessageThreadId !== undefined && ctx.MessageThreadId !== null
? String(ctx.MessageThreadId)
: "";
return [provider, accountId, sessionScope, peerId, threadId, messageId].filter(Boolean).join("|");
const routeKey = channelRouteIdentityKey({
channel: provider,
to: peerId,
accountId,
threadId: ctx.MessageThreadId,
});
return JSON.stringify([sessionScope, routeKey, messageId]);
}
export function shouldSkipDuplicateInbound(

View File

@@ -90,6 +90,36 @@ describe("followup queue deduplication", () => {
expect(calls[0]?.prompt).toContain("[Queued messages while agent was busy]");
});
it("deduplicates message ids when numeric and string thread ids share a route", async () => {
const key = `test-dedup-thread-normalized-${Date.now()}`;
const first = enqueueFollowupRun(
key,
createRun({
prompt: "first",
messageId: "same-id",
originatingChannel: "telegram",
originatingTo: "-100123",
originatingThreadId: 42.9,
}),
collectSettings,
);
expect(first).toBe(true);
const second = enqueueFollowupRun(
key,
createRun({
prompt: "second",
messageId: "same-id",
originatingChannel: "telegram",
originatingTo: "-100123",
originatingThreadId: "42",
}),
collectSettings,
);
expect(second).toBe(false);
});
it("deduplicates same message_id after queue drain restarts", async () => {
const key = `test-dedup-after-drain-${Date.now()}`;
const { calls, done, runFollowup } = createFollowupCollector();

View File

@@ -1,3 +1,4 @@
import { channelRouteIdentityKey } from "../../../channels/route/ref.js";
import { resolveGlobalDedupeCache } from "../../../infra/dedupe.js";
import { normalizeOptionalString } from "../../../shared/string-coerce.js";
import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
@@ -16,6 +17,15 @@ const RECENT_QUEUE_MESSAGE_IDS = resolveGlobalDedupeCache(RECENT_QUEUE_MESSAGE_I
maxSize: 10_000,
});
function followupRouteIdentityKey(run: FollowupRun): string {
return channelRouteIdentityKey({
channel: run.originatingChannel,
to: run.originatingTo,
accountId: run.originatingAccountId,
threadId: run.originatingThreadId,
});
}
function buildRecentMessageIdKey(run: FollowupRun, queueKey: string): string | undefined {
const messageId = normalizeOptionalString(run.messageId);
if (!messageId) {
@@ -23,15 +33,7 @@ function buildRecentMessageIdKey(run: FollowupRun, queueKey: string): string | u
}
// Use JSON tuple serialization to avoid delimiter-collision edge cases when
// channel/to/account values contain "|" characters.
return JSON.stringify([
"queue",
queueKey,
run.originatingChannel ?? "",
run.originatingTo ?? "",
run.originatingAccountId ?? "",
run.originatingThreadId == null ? "" : String(run.originatingThreadId),
messageId,
]);
return JSON.stringify(["queue", queueKey, followupRouteIdentityKey(run), messageId]);
}
function isRunAlreadyQueued(
@@ -39,11 +41,8 @@ function isRunAlreadyQueued(
items: FollowupRun[],
allowPromptFallback = false,
): boolean {
const hasSameRouting = (item: FollowupRun) =>
item.originatingChannel === run.originatingChannel &&
item.originatingTo === run.originatingTo &&
item.originatingAccountId === run.originatingAccountId &&
item.originatingThreadId === run.originatingThreadId;
const routeKey = followupRouteIdentityKey(run);
const hasSameRouting = (item: FollowupRun) => followupRouteIdentityKey(item) === routeKey;
const messageId = normalizeOptionalString(run.messageId);
if (messageId) {

View File

@@ -1,5 +1,6 @@
import { describe, expect, it } from "vitest";
import {
channelRouteIdentityKey,
channelRouteKey,
channelRoutesMatchExact,
channelRoutesShareConversation,
@@ -41,6 +42,24 @@ describe("channel route refs", () => {
expect(channelRouteKey(route)).toBe("telegram|-100123||42");
});
it("builds a stable identity key from route-like input", () => {
expect(
channelRouteIdentityKey({
channel: " Telegram ",
to: " -100123 ",
accountId: " Work ",
threadId: 42.9,
}),
).toBe(
channelRouteIdentityKey({
channel: "telegram",
to: "-100123",
accountId: "work",
threadId: "42",
}),
);
});
it("matches exact routes when numeric and string thread ids are equivalent", () => {
expect(
channelRoutesMatchExact({

View File

@@ -36,6 +36,11 @@ export type ChannelRouteRefInput = {
threadSource?: ChannelRouteThreadSource;
};
export type ChannelRouteTargetInput = Pick<
ChannelRouteRefInput,
"channel" | "accountId" | "to" | "rawTo" | "chatType" | "threadId"
>;
export function normalizeRouteThreadId(value: unknown): string | number | undefined {
return normalizeOptionalThreadValue(value);
}
@@ -92,6 +97,22 @@ export function channelRouteThreadId(route?: ChannelRouteRef): string | number |
return route?.thread?.id;
}
export function normalizeChannelRouteTarget(
input?: ChannelRouteTargetInput | null,
): ChannelRouteRef | undefined {
return input ? normalizeChannelRouteRef(input) : undefined;
}
export function channelRouteIdentityKey(input?: ChannelRouteTargetInput | null): string {
const route = normalizeChannelRouteTarget(input);
return JSON.stringify([
route?.channel ?? "",
route?.target?.to ?? "",
route?.accountId ?? "",
stringifyRouteThreadId(route?.thread?.id) ?? "",
]);
}
function threadIdsEqual(left?: string | number, right?: string | number): boolean {
const normalizedLeft = stringifyRouteThreadId(left);
const normalizedRight = stringifyRouteThreadId(right);

View File

@@ -28,4 +28,18 @@ describe("buildChannelApprovalNativeTargetKey", () => {
}),
);
});
it("normalizes numeric thread ids through the shared route key", () => {
expect(
buildChannelApprovalNativeTargetKey({
to: "telegram:-100123",
threadId: 42.9,
}),
).toBe(
buildChannelApprovalNativeTargetKey({
to: " telegram:-100123 ",
threadId: "42",
}),
);
});
});

View File

@@ -1,8 +1,9 @@
import type { ChannelApprovalNativeTarget } from "../channels/plugins/approval-native.types.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { channelRouteIdentityKey } from "../channels/route/ref.js";
export function buildChannelApprovalNativeTargetKey(target: ChannelApprovalNativeTarget): string {
return `${normalizeOptionalString(target.to) ?? ""}\u0000${
target.threadId == null ? "" : (normalizeOptionalString(String(target.threadId)) ?? "")
}`;
return channelRouteIdentityKey({
to: target.to,
threadId: target.threadId,
});
}

View File

@@ -211,7 +211,12 @@ const TARGETS_CFG = makeTargetsCfg([{ channel: "slack", to: "U123" }]);
function createForwarder(params: {
cfg: OpenClawConfig;
deliver?: ReturnType<typeof vi.fn>;
resolveSessionTarget?: () => { channel: string; to: string } | null;
resolveSessionTarget?: () => {
channel: string;
to: string;
accountId?: string;
threadId?: string | number;
} | null;
}) {
const deliver = params.deliver ?? vi.fn().mockResolvedValue([]);
const deps: NonNullable<Parameters<typeof createExecApprovalForwarder>[0]> = {
@@ -362,6 +367,32 @@ describe("exec approval forwarder", () => {
expect(deliver).toHaveBeenCalledTimes(2);
});
it("deduplicates session and explicit approval targets through normalized route identity", async () => {
vi.useFakeTimers();
const cfg = {
approvals: {
exec: {
enabled: true,
mode: "both",
targets: [{ channel: "telegram", to: "-100999", accountId: "bot", threadId: "77" }],
},
},
} as OpenClawConfig;
const { deliver, forwarder } = createForwarder({
cfg,
resolveSessionTarget: () => ({
channel: "telegram",
to: "-100999",
accountId: "bot",
threadId: 77,
}),
});
await expect(forwarder.handleRequested(baseRequest)).resolves.toBe(true);
expect(deliver).toHaveBeenCalledTimes(1);
});
it("calls outbound beforeDeliverPayload before exec approval delivery", async () => {
const beforeDeliverPayload = vi.fn();
setActivePluginRegistry(

View File

@@ -3,6 +3,7 @@ import {
getLoadedChannelPlugin,
resolveChannelApprovalAdapter,
} from "../channels/plugins/index.js";
import { channelRouteIdentityKey } from "../channels/route/ref.js";
import { getRuntimeConfig } from "../config/config.js";
import type {
ExecApprovalForwardingConfig,
@@ -169,9 +170,12 @@ function shouldForwardRoute(params: {
function buildTargetKey(target: ExecApprovalForwardTarget): string {
const channel = normalizeMessageChannel(target.channel) ?? target.channel;
const accountId = target.accountId ?? "";
const threadId = target.threadId ?? "";
return [channel, target.to, accountId, threadId].join(":");
return channelRouteIdentityKey({
channel,
to: target.to,
accountId: target.accountId,
threadId: target.threadId,
});
}
function buildSyntheticApprovalRequest(routeRequest: ApprovalRouteRequest): ExecApprovalRequest {

View File

@@ -13,6 +13,11 @@ describe("resolveConversationIdFromTargets", () => {
params: { threadId: 123456789, targets: ["channel:987654321"] },
expected: "123456789",
},
{
name: "truncates decimal numeric thread ids",
params: { threadId: 42.9, targets: ["channel:987654321"] },
expected: "42",
},
{
name: "falls back when the thread id is blank",
params: { threadId: " ", targets: ["channel:987654321"] },

View File

@@ -1,3 +1,4 @@
import { stringifyRouteThreadId } from "../../channels/route/ref.js";
import {
normalizeLowercaseStringOrEmpty,
normalizeOptionalString,
@@ -16,8 +17,7 @@ export function resolveConversationIdFromTargets(params: {
threadId?: string | number;
targets: Array<string | undefined | null>;
}): string | undefined {
const threadId =
params.threadId != null ? normalizeOptionalString(String(params.threadId)) : undefined;
const threadId = stringifyRouteThreadId(params.threadId);
if (threadId) {
return threadId;
}

View File

@@ -120,6 +120,23 @@ describe("system events (session routing)", () => {
expect(peekSystemEvents(key)).toEqual(["second"]);
});
it("matches consumed delivery contexts through normalized route identity", () => {
const key = "agent:main:test-consume-route-context";
enqueueSystemEvent("first", {
sessionKey: key,
deliveryContext: {
channel: "telegram",
to: "-100123",
threadId: 42.9,
},
});
const inspected = peekSystemEventEntries(key);
inspected[0].deliveryContext!.threadId = "42";
expect(consumeSystemEventEntries(key, inspected).map((entry) => entry.text)).toEqual(["first"]);
expect(peekSystemEvents(key)).toEqual([]);
});
it("resolves the newest effective delivery context from queued events", () => {
const key = "agent:main:test-delivery-context";
enqueueSystemEvent("Restarted", {

View File

@@ -2,6 +2,7 @@
// prefixed to the next prompt. We intentionally avoid persistence to keep
// events ephemeral. Events are session-scoped and require an explicit key.
import { channelRouteIdentityKey } from "../channels/route/ref.js";
import { resolveGlobalMap } from "../shared/global-singleton.js";
import {
normalizeOptionalLowercaseString,
@@ -135,11 +136,7 @@ function areDeliveryContextsEqual(left?: DeliveryContext, right?: DeliveryContex
if (!left || !right) {
return false;
}
return (
(left.channel ?? undefined) === (right.channel ?? undefined) &&
(left.to ?? undefined) === (right.to ?? undefined) &&
(left.threadId ?? undefined) === (right.threadId ?? undefined)
);
return channelRouteIdentityKey(left) === channelRouteIdentityKey(right);
}
function areSystemEventsEqual(left: SystemEvent, right: SystemEvent): boolean {