Skip to content

Commit 8b3afe3

Browse files
committed
Fix CPA usage queue channel
1 parent 288708d commit 8b3afe3

7 files changed

Lines changed: 135 additions & 11 deletions

File tree

.env.example

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ SQLITE_PATH=/home/cliproxy/cpa-usage/data/app.db
2323
# --- Redis queue (CPA usage stream) ---
2424
# host:port. Default: derived from CPA_BASE_URL host + 8317.
2525
REDIS_QUEUE_ADDR=
26+
# RESP channel. CPA v7+ expects "usage"; v6.10 ignores the key.
27+
REDIS_QUEUE_KEY=usage
2628
REDIS_QUEUE_BATCH_SIZE=1000
2729
REDIS_QUEUE_IDLE_INTERVAL=1s
2830
REDIS_QUEUE_ERROR_BACKOFF=10s
@@ -42,4 +44,3 @@ LOG_FILE_ENABLED=true
4244
# Relative to the working directory (systemd: /home/user/cpa-usage).
4345
LOG_DIR=/home/cliproxy/cpa-usage/logs
4446
LOG_RETENTION_DAYS=7
45-

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ All configuration is via environment variables (also see `.env.example`):
6868
| `STORAGE_DRIVER` | `sqlite` | Only `sqlite` in v1 |
6969
| `SQLITE_PATH` | `./data/app.db` | Resolved against the process working directory |
7070
| `REDIS_QUEUE_ADDR` || Defaults to `<cpa-host>:8317` |
71+
| `REDIS_QUEUE_KEY` | `usage` | RESP channel name; CPA v7+ rejects the old `queue` key |
7172
| `REDIS_QUEUE_BATCH_SIZE` | `1000` | |
7273
| `REDIS_QUEUE_IDLE_INTERVAL` | `1s` | |
7374
| `REDIS_QUEUE_ERROR_BACKOFF` | `10s` | |

cpa-usage-install.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ CPA_MANAGEMENT_KEY=
152152
APP_PORT=${APP_PORT_DEFAULT}
153153
APP_BASE_PATH=${APP_BASE_PATH_DEFAULT}
154154
TZ=Asia/Shanghai
155+
REDIS_QUEUE_KEY=usage
155156
SQLITE_PATH=${DATA_DIR}/app.db
156157
LOG_DIR=${LOGS_DIR}
157158
EOF

internal/app/app.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func New(cfg *config.Config, build BuildInfo) (*App, error) {
7979
BaseURL: cfg.CPABaseURL,
8080
OverrideAddr: cfg.RedisQueueAddr,
8181
ManagementKey: cfg.CPAManagementKey,
82+
QueueKey: cfg.RedisQueueKey,
8283
Timeout: cfg.RequestTimeout,
8384
BatchSize: cfg.RedisQueueBatch,
8485
})
@@ -152,6 +153,7 @@ func (a *App) Run(ctx context.Context) error {
152153
"port": a.cfg.AppPort,
153154
"sqlite": a.cfg.SQLitePath,
154155
"redis_addr": a.cfg.RedisQueueAddr,
156+
"redis_key": a.cfg.RedisQueueKey,
155157
"tz": a.cfg.TZ,
156158
}).Info("cpa-usage starting")
157159

internal/config/config.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ type Config struct {
2424
StorageDriver string
2525
SQLitePath string
2626

27-
RedisQueueAddr string
28-
RedisQueueBatch int
29-
RedisIdleInterval time.Duration
30-
RedisErrorBackoff time.Duration
31-
MetadataInterval time.Duration
27+
RedisQueueAddr string
28+
RedisQueueKey string
29+
RedisQueueBatch int
30+
RedisIdleInterval time.Duration
31+
RedisErrorBackoff time.Duration
32+
MetadataInterval time.Duration
3233

3334
TZ string
3435
LogLevel string
@@ -40,10 +41,10 @@ type Config struct {
4041
LogBodyMaxBytes int64
4142
LogHeaderMaxBytes int64
4243

43-
AuthEnabled bool
44-
LoginPassword string
45-
SessionTTL time.Duration
46-
CookieName string
44+
AuthEnabled bool
45+
LoginPassword string
46+
SessionTTL time.Duration
47+
CookieName string
4748
}
4849

4950
// Load reads the configuration from environment variables.
@@ -57,6 +58,7 @@ func Load() (*Config, error) {
5758
StorageDriver: strOr("STORAGE_DRIVER", "sqlite"),
5859
SQLitePath: strOr("SQLITE_PATH", "./data/app.db"),
5960
RedisQueueAddr: strings.TrimSpace(os.Getenv("REDIS_QUEUE_ADDR")),
61+
RedisQueueKey: strOr("REDIS_QUEUE_KEY", "usage"),
6062
RedisQueueBatch: intOr("REDIS_QUEUE_BATCH_SIZE", 1000),
6163
RedisIdleInterval: durationOr("REDIS_QUEUE_IDLE_INTERVAL", time.Second),
6264
RedisErrorBackoff: durationOr("REDIS_QUEUE_ERROR_BACKOFF", 10*time.Second),

internal/cpa/redis_queue_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package cpa
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"errors"
7+
"fmt"
8+
"net"
9+
"reflect"
10+
"testing"
11+
"time"
12+
)
13+
14+
func TestNewRedisQueueDefaultsToUsageChannel(t *testing.T) {
15+
q := NewRedisQueue(RedisQueueConfig{
16+
BaseURL: "http://127.0.0.1:8317",
17+
ManagementKey: "secret",
18+
})
19+
if q.queueKey != RedisUsageQueueKey {
20+
t.Fatalf("queueKey = %q, want %q", q.queueKey, RedisUsageQueueKey)
21+
}
22+
if RedisUsageQueueKey != "usage" {
23+
t.Fatalf("RedisUsageQueueKey = %q, want usage", RedisUsageQueueKey)
24+
}
25+
}
26+
27+
func TestPopUsageSendsUsageChannelByDefault(t *testing.T) {
28+
addr, commands, stop := startRESPUsageServer(t)
29+
defer stop()
30+
31+
q := NewRedisQueue(RedisQueueConfig{
32+
OverrideAddr: addr,
33+
ManagementKey: "secret",
34+
Timeout: time.Second,
35+
BatchSize: 2,
36+
})
37+
got, err := q.PopUsage(context.Background())
38+
if err != nil {
39+
t.Fatalf("PopUsage returned error: %v", err)
40+
}
41+
if want := []string{`{"id":1}`, `{"id":2}`}; !reflect.DeepEqual(got, want) {
42+
t.Fatalf("PopUsage = %#v, want %#v", got, want)
43+
}
44+
assertCommand(t, <-commands, []string{RedisAuthCommand, "secret"})
45+
assertCommand(t, <-commands, []string{RedisLPopCommand, RedisUsageQueueKey, "2"})
46+
}
47+
48+
func startRESPUsageServer(t *testing.T) (addr string, commands <-chan []string, stop func()) {
49+
t.Helper()
50+
51+
ln, err := net.Listen("tcp", "127.0.0.1:0")
52+
if err != nil {
53+
t.Fatalf("listen: %v", err)
54+
}
55+
cmdCh := make(chan []string, 2)
56+
done := make(chan error, 1)
57+
58+
go func() {
59+
conn, err := ln.Accept()
60+
if err != nil {
61+
done <- err
62+
return
63+
}
64+
defer conn.Close()
65+
66+
reader := bufio.NewReader(conn)
67+
auth, err := readRESPValue(reader)
68+
if err != nil {
69+
done <- err
70+
return
71+
}
72+
cmdCh <- auth.strings()
73+
if _, err := fmt.Fprint(conn, "+OK\r\n"); err != nil {
74+
done <- err
75+
return
76+
}
77+
78+
pop, err := readRESPValue(reader)
79+
if err != nil {
80+
done <- err
81+
return
82+
}
83+
cmdCh <- pop.strings()
84+
if _, err := fmt.Fprint(conn, "*2\r\n$8\r\n{\"id\":1}\r\n$8\r\n{\"id\":2}\r\n"); err != nil {
85+
done <- err
86+
return
87+
}
88+
done <- nil
89+
}()
90+
91+
stop = func() {
92+
_ = ln.Close()
93+
select {
94+
case err := <-done:
95+
if err != nil && !isClosedNetErr(err) {
96+
t.Fatalf("server error: %v", err)
97+
}
98+
case <-time.After(2 * time.Second):
99+
t.Fatal("timeout waiting for RESP test server")
100+
}
101+
}
102+
return ln.Addr().String(), cmdCh, stop
103+
}
104+
105+
func assertCommand(t *testing.T, got, want []string) {
106+
t.Helper()
107+
if !reflect.DeepEqual(got, want) {
108+
t.Fatalf("command = %#v, want %#v", got, want)
109+
}
110+
}
111+
112+
func isClosedNetErr(err error) bool {
113+
if err == nil {
114+
return false
115+
}
116+
return errors.Is(err, net.ErrClosed)
117+
}

internal/cpa/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ const (
2222
RedisDefaultPort = "8317"
2323
RedisAuthCommand = "AUTH"
2424
RedisLPopCommand = "LPOP"
25-
RedisUsageQueueKey = "queue"
25+
RedisUsageQueueKey = "usage"
2626
)
2727

2828
// AuthFile mirrors a single entry from /v0/management/auth-files.

0 commit comments

Comments
 (0)