fix: preserve text part timing in session processor (#21691)

This commit is contained in:
Kit Langton
2026-04-09 10:05:14 -04:00
committed by GitHub
parent c29392d085
commit 58a99916bb
2 changed files with 92 additions and 2 deletions

View File

@@ -352,7 +352,10 @@ export namespace SessionProcessor {
},
{ text: ctx.currentText.text },
)).text
ctx.currentText.time = { start: Date.now(), end: Date.now() }
{
const end = Date.now()
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
}
if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
yield* session.updatePart(ctx.currentText)
ctx.currentText = undefined

View File

@@ -21,7 +21,7 @@ import { Log } from "../../src/util/log"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
import { provideTmpdirServer } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { reply, TestLLMServer } from "../lib/llm-server"
import { raw, reply, TestLLMServer } from "../lib/llm-server"
Log.init({ print: false })
@@ -218,6 +218,93 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
),
)
it.live("session.processor effect tests preserve text start time", () =>
provideTmpdirServer(
({ dir, llm }) =>
Effect.gen(function* () {
const gate = defer<void>()
const { processors, session, provider } = yield* boot()
yield* llm.push(
raw({
head: [
{
id: "chatcmpl-test",
object: "chat.completion.chunk",
choices: [{ delta: { role: "assistant" } }],
},
{
id: "chatcmpl-test",
object: "chat.completion.chunk",
choices: [{ delta: { content: "hello" } }],
},
],
wait: gate.promise,
tail: [
{
id: "chatcmpl-test",
object: "chat.completion.chunk",
choices: [{ delta: {}, finish_reason: "stop" }],
},
],
}),
)
const chat = yield* session.create({})
const parent = yield* user(chat.id, "hi")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
})
const run = yield* handle
.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
messages: [{ role: "user", content: "hi" }],
tools: {},
})
.pipe(Effect.forkChild)
yield* Effect.promise(async () => {
const stop = Date.now() + 500
while (Date.now() < stop) {
const text = MessageV2.parts(msg.id).find((part): part is MessageV2.TextPart => part.type === "text")
if (text?.time?.start) return
await Bun.sleep(10)
}
throw new Error("timed out waiting for text part")
})
yield* Effect.sleep("20 millis")
gate.resolve()
const exit = yield* Fiber.await(run)
const text = MessageV2.parts(msg.id).find((part): part is MessageV2.TextPart => part.type === "text")
expect(Exit.isSuccess(exit)).toBe(true)
expect(text?.text).toBe("hello")
expect(text?.time?.start).toBeDefined()
expect(text?.time?.end).toBeDefined()
if (!text?.time?.start || !text.time.end) return
expect(text.time.start).toBeLessThan(text.time.end)
}),
{ git: true, config: (url) => providerCfg(url) },
),
)
it.live("session.processor effect tests stop after token overflow requests compaction", () =>
provideTmpdirServer(
({ dir, llm }) =>