diff --git a/ext/node/polyfills/net.ts b/ext/node/polyfills/net.ts index ea3f74bf32..69823811fb 100644 --- a/ext/node/polyfills/net.ts +++ b/ext/node/polyfills/net.ts @@ -59,7 +59,6 @@ import { uvExceptionWithHostPort, } from "ext:deno_node/internal/errors.ts"; import type { ErrnoException } from "ext:deno_node/internal/errors.ts"; -import { Encodings } from "ext:deno_node/_utils.ts"; import { isUint8Array } from "ext:deno_node/internal/util/types.ts"; import { kAfterAsyncWrite, @@ -1188,439 +1187,287 @@ const pkgsNeedsSockInitWorkaround = [ * is received. For example, it is passed to the listeners of a `"connection"` event emitted on a `Server`, so the user can use * it to interact with the client. */ -export class Socket extends Duplex { - // Problem with this is that users can supply their own handle, that may not - // have `handle.getAsyncId()`. In this case an `[asyncIdSymbol]` should - // probably be supplied by `async_hooks`. - [asyncIdSymbol] = -1; +export function Socket(options) { + if (!(this instanceof Socket)) { + return new Socket(options); + } - [kHandle]: Handle | null = null; - [kSetNoDelay] = false; - [kLastWriteQueueSize] = 0; - // deno-lint-ignore no-explicit-any - [kTimeout]: any = null; - [kBuffer]: Uint8Array | boolean | null = null; - [kBufferCb]: OnReadOptions["callback"] | null = null; - [kBufferGen]: (() => Uint8Array) | null = null; + if (typeof options === "number") { + // Legacy interface. + options = { fd: options }; + } else { + options = { ...options }; + } - // Used after `.destroy()` - [kBytesRead] = 0; - [kBytesWritten] = 0; + // Default to *not* allowing half open sockets. + options.allowHalfOpen = Boolean(options.allowHalfOpen); + // For backwards compat do not emit close on destroy. + options.emitClose = false; + options.autoDestroy = true; + // Handle strings directly. + options.decodeStrings = false; - // Reserved properties - server = null; - // deno-lint-ignore no-explicit-any - _server: any = null; + Duplex.call(this, options); - _peername?: AddressInfo | Record; - _sockname?: AddressInfo | Record; - _pendingData: Uint8Array | string | null = null; - _pendingEncoding = ""; - _host: string | null = null; - // deno-lint-ignore no-explicit-any - _parent: any = null; - // Skip some initialization (initial read and tls handshake if it's tls socket). - // If this flag is true, it's used as connection for http(s) request, and - // the reading and TLS handshake is done by the http client. - // See discussions in https://github.com/denoland/deno/pull/25470 for more details. - _needsSockInitWorkaround = false; - autoSelectFamilyAttemptedAddresses: AddressInfo[] | undefined = undefined; + this[asyncIdSymbol] = -1; + this[kHandle] = null; + this[kSetNoDelay] = false; + this[kLastWriteQueueSize] = 0; + this[kTimeout] = null; + this[kBuffer] = null; + this[kBufferCb] = null; + this[kBufferGen] = null; + this[kBytesRead] = 0; + this[kBytesWritten] = 0; + this.server = null; + this._server = null; + this._peername = undefined; + this._sockname = undefined; + this._pendingData = null; + this._pendingEncoding = ""; + this._host = null; + this._parent = null; + this._needsSockInitWorkaround = false; + this.autoSelectFamilyAttemptedAddresses = undefined; + this.connecting = false; - constructor(options: SocketOptions | number) { - if (typeof options === "number") { - // Legacy interface. - options = { fd: options }; + const errorStack = new Error().stack; + this._needsSockInitWorkaround = options.handle?.ipc !== true && + pkgsNeedsSockInitWorkaround.some((pkg) => errorStack?.includes(pkg)); + if (this._needsSockInitWorkaround) { + this.pause(); + } + + if (options.handle) { + this._handle = options.handle; + this[asyncIdSymbol] = _getNewAsyncId(this._handle); + } else if (options.fd !== undefined) { + notImplemented("net.Socket.prototype.constructor with fd option"); + } + + const onread = options.onread; + + if ( + onread !== null && + typeof onread === "object" && + (isUint8Array(onread.buffer) || typeof onread.buffer === "function") && + typeof onread.callback === "function" + ) { + if (typeof onread.buffer === "function") { + this[kBuffer] = true; + this[kBufferGen] = onread.buffer; } else { - options = { ...options }; + this[kBuffer] = onread.buffer; } + this[kBufferCb] = onread.callback; + } - // Default to *not* allowing half open sockets. - options.allowHalfOpen = Boolean(options.allowHalfOpen); - // For backwards compat do not emit close on destroy. - options.emitClose = false; - options.autoDestroy = true; - // Handle strings directly. - options.decodeStrings = false; + this.on("end", _onReadableStreamEnd); - super(options); + _initSocketHandle(this); - // Note: If the TCP/TLS socket is created from one of `pkgNeedsSockInitWorkaround`, - // the 'socket' event on ClientRequest object happens after 'connect' event on Socket object. - // That swaps the sequence of op_node_http_request_with_conn() call and - // initial socket read. That causes op_node_http_request_with_conn() not - // working. - // To avoid the above situation, we detect the socket created from - // one of those packages using stack trace and pause the socket - // (and also skips the startTls call if it's TLSSocket) - // TODO(kt3k): Remove this workaround - const errorStack = new Error().stack; - this._needsSockInitWorkaround = options.handle?.ipc !== true && - pkgsNeedsSockInitWorkaround.some((pkg) => errorStack?.includes(pkg)); - if (this._needsSockInitWorkaround) { - this.pause(); + if (this._handle && options.readable !== false) { + if (options.pauseOnCreate) { + this._handle.reading = false; + this._handle.readStop(); + this.readableFlowing = false; + } else if (!options.manualStart) { + this.read(0); } + } +} +Object.setPrototypeOf(Socket.prototype, Duplex.prototype); +Object.setPrototypeOf(Socket, Duplex); - if (options.handle) { - this._handle = options.handle; - this[asyncIdSymbol] = _getNewAsyncId(this._handle); - } else if (options.fd !== undefined) { - // REF: https://github.com/denoland/deno/issues/6529 - notImplemented("net.Socket.prototype.constructor with fd option"); - } +Socket.prototype.connect = function (...args) { + let normalized; - const onread = options.onread; + if ( + Array.isArray(args[0]) && + args[0][normalizedArgsSymbol] + ) { + normalized = args[0]; + } else { + normalized = _normalizeArgs(args); + } - if ( - onread !== null && - typeof onread === "object" && - (isUint8Array(onread.buffer) || typeof onread.buffer === "function") && - typeof onread.callback === "function" - ) { - if (typeof onread.buffer === "function") { - this[kBuffer] = true; - this[kBufferGen] = onread.buffer; - } else { - this[kBuffer] = onread.buffer; - } + const options = normalized[0]; + const cb = normalized[1]; - this[kBufferCb] = onread.callback; - } + if ( + options.port === undefined && + options.path == null + ) { + throw new ERR_MISSING_ARGS(["options", "port", "path"]); + } - this.on("end", _onReadableStreamEnd); + if (this.write !== Socket.prototype.write) { + this.write = Socket.prototype.write; + } + + if (this.destroyed) { + this._handle = null; + this._peername = undefined; + this._sockname = undefined; + } + + const { path } = options; + const pipe = _isPipe(options); + debug("pipe", pipe, path); + + if (!this._handle) { + this._handle = pipe + ? new Pipe(PipeConstants.SOCKET) + : new TCP(TCPConstants.SOCKET); _initSocketHandle(this); + } - // If we have a handle, then start the flow of data into the - // buffer. If not, then this will happen when we connect. - if (this._handle && options.readable !== false) { - if (options.pauseOnCreate) { - // Stop the handle from reading and pause the stream - this._handle.reading = false; - this._handle.readStop(); - // @ts-expect-error This property shouldn't be modified - this.readableFlowing = false; - } else if (!options.manualStart) { - this.read(0); + if (cb !== null) { + this.once("connect", cb); + } + + this._unrefTimer(); + + this.connecting = true; + + if (pipe) { + validateString(path, "options.path"); + defaultTriggerAsyncIdScope( + this[asyncIdSymbol], + _internalConnect, + this, + path, + ); + } else { + _lookupAndConnect(this, options); + } + + return this; +}; + +Socket.prototype.pause = function () { + if ( + !this.connecting && + this._handle && + this._handle.reading + ) { + this._handle.reading = false; + + if (!this.destroyed) { + const err = this._handle.readStop(); + + if (err) { + this.destroy(errnoException(err, "read")); } } } - /** - * Initiate a connection on a given socket. - * - * Possible signatures: - * - * - `socket.connect(options[, connectListener])` - * - `socket.connect(path[, connectListener])` for `IPC` connections. - * - `socket.connect(port[, host][, connectListener])` for TCP connections. - * - Returns: `net.Socket` The socket itself. - * - * This function is asynchronous. When the connection is established, the `"connect"` event will be emitted. If there is a problem connecting, - * instead of a `"connect"` event, an `"error"` event will be emitted with - * the error passed to the `"error"` listener. - * The last parameter `connectListener`, if supplied, will be added as a listener - * for the `"connect"` event **once**. - * - * This function should only be used for reconnecting a socket after `"close"` has been emitted or otherwise it may lead to undefined - * behavior. - */ - connect( - options: SocketConnectOptions | NormalizedArgs, - connectionListener?: ConnectionListener, - ): this; - connect( - port: number, - host: string, - connectionListener?: ConnectionListener, - ): this; - connect(port: number, connectionListener?: ConnectionListener): this; - connect(path: string, connectionListener?: ConnectionListener): this; - connect(...args: unknown[]): this { - let normalized: NormalizedArgs; + return Duplex.prototype.pause.call(this); +}; - // If passed an array, it's treated as an array of arguments that have - // already been normalized (so we don't normalize more than once). This has - // been solved before in https://github.com/nodejs/node/pull/12342, but was - // reverted as it had unintended side effects. - if ( - Array.isArray(args[0]) && - (args[0] as unknown as NormalizedArgs)[normalizedArgsSymbol] - ) { - normalized = args[0] as unknown as NormalizedArgs; - } else { - normalized = _normalizeArgs(args); - } +Socket.prototype.resume = function () { + if ( + !this.connecting && + this._handle && + !this._handle.reading + ) { + _tryReadStart(this); + } - const options = normalized[0]; - const cb = normalized[1]; + return Duplex.prototype.resume.call(this); +}; - // `options.port === null` will be checked later. - if ( - (options as TcpSocketConnectOptions).port === undefined && - (options as IpcSocketConnectOptions).path == null - ) { - throw new ERR_MISSING_ARGS(["options", "port", "path"]); - } +Socket.prototype.setTimeout = setStreamTimeout; - if (this.write !== Socket.prototype.write) { - this.write = Socket.prototype.write; - } - - if (this.destroyed) { - this._handle = null; - this._peername = undefined; - this._sockname = undefined; - } - - const { path } = options as IpcNetConnectOptions; - const pipe = _isPipe(options); - debug("pipe", pipe, path); - - if (!this._handle) { - this._handle = pipe - ? new Pipe(PipeConstants.SOCKET) - : new TCP(TCPConstants.SOCKET); - - _initSocketHandle(this); - } - - if (cb !== null) { - this.once("connect", cb); - } - - this._unrefTimer(); - - this.connecting = true; - - if (pipe) { - validateString(path, "options.path"); - defaultTriggerAsyncIdScope( - this[asyncIdSymbol], - _internalConnect, - this, - path, - ); - } else { - _lookupAndConnect(this, options as TcpSocketConnectOptions); - } +Socket.prototype.setNoDelay = function (noDelay) { + if (!this._handle) { + this.once( + "connect", + noDelay ? this.setNoDelay : () => this.setNoDelay(noDelay), + ); return this; } - /** - * Pauses the reading of data. That is, `"data"` events will not be emitted. - * Useful to throttle back an upload. - * - * @return The socket itself. - */ - override pause(): this { - if ( - !this.connecting && - this._handle && - this._handle.reading - ) { - this._handle.reading = false; + const newValue = noDelay === undefined ? true : !!noDelay; - if (!this.destroyed) { - const err = this._handle.readStop(); - - if (err) { - this.destroy(errnoException(err, "read")); - } - } - } - - return Duplex.prototype.pause.call(this) as unknown as this; + if ( + "setNoDelay" in this._handle && + this._handle.setNoDelay && + newValue !== this[kSetNoDelay] + ) { + this[kSetNoDelay] = newValue; + this._handle.setNoDelay(newValue); } - /** - * Resumes reading after a call to `socket.pause()`. - * - * @return The socket itself. - */ - override resume(): this { - if ( - !this.connecting && - this._handle && - !this._handle.reading - ) { - _tryReadStart(this); - } + return this; +}; - return Duplex.prototype.resume.call(this) as this; - } - - /** - * Sets the socket to timeout after `timeout` milliseconds of inactivity on - * the socket. By default `net.Socket` do not have a timeout. - * - * When an idle timeout is triggered the socket will receive a `"timeout"` event but the connection will not be severed. The user must manually call `socket.end()` or `socket.destroy()` to - * end the connection. - * - * If `timeout` is `0`, then the existing idle timeout is disabled. - * - * The optional `callback` parameter will be added as a one-time listener for the `"timeout"` event. - * @return The socket itself. - */ - setTimeout = setStreamTimeout; - - /** - * Enable/disable the use of Nagle's algorithm. - * - * When a TCP connection is created, it will have Nagle's algorithm enabled. - * - * Nagle's algorithm delays data before it is sent via the network. It attempts - * to optimize throughput at the expense of latency. - * - * Passing `true` for `noDelay` or not passing an argument will disable Nagle's - * algorithm for the socket. Passing `false` for `noDelay` will enable Nagle's - * algorithm. - * - * @param noDelay - * @return The socket itself. - */ - setNoDelay(noDelay?: boolean): this { - if (!this._handle) { - this.once( - "connect", - noDelay ? this.setNoDelay : () => this.setNoDelay(noDelay), - ); - - return this; - } - - // Backwards compatibility: assume true when `noDelay` is omitted - const newValue = noDelay === undefined ? true : !!noDelay; - - if ( - "setNoDelay" in this._handle && - this._handle.setNoDelay && - newValue !== this[kSetNoDelay] - ) { - this[kSetNoDelay] = newValue; - this._handle.setNoDelay(newValue); - } +Socket.prototype.setKeepAlive = function (enable, initialDelay) { + if (!this._handle) { + this.once("connect", () => this.setKeepAlive(enable, initialDelay)); return this; } - /** - * Enable/disable keep-alive functionality, and optionally set the initial - * delay before the first keepalive probe is sent on an idle socket. - * - * Set `initialDelay` (in milliseconds) to set the delay between the last - * data packet received and the first keepalive probe. Setting `0` for`initialDelay` will leave the value unchanged from the default - * (or previous) setting. - * - * Enabling the keep-alive functionality will set the following socket options: - * - * - `SO_KEEPALIVE=1` - * - `TCP_KEEPIDLE=initialDelay` - * - `TCP_KEEPCNT=10` - * - `TCP_KEEPINTVL=1` - * - * @param enable - * @param initialDelay - * @return The socket itself. - */ - setKeepAlive(enable: boolean, initialDelay?: number): this { - if (!this._handle) { - this.once("connect", () => this.setKeepAlive(enable, initialDelay)); + if ("setKeepAlive" in this._handle) { + this._handle.setKeepAlive(enable, ~~(initialDelay / 1000)); + } - return this; - } + return this; +}; - if ("setKeepAlive" in this._handle) { - this._handle.setKeepAlive(enable, ~~(initialDelay! / 1000)); - } +Socket.prototype.address = function () { + return this._getsockname(); +}; + +Socket.prototype.unref = function () { + if (!this._handle) { + this.once("connect", this.unref); return this; } - /** - * Returns the bound `address`, the address `family` name and `port` of the - * socket as reported by the operating system:`{ port: 12346, family: "IPv4", address: "127.0.0.1" }` - */ - address(): AddressInfo | Record { - return this._getsockname(); + if (typeof this._handle.unref === "function") { + this._handle.unref(); } - /** - * Calling `unref()` on a socket will allow the program to exit if this is the only - * active socket in the event system. If the socket is already `unref`ed calling`unref()` again will have no effect. - * - * @return The socket itself. - */ - unref(): this { - if (!this._handle) { - this.once("connect", this.unref); + return this; +}; - return this; - } - - if (typeof this._handle.unref === "function") { - this._handle.unref(); - } +Socket.prototype.ref = function () { + if (!this._handle) { + this.once("connect", this.ref); return this; } - /** - * Opposite of `unref()`, calling `ref()` on a previously `unref`ed socket will_not_ let the program exit if it's the only socket left (the default behavior). - * If the socket is `ref`ed calling `ref` again will have no effect. - * - * @return The socket itself. - */ - ref(): this { - if (!this._handle) { - this.once("connect", this.ref); - - return this; - } - - if (typeof this._handle.ref === "function") { - this._handle.ref(); - } - - return this; + if (typeof this._handle.ref === "function") { + this._handle.ref(); } - /** - * This property shows the number of characters buffered for writing. The buffer - * may contain strings whose length after encoding is not yet known. So this number - * is only an approximation of the number of bytes in the buffer. - * - * `net.Socket` has the property that `socket.write()` always works. This is to - * help users get up and running quickly. The computer cannot always keep up - * with the amount of data that is written to a socket. The network connection - * simply might be too slow. Node.js will internally queue up the data written to a - * socket and send it out over the wire when it is possible. - * - * The consequence of this internal buffering is that memory may grow. - * Users who experience large or growing `bufferSize` should attempt to - * "throttle" the data flows in their program with `socket.pause()` and `socket.resume()`. - * - * @deprecated Use `writableLength` instead. - */ - get bufferSize(): number { + return this; +}; + +Object.defineProperty(Socket.prototype, "bufferSize", { + get: function () { if (this._handle) { return this.writableLength; } return 0; - } + }, +}); - /** - * The amount of received bytes. - */ - get bytesRead(): number { +Object.defineProperty(Socket.prototype, "bytesRead", { + get: function () { return this._handle ? this._handle.bytesRead : this[kBytesRead]; - } + }, +}); - /** - * The amount of bytes sent. - */ - get bytesWritten(): number | undefined { +Object.defineProperty(Socket.prototype, "bytesWritten", { + get: function () { let bytes = this._bytesDispatched; const data = this._pendingData; const encoding = this._pendingEncoding; @@ -1631,95 +1478,79 @@ export class Socket extends Duplex { } for (const el of writableBuffer) { - bytes += el!.chunk instanceof Buffer - ? el!.chunk.length - : Buffer.byteLength(el!.chunk, el!.encoding); + bytes += el.chunk instanceof Buffer + ? el.chunk.length + : Buffer.byteLength(el.chunk, el.encoding); } if (Array.isArray(data)) { - // Was a writev, iterate over chunks to get total length for (let i = 0; i < data.length; i++) { const chunk = data[i]; - // deno-lint-ignore no-explicit-any - if ((data as any).allBuffers || chunk instanceof Buffer) { + if (data.allBuffers || chunk instanceof Buffer) { bytes += chunk.length; } else { bytes += Buffer.byteLength(chunk.chunk, chunk.encoding); } } } else if (data) { - // Writes are either a string or a Buffer. if (typeof data !== "string") { - bytes += (data as Buffer).length; + bytes += data.length; } else { bytes += Buffer.byteLength(data, encoding); } } return bytes; - } + }, +}); - /** - * If `true`,`socket.connect(options[, connectListener])` was - * called and has not yet finished. It will stay `true` until the socket becomes - * connected, then it is set to `false` and the `"connect"` event is emitted. Note - * that the `socket.connect(options[, connectListener])` callback is a listener for the `"connect"` event. - */ - connecting = false; - - /** - * The string representation of the local IP address the remote client is - * connecting on. For example, in a server listening on `"0.0.0.0"`, if a client - * connects on `"192.168.1.1"`, the value of `socket.localAddress` would be`"192.168.1.1"`. - */ - get localAddress(): string { +Object.defineProperty(Socket.prototype, "localAddress", { + get: function () { return this._getsockname().address; - } + }, +}); - /** - * The numeric representation of the local port. For example, `80` or `21`. - */ - get localPort(): number { +Object.defineProperty(Socket.prototype, "localPort", { + get: function () { return this._getsockname().port; - } + }, +}); - /** - * The string representation of the local IP family. `"IPv4"` or `"IPv6"`. - */ - get localFamily(): string | undefined { +Object.defineProperty(Socket.prototype, "localFamily", { + get: function () { return this._getsockname().family; - } + }, +}); - /** - * The string representation of the remote IP address. For example,`"74.125.127.100"` or `"2001:4860:a005::68"`. Value may be `undefined` if - * the socket is destroyed (for example, if the client disconnected). - */ - get remoteAddress(): string | undefined { +Object.defineProperty(Socket.prototype, "remoteAddress", { + get: function () { return this._getpeername().address; - } + }, +}); - /** - * The string representation of the remote IP family. `"IPv4"` or `"IPv6"`. - */ - get remoteFamily(): string | undefined { +Object.defineProperty(Socket.prototype, "remoteFamily", { + get: function () { const { family } = this._getpeername(); return family ? `IPv${family}` : family; - } + }, +}); - /** - * The numeric representation of the remote port. For example, `80` or `21`. - */ - get remotePort(): number | undefined { +Object.defineProperty(Socket.prototype, "remotePort", { + get: function () { return this._getpeername().port; - } + }, +}); - get pending(): boolean { +Object.defineProperty(Socket.prototype, "pending", { + get: function () { return !this._handle || this.connecting; - } + }, +}); - get readyState(): string { +Object.defineProperty(Socket.prototype, "readyState", { + get: function () { if (this.connecting) { return "opening"; } else if (this.readable && this.writable) { @@ -1730,296 +1561,259 @@ export class Socket extends Duplex { return "writeOnly"; } return "closed"; - } + }, +}); - /** - * Half-closes the socket. i.e., it sends a FIN packet. It is possible the - * server will still send some data. - * - * See `writable.end()` for further details. - * - * @param encoding Only used when data is `string`. - * @param cb Optional callback for when the socket is finished. - * @return The socket itself. - */ - override end(cb?: () => void): this; - override end(buffer: Uint8Array | string, cb?: () => void): this; - override end( - data: Uint8Array | string, - encoding?: Encodings, - cb?: () => void, - ): this; - override end( - data?: Uint8Array | string | (() => void), - encoding?: Encodings | (() => void), - cb?: () => void, - ): this { - Duplex.prototype.end.call(this, data, encoding as Encodings, cb); - DTRACE_NET_STREAM_END(this); +Socket.prototype.end = function (data, encoding, cb) { + Duplex.prototype.end.call(this, data, encoding, cb); + DTRACE_NET_STREAM_END(this); - return this; - } + return this; +}; - /** - * @param size Optional argument to specify how much data to read. - */ - override read( - size?: number, - ): string | Uint8Array | Buffer | null | undefined { - if ( - this[kBuffer] && - !this.connecting && - this._handle && - !this._handle.reading - ) { - _tryReadStart(this); - } - - return Duplex.prototype.read.call(this, size); - } - - destroySoon() { - if (this.writable) { - this.end(); - } - - if (this.writableFinished) { - this.destroy(); - } else { - this.once("finish", this.destroy); - } - } - - _unrefTimer() { - // deno-lint-ignore no-this-alias - for (let s = this; s !== null; s = s._parent) { - if (s[kTimeout]) { - s[kTimeout].refresh(); - } - } - } - - // The user has called .end(), and all the bytes have been - // sent out to the other side. - // deno-lint-ignore no-explicit-any - override _final(cb: any): any { - // If still connecting - defer handling `_final` until 'connect' will happen - if (this.pending) { - debug("_final: not yet connected"); - return this.once("connect", () => this._final(cb)); - } - - if (!this._handle) { - return cb(); - } - - debug("_final: not ended, call shutdown()"); - - const req = new ShutdownWrap(); - req.oncomplete = _afterShutdown; - req.handle = this._handle; - req.callback = cb; - const err = this._handle.shutdown(req); - - if (err === 1 || err === codeMap.get("ENOTCONN")) { - // synchronous finish - return cb(); - } else if (err !== 0) { - return cb(errnoException(err, "shutdown")); - } - } - - _onTimeout() { - const handle = this._handle; - const lastWriteQueueSize = this[kLastWriteQueueSize]; - - if (lastWriteQueueSize > 0 && handle) { - // `lastWriteQueueSize !== writeQueueSize` means there is - // an active write in progress, so we suppress the timeout. - const { writeQueueSize } = handle; - - if (lastWriteQueueSize !== writeQueueSize) { - this[kLastWriteQueueSize] = writeQueueSize; - this._unrefTimer(); - - return; - } - } - - debug("_onTimeout"); - this.emit("timeout"); - } - - override _read(size?: number) { - debug("_read"); - if (this.connecting || !this._handle) { - debug("_read wait for connection"); - this.once("connect", () => this._read(size)); - } else if (!this._handle.reading) { - _tryReadStart(this); - } - } - - override _destroy(exception: Error | null, cb: (err: Error | null) => void) { - debug("destroy"); - this.connecting = false; - - // deno-lint-ignore no-this-alias - for (let s = this; s !== null; s = s._parent) { - clearTimeout(s[kTimeout]); - } - - debug("close"); - if (this._handle) { - debug("close handle"); - const isException = exception ? true : false; - // `bytesRead` and `kBytesWritten` should be accessible after `.destroy()` - this[kBytesRead] = this._handle.bytesRead; - this[kBytesWritten] = this._handle.bytesWritten; - - this._handle.close(() => { - this._handle!.onread = _noop; - this._handle = null; - this._sockname = undefined; - - debug("emit close"); - this.emit("close", isException); - }); - cb(exception); - } else { - cb(exception); - nextTick(_emitCloseNT, this); - } - - if (this._server) { - debug("has server"); - this._server._connections--; - - if (this._server._emitCloseIfDrained) { - this._server._emitCloseIfDrained(); - } - } - } - - _getpeername(): AddressInfo | Record { - if (!this._handle || !("getpeername" in this._handle) || this.connecting) { - return this._peername || {}; - } else if (!this._peername) { - this._peername = {}; - this._handle.getpeername(this._peername); - } - - return this._peername; - } - - _getsockname(): AddressInfo | Record { - if (!this._handle || !("getsockname" in this._handle)) { - return {}; - } else if (!this._sockname) { - this._sockname = {}; - this._handle.getsockname(this._sockname); - } - - return this._sockname; - } - - _writeGeneric( - writev: boolean, - // deno-lint-ignore no-explicit-any - data: any, - encoding: string, - cb: (error?: Error | null) => void, +Socket.prototype.read = function (size) { + if ( + this[kBuffer] && + !this.connecting && + this._handle && + !this._handle.reading ) { - // If we are still connecting, then buffer this for later. - // The Writable logic will buffer up any more writes while - // waiting for this one to be done. - if (this.connecting) { - this._pendingData = data; - this._pendingEncoding = encoding; - this.once("connect", function connect(this: Socket) { - this._writeGeneric(writev, data, encoding, cb); - }); + _tryReadStart(this); + } + + return Duplex.prototype.read.call(this, size); +}; + +Socket.prototype.destroySoon = function () { + if (this.writable) { + this.end(); + } + + if (this.writableFinished) { + this.destroy(); + } else { + this.once("finish", this.destroy); + } +}; + +Socket.prototype._unrefTimer = function () { + // deno-lint-ignore no-this-alias + for (let s = this; s !== null; s = s._parent) { + if (s[kTimeout]) { + s[kTimeout].refresh(); + } + } +}; + +Socket.prototype._final = function (cb) { + if (this.pending) { + debug("_final: not yet connected"); + return this.once("connect", () => this._final(cb)); + } + + if (!this._handle) { + return cb(); + } + + debug("_final: not ended, call shutdown()"); + + const req = new ShutdownWrap(); + req.oncomplete = _afterShutdown; + req.handle = this._handle; + req.callback = cb; + const err = this._handle.shutdown(req); + + if (err === 1 || err === codeMap.get("ENOTCONN")) { + return cb(); + } else if (err !== 0) { + return cb(errnoException(err, "shutdown")); + } +}; + +Socket.prototype._onTimeout = function () { + const handle = this._handle; + const lastWriteQueueSize = this[kLastWriteQueueSize]; + + if (lastWriteQueueSize > 0 && handle) { + const { writeQueueSize } = handle; + + if (lastWriteQueueSize !== writeQueueSize) { + this[kLastWriteQueueSize] = writeQueueSize; + this._unrefTimer(); return; } + } - this._pendingData = null; - this._pendingEncoding = ""; + debug("_onTimeout"); + this.emit("timeout"); +}; - if (!this._handle) { - cb(new ERR_SOCKET_CLOSED()); +Socket.prototype._read = function (size) { + debug("_read"); + if (this.connecting || !this._handle) { + debug("_read wait for connection"); + this.once("connect", () => this._read(size)); + } else if (!this._handle.reading) { + _tryReadStart(this); + } +}; - return false; - } +Socket.prototype._destroy = function (exception, cb) { + debug("destroy"); + this.connecting = false; - this._unrefTimer(); + // deno-lint-ignore no-this-alias + for (let s = this; s !== null; s = s._parent) { + clearTimeout(s[kTimeout]); + } - let req; + debug("close"); + if (this._handle) { + debug("close handle"); + const isException = exception ? true : false; + this[kBytesRead] = this._handle.bytesRead; + this[kBytesWritten] = this._handle.bytesWritten; - if (writev) { - req = writevGeneric(this, data, cb); - } else { - req = writeGeneric(this, data, encoding, cb); - } - if (req.async) { - this[kLastWriteQueueSize] = req.bytes; + this._handle.close(() => { + this._handle.onread = _noop; + this._handle = null; + this._sockname = undefined; + + debug("emit close"); + this.emit("close", isException); + }); + cb(exception); + } else { + cb(exception); + nextTick(_emitCloseNT, this); + } + + if (this._server) { + debug("has server"); + this._server._connections--; + + if (this._server._emitCloseIfDrained) { + this._server._emitCloseIfDrained(); } } +}; - // @ts-ignore Duplex defining as a property when want a method. - _writev( - // deno-lint-ignore no-explicit-any - chunks: Array<{ chunk: any; encoding: string }>, - cb: (error?: Error | null) => void, - ) { - this._writeGeneric(true, chunks, "", cb); +Socket.prototype._getpeername = function () { + if (!this._handle || !("getpeername" in this._handle) || this.connecting) { + return this._peername || {}; + } else if (!this._peername) { + this._peername = {}; + this._handle.getpeername(this._peername); } - override _write( - // deno-lint-ignore no-explicit-any - data: any, - encoding: string, - cb: (error?: Error | null) => void, - ) { - this._writeGeneric(false, data, encoding, cb); + return this._peername; +}; + +Socket.prototype._getsockname = function () { + if (!this._handle || !("getsockname" in this._handle)) { + return {}; + } else if (!this._sockname) { + this._sockname = {}; + this._handle.getsockname(this._sockname); } - [kAfterAsyncWrite]() { - this[kLastWriteQueueSize] = 0; + return this._sockname; +}; + +Socket.prototype._writeGeneric = function ( + writev, + data, + encoding, + cb, +) { + if (this.connecting) { + this._pendingData = data; + this._pendingEncoding = encoding; + this.once("connect", function connect() { + this._writeGeneric(writev, data, encoding, cb); + }); + + return; } - get [kUpdateTimer]() { + this._pendingData = null; + this._pendingEncoding = ""; + + if (!this._handle) { + cb(new ERR_SOCKET_CLOSED()); + + return false; + } + + this._unrefTimer(); + + let req; + + if (writev) { + req = writevGeneric(this, data, cb); + } else { + req = writeGeneric(this, data, encoding, cb); + } + if (req.async) { + this[kLastWriteQueueSize] = req.bytes; + } +}; + +Socket.prototype._writev = function ( + chunks, + cb, +) { + this._writeGeneric(true, chunks, "", cb); +}; + +Socket.prototype._write = function ( + data, + encoding, + cb, +) { + this._writeGeneric(false, data, encoding, cb); +}; + +Socket.prototype[kAfterAsyncWrite] = function () { + this[kLastWriteQueueSize] = 0; +}; + +Object.defineProperty(Socket.prototype, kUpdateTimer, { + get: function () { return this._unrefTimer; - } + }, +}); - get _connecting(): boolean { +Object.defineProperty(Socket.prototype, "_connecting", { + get: function () { return this.connecting; - } + }, +}); - // Legacy alias. Having this is probably being overly cautious, but it doesn't - // really hurt anyone either. This can probably be removed safely if desired. - get _bytesDispatched(): number { +Object.defineProperty(Socket.prototype, "_bytesDispatched", { + get: function () { return this._handle ? this._handle.bytesWritten : this[kBytesWritten]; - } + }, +}); - get _handle(): Handle | null { +Object.defineProperty(Socket.prototype, "_handle", { + get: function () { return this[kHandle]; - } - - set _handle(v: Handle | null) { + }, + set: function (v) { this[kHandle] = v; - } + }, +}); - // deno-lint-ignore no-explicit-any - [kReinitializeHandle](handle: any) { - this._handle?.close(); +Socket.prototype[kReinitializeHandle] = function (handle) { + this._handle?.close(); - this._handle = handle; - this._handle[ownerSymbol] = this; + this._handle = handle; + this._handle[ownerSymbol] = this; - _initSocketHandle(this); - } -} + _initSocketHandle(this); +}; export const Stream = Socket; @@ -2445,490 +2239,475 @@ function _setupListenHandle( ); } -/** This class is used to create a TCP or IPC server. */ -export class Server extends EventEmitter { - [asyncIdSymbol] = -1; - - allowHalfOpen = false; - pauseOnConnect = false; - - // deno-lint-ignore no-explicit-any - _handle: any = null; - _connections = 0; - _usingWorkers = false; - // deno-lint-ignore no-explicit-any - _workers: any[] = []; - _unref = false; - _pipeName?: string; - _connectionKey?: string; - - /** - * `net.Server` is an `EventEmitter` with the following events: - * - * - `"close"` - Emitted when the server closes. If connections exist, this - * event is not emitted until all connections are ended. - * - `"connection"` - Emitted when a new connection is made. `socket` is an - * instance of `net.Socket`. - * - `"error"` - Emitted when an error occurs. Unlike `net.Socket`, the - * `"close"` event will not be emitted directly following this event unless - * `server.close()` is manually called. See the example in discussion of - * `server.listen()`. - * - `"listening"` - Emitted when the server has been bound after calling - * `server.listen()`. - */ - constructor(connectionListener?: ConnectionListener); - constructor(options?: ServerOptions, connectionListener?: ConnectionListener); - constructor( - options?: ServerOptions | ConnectionListener, - connectionListener?: ConnectionListener, - ) { - super(); - - if (_isConnectionListener(options)) { - this.on("connection", options); - } else if (_isServerSocketOptions(options)) { - this.allowHalfOpen = options?.allowHalfOpen || false; - this.pauseOnConnect = !!options?.pauseOnConnect; - - if (_isConnectionListener(connectionListener)) { - this.on("connection", connectionListener); - } - } else { - throw new ERR_INVALID_ARG_TYPE("options", "Object", options); - } +/** + * This class is used to create a TCP or IPC server. + * + * `net.Server` is an `EventEmitter` with the following events: + * + * - `"close"` - Emitted when the server closes. If connections exist, this + * event is not emitted until all connections are ended. + * - `"connection"` - Emitted when a new connection is made. `socket` is an + * instance of `net.Socket`. + * - `"error"` - Emitted when an error occurs. Unlike `net.Socket`, the + * `"close"` event will not be emitted directly following this event unless + * `server.close()` is manually called. See the example in discussion of + * `server.listen()`. + * - `"listening"` - Emitted when the server has been bound after calling + * `server.listen()`. + */ +export function Server(connectionListener?: ConnectionListener); +export function Server( + options?: ServerOptions, + connectionListener?: ConnectionListener, +); +export function Server( + options?: ServerOptions | ConnectionListener, + connectionListener?: ConnectionListener, +) { + if (!(this instanceof Server)) { + return new Server(options, connectionListener); } - /** - * Start a server listening for connections. A `net.Server` can be a TCP or - * an `IPC` server depending on what it listens to. - * - * Possible signatures: - * - * - `server.listen(handle[, backlog][, callback])` - * - `server.listen(options[, callback])` - * - `server.listen(path[, backlog][, callback])` for `IPC` servers - * - `server.listen([port[, host[, backlog]]][, callback])` for TCP servers - * - * This function is asynchronous. When the server starts listening, the `'listening'` event will be emitted. The last parameter `callback`will be added as a listener for the `'listening'` - * event. - * - * All `listen()` methods can take a `backlog` parameter to specify the maximum - * length of the queue of pending connections. The actual length will be determined - * by the OS through sysctl settings such as `tcp_max_syn_backlog` and `somaxconn` on Linux. The default value of this parameter is 511 (not 512). - * - * All `Socket` are set to `SO_REUSEADDR` (see [`socket(7)`](https://man7.org/linux/man-pages/man7/socket.7.html) for - * details). - * - * The `server.listen()` method can be called again if and only if there was an - * error during the first `server.listen()` call or `server.close()` has been - * called. Otherwise, an `ERR_SERVER_ALREADY_LISTEN` error will be thrown. - * - * One of the most common errors raised when listening is `EADDRINUSE`. - * This happens when another server is already listening on the requested`port`/`path`/`handle`. One way to handle this would be to retry - * after a certain amount of time: - */ - listen( - port?: number, - hostname?: string, - backlog?: number, - listeningListener?: () => void, - ): this; - listen( - port?: number, - hostname?: string, - listeningListener?: () => void, - ): this; - listen(port?: number, backlog?: number, listeningListener?: () => void): this; - listen(port?: number, listeningListener?: () => void): this; - listen(path: string, backlog?: number, listeningListener?: () => void): this; - listen(path: string, listeningListener?: () => void): this; - listen(options: ListenOptions, listeningListener?: () => void): this; + EventEmitter.call(this); + + this[asyncIdSymbol] = -1; + this.allowHalfOpen = false; + this.pauseOnConnect = false; + this._handle = null; + this._connections = 0; + this._usingWorkers = false; + this._workers = []; + this._unref = false; + this._pipeName = undefined; + this._connectionKey = undefined; + + if (_isConnectionListener(options)) { + this.on("connection", options); + } else if (_isServerSocketOptions(options)) { + this.allowHalfOpen = options?.allowHalfOpen || false; + this.pauseOnConnect = !!options?.pauseOnConnect; + + if (_isConnectionListener(connectionListener)) { + this.on("connection", connectionListener); + } + } else { + throw new ERR_INVALID_ARG_TYPE("options", "Object", options); + } +} +Object.setPrototypeOf(Server.prototype, EventEmitter.prototype); +Object.setPrototypeOf(Server, EventEmitter); + +/** + * Start a server listening for connections. A `net.Server` can be a TCP or + * an `IPC` server depending on what it listens to. + * + * Possible signatures: + * + * - `server.listen(handle[, backlog][, callback])` + * - `server.listen(options[, callback])` + * - `server.listen(path[, backlog][, callback])` for `IPC` servers + * - `server.listen([port[, host[, backlog]]][, callback])` for TCP servers + * + * This function is asynchronous. When the server starts listening, the `'listening'` event will be emitted. The last parameter `callback`will be added as a listener for the `'listening'` + * event. + * + * All `listen()` methods can take a `backlog` parameter to specify the maximum + * length of the queue of pending connections. The actual length will be determined + * by the OS through sysctl settings such as `tcp_max_syn_backlog` and `somaxconn` on Linux. The default value of this parameter is 511 (not 512). + * + * All `Socket` are set to `SO_REUSEADDR` (see [`socket(7)`](https://man7.org/linux/man-pages/man7/socket.7.html) for + * details). + * + * The `server.listen()` method can be called again if and only if there was an + * error during the first `server.listen()` call or `server.close()` has been + * called. Otherwise, an `ERR_SERVER_ALREADY_LISTEN` error will be thrown. + * + * One of the most common errors raised when listening is `EADDRINUSE`. + * This happens when another server is already listening on the requested`port`/`path`/`handle`. One way to handle this would be to retry + * after a certain amount of time: + */ +Server.prototype.listen = function (...args: unknown[]) { + const normalized = _normalizeArgs(args); + let options = normalized[0] as Partial; + const cb = normalized[1]; + + if (this._handle) { + throw new ERR_SERVER_ALREADY_LISTEN(); + } + + if (cb !== null) { + this.once("listening", cb); + } + + const backlogFromArgs: number = + // (handle, backlog) or (path, backlog) or (port, backlog) + _toNumber(args.length > 1 && args[1]) || + (_toNumber(args.length > 2 && args[2]) as number); // (port, host, backlog) + // deno-lint-ignore no-explicit-any - listen(handle: any, backlog?: number, listeningListener?: () => void): this; - // deno-lint-ignore no-explicit-any - listen(handle: any, listeningListener?: () => void): this; - listen(...args: unknown[]): this { - const normalized = _normalizeArgs(args); - let options = normalized[0] as Partial; - const cb = normalized[1]; + options = (options as any)._handle || (options as any).handle || options; + const flags = _getFlags(options.ipv6Only); - if (this._handle) { - throw new ERR_SERVER_ALREADY_LISTEN(); - } + // (handle[, backlog][, cb]) where handle is an object with a handle + if (options instanceof TCP) { + this._handle = options; + this[asyncIdSymbol] = this._handle.getAsyncId(); - if (cb !== null) { - this.once("listening", cb); - } + _listenInCluster(this, null, -1, -1, backlogFromArgs); - const backlogFromArgs: number = - // (handle, backlog) or (path, backlog) or (port, backlog) - _toNumber(args.length > 1 && args[1]) || - (_toNumber(args.length > 2 && args[2]) as number); // (port, host, backlog) + return this; + } - // deno-lint-ignore no-explicit-any - options = (options as any)._handle || (options as any).handle || options; - const flags = _getFlags(options.ipv6Only); + _addAbortSignalOption(this, options); - // (handle[, backlog][, cb]) where handle is an object with a handle - if (options instanceof TCP) { - this._handle = options; - this[asyncIdSymbol] = this._handle.getAsyncId(); + // (handle[, backlog][, cb]) where handle is an object with a fd + if (typeof options.fd === "number" && options.fd >= 0) { + _listenInCluster(this, null, null, null, backlogFromArgs, options.fd); - _listenInCluster(this, null, -1, -1, backlogFromArgs); + return this; + } - return this; - } + // ([port][, host][, backlog][, cb]) where port is omitted, + // that is, listen(), listen(null), listen(cb), or listen(null, cb) + // or (options[, cb]) where options.port is explicitly set as undefined or + // null, bind to an arbitrary unused port + if ( + args.length === 0 || + typeof args[0] === "function" || + (typeof options.port === "undefined" && "port" in options) || + options.port === null + ) { + options.port = 0; + } - _addAbortSignalOption(this, options); + // ([port][, host][, backlog][, cb]) where port is specified + // or (options[, cb]) where options.port is specified + // or if options.port is normalized as 0 before + let backlog; - // (handle[, backlog][, cb]) where handle is an object with a fd - if (typeof options.fd === "number" && options.fd >= 0) { - _listenInCluster(this, null, null, null, backlogFromArgs, options.fd); - - return this; - } - - // ([port][, host][, backlog][, cb]) where port is omitted, - // that is, listen(), listen(null), listen(cb), or listen(null, cb) - // or (options[, cb]) where options.port is explicitly set as undefined or - // null, bind to an arbitrary unused port - if ( - args.length === 0 || - typeof args[0] === "function" || - (typeof options.port === "undefined" && "port" in options) || - options.port === null - ) { - options.port = 0; - } - - // ([port][, host][, backlog][, cb]) where port is specified - // or (options[, cb]) where options.port is specified - // or if options.port is normalized as 0 before - let backlog; - - if (typeof options.port === "number" || typeof options.port === "string") { - validatePort(options.port, "options.port"); - backlog = options.backlog || backlogFromArgs; - - // start TCP server listening on host:port - if (options.host) { - _lookupAndListen( - this, - options.port | 0, - options.host, - backlog, - !!options.exclusive, - flags, - ); - } else { - // Undefined host, listens on unspecified address - // Default addressType 4 will be used to search for primary server - _listenInCluster( - this, - null, - options.port | 0, - 4, - backlog, - undefined, - options.exclusive, - ); - } - - return this; - } - - // (path[, backlog][, cb]) or (options[, cb]) - // where path or options.path is a UNIX domain socket or Windows pipe - if (options.path && _isPipeName(options.path)) { - const pipeName = (this._pipeName = options.path); - backlog = options.backlog || backlogFromArgs; + if (typeof options.port === "number" || typeof options.port === "string") { + validatePort(options.port, "options.port"); + backlog = options.backlog || backlogFromArgs; + // start TCP server listening on host:port + if (options.host) { + _lookupAndListen( + this, + options.port | 0, + options.host, + backlog, + !!options.exclusive, + flags, + ); + } else { + // Undefined host, listens on unspecified address + // Default addressType 4 will be used to search for primary server _listenInCluster( this, - pipeName, - -1, - -1, + null, + options.port | 0, + 4, backlog, undefined, options.exclusive, ); - - if (!this._handle) { - // Failed and an error shall be emitted in the next tick. - // Therefore, we directly return. - return this; - } - - let mode = 0; - - if (options.readableAll === true) { - mode |= PipeConstants.UV_READABLE; - } - - if (options.writableAll === true) { - mode |= PipeConstants.UV_WRITABLE; - } - - if (mode !== 0) { - const err = this._handle.fchmod(mode); - - if (err) { - this._handle.close(); - this._handle = null; - - throw errnoException(err, "uv_pipe_chmod"); - } - } - - return this; - } - - if (!("port" in options || "path" in options)) { - throw new ERR_INVALID_ARG_VALUE( - "options", - options, - 'must have the property "port" or "path"', - ); - } - - throw new ERR_INVALID_ARG_VALUE("options", options); - } - - /** - * Stops the server from accepting new connections and keeps existing - * connections. This function is asynchronous, the server is finally closed - * when all connections are ended and the server emits a `"close"` event. - * The optional `callback` will be called once the `"close"` event occurs. Unlike - * that event, it will be called with an `Error` as its only argument if the server - * was not open when it was closed. - * - * @param cb Called when the server is closed. - */ - close(cb?: (err?: Error) => void): this { - if (typeof cb === "function") { - if (!this._handle) { - this.once("close", function close() { - cb(new ERR_SERVER_NOT_RUNNING()); - }); - } else { - this.once("close", cb); - } - } - - if (this._handle) { - (this._handle as TCP).close(); - this._handle = null; - } - - if (this._usingWorkers) { - let left = this._workers.length; - const onWorkerClose = () => { - if (--left !== 0) { - return; - } - - this._connections = 0; - this._emitCloseIfDrained(); - }; - - // Increment connections to be sure that, even if all sockets will be closed - // during polling of workers, `close` event will be emitted only once. - this._connections++; - - // Poll workers - for (let n = 0; n < this._workers.length; n++) { - this._workers[n].close(onWorkerClose); - } - } else { - this._emitCloseIfDrained(); } return this; } - /** - * Returns the bound `address`, the address `family` name, and `port` of the server - * as reported by the operating system if listening on an IP socket - * (useful to find which port was assigned when getting an OS-assigned address):`{ port: 12346, family: "IPv4", address: "127.0.0.1" }`. - * - * For a server listening on a pipe or Unix domain socket, the name is returned - * as a string. - * - * `server.address()` returns `null` before the `"listening"` event has been - * emitted or after calling `server.close()`. - */ - address(): AddressInfo | string | null { - if (this._handle && this._handle.getsockname) { - const out = {}; - const err = this._handle.getsockname(out); + // (path[, backlog][, cb]) or (options[, cb]) + // where path or options.path is a UNIX domain socket or Windows pipe + if (options.path && _isPipeName(options.path)) { + const pipeName = (this._pipeName = options.path); + backlog = options.backlog || backlogFromArgs; - if (err) { - throw errnoException(err, "address"); - } - - return out as AddressInfo; - } else if (this._pipeName) { - return this._pipeName; - } - - return null; - } - - /** - * Asynchronously get the number of concurrent connections on the server. Works - * when sockets were sent to forks. - * - * Callback should take two arguments `err` and `count`. - */ - getConnections(cb: (err: Error | null, count: number) => void): this { - // deno-lint-ignore no-this-alias - const server = this; - - function end(err: Error | null, connections?: number) { - defaultTriggerAsyncIdScope( - server[asyncIdSymbol], - nextTick, - cb, - err, - connections, - ); - } - - if (!this._usingWorkers) { - end(null, this._connections); - - return this; - } - - // Poll workers - let left = this._workers.length; - let total = this._connections; - - function oncount(err: Error, count: number) { - if (err) { - left = -1; - - return end(err); - } - - total += count; - - if (--left === 0) { - return end(null, total); - } - } - - for (let n = 0; n < this._workers.length; n++) { - this._workers[n].getConnections(oncount); - } - - return this; - } - - /** - * Calling `unref()` on a server will allow the program to exit if this is the only - * active server in the event system. If the server is already `unref`ed calling `unref()` again will have no effect. - */ - unref(): this { - this._unref = true; - - if (this._handle) { - this._handle.unref(); - } - - return this; - } - - /** - * Opposite of `unref()`, calling `ref()` on a previously `unref`ed server will _not_ let the program exit if it's the only server left (the default behavior). - * If the server is `ref`ed calling `ref()` again will have no effect. - */ - ref(): this { - this._unref = false; - - if (this._handle) { - this._handle.ref(); - } - - return this; - } - - /** - * Indicates whether or not the server is listening for connections. - */ - get listening(): boolean { - return !!this._handle; - } - - _createSocket(clientHandle) { - const socket = new Socket({ - handle: clientHandle, - allowHalfOpen: this.allowHalfOpen, - pauseOnCreate: this.pauseOnConnect, - readable: true, - writable: true, - }); - - // TODO(@bartlomieju): implement noDelay and setKeepAlive - - socket.server = this; - socket._server = this; - - DTRACE_NET_SERVER_CONNECTION(socket); - - return socket; - } - - _listen2 = _setupListenHandle; - - _emitCloseIfDrained() { - debug("SERVER _emitCloseIfDrained"); - if (this._handle || this._connections) { - debug( - `SERVER handle? ${!!this._handle} connections? ${this._connections}`, - ); - return; - } - - // We use setTimeout instead of nextTick here to avoid EADDRINUSE error - // when the same port listened immediately after the 'close' event. - // ref: https://github.com/denoland/deno_std/issues/2788 - defaultTriggerAsyncIdScope( - this[asyncIdSymbol], - setTimeout, - _emitCloseNT, - 0, + _listenInCluster( this, + pipeName, + -1, + -1, + backlog, + undefined, + options.exclusive, + ); + + if (!this._handle) { + // Failed and an error shall be emitted in the next tick. + // Therefore, we directly return. + return this; + } + + let mode = 0; + + if (options.readableAll === true) { + mode |= PipeConstants.UV_READABLE; + } + + if (options.writableAll === true) { + mode |= PipeConstants.UV_WRITABLE; + } + + if (mode !== 0) { + const err = this._handle.fchmod(mode); + + if (err) { + this._handle.close(); + this._handle = null; + + throw errnoException(err, "uv_pipe_chmod"); + } + } + + return this; + } + + if (!("port" in options || "path" in options)) { + throw new ERR_INVALID_ARG_VALUE( + "options", + options, + 'must have the property "port" or "path"', ); } - _setupWorker(socketList: EventEmitter) { - this._usingWorkers = true; - this._workers.push(socketList); + throw new ERR_INVALID_ARG_VALUE("options", options); +}; - // deno-lint-ignore no-explicit-any - socketList.once("exit", (socketList: any) => { - const index = this._workers.indexOf(socketList); - this._workers.splice(index, 1); - }); - } - - [EventEmitter.captureRejectionSymbol]( - err: Error, - event: string, - sock: Socket, - ) { - switch (event) { - case "connection": { - sock.destroy(err); - break; - } - default: { - this.emit("error", err); - } +/** + * Stops the server from accepting new connections and keeps existing + * connections. This function is asynchronous, the server is finally closed + * when all connections are ended and the server emits a `"close"` event. + * The optional `callback` will be called once the `"close"` event occurs. Unlike + * that event, it will be called with an `Error` as its only argument if the server + * was not open when it was closed. + * + * @param cb Called when the server is closed. + */ +Server.prototype.close = function (cb?: (err?: Error) => void) { + if (typeof cb === "function") { + if (!this._handle) { + this.once("close", function close() { + cb(new ERR_SERVER_NOT_RUNNING()); + }); + } else { + this.once("close", cb); } } -} + + if (this._handle) { + (this._handle as TCP).close(); + this._handle = null; + } + + if (this._usingWorkers) { + let left = this._workers.length; + const onWorkerClose = () => { + if (--left !== 0) { + return; + } + + this._connections = 0; + this._emitCloseIfDrained(); + }; + + // Increment connections to be sure that, even if all sockets will be closed + // during polling of workers, `close` event will be emitted only once. + this._connections++; + + // Poll workers + for (let n = 0; n < this._workers.length; n++) { + this._workers[n].close(onWorkerClose); + } + } else { + this._emitCloseIfDrained(); + } + + return this; +}; + +/** + * Returns the bound `address`, the address `family` name, and `port` of the server + * as reported by the operating system if listening on an IP socket + * (useful to find which port was assigned when getting an OS-assigned address):`{ port: 12346, family: "IPv4", address: "127.0.0.1" }`. + * + * For a server listening on a pipe or Unix domain socket, the name is returned + * as a string. + * + * `server.address()` returns `null` before the `"listening"` event has been + * emitted or after calling `server.close()`. + */ +Server.prototype.address = function (): AddressInfo | string | null { + if (this._handle && this._handle.getsockname) { + const out = {}; + const err = this._handle.getsockname(out); + + if (err) { + throw errnoException(err, "address"); + } + + return out as AddressInfo; + } else if (this._pipeName) { + return this._pipeName; + } + + return null; +}; + +/** + * Asynchronously get the number of concurrent connections on the server. Works + * when sockets were sent to forks. + * + * Callback should take two arguments `err` and `count`. + */ +Server.prototype.getConnections = function ( + cb: (err: Error | null, count: number) => void, +) { + // deno-lint-ignore no-this-alias + const server = this; + + function end(err: Error | null, connections?: number) { + defaultTriggerAsyncIdScope( + server[asyncIdSymbol], + nextTick, + cb, + err, + connections, + ); + } + + if (!this._usingWorkers) { + end(null, this._connections); + + return this; + } + + // Poll workers + let left = this._workers.length; + let total = this._connections; + + function oncount(err: Error, count: number) { + if (err) { + left = -1; + + return end(err); + } + + total += count; + + if (--left === 0) { + return end(null, total); + } + } + + for (let n = 0; n < this._workers.length; n++) { + this._workers[n].getConnections(oncount); + } + + return this; +}; + +/** + * Calling `unref()` on a server will allow the program to exit if this is the only + * active server in the event system. If the server is already `unref`ed calling `unref()` again will have no effect. + */ +Server.prototype.unref = function () { + this._unref = true; + + if (this._handle) { + this._handle.unref(); + } + + return this; +}; + +/** + * Opposite of `unref()`, calling `ref()` on a previously `unref`ed server will _not_ let the program exit if it's the only server left (the default behavior). + * If the server is `ref`ed calling `ref()` again will have no effect. + */ +Server.prototype.ref = function () { + this._unref = false; + + if (this._handle) { + this._handle.ref(); + } + + return this; +}; + +Object.defineProperty(Server.prototype, "listening", { + get: function () { + return !!this._handle; + }, +}); + +Server.prototype._createSocket = function (clientHandle) { + const socket = new Socket({ + handle: clientHandle, + allowHalfOpen: this.allowHalfOpen, + pauseOnCreate: this.pauseOnConnect, + readable: true, + writable: true, + }); + + // TODO(@bartlomieju): implement noDelay and setKeepAlive + + socket.server = this; + socket._server = this; + + DTRACE_NET_SERVER_CONNECTION(socket); + + return socket; +}; + +Server.prototype._listen2 = _setupListenHandle; + +Server.prototype._emitCloseIfDrained = function () { + debug("SERVER _emitCloseIfDrained"); + if (this._handle || this._connections) { + debug( + `SERVER handle? ${!!this._handle} connections? ${this._connections}`, + ); + return; + } + + // We use setTimeout instead of nextTick here to avoid EADDRINUSE error + // when the same port listened immediately after the 'close' event. + // ref: https://github.com/denoland/deno_std/issues/2788 + defaultTriggerAsyncIdScope( + this[asyncIdSymbol], + setTimeout, + _emitCloseNT, + 0, + this, + ); +}; + +Server.prototype._setupWorker = function (socketList: EventEmitter) { + this._usingWorkers = true; + this._workers.push(socketList); + + // deno-lint-ignore no-explicit-any + socketList.once("exit", (socketList: any) => { + const index = this._workers.indexOf(socketList); + this._workers.splice(index, 1); + }); +}; + +Server.prototype[EventEmitter.captureRejectionSymbol] = function ( + err: Error, + event: string, + sock: Socket, +) { + switch (event) { + case "connection": { + sock.destroy(err); + break; + } + default: { + this.emit("error", err); + } + } +}; /** * Creates a new TCP or IPC server.