Skip to content

Commit 99d7371

Browse files
authored
[Flight] Split Streaming from Relay Implemenation (#18260)
* Add ReactFlightServerConfig intermediate This just forwards to the stream version of Flight which is itself forked between Node and W3C streams. The dom-relay goes directly to the Relay config though which allows it to avoid the stream part of Flight. * Separate streaming protocol into the Stream config * Split streaming parts into the ReactFlightServerConfigStream This decouples it so that the Relay implementation doesn't have to encode the JSON to strings. Instead it can be fed the values as JSON objects and do its own encoding. * Split FlightClient into a basic part and a stream part Same split as the server. * Expose lower level async hooks to Relay This requires an external helper file that we'll wire up internally.
1 parent 160505b commit 99d7371

26 files changed

+615
-269
lines changed

packages/react-client/flight.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
* @flow
88
*/
99

10-
export * from './src/ReactFlightClient';
10+
export * from './src/ReactFlightClientStream';

packages/react-client/src/ReactFlightClient.js

Lines changed: 28 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,17 @@
77
* @flow
88
*/
99

10-
import type {Source, StringDecoder} from './ReactFlightClientHostConfig';
11-
12-
import {
13-
supportsBinaryStreams,
14-
createStringDecoder,
15-
readPartialStringChunk,
16-
readFinalStringChunk,
17-
} from './ReactFlightClientHostConfig';
18-
1910
export type ReactModelRoot<T> = {|
2011
model: T,
2112
|};
2213

23-
type JSONValue =
14+
export type JSONValue =
2415
| number
2516
| null
2617
| boolean
2718
| string
28-
| {[key: string]: JSONValue, ...};
19+
| {[key: string]: JSONValue}
20+
| Array<JSONValue>;
2921

3022
const PENDING = 0;
3123
const RESOLVED = 1;
@@ -48,39 +40,23 @@ type ErroredChunk = {|
4840
|};
4941
type Chunk = PendingChunk | ResolvedChunk | ErroredChunk;
5042

51-
type OpaqueResponseWithoutDecoder = {
52-
source: Source,
43+
export type Response = {
5344
partialRow: string,
5445
modelRoot: ReactModelRoot<any>,
5546
chunks: Map<number, Chunk>,
56-
fromJSON: (key: string, value: JSONValue) => any,
57-
...
5847
};
5948

60-
type OpaqueResponse = OpaqueResponseWithoutDecoder & {
61-
stringDecoder: StringDecoder,
62-
...
63-
};
64-
65-
export function createResponse(source: Source): OpaqueResponse {
49+
export function createResponse(): Response {
6650
let modelRoot: ReactModelRoot<any> = ({}: any);
6751
let rootChunk: Chunk = createPendingChunk();
6852
definePendingProperty(modelRoot, 'model', rootChunk);
6953
let chunks: Map<number, Chunk> = new Map();
7054
chunks.set(0, rootChunk);
71-
72-
let response: OpaqueResponse = (({
73-
source,
55+
let response = {
7456
partialRow: '',
7557
modelRoot,
7658
chunks: chunks,
77-
fromJSON: function(key, value) {
78-
return parseFromJSON(response, this, key, value);
79-
},
80-
}: OpaqueResponseWithoutDecoder): any);
81-
if (supportsBinaryStreams) {
82-
response.stringDecoder = createStringDecoder();
83-
}
59+
};
8460
return response;
8561
}
8662

@@ -138,10 +114,7 @@ function resolveChunk(chunk: Chunk, value: mixed): void {
138114

139115
// Report that any missing chunks in the model is now going to throw this
140116
// error upon read. Also notify any pending promises.
141-
export function reportGlobalError(
142-
response: OpaqueResponse,
143-
error: Error,
144-
): void {
117+
export function reportGlobalError(response: Response, error: Error): void {
145118
response.chunks.forEach(chunk => {
146119
// If this chunk was already resolved or errored, it won't
147120
// trigger an error but if it wasn't then we need to
@@ -168,8 +141,8 @@ function definePendingProperty(
168141
});
169142
}
170143

171-
function parseFromJSON(
172-
response: OpaqueResponse,
144+
export function parseModelFromJSON(
145+
response: Response,
173146
targetObj: Object,
174147
key: string,
175148
value: JSONValue,
@@ -195,12 +168,11 @@ function parseFromJSON(
195168
return value;
196169
}
197170

198-
function resolveJSONRow(
199-
response: OpaqueResponse,
171+
export function resolveModelChunk<T>(
172+
response: Response,
200173
id: number,
201-
json: string,
174+
model: T,
202175
): void {
203-
let model = JSON.parse(json, response.fromJSON);
204176
let chunks = response.chunks;
205177
let chunk = chunks.get(id);
206178
if (!chunk) {
@@ -210,88 +182,31 @@ function resolveJSONRow(
210182
}
211183
}
212184

213-
function processFullRow(response: OpaqueResponse, row: string): void {
214-
if (row === '') {
215-
return;
216-
}
217-
let tag = row[0];
218-
switch (tag) {
219-
case 'J': {
220-
let colon = row.indexOf(':', 1);
221-
let id = parseInt(row.substring(1, colon), 16);
222-
let json = row.substring(colon + 1);
223-
resolveJSONRow(response, id, json);
224-
return;
225-
}
226-
case 'E': {
227-
let colon = row.indexOf(':', 1);
228-
let id = parseInt(row.substring(1, colon), 16);
229-
let json = row.substring(colon + 1);
230-
let errorInfo = JSON.parse(json);
231-
let error = new Error(errorInfo.message);
232-
error.stack = errorInfo.stack;
233-
let chunks = response.chunks;
234-
let chunk = chunks.get(id);
235-
if (!chunk) {
236-
chunks.set(id, createErrorChunk(error));
237-
} else {
238-
triggerErrorOnChunk(chunk, error);
239-
}
240-
return;
241-
}
242-
default: {
243-
// Assume this is the root model.
244-
resolveJSONRow(response, 0, row);
245-
return;
246-
}
247-
}
248-
}
249-
250-
export function processStringChunk(
251-
response: OpaqueResponse,
252-
chunk: string,
253-
offset: number,
254-
): void {
255-
let linebreak = chunk.indexOf('\n', offset);
256-
while (linebreak > -1) {
257-
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
258-
processFullRow(response, fullrow);
259-
response.partialRow = '';
260-
offset = linebreak + 1;
261-
linebreak = chunk.indexOf('\n', offset);
262-
}
263-
response.partialRow += chunk.substring(offset);
264-
}
265-
266-
export function processBinaryChunk(
267-
response: OpaqueResponse,
268-
chunk: Uint8Array,
185+
export function resolveErrorChunk(
186+
response: Response,
187+
id: number,
188+
message: string,
189+
stack: string,
269190
): void {
270-
if (!supportsBinaryStreams) {
271-
throw new Error("This environment don't support binary chunks.");
272-
}
273-
let stringDecoder = response.stringDecoder;
274-
let linebreak = chunk.indexOf(10); // newline
275-
while (linebreak > -1) {
276-
let fullrow =
277-
response.partialRow +
278-
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
279-
processFullRow(response, fullrow);
280-
response.partialRow = '';
281-
chunk = chunk.subarray(linebreak + 1);
282-
linebreak = chunk.indexOf(10); // newline
191+
let error = new Error(message);
192+
error.stack = stack;
193+
let chunks = response.chunks;
194+
let chunk = chunks.get(id);
195+
if (!chunk) {
196+
chunks.set(id, createErrorChunk(error));
197+
} else {
198+
triggerErrorOnChunk(chunk, error);
283199
}
284-
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
285200
}
286201

287-
export function complete(response: OpaqueResponse): void {
202+
export function close(response: Response): void {
288203
// In case there are any remaining unresolved chunks, they won't
289204
// be resolved now. So we need to issue an error to those.
290205
// Ideally we should be able to early bail out if we kept a
291206
// ref count of pending chunks.
292207
reportGlobalError(response, new Error('Connection closed.'));
293208
}
294209

295-
export function getModelRoot<T>(response: OpaqueResponse): ReactModelRoot<T> {
210+
export function getModelRoot<T>(response: Response): ReactModelRoot<T> {
296211
return response.modelRoot;
297212
}

packages/react-client/src/ReactFlightClientHostConfigBrowser.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
* @flow
88
*/
99

10-
export type Source = Promise<Response> | ReadableStream | XMLHttpRequest;
11-
1210
export type StringDecoder = TextDecoder;
1311

1412
export const supportsBinaryStreams = true;
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/**
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* This source code is licensed under the MIT license found in the
5+
* LICENSE file in the root directory of this source tree.
6+
*
7+
* @flow
8+
*/
9+
10+
import type {Response as ResponseBase, JSONValue} from './ReactFlightClient';
11+
12+
import type {StringDecoder} from './ReactFlightClientHostConfig';
13+
14+
import {
15+
createResponse as createResponseImpl,
16+
resolveModelChunk,
17+
resolveErrorChunk,
18+
parseModelFromJSON,
19+
} from './ReactFlightClient';
20+
21+
import {
22+
supportsBinaryStreams,
23+
createStringDecoder,
24+
readPartialStringChunk,
25+
readFinalStringChunk,
26+
} from './ReactFlightClientHostConfig';
27+
28+
export type ReactModelRoot<T> = {|
29+
model: T,
30+
|};
31+
32+
type Response = ResponseBase & {
33+
fromJSON: (key: string, value: JSONValue) => any,
34+
stringDecoder: StringDecoder,
35+
};
36+
37+
export function createResponse(): Response {
38+
let response: Response = (createResponseImpl(): any);
39+
response.fromJSON = function(key: string, value: JSONValue) {
40+
return parseModelFromJSON(response, this, key, value);
41+
};
42+
if (supportsBinaryStreams) {
43+
response.stringDecoder = createStringDecoder();
44+
}
45+
return response;
46+
}
47+
48+
function processFullRow(response: Response, row: string): void {
49+
if (row === '') {
50+
return;
51+
}
52+
let tag = row[0];
53+
switch (tag) {
54+
case 'J': {
55+
let colon = row.indexOf(':', 1);
56+
let id = parseInt(row.substring(1, colon), 16);
57+
let json = row.substring(colon + 1);
58+
let model = JSON.parse(json, response.fromJSON);
59+
resolveModelChunk(response, id, model);
60+
return;
61+
}
62+
case 'E': {
63+
let colon = row.indexOf(':', 1);
64+
let id = parseInt(row.substring(1, colon), 16);
65+
let json = row.substring(colon + 1);
66+
let errorInfo = JSON.parse(json);
67+
resolveErrorChunk(response, id, errorInfo.message, errorInfo.stack);
68+
return;
69+
}
70+
default: {
71+
// Assume this is the root model.
72+
let model = JSON.parse(row, response.fromJSON);
73+
resolveModelChunk(response, 0, model);
74+
return;
75+
}
76+
}
77+
}
78+
79+
export function processStringChunk(
80+
response: Response,
81+
chunk: string,
82+
offset: number,
83+
): void {
84+
let linebreak = chunk.indexOf('\n', offset);
85+
while (linebreak > -1) {
86+
let fullrow = response.partialRow + chunk.substring(offset, linebreak);
87+
processFullRow(response, fullrow);
88+
response.partialRow = '';
89+
offset = linebreak + 1;
90+
linebreak = chunk.indexOf('\n', offset);
91+
}
92+
response.partialRow += chunk.substring(offset);
93+
}
94+
95+
export function processBinaryChunk(
96+
response: Response,
97+
chunk: Uint8Array,
98+
): void {
99+
if (!supportsBinaryStreams) {
100+
throw new Error("This environment don't support binary chunks.");
101+
}
102+
let stringDecoder = response.stringDecoder;
103+
let linebreak = chunk.indexOf(10); // newline
104+
while (linebreak > -1) {
105+
let fullrow =
106+
response.partialRow +
107+
readFinalStringChunk(stringDecoder, chunk.subarray(0, linebreak));
108+
processFullRow(response, fullrow);
109+
response.partialRow = '';
110+
chunk = chunk.subarray(linebreak + 1);
111+
linebreak = chunk.indexOf(10); // newline
112+
}
113+
response.partialRow += readPartialStringChunk(stringDecoder, chunk);
114+
}
115+
116+
export {reportGlobalError, close, getModelRoot} from './ReactFlightClient';

0 commit comments

Comments
 (0)