fix(team-mode): invalidate in-flight drains on teardown and lifecycle events

Oracle R14 found two teardown races:

1. dispose() did not cancel an already-running scheduler tick. stop()
   only cleared the waiting timer. If run() had already started, its
   finally block re-armed the timer even after dispose, so background
   writes continued after the streamer was supposed to be gone.

2. drainAndWrite pulled the buffer into a local `drained` array and
   kept writing those items across awaits. If session.deleted,
   session.error, or message.part.removed fired mid-await, the
   lifecycle handler only cleared the buffer/state maps. The stale
   items were still written to the FIFO, and partTextByKey.set /
   pendingDeltaBuffer.enqueue restored the cleared dedupe state and
   re-populated cleared buffers.

Scheduler hardening: add a stopped flag that stop() sets and run()
re-checks at entry, between iterations, and in finally. Once stopped,
the scheduler never reschedules or runs a new tick.

Streaming invalidation via per-session/per-part generation counters
(createStreamGeneration). Lifecycle events bump the counter. drainAndWrite
captures per-part generations up front before any await, then after each
write checks the captured generations against current. On mismatch:
session-level mismatch aborts the whole drain; part-level mismatch skips
that item without mutating partTextByKey or re-enqueueing. The re-enqueue
loop on write failure also skips parts whose generations changed.

Split writePendingEvent into attemptWrite (pure: write attempt + preview)
and drainAndWrite (mutation: state + buffer) so the caller can do the
generation check between the write and the mutation.

New regression tests:
- scheduler aborts in-flight run and does not reschedule when stop is
  called during drain
- session.deleted during an in-flight write does not restore cleared
  dedupe state - observable because a later cumulative update would
  repeat the full text instead of just the diff
- message.part.removed during an in-flight write does not restore
  cleared dedupe state for the removed partID
This commit is contained in:
YeonGyu-Kim
2026-04-18 08:40:47 +09:00
parent 9e23e62383
commit b0db2379e8
5 changed files with 254 additions and 12 deletions

View File

@@ -651,4 +651,151 @@ describe("createTeamSessionStreamer", () => {
expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(1)
expect(writeTeamSessionFifoMock).toHaveBeenCalledWith("/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "fresh")
})
test("does not restore cleared dedupe state when session.deleted arrives during an in-flight write", async () => {
// given
const listActiveTeams = mock(async () => [{
teamRunId: "11111111-1111-4111-8111-111111111111",
teamName: "team-alpha",
status: "active" as const,
memberCount: 1,
scope: "project" as const,
}])
const loadRuntimeState = mock(async () => createRuntimeState())
let releaseSecondWrite: (() => void) | undefined
const secondWriteGate = new Promise<void>((resolve) => {
releaseSecondWrite = resolve
})
let callCount = 0
writeTeamSessionFifoMock.mockImplementation(async () => {
callCount += 1
if (callCount === 2) await secondWriteGate
})
const config = TeamModeConfigSchema.parse({ enabled: true, tmux_visualization: true })
const streamer = createTeamSessionStreamer(config, { listActiveTeams, loadRuntimeState })
// when
await streamer.event({
event: {
type: "message.part.updated",
properties: {
part: { id: "part-a", sessionID: "member-session", messageID: "msg-a", type: "text", text: "A\n" },
},
},
})
const gatedEventPromise = streamer.event({
event: {
type: "message.part.updated",
properties: {
part: { id: "part-a", sessionID: "member-session", messageID: "msg-a", type: "text", text: "A\nB\n" },
},
},
})
await new Promise((resolvePromise) => setTimeout(resolvePromise, 10))
await streamer.event({
event: {
type: "session.deleted",
properties: { info: { id: "member-session" } as never },
},
})
releaseSecondWrite?.()
await gatedEventPromise
await streamer.event({
event: {
type: "message.part.updated",
properties: {
part: { id: "part-a", sessionID: "member-session", messageID: "msg-a", type: "text", text: "A\nB\nC\n" },
},
},
})
streamer.dispose()
// then
expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(3)
expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(1, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "A\n")
expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(2, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "B\n")
expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(3, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "A\nB\nC\n")
})
test("skips state mutation when message.part.removed arrives for the drained partID during write", async () => {
// given
const listActiveTeams = mock(async () => [{
teamRunId: "11111111-1111-4111-8111-111111111111",
teamName: "team-alpha",
status: "active" as const,
memberCount: 1,
scope: "project" as const,
}])
const loadRuntimeState = mock(async () => createRuntimeState())
let releaseSecondWrite: (() => void) | undefined
const secondWriteGate = new Promise<void>((resolve) => {
releaseSecondWrite = resolve
})
let callCount = 0
writeTeamSessionFifoMock.mockImplementation(async () => {
callCount += 1
if (callCount === 2) await secondWriteGate
})
const config = TeamModeConfigSchema.parse({ enabled: true, tmux_visualization: true })
const streamer = createTeamSessionStreamer(config, { listActiveTeams, loadRuntimeState })
// when
await streamer.event({
event: {
type: "message.part.updated",
properties: {
part: {
id: "part-a",
sessionID: "member-session",
messageID: "msg-a",
type: "text",
text: "first-wave\n",
},
},
},
})
const slowEvent = streamer.event({
event: {
type: "message.part.updated",
properties: {
part: {
id: "part-a",
sessionID: "member-session",
messageID: "msg-a",
type: "text",
text: "first-wave\nin-flight\n",
},
},
},
})
await new Promise((resolvePromise) => setTimeout(resolvePromise, 10))
await streamer.event({
event: {
type: "message.part.removed",
properties: { sessionID: "member-session", messageID: "msg-a", partID: "part-a" },
},
})
releaseSecondWrite?.()
await slowEvent
await streamer.event({
event: {
type: "message.part.updated",
properties: {
part: {
id: "part-a",
sessionID: "member-session",
messageID: "msg-a",
type: "text",
text: "first-wave\nin-flight\n",
},
},
},
})
streamer.dispose()
// then
expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(3)
expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(1, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "first-wave\n")
expect(writeTeamSessionFifoMock).toHaveBeenNthCalledWith(3, "/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "first-wave\nin-flight\n")
})
})

View File

@@ -4,11 +4,12 @@ import type { TeamModeConfig } from "../../config/schema/team-mode"
import { getTeamMemberFifoPath } from "../../features/team-mode/team-layout-tmux/fifo-path"
import * as teamStateStore from "../../features/team-mode/team-state-store"
import { log } from "../../shared/logger"
import { clearSessionPartState, previewPendingEvent } from "./apply-pending-event"
import { clearSessionPartState, type PendingEventPreview, previewPendingEvent } from "./apply-pending-event"
import { type MessagePartDeltaEvent, normalizeDeltaEventForBuffer, normalizeUpdateEventForBuffer } from "./event-normalizer"
import { writeTeamSessionFifo } from "./fifo-writer"
import { createPendingDeltaBuffer, type PendingStreamEvent } from "./pending-delta-buffer"
import { createPendingRetryScheduler } from "./pending-retry-scheduler"
import { createStreamGeneration } from "./stream-generation"
type TeamStateStore = Pick<typeof teamStateStore, "listActiveTeams" | "loadRuntimeState">
@@ -23,6 +24,8 @@ type TeamSessionStreamTarget = {
type HookInput = { event: StreamEvent }
type HookImpl = { event: (input: HookInput) => Promise<void>; dispose: () => void }
type AttemptWriteResult = { written: boolean; preview?: PendingEventPreview }
const DROPPABLE_FIFO_ERROR_CODES = new Set(["ENXIO", "ENOENT", "EPIPE"])
const PENDING_RESOLVE_RETRY_MS = 250
@@ -34,6 +37,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te
const streamTargetsBySession = new Map<string, TeamSessionStreamTarget>()
const partTextByKey = new Map<string, string>()
const pendingDeltaBuffer = createPendingDeltaBuffer()
const generation = createStreamGeneration()
async function resolveStreamTarget(sessionID: string): Promise<TeamSessionStreamTarget | undefined> {
const cachedTarget = streamTargetsBySession.get(sessionID)
@@ -90,26 +94,41 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te
}
}
async function writePendingEvent(
async function attemptWrite(
target: TeamSessionStreamTarget,
sessionID: string,
pending: PendingStreamEvent,
): Promise<boolean> {
): Promise<AttemptWriteResult> {
const preview = previewPendingEvent(pending, sessionID, partTextByKey)
if (!preview) return true
if (!preview) return { written: true }
const written = await writeSegment(target, sessionID, preview.appendedText)
if (written) partTextByKey.set(preview.partKey, preview.nextState)
else pendingDeltaBuffer.enqueue(sessionID, pending)
return written
return { written, preview: written ? preview : undefined }
}
async function drainAndWrite(target: TeamSessionStreamTarget, sessionID: string): Promise<boolean> {
const drainSessionGen = generation.captureSession(sessionID)
const drained = pendingDeltaBuffer.drain(sessionID)
const drainPartGens = new Map<string, number>()
for (const pending of drained) {
if (!drainPartGens.has(pending.partID)) {
drainPartGens.set(pending.partID, generation.capturePart(sessionID, pending.partID))
}
}
for (let i = 0; i < drained.length; i++) {
const written = await writePendingEvent(target, sessionID, drained[i])
if (!written) {
const pending = drained[i]
const startPartGen = drainPartGens.get(pending.partID) ?? 0
const result = await attemptWrite(target, sessionID, pending)
if (!generation.isSessionCurrent(sessionID, drainSessionGen)) return false
if (!generation.isPartCurrent(sessionID, pending.partID, startPartGen)) continue
if (result.preview) partTextByKey.set(result.preview.partKey, result.preview.nextState)
if (!result.written) {
pendingDeltaBuffer.enqueue(sessionID, pending)
for (let j = i + 1; j < drained.length; j++) {
pendingDeltaBuffer.enqueue(sessionID, drained[j])
const remaining = drained[j]
const remainingStart = drainPartGens.get(remaining.partID) ?? 0
if (generation.isPartCurrent(sessionID, remaining.partID, remainingStart)) {
pendingDeltaBuffer.enqueue(sessionID, remaining)
}
}
return false
}
@@ -127,7 +146,11 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te
})
async function handlePendingEvent(sessionID: string, pending: PendingStreamEvent): Promise<void> {
const sessionGenAtStart = generation.captureSession(sessionID)
const partGenAtStart = generation.capturePart(sessionID, pending.partID)
const target = await resolveStreamTarget(sessionID)
if (!generation.isSessionCurrent(sessionID, sessionGenAtStart)) return
if (!generation.isPartCurrent(sessionID, pending.partID, partGenAtStart)) return
if (!target) {
pendingDeltaBuffer.enqueue(sessionID, pending)
retryScheduler.schedule()
@@ -144,6 +167,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te
if (event.type === "session.deleted") {
const sessionID = event.properties.info.id
generation.bumpSession(sessionID)
streamTargetsBySession.delete(sessionID)
pendingDeltaBuffer.clearSession(sessionID)
clearSessionPartState(sessionID, partTextByKey)
@@ -153,6 +177,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te
if (event.type === "message.part.removed") {
const { sessionID, partID } = event.properties
generation.bumpPart(sessionID, partID)
pendingDeltaBuffer.removePart(sessionID, partID)
partTextByKey.delete(`${sessionID}:${partID}`)
if (pendingDeltaBuffer.getPendingSessions().length === 0) retryScheduler.stop()
@@ -162,6 +187,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te
if (event.type === "session.error") {
const sessionID = event.properties.sessionID
if (sessionID) {
generation.bumpSession(sessionID)
streamTargetsBySession.delete(sessionID)
pendingDeltaBuffer.clearSession(sessionID)
clearSessionPartState(sessionID, partTextByKey)

View File

@@ -70,4 +70,35 @@ describe("createPendingRetryScheduler", () => {
expect(pending.size).toBe(0)
scheduler.stop()
})
test("aborts the in-flight run and does not reschedule when stop is called during drain", async () => {
// given
const pending = new Set(["session-a", "session-b", "session-c"])
let release: (() => void) | undefined
const gate = new Promise<void>((resolve) => {
release = resolve
})
const drained: string[] = []
const drainSession = mock(async (sessionID: string) => {
drained.push(sessionID)
if (sessionID === "session-a") await gate
pending.delete(sessionID)
})
const scheduler = createPendingRetryScheduler({
intervalMs: 10,
getPendingSessions: () => Array.from(pending),
drainSession,
})
// when
scheduler.schedule()
await new Promise((resolvePromise) => setTimeout(resolvePromise, 30))
scheduler.stop()
release?.()
await new Promise((resolvePromise) => setTimeout(resolvePromise, 80))
// then
expect(drained).toEqual(["session-a"])
expect(pending.size).toBe(2)
})
})

View File

@@ -13,12 +13,15 @@ export type PendingRetrySchedulerOptions = {
export function createPendingRetryScheduler(options: PendingRetrySchedulerOptions): PendingRetryScheduler {
let timer: ReturnType<typeof setTimeout> | undefined
let stopped = false
async function run(): Promise<void> {
timer = undefined
if (stopped) return
try {
const sessions = options.getPendingSessions()
for (const sessionID of sessions) {
if (stopped) return
try {
await options.drainSession(sessionID)
} catch (error) {
@@ -30,18 +33,19 @@ export function createPendingRetryScheduler(options: PendingRetrySchedulerOption
}
}
} finally {
if (options.getPendingSessions().length > 0) schedule()
if (!stopped && options.getPendingSessions().length > 0) schedule()
}
}
function schedule(): void {
if (timer) return
if (stopped || timer) return
timer = setTimeout(() => {
void run()
}, options.intervalMs)
}
function stop(): void {
stopped = true
if (!timer) return
clearTimeout(timer)
timer = undefined

View File

@@ -0,0 +1,34 @@
export type StreamGeneration = {
bumpSession: (sessionID: string) => void
bumpPart: (sessionID: string, partID: string) => void
captureSession: (sessionID: string) => number
capturePart: (sessionID: string, partID: string) => number
isSessionCurrent: (sessionID: string, generation: number) => boolean
isPartCurrent: (sessionID: string, partID: string, generation: number) => boolean
}
export function createStreamGeneration(): StreamGeneration {
const sessionGen = new Map<string, number>()
const partGen = new Map<string, number>()
function sessionValue(sessionID: string): number {
return sessionGen.get(sessionID) ?? 0
}
function partValue(sessionID: string, partID: string): number {
return partGen.get(`${sessionID}:${partID}`) ?? 0
}
return {
bumpSession: (sessionID) => {
sessionGen.set(sessionID, sessionValue(sessionID) + 1)
},
bumpPart: (sessionID, partID) => {
partGen.set(`${sessionID}:${partID}`, partValue(sessionID, partID) + 1)
},
captureSession: sessionValue,
capturePart: partValue,
isSessionCurrent: (sessionID, generation) => sessionValue(sessionID) === generation,
isPartCurrent: (sessionID, partID, generation) => partValue(sessionID, partID) === generation,
}
}