Skip to content

Commit 9046cc6

Browse files
feat(runtime): WHATWG Web Streams cluster in the JSC realm
Adds the web-streams surface to Home's own JSC realm (pure JS), authored + adversarially verified via a multi-agent workflow, integrated and shipped in one build: - web_globals: ReadableStream (getReader/read/releaseLock/cancel/closed, [Symbol.asyncIterator], pipeTo, pipeThrough, tee, static from) + default reader/controller, WritableStream (getWriter/write/close/abort/ready/ desiredSize), TransformStream (readable/writable, transform/flush/start), CountQueuingStrategy, ByteLengthQueuingStrategy. - node_modules: node:"stream/web" (re-exports the realm globals) and node:"stream/consumers" (text/json/arrayBuffer/buffer/bytes/blob — drains a web ReadableStream or node Readable). - bun_global: Bun.readableStreamTo{Text,JSON,ArrayBuffer,Bytes,Blob,Array}. 18 focused tests (enqueue/read, for-await, from, tee, pipeThrough, consumers text/json/buffer, Bun drains). Full home_rt gate: 1549/1549. (TextEncoderStream/TextDecoderStream are a follow-up.) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 10b178a commit 9046cc6

3 files changed

Lines changed: 1183 additions & 0 deletions

File tree

packages/runtime/src/jsc/bun_global.zig

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,48 @@ const install_glue =
308308
\\ Object.defineProperty(B, "main", { get: function() { var a = (typeof globalThis.process !== "undefined" && globalThis.process.argv) || []; return a[1] || ""; }, configurable: true });
309309
\\ })();
310310
\\ delete globalThis.__home_bun_hash;
311+
\\ (function() {
312+
\\ var B = globalThis.Bun;
313+
\\ function drainStream(stream) {
314+
\\ if (!stream || typeof stream.getReader !== "function") return Promise.reject(new TypeError("Expected a ReadableStream"));
315+
\\ var reader = stream.getReader();
316+
\\ var chunks = [];
317+
\\ function step() {
318+
\\ return reader.read().then(function(r) {
319+
\\ if (r.done) return chunks;
320+
\\ if (r.value !== undefined) chunks.push(r.value);
321+
\\ return step();
322+
\\ });
323+
\\ }
324+
\\ return step();
325+
\\ }
326+
\\ function chunkToBytes(chunk) {
327+
\\ if (typeof chunk === "string") return new TextEncoder().encode(chunk);
328+
\\ if (chunk instanceof Uint8Array) return chunk;
329+
\\ if (chunk instanceof ArrayBuffer) return new Uint8Array(chunk.slice(0));
330+
\\ if (ArrayBuffer.isView(chunk)) return new Uint8Array(chunk.buffer.slice(chunk.byteOffset, chunk.byteOffset + chunk.byteLength));
331+
\\ return new TextEncoder().encode(String(chunk));
332+
\\ }
333+
\\ function concatChunkBytes(chunks) {
334+
\\ var parts = [], total = 0, i;
335+
\\ for (i = 0; i < chunks.length; i++) { var b = chunkToBytes(chunks[i]); parts.push(b); total += b.length; }
336+
\\ var out = new Uint8Array(total), off = 0;
337+
\\ for (i = 0; i < parts.length; i++) { out.set(parts[i], off); off += parts[i].length; }
338+
\\ return out;
339+
\\ }
340+
\\ function chunksToText(chunks) {
341+
\\ var allStrings = true;
342+
\\ for (var i = 0; i < chunks.length; i++) { if (typeof chunks[i] !== "string") { allStrings = false; break; } }
343+
\\ if (allStrings) return chunks.join("");
344+
\\ return new TextDecoder().decode(concatChunkBytes(chunks));
345+
\\ }
346+
\\ B.readableStreamToArray = function(stream) { return drainStream(stream); };
347+
\\ B.readableStreamToText = function(stream) { return drainStream(stream).then(chunksToText); };
348+
\\ B.readableStreamToJSON = function(stream) { return drainStream(stream).then(function(chunks) { return JSON.parse(chunksToText(chunks)); }); };
349+
\\ B.readableStreamToBytes = function(stream) { return drainStream(stream).then(concatChunkBytes); };
350+
\\ B.readableStreamToArrayBuffer = function(stream) { return drainStream(stream).then(function(chunks) { var b = concatChunkBytes(chunks); return b.buffer.slice(b.byteOffset, b.byteOffset + b.byteLength); }); };
351+
\\ B.readableStreamToBlob = function(stream) { return drainStream(stream).then(function(chunks) { var Blob = globalThis.Blob; if (typeof Blob !== "function") throw new TypeError("Blob is not defined in this realm"); return new Blob(chunks); }); };
352+
\\ })();
311353
\\ delete globalThis.__home_bun_write_file;
312354
\\})();
313355
;
@@ -492,3 +534,29 @@ test "Bun.file.exists reflects presence; missing file text() rejects" {
492534
"home:bun-exists-setup", 1, null);
493535
try std.testing.expect(try evalBool(std.testing.allocator, ctx, "globalThis.__e === 'false:rejected'"));
494536
}
537+
538+
test "Bun.readableStreamToText/Array drain a web ReadableStream" {
539+
if (!build_options.enable_jsc) return error.SkipZigTest;
540+
const Engine = @import("engine.zig").Engine;
541+
var engine = try Engine.init(std.testing.allocator);
542+
defer engine.deinit();
543+
const ctx = engine.currentContext();
544+
installRealm(std.testing.allocator, ctx, engine.currentGlobalObject());
545+
546+
// A minimal pull-based ReadableStream over ["foo", "bar"]; the consumers
547+
// only touch getReader()/read(), and reads resolve via already-settled
548+
// promises so the whole chain drains within this single evaluateUtf8.
549+
_ = try evaluate.evaluateUtf8(std.testing.allocator, ctx,
550+
"globalThis.__rsText = null; globalThis.__rsArr = null;" ++
551+
"function makeStream(items) {" ++
552+
" var i = 0;" ++
553+
" return { getReader: function() { return {" ++
554+
" read: function() { return Promise.resolve(i < items.length ? { value: items[i++], done: false } : { value: undefined, done: true }); }" ++
555+
" }; } };" ++
556+
"}" ++
557+
"Bun.readableStreamToText(makeStream(['foo', 'bar'])).then(function(t) { globalThis.__rsText = t; });" ++
558+
"Bun.readableStreamToArray(makeStream(['foo', 'bar'])).then(function(a) { globalThis.__rsArr = JSON.stringify(a); });",
559+
"home:bun-rs-consumers-setup", 1, null);
560+
try std.testing.expect(try evalBool(std.testing.allocator, ctx,
561+
"globalThis.__rsText === 'foobar' && globalThis.__rsArr === '[\"foo\",\"bar\"]'"));
562+
}

0 commit comments

Comments
 (0)