Skip to content

Commit 63e8b97

Browse files
authored
Merge pull request #1505 from dgageot/usage-tracking
Add rate limits to the token_usage event
2 parents ec1aaa0 + 662810b commit 63e8b97

File tree

7 files changed

+106
-41
lines changed

7 files changed

+106
-41
lines changed

pkg/chat/chat.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,13 @@ type MessageStreamChoice struct {
114114

115115
// MessageStreamResponse represents a streaming response from the model
116116
type MessageStreamResponse struct {
117-
ID string `json:"id"`
118-
Object string `json:"object"`
119-
Created int64 `json:"created"`
120-
Model string `json:"model"`
121-
Choices []MessageStreamChoice `json:"choices"`
122-
Usage *Usage `json:"usage,omitempty"`
117+
ID string `json:"id"`
118+
Object string `json:"object"`
119+
Created int64 `json:"created"`
120+
Model string `json:"model"`
121+
Choices []MessageStreamChoice `json:"choices"`
122+
Usage *Usage `json:"usage,omitempty"`
123+
RateLimit *RateLimit `json:"rate_limit,omitempty"`
123124
}
124125

125126
type Usage struct {
@@ -130,6 +131,13 @@ type Usage struct {
130131
ReasoningTokens int64 `json:"reasoning_tokens,omitempty"`
131132
}
132133

134+
type RateLimit struct {
135+
Limit int64 `json:"limit,omitempty"`
136+
Remaining int64 `json:"remaining,omitempty"`
137+
Reset int64 `json:"reset,omitempty"`
138+
RetryAfter int64 `json:"retry_after,omitempty"`
139+
}
140+
133141
// MessageStream interface represents a stream of chat completions
134142
type MessageStream interface {
135143
// Recv gets the next completion chunk

pkg/model/provider/anthropic/adapter.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"io"
88
"net/http"
9+
"strconv"
910
"strings"
1011

1112
"github.com/anthropics/anthropic-sdk-go"
@@ -23,14 +24,16 @@ type streamAdapter struct {
2324
toolCall bool
2425
toolID string
2526
// For single retry on context length error
26-
retryFn func() *streamAdapter
27-
retried bool
27+
retryFn func() *streamAdapter
28+
retried bool
29+
getResponseTrailer func() http.Header
2830
}
2931

30-
func newStreamAdapter(stream *ssestream.Stream[anthropic.MessageStreamEventUnion], trackUsage bool) *streamAdapter {
32+
func (c *Client) newStreamAdapter(stream *ssestream.Stream[anthropic.MessageStreamEventUnion], trackUsage bool) *streamAdapter {
3133
return &streamAdapter{
32-
stream: stream,
33-
trackUsage: trackUsage,
34+
stream: stream,
35+
trackUsage: trackUsage,
36+
getResponseTrailer: c.getResponseTrailer,
3437
}
3538
}
3639

@@ -163,11 +166,30 @@ func (a *streamAdapter) Recv() (chat.MessageStreamResponse, error) {
163166
} else {
164167
response.Choices[0].FinishReason = chat.FinishReasonStop
165168
}
169+
170+
// MessageStopEvent is the last event. Let's drain the response to get the trailing headers.
171+
trailers := a.getResponseTrailer()
172+
if trailers.Get("X-RateLimit-Limit") != "" {
173+
response.RateLimit = &chat.RateLimit{
174+
Limit: parseHeaderInt64(trailers.Get("X-RateLimit-Limit")),
175+
Remaining: parseHeaderInt64(trailers.Get("X-RateLimit-Remaining")),
176+
Reset: parseHeaderInt64(trailers.Get("X-RateLimit-Reset")),
177+
RetryAfter: parseHeaderInt64(trailers.Get("Retry-After")),
178+
}
179+
}
166180
}
167181

168182
return response, nil
169183
}
170184

185+
func parseHeaderInt64(headerValue string) int64 {
186+
value, err := strconv.ParseInt(headerValue, 10, 64)
187+
if err != nil {
188+
return 0
189+
}
190+
return value
191+
}
192+
171193
// Close closes the stream
172194
func (a *streamAdapter) Close() {
173195
if a.stream != nil {

pkg/model/provider/anthropic/beta_adapter.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"io"
66
"log/slog"
7+
"net/http"
78

89
"github.com/anthropics/anthropic-sdk-go"
910
"github.com/anthropics/anthropic-sdk-go/packages/ssestream"
@@ -14,18 +15,22 @@ import (
1415

1516
// betaStreamAdapter adapts the Anthropic Beta stream to our interface
1617
type betaStreamAdapter struct {
17-
stream *ssestream.Stream[anthropic.BetaRawMessageStreamEventUnion]
18-
toolCall bool
19-
toolID string
18+
stream *ssestream.Stream[anthropic.BetaRawMessageStreamEventUnion]
19+
trackUsage bool
20+
toolCall bool
21+
toolID string
2022
// For single retry on context length error
21-
retryFn func() *betaStreamAdapter
22-
retried bool
23+
retryFn func() *betaStreamAdapter
24+
retried bool
25+
getResponseTrailer func() http.Header
2326
}
2427

2528
// newBetaStreamAdapter creates a new Beta stream adapter
26-
func newBetaStreamAdapter(stream *ssestream.Stream[anthropic.BetaRawMessageStreamEventUnion]) *betaStreamAdapter {
29+
func (c *Client) newBetaStreamAdapter(stream *ssestream.Stream[anthropic.BetaRawMessageStreamEventUnion], trackUsage bool) *betaStreamAdapter {
2730
return &betaStreamAdapter{
28-
stream: stream,
31+
stream: stream,
32+
trackUsage: trackUsage,
33+
getResponseTrailer: c.getResponseTrailer,
2934
}
3035
}
3136

@@ -111,11 +116,13 @@ func (a *betaStreamAdapter) Recv() (chat.MessageStreamResponse, error) {
111116
return response, fmt.Errorf("unknown delta type: %T", deltaVariant)
112117
}
113118
case anthropic.BetaRawMessageDeltaEvent:
114-
response.Usage = &chat.Usage{
115-
InputTokens: eventVariant.Usage.InputTokens,
116-
OutputTokens: eventVariant.Usage.OutputTokens,
117-
CachedInputTokens: eventVariant.Usage.CacheReadInputTokens,
118-
CacheWriteTokens: eventVariant.Usage.CacheCreationInputTokens,
119+
if a.trackUsage {
120+
response.Usage = &chat.Usage{
121+
InputTokens: eventVariant.Usage.InputTokens,
122+
OutputTokens: eventVariant.Usage.OutputTokens,
123+
CachedInputTokens: eventVariant.Usage.CacheReadInputTokens,
124+
CacheWriteTokens: eventVariant.Usage.CacheCreationInputTokens,
125+
}
119126
}
120127
case anthropic.BetaRawMessageStopEvent:
121128
if a.toolCall {

pkg/model/provider/anthropic/beta_client.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ func (c *Client) createBetaStream(
103103
"message_count", len(params.Messages))
104104

105105
stream := client.Beta.Messages.NewStreaming(ctx, params)
106-
ad := newBetaStreamAdapter(stream)
106+
trackUsage := c.ModelConfig.TrackUsage == nil || *c.ModelConfig.TrackUsage
107+
ad := c.newBetaStreamAdapter(stream, trackUsage)
107108

108109
// Set up single retry for context length errors
109110
ad.retryFn = func() *betaStreamAdapter {
@@ -120,7 +121,7 @@ func (c *Client) createBetaStream(
120121
slog.Warn("Retrying with clamped max_tokens after context length error", "original", maxTokens, "clamped", newMaxTokens, "used", used)
121122
retryParams := params
122123
retryParams.MaxTokens = newMaxTokens
123-
return newBetaStreamAdapter(client.Beta.Messages.NewStreaming(ctx, retryParams))
124+
return c.newBetaStreamAdapter(client.Beta.Messages.NewStreaming(ctx, retryParams), trackUsage)
124125
}
125126

126127
slog.Debug("Anthropic Beta API chat completion stream created successfully", "model", c.ModelConfig.Model)

pkg/model/provider/anthropic/client.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"encoding/json"
77
"errors"
88
"fmt"
9+
"io"
910
"log/slog"
11+
"net/http"
1012
"net/url"
1113
"strings"
1214

@@ -27,7 +29,20 @@ import (
2729
// It holds the anthropic client and model config
2830
type Client struct {
2931
base.Config
30-
clientFn func(context.Context) (anthropic.Client, error)
32+
clientFn func(context.Context) (anthropic.Client, error)
33+
lastHTTPResponse *http.Response
34+
}
35+
36+
func (c *Client) getResponseTrailer() http.Header {
37+
if c.lastHTTPResponse == nil {
38+
return nil
39+
}
40+
41+
if c.lastHTTPResponse.Body != nil {
42+
_, _ = io.Copy(io.Discard, c.lastHTTPResponse.Body)
43+
}
44+
45+
return c.lastHTTPResponse.Trailer
3146
}
3247

3348
// adjustMaxTokensForThinking checks if max_tokens needs adjustment for thinking_budget.
@@ -116,7 +131,14 @@ func NewClient(ctx context.Context, cfg *latest.ModelConfig, env environment.Pro
116131
}
117132
}
118133

119-
var clientFn func(context.Context) (anthropic.Client, error)
134+
anthropicClient := &Client{
135+
Config: base.Config{
136+
ModelConfig: *cfg,
137+
ModelOptions: globalOptions,
138+
Env: env,
139+
},
140+
}
141+
120142
if gateway := globalOptions.Gateway(); gateway == "" {
121143
authToken, _ := env.Get(ctx, "ANTHROPIC_API_KEY")
122144
if authToken == "" {
@@ -132,7 +154,7 @@ func NewClient(ctx context.Context, cfg *latest.ModelConfig, env environment.Pro
132154
requestOptions = append(requestOptions, option.WithBaseURL(cfg.BaseURL))
133155
}
134156
client := anthropic.NewClient(requestOptions...)
135-
clientFn = func(context.Context) (anthropic.Client, error) {
157+
anthropicClient.clientFn = func(context.Context) (anthropic.Client, error) {
136158
return client, nil
137159
}
138160
} else {
@@ -143,7 +165,7 @@ func NewClient(ctx context.Context, cfg *latest.ModelConfig, env environment.Pro
143165
}
144166

145167
// When using a Gateway, tokens are short-lived.
146-
clientFn = func(ctx context.Context) (anthropic.Client, error) {
168+
anthropicClient.clientFn = func(ctx context.Context) (anthropic.Client, error) {
147169
// Query a fresh auth token each time the client is used
148170
authToken, _ := env.Get(ctx, environment.DockerDesktopTokenEnv)
149171
if authToken == "" {
@@ -168,6 +190,7 @@ func NewClient(ctx context.Context, cfg *latest.ModelConfig, env environment.Pro
168190
}
169191

170192
client := anthropic.NewClient(
193+
option.WithResponseInto(&anthropicClient.lastHTTPResponse),
171194
option.WithAuthToken(authToken),
172195
option.WithAPIKey(authToken),
173196
option.WithBaseURL(baseURL),
@@ -180,14 +203,7 @@ func NewClient(ctx context.Context, cfg *latest.ModelConfig, env environment.Pro
180203

181204
slog.Debug("Anthropic client created successfully", "model", cfg.Model)
182205

183-
return &Client{
184-
Config: base.Config{
185-
ModelConfig: *cfg,
186-
ModelOptions: globalOptions,
187-
Env: env,
188-
},
189-
clientFn: clientFn,
190-
}, nil
206+
return anthropicClient, nil
191207
}
192208

193209
// CreateChatCompletionStream creates a streaming chat completion request
@@ -304,7 +320,7 @@ func (c *Client) CreateChatCompletionStream(
304320

305321
stream := client.Messages.NewStreaming(ctx, params, betaHeader)
306322
trackUsage := c.ModelConfig.TrackUsage == nil || *c.ModelConfig.TrackUsage
307-
ad := newStreamAdapter(stream, trackUsage)
323+
ad := c.newStreamAdapter(stream, trackUsage)
308324

309325
// Set up single retry for context length errors
310326
ad.retryFn = func() *streamAdapter {
@@ -321,7 +337,7 @@ func (c *Client) CreateChatCompletionStream(
321337
slog.Warn("Retrying with clamped max_tokens after context length error", "original max_tokens", maxTokens, "clamped max_tokens", newMaxTokens, "used tokens", used)
322338
retryParams := params
323339
retryParams.MaxTokens = newMaxTokens
324-
return newStreamAdapter(client.Messages.NewStreaming(ctx, retryParams, betaHeader), trackUsage)
340+
return c.newStreamAdapter(client.Messages.NewStreaming(ctx, retryParams, betaHeader), trackUsage)
325341
}
326342

327343
slog.Debug("Anthropic chat completion stream created successfully", "model", c.ModelConfig.Model)

pkg/runtime/event.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ type Usage struct {
208208
// It embeds chat.Usage and adds Cost and Model fields.
209209
type MessageUsage struct {
210210
chat.Usage
211+
chat.RateLimit
211212
Cost float64
212213
Model string
213214
}

pkg/runtime/runtime.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ type streamResult struct {
191191
Stopped bool
192192
ActualModel string // The actual model used (may differ from configured model with routing)
193193
Usage *chat.Usage // Token usage for this stream
194+
RateLimit *chat.RateLimit
194195
}
195196

196197
type Opt func(*LocalRuntime)
@@ -946,6 +947,9 @@ func (r *LocalRuntime) RunStream(ctx context.Context, sess *session.Session) <-c
946947
Model: messageModel,
947948
}
948949
}
950+
if res.RateLimit != nil {
951+
msgUsage.RateLimit = *res.RateLimit
952+
}
949953

950954
sess.AddMessage(session.NewAgentMessage(a, &assistantMessage))
951955
r.saveSession(ctx, sess)
@@ -1119,8 +1123,9 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
11191123
var actualModel string
11201124
var actualModelEventEmitted bool
11211125
var messageUsage *chat.Usage
1122-
modelID := getAgentModelID(a)
1126+
var messageRateLimit *chat.RateLimit
11231127

1128+
modelID := getAgentModelID(a)
11241129
toolCallIndex := make(map[string]int) // toolCallID -> index in toolCalls slice
11251130
emittedPartial := make(map[string]bool) // toolCallID -> whether we've emitted a partial event
11261131
toolDefMap := make(map[string]tools.Tool, len(agentTools))
@@ -1138,7 +1143,6 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
11381143
}
11391144

11401145
if response.Usage != nil {
1141-
// Capture the usage for this specific message
11421146
messageUsage = response.Usage
11431147

11441148
if m != nil && m.Cost != nil {
@@ -1159,6 +1163,10 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
11591163
telemetry.RecordTokenUsage(ctx, modelName, sess.InputTokens, sess.OutputTokens, sess.Cost)
11601164
}
11611165

1166+
if response.RateLimit != nil {
1167+
messageRateLimit = response.RateLimit
1168+
}
1169+
11621170
if len(response.Choices) == 0 {
11631171
continue
11641172
}
@@ -1195,6 +1203,7 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
11951203
Stopped: true,
11961204
ActualModel: actualModel,
11971205
Usage: messageUsage,
1206+
RateLimit: messageRateLimit,
11981207
}, nil
11991208
}
12001209

@@ -1267,6 +1276,7 @@ func (r *LocalRuntime) handleStream(ctx context.Context, stream chat.MessageStre
12671276
Stopped: stoppedDueToNoOutput,
12681277
ActualModel: actualModel,
12691278
Usage: messageUsage,
1279+
RateLimit: messageRateLimit,
12701280
}, nil
12711281
}
12721282

0 commit comments

Comments
 (0)