From 5207d081d459fe9cd797955ef3e2d139e3d72a93 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 15:04:07 +0100 Subject: [PATCH] refactor(line): share replay dedupe guard --- extensions/line/src/bot-handlers.ts | 108 ++++++---------------------- 1 file changed, 20 insertions(+), 88 deletions(-) diff --git a/extensions/line/src/bot-handlers.ts b/extensions/line/src/bot-handlers.ts index 704c519f1fb..98917868ab0 100644 --- a/extensions/line/src/bot-handlers.ts +++ b/extensions/line/src/bot-handlers.ts @@ -18,6 +18,7 @@ import { upsertChannelPairingRequest, } from "openclaw/plugin-sdk/conversation-runtime"; import { evaluateMatchedGroupAccessForPolicy } from "openclaw/plugin-sdk/group-access"; +import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; import { DEFAULT_GROUP_HISTORY_LIMIT, clearHistoryEntriesIfEnabled, @@ -84,40 +85,13 @@ export interface LineHandlerContext { const LINE_WEBHOOK_REPLAY_WINDOW_MS = 10 * 60 * 1000; const LINE_WEBHOOK_REPLAY_MAX_ENTRIES = 4096; -const LINE_WEBHOOK_REPLAY_PRUNE_INTERVAL_MS = 1000; -export type LineWebhookReplayCache = { - seenEvents: Map; - inFlightEvents: Map>; - lastPruneAtMs: number; -}; +export type LineWebhookReplayCache = ClaimableDedupe; export function createLineWebhookReplayCache(): LineWebhookReplayCache { - return { - seenEvents: new Map(), - inFlightEvents: new Map>(), - lastPruneAtMs: 0, - }; -} - -function pruneLineWebhookReplayCache(cache: LineWebhookReplayCache, nowMs: number): void { - const minSeenAt = nowMs - LINE_WEBHOOK_REPLAY_WINDOW_MS; - for (const [key, seenAt] of cache.seenEvents) { - if (seenAt < minSeenAt) { - cache.seenEvents.delete(key); - } - } - - if (cache.seenEvents.size > LINE_WEBHOOK_REPLAY_MAX_ENTRIES) { - const deleteCount = cache.seenEvents.size - LINE_WEBHOOK_REPLAY_MAX_ENTRIES; - let deleted = 0; - for (const key of cache.seenEvents.keys()) { - if (deleted >= deleteCount) { - break; - } - cache.seenEvents.delete(key); - deleted += 1; - } - } + return createClaimableDedupe({ + ttlMs: LINE_WEBHOOK_REPLAY_WINDOW_MS, + memoryMaxSize: LINE_WEBHOOK_REPLAY_MAX_ENTRIES, + }); } function buildLineWebhookReplayKey( @@ -155,16 +129,9 @@ function buildLineWebhookReplayKey( type LineReplayCandidate = { key: string; eventId: string; - seenAtMs: number; cache: LineWebhookReplayCache; }; -type LineInFlightReplayResult = { - promise: Promise; - resolve: () => void; - reject: (err: unknown) => void; -}; - function getLineReplayCandidate( event: WebhookEvent, context: LineHandlerContext, @@ -174,51 +141,22 @@ function getLineReplayCandidate( if (!replay || !cache) { return null; } - - const nowMs = Date.now(); - if ( - nowMs - cache.lastPruneAtMs >= LINE_WEBHOOK_REPLAY_PRUNE_INTERVAL_MS || - cache.seenEvents.size >= LINE_WEBHOOK_REPLAY_MAX_ENTRIES - ) { - pruneLineWebhookReplayCache(cache, nowMs); - cache.lastPruneAtMs = nowMs; - } - return { key: replay.key, eventId: replay.eventId, seenAtMs: nowMs, cache }; + return { key: replay.key, eventId: replay.eventId, cache }; } -function shouldSkipLineReplayEvent( +async function claimLineReplayEvent( candidate: LineReplayCandidate, -): { skip: true; inFlightResult?: Promise } | { skip: false } { - const inFlightResult = candidate.cache.inFlightEvents.get(candidate.key); - if (inFlightResult) { +): Promise<{ skip: true; inFlightResult?: Promise } | { skip: false }> { + const claim = await candidate.cache.claim(candidate.key); + if (claim.kind === "claimed") { + return { skip: false }; + } + if (claim.kind === "inflight") { logVerbose(`line: skipped in-flight replayed webhook event ${candidate.eventId}`); - return { skip: true, inFlightResult }; + return { skip: true, inFlightResult: claim.pending.then(() => undefined) }; } - if (candidate.cache.seenEvents.has(candidate.key)) { - logVerbose(`line: skipped replayed webhook event ${candidate.eventId}`); - return { skip: true }; - } - return { skip: false }; -} - -function markLineReplayEventInFlight(candidate: LineReplayCandidate): LineInFlightReplayResult { - let resolve!: () => void; - let reject!: (err: unknown) => void; - const promise = new Promise((resolvePromise, rejectPromise) => { - resolve = resolvePromise; - reject = rejectPromise; - }); - void promise.catch(() => {}); - candidate.cache.inFlightEvents.set(candidate.key, promise); - return { promise, resolve, reject }; -} - -function clearLineReplayEventInFlight(candidate: LineReplayCandidate): void { - candidate.cache.inFlightEvents.delete(candidate.key); -} - -function rememberLineReplayEvent(candidate: LineReplayCandidate): void { - candidate.cache.seenEvents.set(candidate.key, candidate.seenAtMs); + logVerbose(`line: skipped replayed webhook event ${candidate.eventId}`); + return { skip: true }; } function resolveLineGroupConfig(params: { @@ -639,7 +577,7 @@ export async function handleLineWebhookEvents( let firstError: unknown; for (const event of events) { const replayCandidate = getLineReplayCandidate(event, context); - const replaySkip = replayCandidate ? shouldSkipLineReplayEvent(replayCandidate) : null; + const replaySkip = replayCandidate ? await claimLineReplayEvent(replayCandidate) : null; if (replaySkip?.skip) { if (replaySkip.inFlightResult) { try { @@ -651,9 +589,6 @@ export async function handleLineWebhookEvents( } continue; } - const inFlightReservation = replayCandidate - ? markLineReplayEventInFlight(replayCandidate) - : null; try { switch (event.type) { case "message": @@ -678,14 +613,11 @@ export async function handleLineWebhookEvents( logVerbose(`line: unhandled event type: ${(event as WebhookEvent).type}`); } if (replayCandidate) { - rememberLineReplayEvent(replayCandidate); - inFlightReservation?.resolve(); - clearLineReplayEventInFlight(replayCandidate); + await replayCandidate.cache.commit(replayCandidate.key); } } catch (err) { if (replayCandidate) { - inFlightReservation?.reject(err); - clearLineReplayEventInFlight(replayCandidate); + replayCandidate.cache.release(replayCandidate.key, { error: err }); } context.runtime.error?.(danger(`line: event handler failed: ${String(err)}`)); firstError ??= err;