Skip to content

Commit 1e94acc

Browse files
committed
Support new CPA usage queue fields
1 parent 63986b6 commit 1e94acc

14 files changed

Lines changed: 571 additions & 254 deletions

File tree

internal/cpa/types.go

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,21 @@ import (
88

99
// Endpoints used against the CPA management API.
1010
const (
11-
managementAuthFilesEndpoint = "/v0/management/auth-files"
12-
managementExternalKeysEndpoint = "/v0/management/api-keys"
13-
managementGeminiKeysEndpoint = "/v0/management/gemini-api-key"
14-
managementClaudeKeysEndpoint = "/v0/management/claude-api-key"
15-
managementCodexKeysEndpoint = "/v0/management/codex-api-key"
16-
managementVertexKeysEndpoint = "/v0/management/vertex-api-key"
17-
managementOpenAICompatEndpoint = "/v0/management/openai-compatibility"
18-
modelsEndpoint = "/v1/models"
11+
managementAuthFilesEndpoint = "/v0/management/auth-files"
12+
managementExternalKeysEndpoint = "/v0/management/api-keys"
13+
managementGeminiKeysEndpoint = "/v0/management/gemini-api-key"
14+
managementClaudeKeysEndpoint = "/v0/management/claude-api-key"
15+
managementCodexKeysEndpoint = "/v0/management/codex-api-key"
16+
managementVertexKeysEndpoint = "/v0/management/vertex-api-key"
17+
managementOpenAICompatEndpoint = "/v0/management/openai-compatibility"
18+
modelsEndpoint = "/v1/models"
1919

2020
// Redis queue (RESP TCP) constants — multiplexed on CPA's HTTP port + 1 by default (8317).
21-
redisNetwork = "tcp"
22-
RedisDefaultPort = "8317"
23-
RedisAuthCommand = "AUTH"
24-
RedisLPopCommand = "LPOP"
25-
RedisUsageQueueKey = "queue"
21+
redisNetwork = "tcp"
22+
RedisDefaultPort = "8317"
23+
RedisAuthCommand = "AUTH"
24+
RedisLPopCommand = "LPOP"
25+
RedisUsageQueueKey = "queue"
2626
)
2727

2828
// AuthFile mirrors a single entry from /v0/management/auth-files.
@@ -181,25 +181,39 @@ func firstString(raw map[string]any, keys ...string) string {
181181
// CPA encodes timestamps as RFC3339 strings; deserialization tolerates both string
182182
// and object container shapes via Tokens.
183183
type UsageRecord struct {
184-
Timestamp time.Time `json:"timestamp"`
185-
LatencyMs int64 `json:"latency_ms"`
186-
Source string `json:"source"`
187-
AuthIndex string `json:"auth_index"`
188-
Tokens UsageTokens `json:"tokens"`
189-
Failed bool `json:"failed"`
190-
Provider string `json:"provider"`
191-
Model string `json:"model"`
192-
Endpoint string `json:"endpoint"`
193-
AuthType string `json:"auth_type"`
194-
APIKey string `json:"api_key"`
195-
RequestID string `json:"request_id"`
184+
Timestamp time.Time `json:"timestamp"`
185+
LatencyMs int64 `json:"latency_ms"`
186+
TTFTMs int64 `json:"ttft_ms"`
187+
Source string `json:"source"`
188+
AuthIndex string `json:"auth_index"`
189+
Tokens UsageTokens `json:"tokens"`
190+
Failed bool `json:"failed"`
191+
Fail UsageFail `json:"fail"`
192+
ResponseHeaders json.RawMessage `json:"response_headers"`
193+
Provider string `json:"provider"`
194+
Model string `json:"model"`
195+
Alias string `json:"alias"`
196+
Endpoint string `json:"endpoint"`
197+
AuthType string `json:"auth_type"`
198+
APIKey string `json:"api_key"`
199+
RequestID string `json:"request_id"`
200+
ReasoningEffort string `json:"reasoning_effort"`
201+
ServiceTier string `json:"service_tier"`
196202
}
197203

198204
// UsageTokens is the nested token stats object from CPA.
199205
type UsageTokens struct {
200-
InputTokens int64 `json:"input_tokens"`
201-
OutputTokens int64 `json:"output_tokens"`
202-
ReasoningTokens int64 `json:"reasoning_tokens"`
203-
CachedTokens int64 `json:"cached_tokens"`
204-
TotalTokens int64 `json:"total_tokens"`
206+
InputTokens int64 `json:"input_tokens"`
207+
OutputTokens int64 `json:"output_tokens"`
208+
ReasoningTokens int64 `json:"reasoning_tokens"`
209+
CachedTokens int64 `json:"cached_tokens"`
210+
CacheReadTokens int64 `json:"cache_read_tokens"`
211+
CacheCreationTokens int64 `json:"cache_creation_tokens"`
212+
TotalTokens int64 `json:"total_tokens"`
213+
}
214+
215+
// UsageFail is the nested failure detail object from CPA.
216+
type UsageFail struct {
217+
StatusCode int `json:"status_code"`
218+
Body string `json:"body"`
205219
}

internal/ingest/decoder.go

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ingest
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"fmt"
67
"strings"
@@ -30,25 +31,34 @@ func Decode(message string) (storage.UsageEvent, error) {
3031
ts = time.Now().UTC()
3132
}
3233
return storage.UsageEvent{
33-
EventKey: requestID,
34-
Timestamp: ts.UTC(),
35-
Provider: strings.TrimSpace(rec.Provider),
36-
Model: strings.TrimSpace(rec.Model),
37-
APIGroupKey: resolveAPIGroupKey(rec),
38-
Source: strings.TrimSpace(rec.Source),
39-
AuthIndex: strings.TrimSpace(rec.AuthIndex),
40-
AuthType: strings.TrimSpace(rec.AuthType),
41-
APIKey: strings.TrimSpace(rec.APIKey),
42-
Endpoint: strings.TrimSpace(rec.Endpoint),
43-
RequestID: requestID,
44-
LatencyMs: rec.LatencyMs,
45-
InputTokens: rec.Tokens.InputTokens,
46-
OutputTokens: rec.Tokens.OutputTokens,
47-
ReasoningTokens: rec.Tokens.ReasoningTokens,
48-
CachedTokens: rec.Tokens.CachedTokens,
49-
TotalTokens: rec.Tokens.TotalTokens,
50-
Failed: rec.Failed,
51-
InsertedAt: time.Now().UTC(),
34+
EventKey: requestID,
35+
Timestamp: ts.UTC(),
36+
Provider: strings.TrimSpace(rec.Provider),
37+
Model: strings.TrimSpace(rec.Model),
38+
Alias: strings.TrimSpace(rec.Alias),
39+
APIGroupKey: resolveAPIGroupKey(rec),
40+
Source: strings.TrimSpace(rec.Source),
41+
AuthIndex: strings.TrimSpace(rec.AuthIndex),
42+
AuthType: strings.TrimSpace(rec.AuthType),
43+
APIKey: strings.TrimSpace(rec.APIKey),
44+
Endpoint: strings.TrimSpace(rec.Endpoint),
45+
RequestID: requestID,
46+
LatencyMs: rec.LatencyMs,
47+
TTFTMs: rec.TTFTMs,
48+
InputTokens: rec.Tokens.InputTokens,
49+
OutputTokens: rec.Tokens.OutputTokens,
50+
ReasoningTokens: rec.Tokens.ReasoningTokens,
51+
CachedTokens: rec.Tokens.CachedTokens,
52+
CacheReadTokens: rec.Tokens.CacheReadTokens,
53+
CacheCreationTokens: rec.Tokens.CacheCreationTokens,
54+
TotalTokens: rec.Tokens.TotalTokens,
55+
Failed: rec.Failed,
56+
FailStatusCode: rec.Fail.StatusCode,
57+
FailBody: strings.TrimSpace(rec.Fail.Body),
58+
ResponseHeaders: compactRawJSON(rec.ResponseHeaders),
59+
ReasoningEffort: strings.TrimSpace(rec.ReasoningEffort),
60+
ServiceTier: strings.TrimSpace(rec.ServiceTier),
61+
InsertedAt: time.Now().UTC(),
5262
}, nil
5363
}
5464

@@ -83,3 +93,15 @@ func resolveAPIGroupKey(rec cpa.UsageRecord) string {
8393
}
8494
return "unknown"
8595
}
96+
97+
func compactRawJSON(raw json.RawMessage) string {
98+
raw = bytes.TrimSpace(raw)
99+
if len(raw) == 0 || bytes.Equal(raw, []byte("null")) || !json.Valid(raw) {
100+
return ""
101+
}
102+
var buf bytes.Buffer
103+
if err := json.Compact(&buf, raw); err != nil {
104+
return ""
105+
}
106+
return buf.String()
107+
}

internal/ingest/decoder_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package ingest
2+
3+
import "testing"
4+
5+
func TestDecodeNewUsageQueueFields(t *testing.T) {
6+
raw := `{
7+
"timestamp":"2026-04-25T00:00:00Z",
8+
"latency_ms":1500,
9+
"ttft_ms":320,
10+
"source":"user@example.com",
11+
"auth_index":"0",
12+
"tokens":{
13+
"input_tokens":10,
14+
"output_tokens":20,
15+
"reasoning_tokens":3,
16+
"cached_tokens":4,
17+
"cache_read_tokens":4,
18+
"cache_creation_tokens":5,
19+
"total_tokens":42
20+
},
21+
"failed":true,
22+
"fail":{"status_code":429,"body":" rate limited "},
23+
"response_headers":{"Retry-After":["30"],"X-Upstream-Request-Id":["upstream-req-1"]},
24+
"provider":"openai",
25+
"model":"gpt-5.4",
26+
"alias":"client-gpt",
27+
"endpoint":"POST /v1/chat/completions",
28+
"auth_type":"apikey",
29+
"api_key":"test-key",
30+
"request_id":"ctx-request-id",
31+
"reasoning_effort":"medium",
32+
"service_tier":"priority"
33+
}`
34+
35+
ev, err := Decode(raw)
36+
if err != nil {
37+
t.Fatalf("Decode: %v", err)
38+
}
39+
if ev.EventKey != "ctx-request-id" || ev.RequestID != "ctx-request-id" {
40+
t.Fatalf("request ids = event_key %q request_id %q", ev.EventKey, ev.RequestID)
41+
}
42+
if ev.Alias != "client-gpt" || ev.TTFTMs != 320 {
43+
t.Fatalf("alias/ttft = %q/%d", ev.Alias, ev.TTFTMs)
44+
}
45+
if ev.CacheReadTokens != 4 || ev.CacheCreationTokens != 5 {
46+
t.Fatalf("cache split = read %d creation %d", ev.CacheReadTokens, ev.CacheCreationTokens)
47+
}
48+
if ev.FailStatusCode != 429 || ev.FailBody != "rate limited" {
49+
t.Fatalf("fail = status %d body %q", ev.FailStatusCode, ev.FailBody)
50+
}
51+
if ev.ResponseHeaders != `{"Retry-After":["30"],"X-Upstream-Request-Id":["upstream-req-1"]}` {
52+
t.Fatalf("response headers = %s", ev.ResponseHeaders)
53+
}
54+
if ev.ReasoningEffort != "medium" || ev.ServiceTier != "priority" {
55+
t.Fatalf("reasoning/service tier = %q/%q", ev.ReasoningEffort, ev.ServiceTier)
56+
}
57+
}

internal/ingest/snapshot.go

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,13 @@ type DetailPayload struct {
4949

5050
// TokensPayload mirrors the legacy `TokenStats` JSON.
5151
type TokensPayload struct {
52-
InputTokens int64 `json:"input_tokens"`
53-
OutputTokens int64 `json:"output_tokens"`
54-
ReasoningTokens int64 `json:"reasoning_tokens"`
55-
CachedTokens int64 `json:"cached_tokens"`
56-
TotalTokens int64 `json:"total_tokens"`
52+
InputTokens int64 `json:"input_tokens"`
53+
OutputTokens int64 `json:"output_tokens"`
54+
ReasoningTokens int64 `json:"reasoning_tokens"`
55+
CachedTokens int64 `json:"cached_tokens"`
56+
CacheReadTokens int64 `json:"cache_read_tokens"`
57+
CacheCreationTokens int64 `json:"cache_creation_tokens"`
58+
TotalTokens int64 `json:"total_tokens"`
5759
}
5860

5961
// DecodeSnapshot parses the legacy export envelope from raw JSON bytes.
@@ -92,21 +94,23 @@ func SnapshotToEvents(env *SnapshotEnvelope) []storage.UsageEvent {
9294
continue
9395
}
9496
ev := storage.UsageEvent{
95-
EventKey: snapshotEventKey(apiKey, model, d),
96-
Timestamp: ts.UTC(),
97-
Model: strings.TrimSpace(model),
98-
APIGroupKey: resolveAPIGroup(apiKey),
99-
APIKey: strings.TrimSpace(apiKey),
100-
Source: strings.TrimSpace(d.Source),
101-
AuthIndex: strings.TrimSpace(d.AuthIndex),
102-
LatencyMs: d.LatencyMs,
103-
InputTokens: d.Tokens.InputTokens,
104-
OutputTokens: d.Tokens.OutputTokens,
105-
ReasoningTokens: d.Tokens.ReasoningTokens,
106-
CachedTokens: d.Tokens.CachedTokens,
107-
TotalTokens: d.Tokens.TotalTokens,
108-
Failed: d.Failed,
109-
InsertedAt: now,
97+
EventKey: snapshotEventKey(apiKey, model, d),
98+
Timestamp: ts.UTC(),
99+
Model: strings.TrimSpace(model),
100+
APIGroupKey: resolveAPIGroup(apiKey),
101+
APIKey: strings.TrimSpace(apiKey),
102+
Source: strings.TrimSpace(d.Source),
103+
AuthIndex: strings.TrimSpace(d.AuthIndex),
104+
LatencyMs: d.LatencyMs,
105+
InputTokens: d.Tokens.InputTokens,
106+
OutputTokens: d.Tokens.OutputTokens,
107+
ReasoningTokens: d.Tokens.ReasoningTokens,
108+
CachedTokens: d.Tokens.CachedTokens,
109+
CacheReadTokens: d.Tokens.CacheReadTokens,
110+
CacheCreationTokens: d.Tokens.CacheCreationTokens,
111+
TotalTokens: d.Tokens.TotalTokens,
112+
Failed: d.Failed,
113+
InsertedAt: now,
110114
}
111115
out = append(out, ev)
112116
}

internal/storage/sqlite/migrate.go

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,35 @@ import "time"
44

55
// usageEventModel is the GORM/sqlite-side schema. Rows are deduplicated on event_key.
66
type usageEventModel struct {
7-
ID uint `gorm:"primaryKey"`
8-
EventKey string `gorm:"uniqueIndex;size:128"`
9-
Timestamp time.Time `gorm:"index"`
10-
Provider string `gorm:"size:64;index"`
11-
Model string `gorm:"size:128;index"`
12-
APIGroupKey string `gorm:"size:128;index;column:api_group_key"`
13-
Source string `gorm:"size:256;index"`
14-
AuthIndex string `gorm:"size:64;index"`
15-
AuthType string `gorm:"size:32"`
16-
APIKey string `gorm:"size:128;column:api_key"`
17-
Endpoint string `gorm:"size:128"`
18-
RequestID string `gorm:"size:64;column:request_id;index"`
19-
LatencyMs int64
20-
InputTokens int64
21-
OutputTokens int64
22-
ReasoningTokens int64
23-
CachedTokens int64
24-
TotalTokens int64
25-
Failed bool `gorm:"index"`
26-
InsertedAt time.Time
7+
ID uint `gorm:"primaryKey"`
8+
EventKey string `gorm:"uniqueIndex;size:128"`
9+
Timestamp time.Time `gorm:"index"`
10+
Provider string `gorm:"size:64;index"`
11+
Model string `gorm:"size:128;index"`
12+
Alias string `gorm:"size:128"`
13+
APIGroupKey string `gorm:"size:128;index;column:api_group_key"`
14+
Source string `gorm:"size:256;index"`
15+
AuthIndex string `gorm:"size:64;index"`
16+
AuthType string `gorm:"size:32"`
17+
APIKey string `gorm:"size:128;column:api_key"`
18+
Endpoint string `gorm:"size:128"`
19+
RequestID string `gorm:"size:64;column:request_id;index"`
20+
LatencyMs int64
21+
TTFTMs int64 `gorm:"column:ttft_ms"`
22+
InputTokens int64
23+
OutputTokens int64
24+
ReasoningTokens int64
25+
CachedTokens int64
26+
CacheReadTokens int64
27+
CacheCreationTokens int64
28+
TotalTokens int64
29+
Failed bool `gorm:"index"`
30+
FailStatusCode int
31+
FailBody string
32+
ResponseHeaders string
33+
ReasoningEffort string `gorm:"size:64"`
34+
ServiceTier string `gorm:"size:64"`
35+
InsertedAt time.Time
2736
}
2837

2938
func (usageEventModel) TableName() string { return "usage_events" }

0 commit comments

Comments
 (0)