Skip to content

Commit ef0e24c

Browse files
authored
Gracefully handle event stream reading errors and cancellations (#1121)
1 parent eace48b commit ef0e24c

File tree

3 files changed

+126
-2
lines changed

3 files changed

+126
-2
lines changed

.changeset/twenty-beans-grab.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@graphql-tools/executor-http': patch
3+
---
4+
5+
Gracefully handle event stream reading errors and cancellations

packages/executors/http/src/handleEventStreamResponse.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
import { ExecutionResult, inspect } from '@graphql-tools/utils';
1+
import {
2+
createGraphQLError,
3+
ExecutionResult,
4+
inspect,
5+
} from '@graphql-tools/utils';
26
import { Repeater } from '@repeaterjs/repeater';
37
import { TextDecoder } from '@whatwg-node/fetch';
48
import { createResultForAbort } from './utils';
@@ -48,7 +52,24 @@ export function handleEventStreamResponse(
4852
if (!body?.locked) {
4953
return stop();
5054
}
51-
const { done, value: chunk } = await reader.read();
55+
let done: boolean, chunk: Uint8Array<ArrayBufferLike> | undefined;
56+
try {
57+
const result = await reader.read();
58+
done = result.done;
59+
chunk = result.value;
60+
} catch (err) {
61+
if (signal?.aborted) {
62+
await push(createResultForAbort(signal.reason));
63+
return stop();
64+
}
65+
const errErr = err instanceof Error ? err : new Error(String(err));
66+
await push({
67+
errors: [
68+
createGraphQLError(errErr.message, { originalError: errErr }),
69+
],
70+
});
71+
return stop();
72+
}
5273
if (done) {
5374
return stop();
5475
}

packages/executors/http/tests/handleEventStreamResponse.test.ts

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,102 @@ describe('handleEventStreamResponse', () => {
119119
value: undefined,
120120
});
121121
});
122+
123+
it.skipIf(
124+
// we skip bun because we cant cancel the stream while reading it (it's locked)
125+
// however, the same test from nodejs applies in bun
126+
globalThis.Bun,
127+
)('should gracefully report stream cancel with aborted signal', async () => {
128+
const ctrl = new AbortController();
129+
const readableStream = new ReadableStream<Uint8Array>({
130+
start() {
131+
// dont enqueue anything, to hang on iterator.next()
132+
},
133+
});
134+
135+
const response = new Response(readableStream);
136+
const asyncIterable = handleEventStreamResponse(
137+
response,
138+
undefined,
139+
ctrl.signal,
140+
);
141+
const iterator = asyncIterable[Symbol.asyncIterator]();
142+
143+
Promise.resolve().then(() => {
144+
ctrl.abort(); // we abort
145+
readableStream.cancel(); // then cancel
146+
// so that the error reported is the abort error
147+
});
148+
149+
await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
150+
{
151+
"done": false,
152+
"value": {
153+
"errors": [
154+
[GraphQLError: This operation was aborted],
155+
],
156+
},
157+
}
158+
`);
159+
160+
await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
161+
{
162+
"done": true,
163+
"value": undefined,
164+
}
165+
`);
166+
167+
await expect(iterator.return()).resolves.toMatchInlineSnapshot(`
168+
{
169+
"done": true,
170+
"value": undefined,
171+
}
172+
`);
173+
});
174+
175+
it.skipIf(
176+
// we skip bun because we cant cancel the stream while reading it (it's locked)
177+
// however, the same test from nodejs applies in bun
178+
globalThis.Bun,
179+
)('should gracefully report stream errors', async () => {
180+
const readableStream = new ReadableStream<Uint8Array>({
181+
start() {
182+
// dont enqueue anything, to hang on iterator.next()
183+
},
184+
});
185+
186+
const response = new Response(readableStream);
187+
const asyncIterable = handleEventStreamResponse(response);
188+
const iterator = asyncIterable[Symbol.asyncIterator]();
189+
190+
const originalError = new Error('Oops!');
191+
Promise.resolve().then(() => {
192+
readableStream.cancel(originalError); // this will throw in reader.read()
193+
});
194+
195+
const { value, done } = await iterator.next();
196+
expect(done).toBeFalsy();
197+
expect(value).toMatchInlineSnapshot(`
198+
{
199+
"errors": [
200+
[GraphQLError: Oops!],
201+
],
202+
}
203+
`);
204+
expect(value.errors[0].originalError).toBe(originalError);
205+
206+
await expect(iterator.next()).resolves.toMatchInlineSnapshot(`
207+
{
208+
"done": true,
209+
"value": undefined,
210+
}
211+
`);
212+
213+
await expect(iterator.return()).resolves.toMatchInlineSnapshot(`
214+
{
215+
"done": true,
216+
"value": undefined,
217+
}
218+
`);
219+
});
122220
});

0 commit comments

Comments
 (0)