fix(team-mode): resolve stream target before mutating part-text dedupe state

team-session-streamer was mutating partTextByKey inside extractSegment
BEFORE checking if the stream target could be resolved. If a member
emitted output in the launch-before-mapping window (session created,
events arriving, but runtimeState.members[.].sessionId not yet written),
the first event recorded its cumulative text into the dedupe map but
was silently dropped at the write step because no target existed.
The next cumulative update would then diff against that recorded
prefix and emit only the delta, permanently losing the earlier text.

Reproduction (per Oracle round 8):
  event 1: message.part.updated text='early', mapping not ready
           -> partTextByKey['key']='early', write skipped
  event 2: message.part.updated text='early late', mapping ready
           -> previousText='early', appended=' late'
           -> FIFO received only ' late', 'early' lost forever

Fix: swap the order. resolveStreamTarget first; if no target, return
without touching partTextByKey. Once mapping appears, the next
cumulative update diffs against '' and emits the full current text.

Apply the same ordering to message.part.delta path (orphan/non-text
short-circuits still run before target resolution since they carry
no state mutation).

Regression test in hook.test.ts:
  'does not lose text when runtime state mapping is not yet available'
  -- listActiveTeams returns [] for event 1, then [team] for event 2
  -- assert FIFO write sees full 'early late' cumulative, not just
     ' late' delta

Oracle review blocker: pre-mapping startup gap losing earliest text.
This commit is contained in:
YeonGyu-Kim
2026-04-18 07:09:47 +09:00
parent 17bd24c9ca
commit fa20c2b47c
2 changed files with 64 additions and 10 deletions

View File

@@ -258,4 +258,57 @@ describe("createTeamSessionStreamer", () => {
expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(1)
expect(writeTeamSessionFifoMock).toHaveBeenCalledWith("/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "orphan full")
})
test("does not lose text when runtime state mapping is not yet available (pre-mapping race)", async () => {
// given
let mappingReady = false
const listActiveTeams = mock(async () => mappingReady ? [{
teamRunId: "11111111-1111-4111-8111-111111111111",
teamName: "team-alpha",
status: "active" as const,
memberCount: 1,
scope: "project" as const,
}] : [])
const loadRuntimeState = mock(async () => createRuntimeState())
const config = TeamModeConfigSchema.parse({ enabled: true, tmux_visualization: true })
const streamer = createTeamSessionStreamer(config, { listActiveTeams, loadRuntimeState })
// when
await streamer.event({
event: {
type: "message.part.updated",
properties: {
part: {
id: "part-race",
sessionID: "member-session",
messageID: "message-z",
type: "text",
text: "early",
},
},
},
})
expect(writeTeamSessionFifoMock).not.toHaveBeenCalled()
mappingReady = true
await streamer.event({
event: {
type: "message.part.updated",
properties: {
part: {
id: "part-race",
sessionID: "member-session",
messageID: "message-z",
type: "text",
text: "early late",
},
},
},
})
// then
expect(writeTeamSessionFifoMock).toHaveBeenCalledTimes(1)
expect(writeTeamSessionFifoMock).toHaveBeenCalledWith("/tmp/omo-team/11111111-1111-4111-8111-111111111111/member-a.fifo", "early late")
})
})

View File

@@ -40,7 +40,7 @@ function extractCumulativeText(part: Part): string | undefined {
return undefined
}
function extractUpdateSegment(
function consumeUpdateSegment(
event: EventMessagePartUpdated,
partTextByKey: Map<string, string>,
): { sessionID: string; text: string } | undefined {
@@ -59,7 +59,7 @@ function extractUpdateSegment(
return { sessionID: part.sessionID, text: appendedText }
}
function extractDeltaSegment(
function consumeDeltaSegment(
event: MessagePartDeltaEvent,
partTextByKey: Map<string, string>,
): { sessionID: string; text: string } | undefined {
@@ -117,10 +117,7 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te
return undefined
}
async function writeSegment(sessionID: string, text: string): Promise<void> {
const target = await resolveStreamTarget(sessionID)
if (!target) return
async function writeSegment(target: TeamSessionStreamTarget, sessionID: string, text: string): Promise<void> {
try {
await writeTeamSessionFifo(target.fifoPath, text)
} catch (error) {
@@ -168,17 +165,21 @@ export function createTeamSessionStreamer(config: TeamModeConfig, stateStore: Te
}
if (event.type === "message.part.delta") {
const segment = extractDeltaSegment(event, partTextByKey)
const target = await resolveStreamTarget(event.properties.sessionID)
if (!target) return
const segment = consumeDeltaSegment(event, partTextByKey)
if (!segment) return
await writeSegment(segment.sessionID, segment.text)
await writeSegment(target, segment.sessionID, segment.text)
return
}
if (event.type !== "message.part.updated") return
const segment = extractUpdateSegment(event, partTextByKey)
const target = await resolveStreamTarget(event.properties.part.sessionID)
if (!target) return
const segment = consumeUpdateSegment(event, partTextByKey)
if (!segment) return
await writeSegment(segment.sessionID, segment.text)
await writeSegment(target, segment.sessionID, segment.text)
},
}
}