feat: Add "deno_net" extension (#11150)

This commits moves implementation of net related APIs available on "Deno"
namespace to "deno_net" extension.

Following APIs were moved:
- Deno.listen()
- Deno.connect()
- Deno.listenTls()
- Deno.serveHttp()
- Deno.shutdown()
- Deno.resolveDns()
- Deno.listenDatagram()
- Deno.startTls()
- Deno.Conn
- Deno.Listener
- Deno.DatagramConn
This commit is contained in:
Bartek Iwańczuk 2021-06-29 01:43:03 +02:00 committed by GitHub
parent 30cba24848
commit 38a7128cdd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 1049 additions and 679 deletions

View file

@ -2,6 +2,9 @@
"use strict";
((window) => {
const core = window.Deno.core;
const { BadResource, Interrupted } = core;
class NotFound extends Error {
constructor(msg) {
super(msg);
@ -86,13 +89,6 @@
}
}
class Interrupted extends Error {
constructor(msg) {
super(msg);
this.name = "Interrupted";
}
}
class WriteZero extends Error {
constructor(msg) {
super(msg);
@ -107,13 +103,6 @@
}
}
class BadResource extends Error {
constructor(msg) {
super(msg);
this.name = "BadResource";
}
}
class Http extends Error {
constructor(msg) {
super(msg);

View file

@ -1,220 +0,0 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const core = window.Deno.core;
const { errors } = window.__bootstrap.errors;
const { read, write } = window.__bootstrap.io;
function shutdown(rid) {
return core.opAsync("op_shutdown", rid);
}
function opAccept(rid, transport) {
return core.opAsync("op_accept", { rid, transport });
}
function opListen(args) {
return core.opSync("op_listen", args);
}
function opConnect(args) {
return core.opAsync("op_connect", args);
}
function opReceive(rid, transport, zeroCopy) {
return core.opAsync(
"op_datagram_receive",
{ rid, transport },
zeroCopy,
);
}
function opSend(args, zeroCopy) {
return core.opAsync("op_datagram_send", args, zeroCopy);
}
function resolveDns(query, recordType, options) {
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
class Conn {
#rid = 0;
#remoteAddr = null;
#localAddr = null;
constructor(rid, remoteAddr, localAddr) {
this.#rid = rid;
this.#remoteAddr = remoteAddr;
this.#localAddr = localAddr;
}
get rid() {
return this.#rid;
}
get remoteAddr() {
return this.#remoteAddr;
}
get localAddr() {
return this.#localAddr;
}
write(p) {
return write(this.rid, p);
}
read(p) {
return read(this.rid, p);
}
close() {
core.close(this.rid);
}
closeWrite() {
return shutdown(this.rid);
}
}
class Listener {
#rid = 0;
#addr = null;
constructor(rid, addr) {
this.#rid = rid;
this.#addr = addr;
}
get rid() {
return this.#rid;
}
get addr() {
return this.#addr;
}
async accept() {
const res = await opAccept(this.rid, this.addr.transport);
return new Conn(res.rid, res.remoteAddr, res.localAddr);
}
async next() {
let conn;
try {
conn = await this.accept();
} catch (error) {
if (error instanceof errors.BadResource) {
return { value: undefined, done: true };
}
throw error;
}
return { value: conn, done: false };
}
return(value) {
this.close();
return Promise.resolve({ value, done: true });
}
close() {
core.close(this.rid);
}
[Symbol.asyncIterator]() {
return this;
}
}
class Datagram {
#rid = 0;
#addr = null;
constructor(rid, addr, bufSize = 1024) {
this.#rid = rid;
this.#addr = addr;
this.bufSize = bufSize;
}
get rid() {
return this.#rid;
}
get addr() {
return this.#addr;
}
async receive(p) {
const buf = p || new Uint8Array(this.bufSize);
const { size, remoteAddr } = await opReceive(
this.rid,
this.addr.transport,
buf,
);
const sub = buf.subarray(0, size);
return [sub, remoteAddr];
}
send(p, addr) {
const remote = { hostname: "127.0.0.1", ...addr };
const args = { ...remote, rid: this.rid };
return opSend(args, p);
}
close() {
core.close(this.rid);
}
async *[Symbol.asyncIterator]() {
while (true) {
try {
yield await this.receive();
} catch (err) {
if (err instanceof errors.BadResource) {
break;
}
throw err;
}
}
}
}
function listen({ hostname, ...options }) {
const res = opListen({
transport: "tcp",
hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname,
...options,
});
return new Listener(res.rid, res.localAddr);
}
async function connect(options) {
let res;
if (options.transport === "unix") {
res = await opConnect(options);
} else {
res = await opConnect({
transport: "tcp",
hostname: "127.0.0.1",
...options,
});
}
return new Conn(res.rid, res.remoteAddr, res.localAddr);
}
window.__bootstrap.net = {
connect,
Conn,
opConnect,
listen,
opListen,
Listener,
shutdown,
Datagram,
resolveDns,
};
})(this);

View file

@ -1,251 +0,0 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const { InnerBody } = window.__bootstrap.fetchBody;
const { Response, fromInnerRequest, toInnerResponse, newInnerRequest } =
window.__bootstrap.fetch;
const errors = window.__bootstrap.errors.errors;
const core = window.Deno.core;
const { ReadableStream } = window.__bootstrap.streams;
const abortSignal = window.__bootstrap.abortSignal;
function serveHttp(conn) {
const rid = Deno.core.opSync("op_http_start", conn.rid);
return new HttpConn(rid);
}
const connErrorSymbol = Symbol("connError");
class HttpConn {
#rid = 0;
constructor(rid) {
this.#rid = rid;
}
/** @returns {number} */
get rid() {
return this.#rid;
}
/** @returns {Promise<ResponseEvent | null>} */
async nextRequest() {
let nextRequest;
try {
nextRequest = await Deno.core.opAsync(
"op_http_request_next",
this.#rid,
);
} catch (error) {
// A connection error seen here would cause disrupted responses to throw
// a generic `BadResource` error. Instead store this error and replace
// those with it.
this[connErrorSymbol] = error;
if (error instanceof errors.BadResource) {
return null;
} else if (error instanceof errors.Interrupted) {
return null;
} else if (error.message.includes("connection closed")) {
return null;
}
throw error;
}
if (nextRequest === null) return null;
const [
requestBodyRid,
responseSenderRid,
method,
headersList,
url,
] = nextRequest;
/** @type {ReadableStream<Uint8Array> | undefined} */
let body = null;
if (typeof requestBodyRid === "number") {
body = createRequestBodyStream(requestBodyRid);
}
const innerRequest = newInnerRequest(
method,
url,
headersList,
body !== null ? new InnerBody(body) : null,
);
const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable");
const respondWith = createRespondWith(this, responseSenderRid);
return { request, respondWith };
}
/** @returns {void} */
close() {
core.close(this.#rid);
}
[Symbol.asyncIterator]() {
// deno-lint-ignore no-this-alias
const httpConn = this;
return {
async next() {
const reqEvt = await httpConn.nextRequest();
// Change with caution, current form avoids a v8 deopt
return { value: reqEvt, done: reqEvt === null };
},
};
}
}
function readRequest(requestRid, zeroCopyBuf) {
return Deno.core.opAsync(
"op_http_request_read",
requestRid,
zeroCopyBuf,
);
}
function createRespondWith(httpConn, responseSenderRid) {
return async function respondWith(resp) {
if (resp instanceof Promise) {
resp = await resp;
}
if (!(resp instanceof Response)) {
throw new TypeError(
"First argument to respondWith must be a Response or a promise resolving to a Response.",
);
}
const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a
// single op, in other case a "response body" resource will be created and
// we'll be streaming it.
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let respBody = null;
if (innerResp.body !== null) {
if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
if (innerResp.body.streamOrStatic instanceof ReadableStream) {
if (innerResp.body.length === null) {
respBody = innerResp.body.stream;
} else {
const reader = innerResp.body.stream.getReader();
const r1 = await reader.read();
if (r1.done) {
respBody = new Uint8Array(0);
} else {
respBody = r1.value;
const r2 = await reader.read();
if (!r2.done) throw new TypeError("Unreachable");
}
}
} else {
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
}
} else {
respBody = new Uint8Array(0);
}
let responseBodyRid;
try {
responseBodyRid = await Deno.core.opAsync("op_http_response", [
responseSenderRid,
innerResp.status ?? 200,
innerResp.headerList,
], respBody instanceof Uint8Array ? respBody : null);
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (error instanceof errors.BadResource && connError != null) {
// deno-lint-ignore no-ex-assign
error = new connError.constructor(connError.message);
}
if (respBody !== null && respBody instanceof ReadableStream) {
await respBody.cancel(error);
}
throw error;
}
// If `respond` returns a responseBodyRid, we should stream the body
// to that resource.
if (responseBodyRid !== null) {
try {
if (respBody === null || !(respBody instanceof ReadableStream)) {
throw new TypeError("Unreachable");
}
const reader = respBody.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) break;
if (!(value instanceof Uint8Array)) {
await reader.cancel(new TypeError("Value not a Uint8Array"));
break;
}
try {
await Deno.core.opAsync(
"op_http_response_write",
responseBodyRid,
value,
);
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (error instanceof errors.BadResource && connError != null) {
// deno-lint-ignore no-ex-assign
error = new connError.constructor(connError.message);
}
await reader.cancel(error);
throw error;
}
}
} finally {
// Once all chunks are sent, and the request body is closed, we can
// close the response body.
try {
await Deno.core.opAsync("op_http_response_close", responseBodyRid);
} catch { /* pass */ }
}
}
};
}
function createRequestBodyStream(requestBodyRid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
try {
// This is the largest possible size for a single packet on a TLS
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
const read = await readRequest(
requestBodyRid,
chunk,
);
if (read > 0) {
// We read some data. Enqueue it onto the stream.
controller.enqueue(chunk.subarray(0, read));
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
core.close(requestBodyRid);
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
core.close(requestBodyRid);
}
},
cancel() {
core.close(requestBodyRid);
},
});
}
window.__bootstrap.http = {
serveHttp,
};
})(this);

View file

@ -1,49 +0,0 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const net = window.__bootstrap.net;
function listen(options) {
if (options.transport === "unix") {
const res = net.opListen(options);
return new net.Listener(res.rid, res.localAddr);
} else {
return net.listen(options);
}
}
function listenDatagram(
options,
) {
let res;
if (options.transport === "unixpacket") {
res = net.opListen(options);
} else {
res = net.opListen({
transport: "udp",
hostname: "127.0.0.1",
...options,
});
}
return new net.Datagram(res.rid, res.localAddr);
}
async function connect(
options,
) {
if (options.transport === "unix") {
const res = await net.opConnect(options);
return new net.Conn(res.rid, res.remoteAddr, res.localAddr);
} else {
return net.connect(options);
}
}
window.__bootstrap.netUnstable = {
connect,
listenDatagram,
listen,
};
})(this);

View file

@ -1,85 +0,0 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const core = window.Deno.core;
const { Listener, Conn } = window.__bootstrap.net;
function opConnectTls(
args,
) {
return core.opAsync("op_connect_tls", args);
}
function opAcceptTLS(rid) {
return core.opAsync("op_accept_tls", rid);
}
function opListenTls(args) {
return core.opSync("op_listen_tls", args);
}
function opStartTls(args) {
return core.opAsync("op_start_tls", args);
}
async function connectTls({
port,
hostname = "127.0.0.1",
transport = "tcp",
certFile = undefined,
}) {
const res = await opConnectTls({
port,
hostname,
transport,
certFile,
});
return new Conn(res.rid, res.remoteAddr, res.localAddr);
}
class TLSListener extends Listener {
async accept() {
const res = await opAcceptTLS(this.rid);
return new Conn(res.rid, res.remoteAddr, res.localAddr);
}
}
function listenTls({
port,
certFile,
keyFile,
hostname = "0.0.0.0",
transport = "tcp",
alpnProtocols,
}) {
const res = opListenTls({
port,
certFile,
keyFile,
hostname,
transport,
alpnProtocols,
});
return new TLSListener(res.rid, res.localAddr);
}
async function startTls(
conn,
{ hostname = "127.0.0.1", certFile } = {},
) {
const res = await opStartTls({
rid: conn.rid,
hostname,
certFile,
});
return new Conn(res.rid, res.remoteAddr, res.localAddr);
}
window.__bootstrap.tls = {
startTls,
listenTls,
connectTls,
TLSListener,
};
})(this);