feat(runtime): web streams in fs & net APIs (#13615)

This commit adds `readable` and `writable` properties to `Deno.File` and
`Deno.Conn`. This makes it very simple to use files and network sockets
with fetch or the native HTTP server.
This commit is contained in:
Luca Casonato 2022-02-15 13:35:22 +01:00 committed by GitHub
parent 7b893bd57f
commit bdc8006a36
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 337 additions and 84 deletions

View file

@ -4,6 +4,7 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype } = core;
const { ReadableStream, WritableStream } = window.__bootstrap.streams;
const {
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
@ -59,10 +60,75 @@
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
const DEFAULT_CHUNK_SIZE = 16_640;
function tryClose(rid) {
try {
core.close(rid);
} catch {
// Ignore errors
}
}
function readableStreamForRid(rid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
const v = controller.byobRequest.view;
try {
const bytesRead = await read(rid, v);
if (bytesRead === null) {
tryClose(rid);
controller.close();
controller.byobRequest.respond(0);
} else {
controller.byobRequest.respond(bytesRead);
}
} catch (e) {
controller.error(e);
tryClose(rid);
}
},
cancel() {
tryClose(rid);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
function writableStreamForRid(rid) {
return new WritableStream({
async write(chunk, controller) {
try {
let nwritten = 0;
while (nwritten < chunk.length) {
nwritten += await write(
rid,
TypedArrayPrototypeSubarray(chunk, nwritten),
);
}
} catch (e) {
controller.error(e);
tryClose(rid);
}
},
close() {
tryClose(rid);
},
abort() {
tryClose(rid);
},
});
}
class Conn {
#rid = 0;
#remoteAddr = null;
#localAddr = null;
#readable;
#writable;
constructor(rid, remoteAddr, localAddr) {
this.#rid = rid;
this.#remoteAddr = remoteAddr;
@ -104,6 +170,20 @@
setKeepAlive(keepalive = true) {
return core.opSync("op_set_keepalive", this.rid, keepalive);
}
get readable() {
if (this.#readable === undefined) {
this.#readable = readableStreamForRid(this.rid);
}
return this.#readable;
}
get writable() {
if (this.#writable === undefined) {
this.#writable = writableStreamForRid(this.rid);
}
return this.#writable;
}
}
class Listener {
@ -252,4 +332,8 @@
Datagram,
resolveDns,
};
window.__bootstrap.streamUtils = {
readableStreamForRid,
writableStreamForRid,
};
})(this);