Skip to content

Commit 51176de

Browse files
committed
feat: add Redis 8.2 deletion policies to XADD commands
1 parent b54588b commit 51176de

File tree

3 files changed

+165
-11
lines changed

3 files changed

+165
-11
lines changed

packages/client/lib/commands/XADD.spec.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert';
22
import testUtils, { GLOBAL } from '../test-utils';
33
import XADD from './XADD';
44
import { parseArgs } from './generic-transformers';
5+
import { STREAM_DELETION_POLICY } from './common-stream.types';
56

67
describe('XADD', () => {
78
describe('transformArguments', () => {
@@ -78,6 +79,37 @@ describe('XADD', () => {
7879
['XADD', 'key', '1000', 'LIMIT', '1', '*', 'field', 'value']
7980
);
8081
});
82+
83+
it('with TRIM.policy', () => {
84+
assert.deepEqual(
85+
parseArgs(XADD, 'key', '*', {
86+
field: 'value'
87+
}, {
88+
TRIM: {
89+
threshold: 1000,
90+
policy: STREAM_DELETION_POLICY.DELREF
91+
}
92+
}),
93+
['XADD', 'key', '1000', 'DELREF', '*', 'field', 'value']
94+
);
95+
});
96+
97+
it('with all TRIM options', () => {
98+
assert.deepEqual(
99+
parseArgs(XADD, 'key', '*', {
100+
field: 'value'
101+
}, {
102+
TRIM: {
103+
strategy: 'MAXLEN',
104+
strategyModifier: '~',
105+
threshold: 1000,
106+
limit: 100,
107+
policy: STREAM_DELETION_POLICY.ACKED
108+
}
109+
}),
110+
['XADD', 'key', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value']
111+
);
112+
});
81113
});
82114

83115
testUtils.testAll('xAdd', async client => {
@@ -91,4 +123,52 @@ describe('XADD', () => {
91123
client: GLOBAL.SERVERS.OPEN,
92124
cluster: GLOBAL.CLUSTERS.OPEN
93125
});
126+
127+
testUtils.testAll(
128+
'xAdd with TRIM policy',
129+
async (client) => {
130+
assert.equal(
131+
typeof await client.xAdd('{tag}key', '*',
132+
{ field: 'value' },
133+
{
134+
TRIM: {
135+
strategy: 'MAXLEN',
136+
threshold: 1000,
137+
policy: STREAM_DELETION_POLICY.KEEPREF
138+
}
139+
}
140+
),
141+
'string'
142+
);
143+
},
144+
{
145+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
146+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
147+
}
148+
);
149+
150+
testUtils.testAll(
151+
'xAdd with all TRIM options',
152+
async (client) => {
153+
assert.equal(
154+
typeof await client.xAdd('{tag}key2', '*',
155+
{ field: 'value' },
156+
{
157+
TRIM: {
158+
strategy: 'MAXLEN',
159+
strategyModifier: '~',
160+
threshold: 1000,
161+
limit: 10,
162+
policy: STREAM_DELETION_POLICY.DELREF
163+
}
164+
}
165+
),
166+
'string'
167+
);
168+
},
169+
{
170+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
171+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
172+
}
173+
);
94174
});

packages/client/lib/commands/XADD.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { CommandParser } from '../client/parser';
22
import { RedisArgument, BlobStringReply, Command } from '../RESP/types';
3+
import { StreamDeletionPolicy } from './common-stream.types';
34
import { Tail } from './generic-transformers';
45

56
/**
@@ -10,13 +11,16 @@ import { Tail } from './generic-transformers';
1011
* @property TRIM.strategyModifier - Exact ('=') or approximate ('~') trimming
1112
* @property TRIM.threshold - Maximum stream length or minimum ID to retain
1213
* @property TRIM.limit - Maximum number of entries to trim in one call
14+
* @property TRIM.policy - Policy to apply when trimming entries (optional, defaults to KEEPREF)
1315
*/
1416
export interface XAddOptions {
1517
TRIM?: {
1618
strategy?: 'MAXLEN' | 'MINID';
1719
strategyModifier?: '=' | '~';
1820
threshold: number;
1921
limit?: number;
22+
/** added in 8.2 */
23+
policy?: StreamDeletionPolicy;
2024
};
2125
}
2226

@@ -58,6 +62,10 @@ export function parseXAddArguments(
5862
if (options.TRIM.limit) {
5963
parser.push('LIMIT', options.TRIM.limit.toString());
6064
}
65+
66+
if (options.TRIM.policy) {
67+
parser.push(options.TRIM.policy);
68+
}
6169
}
6270

6371
parser.push(id);

packages/client/lib/commands/XADD_NOMKSTREAM.spec.ts

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { strict as assert } from 'node:assert';
22
import testUtils, { GLOBAL } from '../test-utils';
33
import XADD_NOMKSTREAM from './XADD_NOMKSTREAM';
44
import { parseArgs } from './generic-transformers';
5+
import { STREAM_DELETION_POLICY } from './common-stream.types';
56

67
describe('XADD NOMKSTREAM', () => {
78
testUtils.isVersionGreaterThanHook([6, 2]);
@@ -80,17 +81,82 @@ describe('XADD NOMKSTREAM', () => {
8081
['XADD', 'key', 'NOMKSTREAM', '1000', 'LIMIT', '1', '*', 'field', 'value']
8182
);
8283
});
83-
});
8484

85-
testUtils.testAll('xAddNoMkStream', async client => {
86-
assert.equal(
87-
await client.xAddNoMkStream('key', '*', {
88-
field: 'value'
89-
}),
90-
null
91-
);
92-
}, {
93-
client: GLOBAL.SERVERS.OPEN,
94-
cluster: GLOBAL.CLUSTERS.OPEN
85+
it('with TRIM.policy', () => {
86+
assert.deepEqual(
87+
parseArgs(XADD_NOMKSTREAM, 'key', '*', {
88+
field: 'value'
89+
}, {
90+
TRIM: {
91+
threshold: 1000,
92+
policy: STREAM_DELETION_POLICY.DELREF
93+
}
94+
}),
95+
['XADD', 'key', 'NOMKSTREAM', '1000', 'DELREF', '*', 'field', 'value']
96+
);
97+
});
98+
99+
it('with all TRIM options', () => {
100+
assert.deepEqual(
101+
parseArgs(XADD_NOMKSTREAM, 'key', '*', {
102+
field: 'value'
103+
}, {
104+
TRIM: {
105+
strategy: 'MAXLEN',
106+
strategyModifier: '~',
107+
threshold: 1000,
108+
limit: 100,
109+
policy: STREAM_DELETION_POLICY.ACKED
110+
}
111+
}),
112+
['XADD', 'key', 'NOMKSTREAM', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value']
113+
);
114+
});
95115
});
116+
117+
testUtils.testAll(
118+
'xAddNoMkStream - null when stream does not exist',
119+
async (client) => {
120+
assert.equal(
121+
await client.xAddNoMkStream('{tag}nonexistent-stream', '*', {
122+
field: 'value'
123+
}),
124+
null
125+
);
126+
},
127+
{
128+
client: GLOBAL.SERVERS.OPEN,
129+
cluster: GLOBAL.CLUSTERS.OPEN,
130+
}
131+
);
132+
133+
testUtils.testAll(
134+
'xAddNoMkStream - with all TRIM options',
135+
async (client) => {
136+
const streamKey = '{tag}stream';
137+
138+
// Create stream and add some messages
139+
await client.xAdd(streamKey, '*', { field: 'value1' });
140+
141+
// Use NOMKSTREAM with all TRIM options
142+
const messageId = await client.xAddNoMkStream(streamKey, '*',
143+
{ field: 'value2' },
144+
{
145+
TRIM: {
146+
strategyModifier: '~',
147+
limit: 1,
148+
strategy: 'MAXLEN',
149+
threshold: 2,
150+
policy: STREAM_DELETION_POLICY.DELREF
151+
}
152+
}
153+
);
154+
155+
assert.equal(typeof messageId, 'string');
156+
},
157+
{
158+
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 2] },
159+
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
160+
}
161+
);
96162
});

0 commit comments

Comments
 (0)