Skip to content

Commit 5649bff

Browse files
committed
Refactor batching logic
1 parent 4212c9e commit 5649bff

File tree

2 files changed

+85
-69
lines changed

2 files changed

+85
-69
lines changed

src/__tests__/dataloader.test.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,24 @@ describe('Primary API', () => {
106106
expect(loadCalls).toEqual([ [ 1 ] ]);
107107
});
108108

109+
it('coalesces identical requests across sized batches', async () => {
110+
const [ identityLoader, loadCalls ] = idLoader<number>({ maxBatchSize: 2 });
111+
112+
const promise1a = identityLoader.load(1);
113+
const promise2 = identityLoader.load(2);
114+
const promise1b = identityLoader.load(1);
115+
const promise3 = identityLoader.load(3);
116+
117+
const [ value1a, value2, value1b, value3 ] =
118+
await Promise.all([ promise1a, promise2, promise1b, promise3 ]);
119+
expect(value1a).toBe(1);
120+
expect(value2).toBe(2);
121+
expect(value1b).toBe(1);
122+
expect(value3).toBe(3);
123+
124+
expect(loadCalls).toEqual([ [ 1, 2 ], [ 3 ] ]);
125+
});
126+
109127
it('caches repeated requests', async () => {
110128
const [ identityLoader, loadCalls ] = idLoader<string>();
111129

src/index.js

Lines changed: 67 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ class DataLoader<K, V, C = K> {
5454
this._batchLoadFn = batchLoadFn;
5555
this._options = options;
5656
this._promiseCache = getValidCacheMap(options);
57-
this._queue = [];
57+
this._batch = null;
5858
}
5959

6060
// Private
6161
_batchLoadFn: BatchLoadFn<K, V>;
6262
_options: ?Options<K, V, C>;
6363
_promiseCache: ?CacheMap<C, Promise<V>>;
64-
_queue: LoaderQueue<K, V>;
64+
_batch: Batch<K, V> | null;
6565

6666
/**
6767
* Loads a key, returning a `Promise` for the value represented by that key.
@@ -76,7 +76,7 @@ class DataLoader<K, V, C = K> {
7676

7777
// Determine options
7878
var options = this._options;
79-
var shouldBatch = !options || options.batch !== false;
79+
var batch = getCurrentBatch(this);
8080
var cache = this._promiseCache;
8181
var cacheKey = getCacheKey(options, key);
8282

@@ -88,23 +88,11 @@ class DataLoader<K, V, C = K> {
8888
}
8989
}
9090

91-
// Otherwise, produce a new Promise for this value.
91+
// Otherwise, produce a new Promise for this key, and enqueue it to be
92+
// dispatched along with the current batch.
93+
batch.keys.push(key);
9294
var promise = new Promise((resolve, reject) => {
93-
// Enqueue this Promise to be dispatched.
94-
this._queue.push({ key, resolve, reject });
95-
96-
// Determine if a dispatch of this queue should be scheduled.
97-
// A single dispatch should be scheduled per queue at the time when the
98-
// queue changes from "empty" to "full".
99-
if (this._queue.length === 1) {
100-
if (shouldBatch) {
101-
// If batching, schedule a task to dispatch the queue.
102-
enqueuePostPromiseJob(() => dispatchQueue(this));
103-
} else {
104-
// Otherwise dispatch the (queue of one) immediately.
105-
dispatchQueue(this);
106-
}
107-
}
95+
batch.callbacks.push({ resolve, reject });
10896
});
10997

11098
// If caching, cache this promise.
@@ -234,43 +222,61 @@ var enqueuePostPromiseJob =
234222
// Private: cached resolved Promise instance
235223
var resolvedPromise;
236224

237-
// Private: given the current state of a Loader instance, perform a batch load
238-
// from its current queue.
239-
function dispatchQueue<K, V>(loader: DataLoader<K, V, any>) {
240-
// Take the current loader queue, replacing it with an empty queue.
241-
var queue = loader._queue;
242-
loader._queue = [];
243-
244-
// If a maxBatchSize was provided and the queue is longer, then segment the
245-
// queue into multiple batches, otherwise treat the queue as a single batch.
246-
var maxBatchSize = loader._options && loader._options.maxBatchSize;
247-
if (maxBatchSize && maxBatchSize > 0 && maxBatchSize < queue.length) {
248-
for (var i = 0; i < queue.length / maxBatchSize; i++) {
249-
dispatchQueueBatch(
250-
loader,
251-
queue.slice(i * maxBatchSize, (i + 1) * maxBatchSize)
252-
);
253-
}
254-
} else {
255-
dispatchQueueBatch(loader, queue);
225+
// Private: Describes a batch of requests
226+
type Batch<K, V> = {
227+
hasDispatched: boolean,
228+
keys: Array<K>,
229+
callbacks: Array<{
230+
resolve: (value: V) => void;
231+
reject: (error: Error) => void;
232+
}>
233+
}
234+
235+
// Private: Either returns the current batch, or creates and schedules a
236+
// dispatch of a new batch for the given loader.
237+
function getCurrentBatch<K, V>(loader: DataLoader<K, V, any>): Batch<K, V> {
238+
var options = loader._options;
239+
var maxBatchSize =
240+
(options && options.maxBatchSize) ||
241+
(options && options.batch === false ? 1 : 0);
242+
243+
// If there is an existing batch which has not yet dispatched and is within
244+
// the limit of the batch size, then return it.
245+
var existingBatch = loader._batch;
246+
if (
247+
existingBatch !== null &&
248+
!existingBatch.hasDispatched &&
249+
(maxBatchSize === 0 || existingBatch.keys.length < maxBatchSize)
250+
) {
251+
return existingBatch;
256252
}
253+
254+
// Otherwise, create a new batch for this loader.
255+
var newBatch = { hasDispatched: false, keys: [], callbacks: [] };
256+
257+
// Store it on the loader so it may be reused.
258+
loader._batch = newBatch;
259+
260+
// Then schedule a task to dispatch this batch of requests.
261+
enqueuePostPromiseJob(() => dispatchBatch(loader, newBatch));
262+
263+
return newBatch;
257264
}
258265

259-
function dispatchQueueBatch<K, V>(
266+
function dispatchBatch<K, V>(
260267
loader: DataLoader<K, V, any>,
261-
queue: LoaderQueue<K, V>
268+
batch: Batch<K, V>
262269
) {
263-
// Collect all keys to be loaded in this dispatch
264-
var keys = queue.map(({ key }) => key);
270+
// Mark this batch as having been dispatched.
271+
batch.hasDispatched = true;
265272

266-
// Call the provided batchLoadFn for this loader with the loader queue's keys.
267-
var batchLoadFn = loader._batchLoadFn;
268-
// Call with the loader as the `this` context.
269-
var batchPromise = batchLoadFn.call(loader, keys);
273+
// Call the provided batchLoadFn for this loader with the batch's keys and
274+
// with the loader as the `this` context.
275+
var batchPromise = loader._batchLoadFn(batch.keys);
270276

271277
// Assert the expected response from batchLoadFn
272278
if (!batchPromise || typeof batchPromise.then !== 'function') {
273-
return failedDispatch(loader, queue, new TypeError(
279+
return failedDispatch(loader, batch, new TypeError(
274280
'DataLoader must be constructed with a function which accepts ' +
275281
'Array<key> and returns Promise<Array<value>>, but the function did ' +
276282
`not return a Promise: ${String(batchPromise)}.`
@@ -288,41 +294,40 @@ function dispatchQueueBatch<K, V>(
288294
`not return a Promise of an Array: ${String(values)}.`
289295
);
290296
}
291-
if (values.length !== keys.length) {
297+
if (values.length !== batch.keys.length) {
292298
throw new TypeError(
293299
'DataLoader must be constructed with a function which accepts ' +
294300
'Array<key> and returns Promise<Array<value>>, but the function did ' +
295301
'not return a Promise of an Array of the same length as the Array ' +
296302
'of keys.' +
297-
`\n\nKeys:\n${String(keys)}` +
303+
`\n\nKeys:\n${String(batch.keys)}` +
298304
`\n\nValues:\n${String(values)}`
299305
);
300306
}
301307

302-
// Step through the values, resolving or rejecting each Promise in the
303-
// loaded queue.
304-
queue.forEach(({ resolve, reject }, index) => {
305-
var value = values[index];
308+
// Step through values, resolving or rejecting each Promise in the batch.
309+
for (var i = 0; i < batch.callbacks.length; i++) {
310+
var value = values[i];
306311
if (value instanceof Error) {
307-
reject(value);
312+
batch.callbacks[i].reject(value);
308313
} else {
309-
resolve(value);
314+
batch.callbacks[i].resolve(value);
310315
}
311-
});
312-
}).catch(error => failedDispatch(loader, queue, error));
316+
}
317+
}).catch(error => failedDispatch(loader, batch, error));
313318
}
314319

315320
// Private: do not cache individual loads if the entire batch dispatch fails,
316321
// but still reject each request so they do not hang.
317322
function failedDispatch<K, V>(
318323
loader: DataLoader<K, V, any>,
319-
queue: LoaderQueue<K, V>,
324+
batch: Batch<K, V>,
320325
error: Error
321326
) {
322-
queue.forEach(({ key, reject }) => {
323-
loader.clear(key);
324-
reject(error);
325-
});
327+
for (var i = 0; i < batch.keys.length; i++) {
328+
loader.clear(batch.keys[i]);
329+
batch.callbacks[i].reject(error);
330+
}
326331
}
327332

328333
// Private: produce a cache key for a given key (and options)
@@ -357,13 +362,6 @@ function getValidCacheMap<K, V, C>(
357362
return cacheMap;
358363
}
359364

360-
// Private
361-
type LoaderQueue<K, V> = Array<{
362-
key: K;
363-
resolve: (value: V) => void;
364-
reject: (error: Error) => void;
365-
}>;
366-
367365
// Private
368366
function isArrayLike(x: mixed): boolean {
369367
return (

0 commit comments

Comments
 (0)