fix(codex): backfill streaming response output

This commit is contained in:
stringer07
2026-04-21 16:03:56 +08:00
parent 8f4a4eabfc
commit bb8408cef5
2 changed files with 103 additions and 2 deletions

View File

@@ -36,6 +36,48 @@ const (
var dataTag = []byte("data:")
// Streamed Codex responses may emit response.output_item.done events while leaving
// response.completed.response.output empty. Keep the stream path aligned with the
// already-patched non-stream path by reconstructing response.output from those items.
func collectCodexOutputItemDone(eventData []byte, outputItemsByIndex map[int64][]byte, outputItemsFallback *[][]byte) {
itemResult := gjson.GetBytes(eventData, "item")
if !itemResult.Exists() || itemResult.Type != gjson.JSON {
return
}
outputIndexResult := gjson.GetBytes(eventData, "output_index")
if outputIndexResult.Exists() {
outputItemsByIndex[outputIndexResult.Int()] = []byte(itemResult.Raw)
return
}
*outputItemsFallback = append(*outputItemsFallback, []byte(itemResult.Raw))
}
func patchCodexCompletedOutput(eventData []byte, outputItemsByIndex map[int64][]byte, outputItemsFallback [][]byte) []byte {
outputResult := gjson.GetBytes(eventData, "response.output")
shouldPatchOutput := (!outputResult.Exists() || !outputResult.IsArray() || len(outputResult.Array()) == 0) && (len(outputItemsByIndex) > 0 || len(outputItemsFallback) > 0)
if !shouldPatchOutput {
return eventData
}
completedDataPatched := eventData
completedDataPatched, _ = sjson.SetRawBytes(completedDataPatched, "response.output", []byte(`[]`))
indexes := make([]int64, 0, len(outputItemsByIndex))
for idx := range outputItemsByIndex {
indexes = append(indexes, idx)
}
sort.Slice(indexes, func(i, j int) bool {
return indexes[i] < indexes[j]
})
for _, idx := range indexes {
completedDataPatched, _ = sjson.SetRawBytes(completedDataPatched, "response.output.-1", outputItemsByIndex[idx])
}
for _, item := range outputItemsFallback {
completedDataPatched, _ = sjson.SetRawBytes(completedDataPatched, "response.output.-1", item)
}
return completedDataPatched
}
// CodexExecutor is a stateless executor for Codex (OpenAI Responses API entrypoint).
// If api_key is unavailable on auth, it falls back to legacy via ClientAdapter.
type CodexExecutor struct {
@@ -414,20 +456,28 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
scanner := bufio.NewScanner(httpResp.Body)
scanner.Buffer(nil, 52_428_800) // 50MB
var param any
outputItemsByIndex := make(map[int64][]byte)
var outputItemsFallback [][]byte
for scanner.Scan() {
line := scanner.Bytes()
helps.AppendAPIResponseChunk(ctx, e.cfg, line)
translatedLine := bytes.Clone(line)
if bytes.HasPrefix(line, dataTag) {
data := bytes.TrimSpace(line[5:])
if gjson.GetBytes(data, "type").String() == "response.completed" {
switch gjson.GetBytes(data, "type").String() {
case "response.output_item.done":
collectCodexOutputItemDone(data, outputItemsByIndex, &outputItemsFallback)
case "response.completed":
if detail, ok := helps.ParseCodexUsage(data); ok {
reporter.Publish(ctx, detail)
}
data = patchCodexCompletedOutput(data, outputItemsByIndex, outputItemsFallback)
translatedLine = append([]byte("data: "), data...)
}
}
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, originalPayload, body, bytes.Clone(line), &param)
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, originalPayload, body, translatedLine, &param)
for i := range chunks {
out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}
}

View File

@@ -1,6 +1,7 @@
package executor
import (
"bytes"
"context"
"net/http"
"net/http/httptest"
@@ -44,3 +45,53 @@ func TestCodexExecutorExecute_EmptyStreamCompletionOutputUsesOutputItemDone(t *t
t.Fatalf("choices.0.message.content = %q, want %q; payload=%s", gotContent, "ok", string(resp.Payload))
}
}
func TestCodexExecutorExecuteStream_EmptyStreamCompletionOutputUsesOutputItemDone(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"ok\"}]},\"output_index\":0}\n"))
_, _ = w.Write([]byte("data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_1\",\"object\":\"response\",\"created_at\":1775555723,\"status\":\"completed\",\"model\":\"gpt-5.4-mini-2026-03-17\",\"output\":[],\"usage\":{\"input_tokens\":8,\"output_tokens\":28,\"total_tokens\":36}}}\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
auth := &cliproxyauth.Auth{Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
}}
result, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.4-mini",
Payload: []byte(`{"model":"gpt-5.4-mini","input":"Say ok"}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai-response"),
Stream: true,
})
if err != nil {
t.Fatalf("ExecuteStream error: %v", err)
}
var completed []byte
for chunk := range result.Chunks {
if chunk.Err != nil {
t.Fatalf("stream chunk error: %v", chunk.Err)
}
payload := bytes.TrimSpace(chunk.Payload)
if !bytes.HasPrefix(payload, []byte("data:")) {
continue
}
data := bytes.TrimSpace(payload[5:])
if gjson.GetBytes(data, "type").String() == "response.completed" {
completed = append([]byte(nil), data...)
}
}
if len(completed) == 0 {
t.Fatal("missing response.completed chunk")
}
gotContent := gjson.GetBytes(completed, "response.output.0.content.0.text").String()
if gotContent != "ok" {
t.Fatalf("response.output[0].content[0].text = %q, want %q; completed=%s", gotContent, "ok", string(completed))
}
}