Skip to content

Commit 701453e

Browse files
committed
Add rate-limit visibility feature
1 parent e48716f commit 701453e

File tree

3 files changed

+1235
-4
lines changed

3 files changed

+1235
-4
lines changed

readme.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,36 @@ await queue.onPendingZero();
241241
// All running tasks have finished, though the queue may still have items
242242
```
243243

244+
#### .onRateLimit()
245+
246+
Returns a promise that settles when the queue becomes rate-limited due to `intervalCap`. If the queue is already rate-limited, the promise resolves immediately.
247+
248+
Useful for implementing backpressure to prevent memory issues when producers are faster than consumers.
249+
250+
```js
251+
const queue = new PQueue({intervalCap: 5, interval: 1000});
252+
253+
// Add many tasks
254+
for (let index = 0; index < 10; index++) {
255+
queue.add(() => someTask());
256+
}
257+
258+
await queue.onRateLimit();
259+
console.log('Queue is now rate-limited - time for maintenance tasks');
260+
```
261+
262+
#### .onRateLimitCleared()
263+
264+
Returns a promise that settles when the queue is no longer rate-limited. If the queue is not currently rate-limited, the promise resolves immediately.
265+
266+
```js
267+
const queue = new PQueue({intervalCap: 5, interval: 1000});
268+
269+
// Wait for rate limiting to be cleared
270+
await queue.onRateLimitCleared();
271+
console.log('Rate limit cleared - can add more tasks');
272+
```
273+
244274
#### .onSizeLessThan(limit)
245275

246276
Returns a promise that settles when the queue size is less than the given limit: `queue.size < limit`.
@@ -329,6 +359,10 @@ Number of running items (no longer in the queue).
329359

330360
Whether the queue is currently paused.
331361

362+
#### .isRateLimited
363+
364+
Whether the queue is currently rate-limited due to `intervalCap`. Returns `true` when the number of tasks executed in the current interval has reached the `intervalCap` and there are still tasks waiting to be processed.
365+
332366
## Events
333367

334368
#### active
@@ -465,6 +499,56 @@ await queue.add(() => delay(600));
465499
//=> 'Task is completed. Size: 0 Pending: 0'
466500
```
467501

502+
#### rateLimit
503+
504+
Emitted when the queue becomes rate-limited due to `intervalCap`. This happens when the maximum number of tasks allowed per interval has been reached.
505+
506+
Useful for implementing backpressure to prevent memory issues when producers are faster than consumers.
507+
508+
```js
509+
import delay from 'delay';
510+
import PQueue from 'p-queue';
511+
512+
const queue = new PQueue({
513+
intervalCap: 2,
514+
interval: 1000
515+
});
516+
517+
queue.on('rateLimit', () => {
518+
console.log('Queue is rate-limited - processing backlog or maintenance tasks');
519+
});
520+
521+
// Add 3 tasks - third one triggers rate limiting
522+
queue.add(() => delay(100));
523+
queue.add(() => delay(100));
524+
queue.add(() => delay(100));
525+
```
526+
527+
#### rateLimitCleared
528+
529+
Emitted when the queue is no longer rate-limited—either because the interval reset and new tasks can start, or because the backlog was drained.
530+
531+
```js
532+
import delay from 'delay';
533+
import PQueue from 'p-queue';
534+
535+
const queue = new PQueue({
536+
intervalCap: 1,
537+
interval: 1000
538+
});
539+
540+
queue.on('rateLimit', () => {
541+
console.log('Rate limited - waiting for interval to reset');
542+
});
543+
544+
queue.on('rateLimitCleared', () => {
545+
console.log('Rate limit cleared - can process more tasks');
546+
});
547+
548+
queue.add(() => delay(100));
549+
queue.add(() => delay(100)); // This triggers rate limiting
550+
```
551+
468552
## Advanced example
469553

470554
A more advanced example to help you understand the flow.

source/index.ts

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ type Task<TaskResultType> =
88
| ((options: TaskOptions) => PromiseLike<TaskResultType>)
99
| ((options: TaskOptions) => TaskResultType);
1010

11-
type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'pendingZero';
11+
type EventName = 'active' | 'idle' | 'empty' | 'add' | 'next' | 'completed' | 'error' | 'pendingZero' | 'rateLimit' | 'rateLimitCleared';
1212

1313
/**
1414
Promise queue with concurrency control.
@@ -22,6 +22,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
2222

2323
readonly #intervalCap: number;
2424

25+
#rateLimitedInInterval = false;
26+
#rateLimitFlushScheduled = false;
27+
2528
readonly #interval: number;
2629

2730
#intervalEnd = 0;
@@ -84,8 +87,15 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
8487
this.#queue = new options.queueClass!();
8588
this.#queueClass = options.queueClass!;
8689
this.concurrency = options.concurrency!;
90+
91+
if (options.timeout !== undefined && !(Number.isFinite(options.timeout) && options.timeout > 0)) {
92+
throw new TypeError(`Expected \`timeout\` to be a positive finite number, got \`${options.timeout}\` (${typeof options.timeout})`);
93+
}
94+
8795
this.timeout = options.timeout;
8896
this.#isPaused = options.autoStart === false;
97+
98+
this.#setupRateLimitTracking();
8999
}
90100

91101
get #doesIntervalAllowAnother(): boolean {
@@ -108,7 +118,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
108118
}
109119

110120
#onResumeInterval(): void {
111-
this.#onInterval();
121+
this.#onInterval(); // Already schedules update
112122
this.#initializeIntervalIfNeeded();
113123
this.#timeoutId = undefined;
114124
}
@@ -183,6 +193,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
183193
return false;
184194
}
185195

196+
let taskStarted = false;
197+
186198
if (!this.#isPaused) {
187199
const canInitializeInterval = !this.#isIntervalPaused;
188200
if (this.#doesIntervalAllowAnother && this.#doesConcurrentAllowAnother) {
@@ -191,6 +203,7 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
191203
// Increment interval count immediately to prevent race conditions
192204
if (!this.#isIntervalIgnored) {
193205
this.#intervalCount++;
206+
this.#scheduleRateLimitUpdate();
194207
}
195208

196209
this.emit('active');
@@ -201,11 +214,11 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
201214
this.#initializeIntervalIfNeeded();
202215
}
203216

204-
return true;
217+
taskStarted = true;
205218
}
206219
}
207220

208-
return false;
221+
return taskStarted;
209222
}
210223

211224
#initializeIntervalIfNeeded(): void {
@@ -229,7 +242,9 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
229242
}
230243

231244
this.#intervalCount = this.#carryoverConcurrencyCount ? this.#pending : 0;
245+
232246
this.#processQueue();
247+
this.#scheduleRateLimitUpdate();
233248
}
234249

235250
/**
@@ -299,6 +314,10 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
299314
Here, the promise function with `id: '🦀'` executes last.
300315
*/
301316
setPriority(id: string, priority: number) {
317+
if (typeof priority !== 'number' || !Number.isFinite(priority)) {
318+
throw new TypeError(`Expected \`priority\` to be a finite number, got \`${priority}\` (${typeof priority})`);
319+
}
320+
302321
this.#queue.setPriority(id, priority);
303322
}
304323

@@ -405,6 +424,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
405424
*/
406425
clear(): void {
407426
this.#queue = new this.#queueClass();
427+
// Force synchronous update since clear() should have immediate effect
428+
this.#updateRateLimitState();
408429
}
409430

410431
/**
@@ -464,6 +485,28 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
464485
await this.#onEvent('pendingZero');
465486
}
466487

488+
/**
489+
@returns A promise that settles when the queue becomes rate-limited due to intervalCap.
490+
*/
491+
async onRateLimit(): Promise<void> {
492+
if (this.isRateLimited) {
493+
return;
494+
}
495+
496+
await this.#onEvent('rateLimit');
497+
}
498+
499+
/**
500+
@returns A promise that settles when the queue is no longer rate-limited.
501+
*/
502+
async onRateLimitCleared(): Promise<void> {
503+
if (!this.isRateLimited) {
504+
return;
505+
}
506+
507+
await this.#onEvent('rateLimitCleared');
508+
}
509+
467510
async #onEvent(event: EventName, filter?: () => boolean): Promise<void> {
468511
return new Promise(resolve => {
469512
const listener = () => {
@@ -509,6 +552,57 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
509552
get isPaused(): boolean {
510553
return this.#isPaused;
511554
}
555+
556+
#setupRateLimitTracking(): void {
557+
// Only schedule updates when rate limiting is enabled
558+
if (this.#isIntervalIgnored) {
559+
return;
560+
}
561+
562+
// Wire up to lifecycle events that affect rate limit state
563+
// Only 'add' and 'next' can actually change rate limit state
564+
this.on('add', () => {
565+
if (this.#queue.size > 0) {
566+
this.#scheduleRateLimitUpdate();
567+
}
568+
});
569+
570+
this.on('next', () => {
571+
this.#scheduleRateLimitUpdate();
572+
});
573+
}
574+
575+
#scheduleRateLimitUpdate(): void {
576+
// Skip if rate limiting is not enabled or already scheduled
577+
if (this.#isIntervalIgnored || this.#rateLimitFlushScheduled) {
578+
return;
579+
}
580+
581+
this.#rateLimitFlushScheduled = true;
582+
queueMicrotask(() => {
583+
this.#rateLimitFlushScheduled = false;
584+
this.#updateRateLimitState();
585+
});
586+
}
587+
588+
#updateRateLimitState(): void {
589+
const previous = this.#rateLimitedInInterval;
590+
const shouldBeRateLimited = !this.#isIntervalIgnored
591+
&& this.#intervalCount >= this.#intervalCap
592+
&& this.#queue.size > 0;
593+
594+
if (shouldBeRateLimited !== previous) {
595+
this.#rateLimitedInInterval = shouldBeRateLimited;
596+
this.emit(shouldBeRateLimited ? 'rateLimit' : 'rateLimitCleared');
597+
}
598+
}
599+
600+
/**
601+
Whether the queue is currently rate-limited due to intervalCap.
602+
*/
603+
get isRateLimited(): boolean {
604+
return this.#rateLimitedInInterval;
605+
}
512606
}
513607

514608
export type {Queue} from './queue.js';

0 commit comments

Comments
 (0)