Skip to content

Commit d941ec5

Browse files
authored
Add Redis 8.2 New Stream Commands (#3029)
* chore: update Redis version from 8.2-RC1-pre to 8.2-rc1 * feat: implement XDELEX command for Redis 8.2 * feat: implement XACKDEL command for Redis 8.2 * refactor: create shared stream deletion types for Redis 8.2 commands * feat: add Redis 8.2 deletion policies to XTRIM command * feat: add Redis 8.2 deletion policies to XADD commands * fix: correct XDELEX command method name and test parameter
1 parent ff8319d commit d941ec5

File tree

19 files changed

+746
-22
lines changed

19 files changed

+746
-22
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
fail-fast: false
2323
matrix:
2424
node-version: ["18", "20", "22"]
25-
redis-version: ["rs-7.2.0-v13", "rs-7.4.0-v1", "8.0.2", "8.2-M01-pre"]
25+
redis-version: ["rs-7.4.0-v1", "8.0.2", "8.2-rc1"]
2626
steps:
2727
- uses: actions/checkout@v4
2828
with:

packages/bloom/lib/test-utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import RedisBloomModules from '.';
44
export default TestUtils.createFromConfig({
55
dockerImageName: 'redislabs/client-libs-test',
66
dockerImageVersionArgument: 'redis-version',
7-
defaultDockerVersion: '8.2-M01-pre'
7+
defaultDockerVersion: '8.2-rc1'
88
});
99

1010
export const GLOBAL = {
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import { strict as assert } from "node:assert";
2+
import XACKDEL from "./XACKDEL";
3+
import { parseArgs } from "./generic-transformers";
4+
import testUtils, { GLOBAL } from "../test-utils";
5+
import {
6+
STREAM_DELETION_POLICY,
7+
STREAM_DELETION_REPLY_CODES,
8+
} from "./common-stream.types";
9+
10+
describe("XACKDEL", () => {
11+
describe("transformArguments", () => {
12+
it("string - without policy", () => {
13+
assert.deepEqual(parseArgs(XACKDEL, "key", "group", "0-0"), [
14+
"XACKDEL",
15+
"key",
16+
"group",
17+
"IDS",
18+
"1",
19+
"0-0",
20+
]);
21+
});
22+
23+
it("string - with policy", () => {
24+
assert.deepEqual(
25+
parseArgs(
26+
XACKDEL,
27+
"key",
28+
"group",
29+
"0-0",
30+
STREAM_DELETION_POLICY.KEEPREF
31+
),
32+
["XACKDEL", "key", "group", "KEEPREF", "IDS", "1", "0-0"]
33+
);
34+
});
35+
36+
it("array - without policy", () => {
37+
assert.deepEqual(parseArgs(XACKDEL, "key", "group", ["0-0", "1-0"]), [
38+
"XACKDEL",
39+
"key",
40+
"group",
41+
"IDS",
42+
"2",
43+
"0-0",
44+
"1-0",
45+
]);
46+
});
47+
48+
it("array - with policy", () => {
49+
assert.deepEqual(
50+
parseArgs(
51+
XACKDEL,
52+
"key",
53+
"group",
54+
["0-0", "1-0"],
55+
STREAM_DELETION_POLICY.DELREF
56+
),
57+
["XACKDEL", "key", "group", "DELREF", "IDS", "2", "0-0", "1-0"]
58+
);
59+
});
60+
});
61+
62+
testUtils.testAll(
63+
`XACKDEL non-existing key - without policy`,
64+
async (client) => {
65+
const reply = await client.xAckDel("{tag}stream-key", "testgroup", "0-0");
66+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.NOT_FOUND]);
67+
},
68+
{
69+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
70+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
71+
}
72+
);
73+
74+
testUtils.testAll(
75+
`XACKDEL existing key - without policy`,
76+
async (client) => {
77+
const streamKey = "{tag}stream-key";
78+
const groupName = "testgroup";
79+
80+
// create consumer group, stream and message
81+
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
82+
const messageId = await client.xAdd(streamKey, "*", { field: "value" });
83+
84+
// read message
85+
await client.xReadGroup(groupName, "testconsumer", {
86+
key: streamKey,
87+
id: ">",
88+
});
89+
90+
const reply = await client.xAckDel(streamKey, groupName, messageId);
91+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
92+
},
93+
{
94+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
95+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
96+
}
97+
);
98+
99+
testUtils.testAll(
100+
`XACKDEL existing key - with policy`,
101+
async (client) => {
102+
const streamKey = "{tag}stream-key";
103+
const groupName = "testgroup";
104+
105+
// create consumer group, stream and message
106+
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
107+
const messageId = await client.xAdd(streamKey, "*", { field: "value" });
108+
109+
// read message
110+
await client.xReadGroup(groupName, "testconsumer", {
111+
key: streamKey,
112+
id: ">",
113+
});
114+
115+
const reply = await client.xAckDel(
116+
streamKey,
117+
groupName,
118+
messageId,
119+
STREAM_DELETION_POLICY.DELREF
120+
);
121+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DELETED]);
122+
},
123+
{
124+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
125+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
126+
}
127+
);
128+
129+
testUtils.testAll(
130+
`XACKDEL acknowledge policy - with consumer group`,
131+
async (client) => {
132+
const streamKey = "{tag}stream-key";
133+
const groupName = "testgroup";
134+
135+
// create consumer groups, stream and message
136+
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
137+
await client.xGroupCreate(streamKey, "some-other-group", "0");
138+
const messageId = await client.xAdd(streamKey, "*", { field: "value" });
139+
140+
// read message
141+
await client.xReadGroup(groupName, "testconsumer", {
142+
key: streamKey,
143+
id: ">",
144+
});
145+
146+
const reply = await client.xAckDel(
147+
streamKey,
148+
groupName,
149+
messageId,
150+
STREAM_DELETION_POLICY.ACKED
151+
);
152+
assert.deepEqual(reply, [STREAM_DELETION_REPLY_CODES.DANGLING_REFS]);
153+
},
154+
{
155+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
156+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
157+
}
158+
);
159+
160+
testUtils.testAll(
161+
`XACKDEL multiple keys`,
162+
async (client) => {
163+
const streamKey = "{tag}stream-key";
164+
const groupName = "testgroup";
165+
166+
// create consumer groups, stream and add messages
167+
await client.xGroupCreate(streamKey, groupName, "0", { MKSTREAM: true });
168+
const messageIds = await Promise.all([
169+
client.xAdd(streamKey, "*", { field: "value1" }),
170+
client.xAdd(streamKey, "*", { field: "value2" }),
171+
]);
172+
173+
// read messages
174+
await client.xReadGroup(groupName, "testconsumer", {
175+
key: streamKey,
176+
id: ">",
177+
});
178+
179+
const reply = await client.xAckDel(
180+
streamKey,
181+
groupName,
182+
[...messageIds, "0-0"],
183+
STREAM_DELETION_POLICY.DELREF
184+
);
185+
assert.deepEqual(reply, [
186+
STREAM_DELETION_REPLY_CODES.DELETED,
187+
STREAM_DELETION_REPLY_CODES.DELETED,
188+
STREAM_DELETION_REPLY_CODES.NOT_FOUND,
189+
]);
190+
},
191+
{
192+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
193+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
194+
}
195+
);
196+
});
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { CommandParser } from "../client/parser";
2+
import { RedisArgument, ArrayReply, Command } from "../RESP/types";
3+
import {
4+
StreamDeletionReplyCode,
5+
StreamDeletionPolicy,
6+
} from "./common-stream.types";
7+
import { RedisVariadicArgument } from "./generic-transformers";
8+
9+
/**
10+
* Acknowledges and deletes one or multiple messages for a stream consumer group
11+
*/
12+
export default {
13+
IS_READ_ONLY: false,
14+
/**
15+
* Constructs the XACKDEL command to acknowledge and delete one or multiple messages for a stream consumer group
16+
*
17+
* @param parser - The command parser
18+
* @param key - The stream key
19+
* @param group - The consumer group name
20+
* @param id - One or more message IDs to acknowledge and delete
21+
* @param policy - Policy to apply when deleting entries (optional, defaults to KEEPREF)
22+
* @returns Array of integers: -1 (not found), 1 (acknowledged and deleted), 2 (acknowledged with dangling refs)
23+
* @see https://redis.io/commands/xackdel/
24+
*/
25+
parseCommand(
26+
parser: CommandParser,
27+
key: RedisArgument,
28+
group: RedisArgument,
29+
id: RedisVariadicArgument,
30+
policy?: StreamDeletionPolicy
31+
) {
32+
parser.push("XACKDEL");
33+
parser.pushKey(key);
34+
parser.push(group);
35+
36+
if (policy) {
37+
parser.push(policy);
38+
}
39+
40+
parser.push("IDS");
41+
parser.pushVariadicWithLength(id);
42+
},
43+
transformReply:
44+
undefined as unknown as () => ArrayReply<StreamDeletionReplyCode>,
45+
} as const satisfies Command;

packages/client/lib/commands/XADD.spec.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert';
22
import testUtils, { GLOBAL } from '../test-utils';
33
import XADD from './XADD';
44
import { parseArgs } from './generic-transformers';
5+
import { STREAM_DELETION_POLICY } from './common-stream.types';
56

67
describe('XADD', () => {
78
describe('transformArguments', () => {
@@ -78,6 +79,37 @@ describe('XADD', () => {
7879
['XADD', 'key', '1000', 'LIMIT', '1', '*', 'field', 'value']
7980
);
8081
});
82+
83+
it('with TRIM.policy', () => {
84+
assert.deepEqual(
85+
parseArgs(XADD, 'key', '*', {
86+
field: 'value'
87+
}, {
88+
TRIM: {
89+
threshold: 1000,
90+
policy: STREAM_DELETION_POLICY.DELREF
91+
}
92+
}),
93+
['XADD', 'key', '1000', 'DELREF', '*', 'field', 'value']
94+
);
95+
});
96+
97+
it('with all TRIM options', () => {
98+
assert.deepEqual(
99+
parseArgs(XADD, 'key', '*', {
100+
field: 'value'
101+
}, {
102+
TRIM: {
103+
strategy: 'MAXLEN',
104+
strategyModifier: '~',
105+
threshold: 1000,
106+
limit: 100,
107+
policy: STREAM_DELETION_POLICY.ACKED
108+
}
109+
}),
110+
['XADD', 'key', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value']
111+
);
112+
});
81113
});
82114

83115
testUtils.testAll('xAdd', async client => {
@@ -91,4 +123,52 @@ describe('XADD', () => {
91123
client: GLOBAL.SERVERS.OPEN,
92124
cluster: GLOBAL.CLUSTERS.OPEN
93125
});
126+
127+
testUtils.testAll(
128+
'xAdd with TRIM policy',
129+
async (client) => {
130+
assert.equal(
131+
typeof await client.xAdd('{tag}key', '*',
132+
{ field: 'value' },
133+
{
134+
TRIM: {
135+
strategy: 'MAXLEN',
136+
threshold: 1000,
137+
policy: STREAM_DELETION_POLICY.KEEPREF
138+
}
139+
}
140+
),
141+
'string'
142+
);
143+
},
144+
{
145+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
146+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
147+
}
148+
);
149+
150+
testUtils.testAll(
151+
'xAdd with all TRIM options',
152+
async (client) => {
153+
assert.equal(
154+
typeof await client.xAdd('{tag}key2', '*',
155+
{ field: 'value' },
156+
{
157+
TRIM: {
158+
strategy: 'MAXLEN',
159+
strategyModifier: '~',
160+
threshold: 1000,
161+
limit: 10,
162+
policy: STREAM_DELETION_POLICY.DELREF
163+
}
164+
}
165+
),
166+
'string'
167+
);
168+
},
169+
{
170+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
171+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
172+
}
173+
);
94174
});

packages/client/lib/commands/XADD.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { CommandParser } from '../client/parser';
22
import { RedisArgument, BlobStringReply, Command } from '../RESP/types';
3+
import { StreamDeletionPolicy } from './common-stream.types';
34
import { Tail } from './generic-transformers';
45

56
/**
@@ -10,13 +11,16 @@ import { Tail } from './generic-transformers';
1011
* @property TRIM.strategyModifier - Exact ('=') or approximate ('~') trimming
1112
* @property TRIM.threshold - Maximum stream length or minimum ID to retain
1213
* @property TRIM.limit - Maximum number of entries to trim in one call
14+
* @property TRIM.policy - Policy to apply when trimming entries (optional, defaults to KEEPREF)
1315
*/
1416
export interface XAddOptions {
1517
TRIM?: {
1618
strategy?: 'MAXLEN' | 'MINID';
1719
strategyModifier?: '=' | '~';
1820
threshold: number;
1921
limit?: number;
22+
/** added in 8.2 */
23+
policy?: StreamDeletionPolicy;
2024
};
2125
}
2226

@@ -58,6 +62,10 @@ export function parseXAddArguments(
5862
if (options.TRIM.limit) {
5963
parser.push('LIMIT', options.TRIM.limit.toString());
6064
}
65+
66+
if (options.TRIM.policy) {
67+
parser.push(options.TRIM.policy);
68+
}
6169
}
6270

6371
parser.push(id);

0 commit comments

Comments
 (0)