diff --git a/.gitignore b/.gitignore index 7016f18e53..5e03a51bf3 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,6 @@ Untitled*.ipynb /.ms-playwright **/.claude/settings.local.json + +# pyenv +/.python-version diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 83a6ac13d0..7dd0c1fb8a 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -60,6 +60,7 @@ const { PromisePrototypeThen, PromiseReject, PromiseResolve, + PromiseWithResolvers, RangeError, ReflectHas, SafeFinalizationRegistry, @@ -5157,6 +5158,8 @@ class ReadableStream { /** @type {Deferred} */ [_isClosedPromise]; + [core.hostObjectBrand] = "ReadableStream"; + /** * @param {UnderlyingSource=} underlyingSource * @param {QueuingStrategy=} strategy @@ -6164,6 +6167,8 @@ class TransformStream { /** @type {WritableStream} */ [_writable]; + [core.hostObjectBrand] = "TransformStream"; + /** * @param {Transformer} transformer * @param {QueuingStrategy} writableStrategy @@ -6174,6 +6179,10 @@ class TransformStream { writableStrategy = { __proto__: null }, readableStrategy = { __proto__: null }, ) { + if (transformer === _brand) { + this[_brand] = _brand; + return; + } const prefix = "Failed to construct 'TransformStream'"; if (transformer !== undefined) { transformer = webidl.converters.object(transformer, prefix, "Argument 1"); @@ -6374,6 +6383,8 @@ class WritableStream { /** @type {Deferred[]} */ [_writeRequests]; + [core.hostObjectBrand] = "WritableStream"; + /** * @param {UnderlyingSink=} underlyingSink * @param {QueuingStrategy=} strategy @@ -6740,6 +6751,283 @@ function createProxy(stream) { return stream.pipeThrough(new TransformStream()); } +function packAndPostMessage(port, type, value) { + port.postMessage({ type, value, __proto__: null }); +} + +function crossRealmTransformSendError(port, error) { + packAndPostMessage(port, "error", error); +} + +function packAndPostMessageHandlingError(port, type, value) { + try { + packAndPostMessage(port, type, value); + } catch (e) { + crossRealmTransformSendError(port, e); + throw e; + } +} + +/** + * @param stream {ReadableStream} + * @param port {MessagePort} + */ +function setUpCrossRealmTransformReadable(stream, port) { + initializeReadableStream(stream); + const controller = new ReadableStreamDefaultController(_brand); + port.addEventListener("message", (event) => { + if (event.data.type === "chunk") { + readableStreamDefaultControllerEnqueue(controller, event.data.value); + } else if (event.data.type === "close") { + readableStreamDefaultControllerClose(controller); + port.close(); + } else if (event.data.type === "error") { + readableStreamDefaultControllerError(controller, event.data.value); + port.close(); + } + }); + port.addEventListener("messageerror", (event) => { + crossRealmTransformSendError(port, event.error); + readableStreamDefaultControllerError(controller, event.error); + port.close(); + }); + port.start(); + const startAlgorithm = () => undefined; + const pullAlgorithm = () => { + packAndPostMessage(port, "pull", undefined); + return PromiseResolve(undefined); + }; + const cancelAlgorithm = (reason) => { + try { + packAndPostMessageHandlingError(port, "error", reason); + } catch (e) { + return PromiseReject(e); + } finally { + port.close(); + } + return PromiseResolve(undefined); + }; + const sizeAlgorithm = () => 1; + setUpReadableStreamDefaultController( + stream, + controller, + startAlgorithm, + pullAlgorithm, + cancelAlgorithm, + 0, + sizeAlgorithm, + ); +} + +/** + * @param stream {WritableStream} + * @param port {MessagePort} + */ +function setUpCrossRealmTransformWritable(stream, port) { + initializeWritableStream(stream); + const controller = new WritableStreamDefaultController(_brand); + let backpressurePromise = PromiseWithResolvers(); + port.addEventListener("message", (event) => { + if (event.data.type === "pull") { + if (backpressurePromise) { + backpressurePromise.resolve(); + backpressurePromise = undefined; + } + } else if (event.data.type === "error") { + writableStreamDefaultControllerErrorIfNeeded( + controller, + event.data.value, + ); + if (backpressurePromise) { + backpressurePromise.resolve(); + backpressurePromise = undefined; + } + } + }); + port.addEventListener("messageerror", (event) => { + crossRealmTransformSendError(port, event.error); + writableStreamDefaultControllerErrorIfNeeded(controller, event.error); + port.close(); + }); + port.start(); + const startAlgorithm = () => undefined; + const writeAlgorithm = (chunk) => { + if (!backpressurePromise) { + backpressurePromise = PromiseWithResolvers(); + backpressurePromise.resolve(); + } + return PromisePrototypeThen(backpressurePromise.promise, () => { + backpressurePromise = PromiseWithResolvers(); + try { + packAndPostMessageHandlingError(port, "chunk", chunk); + } catch (e) { + port.close(); + throw e; + } + }); + }; + const closeAlgorithm = () => { + packAndPostMessage(port, "close", undefined); + port.close(); + return PromiseResolve(undefined); + }; + const abortAlgorithm = (reason) => { + try { + packAndPostMessageHandlingError(port, "error", reason); + return PromiseResolve(undefined); + } catch (error) { + return PromiseReject(error); + } finally { + port.close(); + } + }; + const sizeAlgorithm = () => 1; + setUpWritableStreamDefaultController( + stream, + controller, + startAlgorithm, + writeAlgorithm, + closeAlgorithm, + abortAlgorithm, + 1, + sizeAlgorithm, + ); +} + +/** + * @param value {ReadableStream} + * @param port {MessagePort} + */ +function readableStreamTransferSteps(value, port) { + if (isReadableStreamLocked(value)) { + throw new DOMException( + "Cannot transfer a locked ReadableStream", + "DataCloneError", + ); + } + const writable = new WritableStream(_brand); + setUpCrossRealmTransformWritable(writable, port); + const promise = readableStreamPipeTo(value, writable, false, false, false); + setPromiseIsHandledToTrue(promise); +} + +/** + * @param port {MessagePort} + * @returns {ReadableStream} + */ +function readableStreamTransferReceivingSteps(port) { + const stream = new ReadableStream(_brand); + setUpCrossRealmTransformReadable(stream, port); + return stream; +} + +/** + * @param value {WritableStream} + * @param port {MessagePort} + */ +function writableStreamTransferSteps(value, port) { + if (isWritableStreamLocked(value)) { + throw new DOMException( + "Cannot transfer a locked WritableStream", + "DataCloneError", + ); + } + const readable = new ReadableStream(_brand); + setUpCrossRealmTransformReadable(readable, port); + const promise = readableStreamPipeTo(readable, value, false, false, false); + setPromiseIsHandledToTrue(promise); +} + +/** + * @param port {MessagePort} + * @returns {WritableStream} + */ +function writableStreamTransferReceivingSteps(port) { + const stream = new WritableStream(_brand); + setUpCrossRealmTransformWritable(stream, port); + return stream; +} + +/** + * @param value {TransformStream} + * @param portR {MessagePort} + * @param portW {MessagePort} + */ +function transformStreamTransferSteps(value, portR, portW) { + if (isReadableStreamLocked(value.readable)) { + throw new DOMException( + "Cannot transfer a locked ReadableStream", + "DataCloneError", + ); + } + if (isWritableStreamLocked(value.writable)) { + throw new DOMException( + "Cannot transfer a locked WritableStream", + "DataCloneError", + ); + } + readableStreamTransferSteps(value.readable, portR); + writableStreamTransferSteps(value.writable, portW); +} + +/** + * @param portR {MessagePort} + * @param portW {MessagePort} + * @returns {TransformStream} + */ +function transformStreamTransferReceivingSteps(portR, portW) { + const stream = new TransformStream(_brand); + stream[_readable] = new ReadableStream(_brand); + setUpCrossRealmTransformReadable(stream[_readable], portR); + stream[_writable] = new WritableStream(_brand); + setUpCrossRealmTransformWritable(stream[_writable], portW); + return stream; +} + +core.registerTransferableResource( + "ReadableStream", + (value) => { + const { port1, port2 } = new MessageChannel(); + readableStreamTransferSteps(value, port1); + return core.getTransferableResource("MessagePort").send(port2); + }, + (rid) => { + const port = core.getTransferableResource("MessagePort").receive(rid); + return readableStreamTransferReceivingSteps(port); + }, +); + +core.registerTransferableResource( + "WritableStream", + (value) => { + const { port1, port2 } = new MessageChannel(); + writableStreamTransferSteps(value, port1); + return core.getTransferableResource("MessagePort").send(port2); + }, + (rid) => { + const port = core.getTransferableResource("MessagePort").receive(rid); + return writableStreamTransferReceivingSteps(port); + }, +); + +core.registerTransferableResource( + "TransformStream", + (value) => { + const { port1: portR1, port2: portR2 } = new MessageChannel(); + const { port1: portW1, port2: portW2 } = new MessageChannel(); + transformStreamTransferSteps(value, portR1, portW1); + return [ + core.getTransferableResource("MessagePort").send(portR2), + core.getTransferableResource("MessagePort").send(portW2), + ]; + }, + (rids) => { + const portR = core.getTransferableResource("MessagePort").receive(rids[0]); + const portW = core.getTransferableResource("MessagePort").receive(rids[1]); + return transformStreamTransferReceivingSteps(portR, portW); + }, +); + webidl.converters.ReadableStream = webidl .createInterfaceConverter("ReadableStream", ReadableStream.prototype); webidl.converters.WritableStream = webidl diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index 18b64a52fe..ce0dc74318 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -385,6 +385,13 @@ function deserializeJsMessageData(messageData) { ArrayPrototypePush(hostObjects, hostObj); break; } + case "multiResource": { + const { 0: type, 1: rids } = transferable.data; + const hostObj = core.getTransferableResource(type).receive(rids); + ArrayPrototypePush(transferables, hostObj); + ArrayPrototypePush(hostObjects, hostObj); + break; + } case "arrayBuffer": { ArrayPrototypePush(transferredArrayBuffers, transferable.data); const index = ArrayPrototypePush(transferables, null); @@ -460,10 +467,17 @@ function serializeJsMessageData(data, transferables) { if (transferable[core.hostObjectBrand]) { const type = transferable[core.hostObjectBrand]; const rid = core.getTransferableResource(type).send(transferable); - ArrayPrototypePush(serializedTransferables, { - kind: "resource", - data: [type, rid], - }); + if (typeof rid === "number") { + ArrayPrototypePush(serializedTransferables, { + kind: "resource", + data: [type, rid], + }); + } else { + ArrayPrototypePush(serializedTransferables, { + kind: "multiResource", + data: [type, rid], + }); + } } else if (isArrayBuffer(transferable)) { ArrayPrototypePush(serializedTransferables, { kind: "arrayBuffer", diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs index 5c0ad22e23..3b20a15a48 100644 --- a/ext/web/message_port.rs +++ b/ext/web/message_port.rs @@ -46,6 +46,7 @@ pub enum MessagePortError { pub enum Transferable { Resource(String, Box), + MultiResource(String, Vec>), ArrayBuffer(u32), } @@ -180,6 +181,7 @@ pub fn op_message_port_create_entangled( pub enum JsTransferable { ArrayBuffer(u32), Resource(String, ResourceId), + MultiResource(String, Vec), } pub fn deserialize_js_transferables( @@ -197,6 +199,18 @@ pub fn deserialize_js_transferables( let tx = resource.transfer().map_err(MessagePortError::Generic)?; transferables.push(Transferable::Resource(name, tx)); } + JsTransferable::MultiResource(name, rids) => { + let mut txs = Vec::with_capacity(rids.len()); + for rid in rids { + let resource = state + .resource_table + .take_any(rid) + .map_err(|_| MessagePortError::InvalidTransfer)?; + let tx = resource.transfer().map_err(MessagePortError::Generic)?; + txs.push(tx); + } + transferables.push(Transferable::MultiResource(name, txs)); + } JsTransferable::ArrayBuffer(id) => { transferables.push(Transferable::ArrayBuffer(id)); } @@ -217,6 +231,13 @@ pub fn serialize_transferables( let rid = state.resource_table.add_rc_dyn(rx); js_transferables.push(JsTransferable::Resource(name, rid)); } + Transferable::MultiResource(name, txs) => { + let rids = txs + .into_iter() + .map(|tx| state.resource_table.add_rc_dyn(tx.receive())) + .collect(); + js_transferables.push(JsTransferable::MultiResource(name, rids)); + } Transferable::ArrayBuffer(id) => { js_transferables.push(JsTransferable::ArrayBuffer(id)); } diff --git a/tests/wpt/runner/expectation.json b/tests/wpt/runner/expectation.json index 932c788dd7..91a24d7d67 100644 --- a/tests/wpt/runner/expectation.json +++ b/tests/wpt/runner/expectation.json @@ -5371,17 +5371,27 @@ "queuing-strategies-size-function-per-global.window.html": false, "transferable": { "deserialize-error.window.html": false, - "transfer-with-messageport.window.html": false, - "readable-stream.html": false, - "reason.html": false, + "transfer-with-messageport.window.html": true, + "readable-stream.html": [ + "cancel should be propagated to the original", + "cancel should abort a pending read()", + "transferring a non-serializable chunk should error both sides" + ], + "reason.html": [ + "DOMException errors should be preserved" + ], "service-worker.https.html": false, "shared-worker.html": false, "transform-stream-members.any.html": true, "transform-stream-members.any.worker.html": true, - "transform-stream.html": false, - "window.html": false, - "worker.html": false, - "writable-stream.html": false + "transform-stream.html": true, + "window.html": [ + "transfer to and from an iframe should work" + ], + "worker.html": true, + "writable-stream.html": [ + "writing a unclonable object should error the stream" + ] } }, "user-timing": { @@ -13093,7 +13103,6 @@ "An object whose interface is deleted from the global must still deserialize", "A subclass instance will deserialize as its closest serializable superclass", "Growable SharedArrayBuffer", - "A subclass instance will be received as its closest transferable superclass", "Transferring OOB TypedArray throws" ], "structured-clone.any.worker.html": [ @@ -13121,7 +13130,6 @@ "An object whose interface is deleted from the global must still deserialize", "A subclass instance will deserialize as its closest serializable superclass", "Growable SharedArrayBuffer", - "A subclass instance will be received as its closest transferable superclass", "Transferring OOB TypedArray throws" ], "structured-clone-cross-realm-method.html": false diff --git a/tests/wpt/runner/runner.ts b/tests/wpt/runner/runner.ts index b7e56f7205..dc84ffca2e 100644 --- a/tests/wpt/runner/runner.ts +++ b/tests/wpt/runner/runner.ts @@ -189,6 +189,35 @@ export async function runSingleTest( } } +function getShim(test: string): string { + const shim = []; + + shim.push("globalThis.window = globalThis;"); + + if (test.includes("streams/transferable")) { + shim.push(` + { + const { port1, port2 } = new MessageChannel(); + port2.addEventListener('message', (e) => { + queueMicrotask(() => { + globalThis.dispatchEvent(e); + }); + }); + port2.start(); + globalThis.postMessage = (message, targetOriginOrOptions, transfer) => { + let options = targetOriginOrOptions; + if (transfer || typeof targetOriginOrOptions === 'string') { + options = { transfer }; + } + return port1.postMessage(message, options); + }; + } + `); + } + + return shim.join("\n"); +} + async function generateBundle(location: URL): Promise { const res = await fetch(location); const body = await res.text(); @@ -206,6 +235,7 @@ async function generateBundle(location: URL): Promise { `globalThis.META_TITLE=${JSON.stringify(title)}`, ]); } + const shim = getShim(location.pathname); for (const script of scripts) { const src = script.getAttribute("src"); if (src === "/resources/testharnessreport.js") { @@ -213,20 +243,20 @@ async function generateBundle(location: URL): Promise { join(ROOT_PATH, "./tests/wpt/runner/testharnessreport.js"), ); const contents = await Deno.readTextFile(url); - scriptContents.push([url.href, "globalThis.window = globalThis;"]); + scriptContents.push([url.href, shim]); scriptContents.push([url.href, contents]); } else if (src) { const url = new URL(src, location); const res = await fetch(url); if (res.ok) { const contents = await res.text(); - scriptContents.push([url.href, "globalThis.window = globalThis;"]); + scriptContents.push([url.href, shim]); scriptContents.push([url.href, contents]); } } else { const url = new URL(`#${inlineScriptCount}`, location); inlineScriptCount++; - scriptContents.push([url.href, "globalThis.window = globalThis;"]); + scriptContents.push([url.href, shim]); scriptContents.push([url.href, script.textContent]); } }