From 5f5d5936fa61ed451ee8bb491bdc888d8ccaa2e2 Mon Sep 17 00:00:00 2001 From: sususu98 Date: Fri, 24 Apr 2026 15:47:18 +0800 Subject: [PATCH] fix antigravity credits stream fallback --- sdk/cliproxy/auth/antigravity_credits_test.go | 92 +++++++++++++++++++ sdk/cliproxy/auth/conductor.go | 26 ++++-- 2 files changed, 109 insertions(+), 9 deletions(-) diff --git a/sdk/cliproxy/auth/antigravity_credits_test.go b/sdk/cliproxy/auth/antigravity_credits_test.go index 8f59b4c78..38c08dcfb 100644 --- a/sdk/cliproxy/auth/antigravity_credits_test.go +++ b/sdk/cliproxy/auth/antigravity_credits_test.go @@ -1,10 +1,102 @@ package auth import ( + "context" + "fmt" + "net/http" "testing" "time" + + internalconfig "github.com/router-for-me/CLIProxyAPI/v6/internal/config" + "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" ) +type antigravityCreditsFallbackExecutor struct { + streamCreditsRequested []bool +} + +func (e *antigravityCreditsFallbackExecutor) Identifier() string { return "antigravity" } + +func (e *antigravityCreditsFallbackExecutor) Execute(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, &Error{HTTPStatus: http.StatusNotImplemented, Message: "Execute not implemented"} +} + +func (e *antigravityCreditsFallbackExecutor) ExecuteStream(ctx context.Context, _ *Auth, req cliproxyexecutor.Request, _ cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) { + creditsRequested := AntigravityCreditsRequested(ctx) + e.streamCreditsRequested = append(e.streamCreditsRequested, creditsRequested) + ch := make(chan cliproxyexecutor.StreamChunk, 1) + if !creditsRequested { + ch <- cliproxyexecutor.StreamChunk{Err: &Error{HTTPStatus: http.StatusTooManyRequests, Message: "quota exhausted"}} + close(ch) + return &cliproxyexecutor.StreamResult{Headers: http.Header{"X-Initial": {req.Model}}, Chunks: ch}, nil + } + ch <- cliproxyexecutor.StreamChunk{Payload: []byte("credits fallback")} + close(ch) + return &cliproxyexecutor.StreamResult{Headers: http.Header{"X-Credits": {req.Model}}, Chunks: ch}, nil +} + +func (e *antigravityCreditsFallbackExecutor) Refresh(_ context.Context, auth *Auth) (*Auth, error) { + return auth, nil +} + +func (e *antigravityCreditsFallbackExecutor) CountTokens(context.Context, *Auth, cliproxyexecutor.Request, cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { + return cliproxyexecutor.Response{}, &Error{HTTPStatus: http.StatusNotImplemented, Message: "CountTokens not implemented"} +} + +func (e *antigravityCreditsFallbackExecutor) HttpRequest(context.Context, *Auth, *http.Request) (*http.Response, error) { + return nil, &Error{HTTPStatus: http.StatusNotImplemented, Message: "HttpRequest not implemented"} +} + +func TestManagerExecuteStream_AntigravityCreditsFallbackAfterBootstrap429(t *testing.T) { + const model = "claude-opus-4-6-thinking" + executor := &antigravityCreditsFallbackExecutor{} + manager := NewManager(nil, nil, nil) + manager.SetConfig(&internalconfig.Config{ + QuotaExceeded: internalconfig.QuotaExceeded{AntigravityCredits: true}, + }) + manager.RegisterExecutor(executor) + registry.GetGlobalRegistry().RegisterClient("ag-credits", "antigravity", []*registry.ModelInfo{{ID: model}}) + t.Cleanup(func() { registry.GetGlobalRegistry().UnregisterClient("ag-credits") }) + if _, errRegister := manager.Register(context.Background(), &Auth{ID: "ag-credits", Provider: "antigravity"}); errRegister != nil { + t.Fatalf("register auth: %v", errRegister) + } + + streamResult, errExecute := manager.ExecuteStream(context.Background(), []string{"antigravity"}, cliproxyexecutor.Request{Model: model}, cliproxyexecutor.Options{}) + if errExecute != nil { + t.Fatalf("execute stream: %v", errExecute) + } + + var payload []byte + for chunk := range streamResult.Chunks { + if chunk.Err != nil { + t.Fatalf("unexpected stream error: %v", chunk.Err) + } + payload = append(payload, chunk.Payload...) + } + if string(payload) != "credits fallback" { + t.Fatalf("payload = %q, want %q", string(payload), "credits fallback") + } + if got := streamResult.Headers.Get("X-Credits"); got != model { + t.Fatalf("X-Credits header = %q, want routed model", got) + } + if len(executor.streamCreditsRequested) != 2 { + t.Fatalf("stream calls = %d, want 2", len(executor.streamCreditsRequested)) + } + if executor.streamCreditsRequested[0] || !executor.streamCreditsRequested[1] { + t.Fatalf("credits flags = %v, want [false true]", executor.streamCreditsRequested) + } +} + +func TestStatusCodeFromError_UnwrapsStreamBootstrap429(t *testing.T) { + bootstrapErr := newStreamBootstrapError(&Error{HTTPStatus: http.StatusTooManyRequests, Message: "quota exhausted"}, nil) + wrappedErr := fmt.Errorf("conductor stream failed: %w", bootstrapErr) + + if status := statusCodeFromError(wrappedErr); status != http.StatusTooManyRequests { + t.Fatalf("statusCodeFromError() = %d, want %d", status, http.StatusTooManyRequests) + } +} + func TestIsAuthBlockedForModel_ClaudeWithCreditsStillBlockedDuringCooldown(t *testing.T) { auth := &Auth{ ID: "ag-1", diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 4d37581a6..05a32ceb2 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -1273,6 +1273,10 @@ func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cli return result, nil } } + var bootstrapErr *streamBootstrapError + if errors.As(lastErr, &bootstrapErr) && bootstrapErr != nil { + return streamErrorResult(bootstrapErr.Headers(), bootstrapErr.cause), nil + } return nil, lastErr } return nil, &Error{Code: "auth_not_found", Message: "no auth available"} @@ -1446,10 +1450,6 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string for { if maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials { if lastErr != nil { - var bootstrapErr *streamBootstrapError - if errors.As(lastErr, &bootstrapErr) && bootstrapErr != nil { - return streamErrorResult(bootstrapErr.Headers(), bootstrapErr.cause), nil - } return nil, lastErr } return nil, &Error{Code: "auth_not_found", Message: "no auth available"} @@ -1457,10 +1457,6 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string auth, executor, provider, errPick := m.pickNextMixed(ctx, providers, routeModel, opts, tried) if errPick != nil { if lastErr != nil { - var bootstrapErr *streamBootstrapError - if errors.As(lastErr, &bootstrapErr) && bootstrapErr != nil { - return streamErrorResult(bootstrapErr.Headers(), bootstrapErr.cause), nil - } return nil, lastErr } return nil, errPick @@ -2299,6 +2295,13 @@ func cloneError(err *Error) *Error { } } +func errorString(err error) string { + if err == nil { + return "" + } + return err.Error() +} + func statusCodeFromError(err error) int { if err == nil { return 0 @@ -2965,6 +2968,12 @@ type creditsCandidateEntry struct { } func shouldAttemptAntigravityCreditsFallback(m *Manager, lastErr error, providers []string) bool { + status := statusCodeFromError(lastErr) + log.WithFields(log.Fields{ + "lastErr": errorString(lastErr), + "status": status, + "providers": providers, + }).Debug("shouldAttemptAntigravityCreditsFallback") if m == nil || lastErr == nil { return false } @@ -2984,7 +2993,6 @@ func shouldAttemptAntigravityCreditsFallback(m *Manager, lastErr error, provider if cfg == nil || !cfg.QuotaExceeded.AntigravityCredits { return false } - status := statusCodeFromError(lastErr) switch status { case http.StatusTooManyRequests, http.StatusServiceUnavailable: return true