Skip to content

Commit 2507a9b

Browse files
authored
In-memory fallback queue and no manual reconnecting of Kafka in Usage service (#6531)
1 parent 457fc3d commit 2507a9b

File tree

7 files changed

+228
-127
lines changed

7 files changed

+228
-127
lines changed

deployment/services/usage.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,13 @@ export function deployUsage({
4545
image,
4646
imagePullSecret: docker.secret,
4747
replicas,
48-
readinessProbe: '/_readiness',
48+
readinessProbe: {
49+
initialDelaySeconds: 10,
50+
periodSeconds: 5,
51+
failureThreshold: 2,
52+
timeoutSeconds: 5,
53+
endpoint: '/_readiness',
54+
},
4955
livenessProbe: '/_health',
5056
startupProbe: '/_health',
5157
availabilityOnEveryNode: true,

packages/services/usage/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"graphql": "16.9.0",
2323
"kafkajs": "2.2.4",
2424
"lru-cache": "11.0.2",
25+
"p-limit": "6.2.0",
2526
"pino-pretty": "11.3.0",
2627
"zod": "3.24.1"
2728
}

packages/services/usage/src/buffer.ts

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
import { randomUUID } from 'node:crypto';
22
import type { ServiceLogger } from '@hive/service-common';
33

4-
export class BufferTooBigError extends Error {
4+
class BufferTooBigError extends Error {
55
constructor(public bytes: number) {
66
super(`Buffer too big: ${bytes}`);
77
}
88
}
99

10-
export function isBufferTooBigError(error: unknown): error is BufferTooBigError {
11-
return error instanceof BufferTooBigError;
12-
}
13-
1410
/**
1511
* @param totalLength Number of all items in a list
1612
* @param numOfChunks How many chunks to split the list into
@@ -142,6 +138,7 @@ export function createKVBuffer<T>(config: {
142138
const { logger } = config;
143139
let buffer: T[] = [];
144140
let timeoutId: ReturnType<typeof setTimeout> | null = null;
141+
145142
const estimator = createEstimator({
146143
logger,
147144
defaultBytesPerUnit: config.limitInBytes / config.size,
@@ -172,11 +169,10 @@ export function createKVBuffer<T>(config: {
172169
reports: readonly T[],
173170
size: number,
174171
batchId: string,
175-
isRetry = false,
172+
isChunkedBuffer = false,
176173
) {
177174
logger.info(`Flushing (reports=%s, bufferSize=%s, id=%s)`, reports.length, size, batchId);
178175
const estimatedSizeInBytes = estimator.estimate(size);
179-
buffer = [];
180176
await config
181177
.sender(reports, estimatedSizeInBytes, batchId, function validateSize(bytes) {
182178
if (!config.useEstimator) {
@@ -203,7 +199,11 @@ export function createKVBuffer<T>(config: {
203199
}
204200
})
205201
.catch(error => {
206-
if (!isRetry && isBufferTooBigError(error)) {
202+
if (isChunkedBuffer) {
203+
return Promise.reject(error);
204+
}
205+
206+
if (error instanceof BufferTooBigError) {
207207
config.onRetry(reports);
208208
logger.info(`Retrying (reports=%s, bufferSize=%s, id=%s)`, reports.length, size, batchId);
209209

@@ -241,17 +241,16 @@ export function createKVBuffer<T>(config: {
241241
});
242242
}
243243

244-
async function send(shouldSchedule = true): Promise<void> {
245-
if (timeoutId !== null) {
246-
clearTimeout(timeoutId);
247-
}
244+
async function send(options: { scheduleNextSend: boolean }): Promise<void> {
245+
const { scheduleNextSend } = options;
248246

249247
if (buffer.length !== 0) {
250248
const reports = buffer.slice();
251249
const size = calculateBufferSize(reports);
252250
const batchId = randomUUID();
253251

254252
try {
253+
buffer = [];
255254
await flushBuffer(reports, size, batchId);
256255
} catch (error) {
257256
logger.error(error);
@@ -262,13 +261,24 @@ export function createKVBuffer<T>(config: {
262261
}
263262
}
264263

265-
if (shouldSchedule) {
264+
if (scheduleNextSend) {
266265
schedule();
267266
}
268267
}
269268

270269
function schedule() {
271-
timeoutId = setTimeout(() => send(true), config.interval);
270+
if (timeoutId !== null) {
271+
clearTimeout(timeoutId);
272+
timeoutId = null;
273+
}
274+
275+
timeoutId = setTimeout(
276+
() =>
277+
void send({
278+
scheduleNextSend: true,
279+
}),
280+
config.interval,
281+
);
272282
}
273283

274284
function add(report: T) {
@@ -278,7 +288,9 @@ export function createKVBuffer<T>(config: {
278288
const estimatedBufferSize = currentBufferSize + estimatedReportSize;
279289

280290
if (currentBufferSize >= config.limitInBytes || estimatedBufferSize >= config.limitInBytes) {
281-
void send(true);
291+
void send({
292+
scheduleNextSend: true,
293+
});
282294
}
283295

284296
if (estimatedReportSize > config.limitInBytes) {
@@ -293,7 +305,9 @@ export function createKVBuffer<T>(config: {
293305
} else {
294306
buffer.push(report);
295307
if (sumOfOperationsSizeInBuffer() >= config.size) {
296-
void send(true);
308+
void send({
309+
scheduleNextSend: true,
310+
});
297311
}
298312
}
299313
}
@@ -309,7 +323,9 @@ export function createKVBuffer<T>(config: {
309323
if (timeoutId) {
310324
clearTimeout(timeoutId);
311325
}
312-
await send(false);
326+
await send({
327+
scheduleNextSend: false,
328+
});
313329
},
314330
};
315331
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import pLimit from 'p-limit';
2+
import type { ServiceLogger } from '@hive/service-common';
3+
4+
// Average message size is ~800kb
5+
// 1000 messages = 800mb
6+
const MAX_QUEUE_SIZE = 1000;
7+
8+
export function createFallbackQueue(config: {
9+
send: (msgValue: Buffer<ArrayBufferLike>, numOfOperations: number) => Promise<void>;
10+
logger: ServiceLogger;
11+
}) {
12+
const queue: [Buffer<ArrayBufferLike>, number][] = [];
13+
14+
async function flushSingle() {
15+
const msg = queue.shift();
16+
if (!msg) {
17+
return;
18+
}
19+
20+
try {
21+
const [msgValue, numOfOperations] = msg;
22+
await config.send(msgValue, numOfOperations);
23+
} catch (error) {
24+
if (error instanceof Error && 'type' in error && error.type === 'MESSAGE_TOO_LARGE') {
25+
config.logger.error('Message too large, dropping message');
26+
return;
27+
}
28+
29+
config.logger.error(
30+
{
31+
error: error instanceof Error ? error.message : String(error),
32+
},
33+
'Failed to flush message, adding back to fallback queue',
34+
);
35+
queue.push(msg);
36+
}
37+
}
38+
39+
let timeoutId: ReturnType<typeof setTimeout> | null = null;
40+
41+
function schedule() {
42+
if (timeoutId !== null) {
43+
clearTimeout(timeoutId);
44+
}
45+
46+
timeoutId = setTimeout(async () => {
47+
await flushSingle();
48+
schedule();
49+
}, 200);
50+
}
51+
52+
return {
53+
start() {
54+
schedule();
55+
},
56+
stop() {
57+
if (timeoutId !== null) {
58+
clearTimeout(timeoutId);
59+
}
60+
61+
const limit = pLimit(10);
62+
return Promise.allSettled(
63+
queue.map(msgValue =>
64+
limit(() =>
65+
config.send(msgValue[0], msgValue[1]).catch(error => {
66+
config.logger.error(
67+
{
68+
error: error instanceof Error ? error.message : String(error),
69+
},
70+
'Failed to flush message before stopping',
71+
);
72+
}),
73+
),
74+
),
75+
);
76+
},
77+
add(msgValue: Buffer<ArrayBufferLike>, numOfOperations: number) {
78+
if (queue.length >= MAX_QUEUE_SIZE) {
79+
config.logger.error('Queue is full, dropping oldest message');
80+
queue.shift();
81+
}
82+
83+
queue.push([msgValue, numOfOperations]);
84+
},
85+
size() {
86+
return queue.length;
87+
},
88+
};
89+
}

packages/services/usage/src/index.ts

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,14 @@ async function main() {
182182
if (!token) {
183183
httpRequestsWithoutToken.inc();
184184
activeSpan?.recordException('Missing token in request');
185-
void res.status(401).send('Missing token');
185+
await res.status(401).send('Missing token');
186186
return;
187187
}
188188

189189
if (token.length !== 32) {
190190
activeSpan?.recordException('Invalid token');
191191
httpRequestsWithoutToken.inc();
192-
void res.status(401).send('Invalid token');
192+
await res.status(401).send('Invalid token');
193193
return;
194194
}
195195

@@ -205,7 +205,7 @@ async function main() {
205205
httpRequestsWithNonExistingToken.inc();
206206
req.log.info('Token not found (token=%s)', maskedToken);
207207
activeSpan?.recordException('Token not found');
208-
void res.status(401).send('Missing token');
208+
await res.status(401).send('Missing token');
209209
return;
210210
}
211211

@@ -217,7 +217,7 @@ async function main() {
217217
httpRequestsWithNoAccess.inc();
218218
req.log.info('No access (token=%s)', maskedToken);
219219
activeSpan?.recordException('No access');
220-
void res.status(403).send('No access');
220+
await res.status(403).send('No access');
221221
return;
222222
}
223223

@@ -259,13 +259,13 @@ async function main() {
259259
droppedReports
260260
.labels({ targetId: tokenInfo.target, orgId: tokenInfo.organization })
261261
.inc();
262-
authenticatedRequestLogger.info(
262+
authenticatedRequestLogger.debug(
263263
'Rate limited',
264264
maskedToken,
265265
tokenInfo.target,
266266
tokenInfo.organization,
267267
);
268-
void res.status(429).send();
268+
await res.status(429).send();
269269

270270
return;
271271
}
@@ -297,7 +297,7 @@ async function main() {
297297
// 503 - Service Unavailable
298298
// The server is currently unable to handle the request due being not ready.
299299
// This tells the gateway to retry the request and not to drop it.
300-
void res.status(503).send();
300+
await res.status(503).send();
301301
return;
302302
}
303303

@@ -311,11 +311,14 @@ async function main() {
311311
stopTimer({
312312
status: 'success',
313313
});
314-
void res.status(200).send({
314+
await res.status(200).send({
315315
id: result.report.id,
316316
operations: result.operations,
317317
});
318-
} else if (apiVersion === '2') {
318+
return;
319+
}
320+
321+
if (apiVersion === '2') {
319322
activeSpan?.addEvent('using v2');
320323
const result = measureParsing(
321324
() => usageProcessorV2(server.log, req.body, tokenInfo, retentionInfo),
@@ -336,7 +339,7 @@ async function main() {
336339
activeSpan?.recordException(error.path + ': ' + error.message),
337340
);
338341

339-
void res.status(400).send({
342+
await res.status(400).send({
340343
errors: result.errors,
341344
});
342345

@@ -347,18 +350,20 @@ async function main() {
347350
stopTimer({
348351
status: 'success',
349352
});
350-
void res.status(200).send({
353+
await res.status(200).send({
351354
id: result.report.id,
352355
operations: result.operations,
353356
});
354-
} else {
355-
authenticatedRequestLogger.debug("Invalid 'x-api-version' header value.");
356-
stopTimer({
357-
status: 'error',
358-
});
359-
activeSpan?.recordException("Invalid 'x-api-version' header value.");
360-
void res.status(401).send("Invalid 'x-api-version' header value.");
357+
return;
361358
}
359+
360+
authenticatedRequestLogger.debug("Invalid 'x-api-version' header value.");
361+
stopTimer({
362+
status: 'error',
363+
});
364+
activeSpan?.recordException("Invalid 'x-api-version' header value.");
365+
await res.status(401).send("Invalid 'x-api-version' header value.");
366+
return;
362367
} catch (error) {
363368
stopTimer({
364369
status: 'error',
@@ -369,26 +374,26 @@ async function main() {
369374
level: 'error',
370375
});
371376
activeSpan?.recordException(error as Error);
372-
void res.status(500).send();
377+
await res.status(500).send();
373378
}
374379
}),
375380
});
376381

377382
server.route({
378383
method: ['GET', 'HEAD'],
379384
url: '/_health',
380-
handler(_, res) {
381-
void res.status(200).send();
385+
async handler(_, res) {
386+
await res.status(200).send();
382387
},
383388
});
384389

385390
server.route({
386391
method: ['GET', 'HEAD'],
387392
url: '/_readiness',
388-
handler(_, res) {
393+
async handler(_, res) {
389394
const isReady = readiness();
390395
reportReadiness(isReady);
391-
void res.status(isReady ? 200 : 400).send();
396+
await res.status(isReady ? 200 : 400).send();
392397
},
393398
});
394399

0 commit comments

Comments
 (0)