refactor(line): share replay dedupe guard

This commit is contained in:
Vincent Koc
2026-04-13 15:04:07 +01:00
parent 028434a00f
commit 5207d081d4

View File

@@ -18,6 +18,7 @@ import {
upsertChannelPairingRequest,
} from "openclaw/plugin-sdk/conversation-runtime";
import { evaluateMatchedGroupAccessForPolicy } from "openclaw/plugin-sdk/group-access";
import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
import {
DEFAULT_GROUP_HISTORY_LIMIT,
clearHistoryEntriesIfEnabled,
@@ -84,40 +85,13 @@ export interface LineHandlerContext {
const LINE_WEBHOOK_REPLAY_WINDOW_MS = 10 * 60 * 1000;
const LINE_WEBHOOK_REPLAY_MAX_ENTRIES = 4096;
const LINE_WEBHOOK_REPLAY_PRUNE_INTERVAL_MS = 1000;
export type LineWebhookReplayCache = {
seenEvents: Map<string, number>;
inFlightEvents: Map<string, Promise<void>>;
lastPruneAtMs: number;
};
export type LineWebhookReplayCache = ClaimableDedupe;
export function createLineWebhookReplayCache(): LineWebhookReplayCache {
return {
seenEvents: new Map<string, number>(),
inFlightEvents: new Map<string, Promise<void>>(),
lastPruneAtMs: 0,
};
}
function pruneLineWebhookReplayCache(cache: LineWebhookReplayCache, nowMs: number): void {
const minSeenAt = nowMs - LINE_WEBHOOK_REPLAY_WINDOW_MS;
for (const [key, seenAt] of cache.seenEvents) {
if (seenAt < minSeenAt) {
cache.seenEvents.delete(key);
}
}
if (cache.seenEvents.size > LINE_WEBHOOK_REPLAY_MAX_ENTRIES) {
const deleteCount = cache.seenEvents.size - LINE_WEBHOOK_REPLAY_MAX_ENTRIES;
let deleted = 0;
for (const key of cache.seenEvents.keys()) {
if (deleted >= deleteCount) {
break;
}
cache.seenEvents.delete(key);
deleted += 1;
}
}
return createClaimableDedupe({
ttlMs: LINE_WEBHOOK_REPLAY_WINDOW_MS,
memoryMaxSize: LINE_WEBHOOK_REPLAY_MAX_ENTRIES,
});
}
function buildLineWebhookReplayKey(
@@ -155,16 +129,9 @@ function buildLineWebhookReplayKey(
type LineReplayCandidate = {
key: string;
eventId: string;
seenAtMs: number;
cache: LineWebhookReplayCache;
};
type LineInFlightReplayResult = {
promise: Promise<void>;
resolve: () => void;
reject: (err: unknown) => void;
};
function getLineReplayCandidate(
event: WebhookEvent,
context: LineHandlerContext,
@@ -174,51 +141,22 @@ function getLineReplayCandidate(
if (!replay || !cache) {
return null;
}
const nowMs = Date.now();
if (
nowMs - cache.lastPruneAtMs >= LINE_WEBHOOK_REPLAY_PRUNE_INTERVAL_MS ||
cache.seenEvents.size >= LINE_WEBHOOK_REPLAY_MAX_ENTRIES
) {
pruneLineWebhookReplayCache(cache, nowMs);
cache.lastPruneAtMs = nowMs;
}
return { key: replay.key, eventId: replay.eventId, seenAtMs: nowMs, cache };
return { key: replay.key, eventId: replay.eventId, cache };
}
function shouldSkipLineReplayEvent(
async function claimLineReplayEvent(
candidate: LineReplayCandidate,
): { skip: true; inFlightResult?: Promise<void> } | { skip: false } {
const inFlightResult = candidate.cache.inFlightEvents.get(candidate.key);
if (inFlightResult) {
): Promise<{ skip: true; inFlightResult?: Promise<void> } | { skip: false }> {
const claim = await candidate.cache.claim(candidate.key);
if (claim.kind === "claimed") {
return { skip: false };
}
if (claim.kind === "inflight") {
logVerbose(`line: skipped in-flight replayed webhook event ${candidate.eventId}`);
return { skip: true, inFlightResult };
return { skip: true, inFlightResult: claim.pending.then(() => undefined) };
}
if (candidate.cache.seenEvents.has(candidate.key)) {
logVerbose(`line: skipped replayed webhook event ${candidate.eventId}`);
return { skip: true };
}
return { skip: false };
}
function markLineReplayEventInFlight(candidate: LineReplayCandidate): LineInFlightReplayResult {
let resolve!: () => void;
let reject!: (err: unknown) => void;
const promise = new Promise<void>((resolvePromise, rejectPromise) => {
resolve = resolvePromise;
reject = rejectPromise;
});
void promise.catch(() => {});
candidate.cache.inFlightEvents.set(candidate.key, promise);
return { promise, resolve, reject };
}
function clearLineReplayEventInFlight(candidate: LineReplayCandidate): void {
candidate.cache.inFlightEvents.delete(candidate.key);
}
function rememberLineReplayEvent(candidate: LineReplayCandidate): void {
candidate.cache.seenEvents.set(candidate.key, candidate.seenAtMs);
logVerbose(`line: skipped replayed webhook event ${candidate.eventId}`);
return { skip: true };
}
function resolveLineGroupConfig(params: {
@@ -639,7 +577,7 @@ export async function handleLineWebhookEvents(
let firstError: unknown;
for (const event of events) {
const replayCandidate = getLineReplayCandidate(event, context);
const replaySkip = replayCandidate ? shouldSkipLineReplayEvent(replayCandidate) : null;
const replaySkip = replayCandidate ? await claimLineReplayEvent(replayCandidate) : null;
if (replaySkip?.skip) {
if (replaySkip.inFlightResult) {
try {
@@ -651,9 +589,6 @@ export async function handleLineWebhookEvents(
}
continue;
}
const inFlightReservation = replayCandidate
? markLineReplayEventInFlight(replayCandidate)
: null;
try {
switch (event.type) {
case "message":
@@ -678,14 +613,11 @@ export async function handleLineWebhookEvents(
logVerbose(`line: unhandled event type: ${(event as WebhookEvent).type}`);
}
if (replayCandidate) {
rememberLineReplayEvent(replayCandidate);
inFlightReservation?.resolve();
clearLineReplayEventInFlight(replayCandidate);
await replayCandidate.cache.commit(replayCandidate.key);
}
} catch (err) {
if (replayCandidate) {
inFlightReservation?.reject(err);
clearLineReplayEventInFlight(replayCandidate);
replayCandidate.cache.release(replayCandidate.key, { error: err });
}
context.runtime.error?.(danger(`line: event handler failed: ${String(err)}`));
firstError ??= err;