mirror of
https://github.com/denoland/deno.git
synced 2025-09-22 10:22:34 +00:00

Some checks are pending
ci / pre-build (push) Waiting to run
ci / test debug linux-aarch64 (push) Blocked by required conditions
ci / test release linux-aarch64 (push) Blocked by required conditions
ci / test debug macos-aarch64 (push) Blocked by required conditions
ci / test release macos-aarch64 (push) Blocked by required conditions
ci / bench release linux-x86_64 (push) Blocked by required conditions
ci / lint debug linux-x86_64 (push) Blocked by required conditions
ci / lint debug macos-x86_64 (push) Blocked by required conditions
ci / lint debug windows-x86_64 (push) Blocked by required conditions
ci / test debug linux-x86_64 (push) Blocked by required conditions
ci / test release linux-x86_64 (push) Blocked by required conditions
ci / test debug macos-x86_64 (push) Blocked by required conditions
ci / test release macos-x86_64 (push) Blocked by required conditions
ci / test debug windows-x86_64 (push) Blocked by required conditions
ci / test release windows-x86_64 (push) Blocked by required conditions
ci / build libs (push) Blocked by required conditions
ci / publish canary (push) Blocked by required conditions
Fixes https://github.com/denoland/deno/issues/20594 This implements `JSStreamSocket` which drives the TLS underlying stream in `rustls_tokio_stream` using 2 sets of channels. One for piping the encrypted protocol transport and the other for plaintext application data. This fixes connecting to `npm:mssql`: ```js import sql from "npm:mssql"; const sqlConfig = { server: "localhost", user: "divy", password: "123", database: "master", options: { trustServerCertificate: true, }, }; const pool = await sql.connect(sqlConfig); const result = await pool.request().query(`SELECT * FROM sys.databases`); ```
494 lines
13 KiB
TypeScript
494 lines
13 KiB
TypeScript
// Copyright 2018-2025 the Deno authors. MIT license.
|
|
// Copyright Joyent, Inc. and other Node contributors.
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
// copy of this software and associated documentation files (the
|
|
// "Software"), to deal in the Software without restriction, including
|
|
// without limitation the rights to use, copy, modify, merge, publish,
|
|
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
|
// persons to whom the Software is furnished to do so, subject to the
|
|
// following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included
|
|
// in all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
|
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
|
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
|
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
|
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
|
|
// This module ports:
|
|
// - https://github.com/nodejs/node/blob/master/src/stream_base-inl.h
|
|
// - https://github.com/nodejs/node/blob/master/src/stream_base.h
|
|
// - https://github.com/nodejs/node/blob/master/src/stream_base.cc
|
|
// - https://github.com/nodejs/node/blob/master/src/stream_wrap.h
|
|
// - https://github.com/nodejs/node/blob/master/src/stream_wrap.cc
|
|
|
|
import { core, primordials } from "ext:core/mod.js";
|
|
const { internalRidSymbol } = core;
|
|
const {
|
|
Array,
|
|
MapPrototypeGet,
|
|
ObjectPrototypeIsPrototypeOf,
|
|
PromiseResolve,
|
|
PromisePrototypeThen,
|
|
Symbol,
|
|
TypedArrayPrototypeSlice,
|
|
Uint8Array,
|
|
Uint8ArrayPrototype,
|
|
} = primordials;
|
|
import { op_can_write_vectored, op_raw_write_vectored } from "ext:core/ops";
|
|
|
|
import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
|
|
import { Buffer } from "node:buffer";
|
|
import { notImplemented } from "ext:deno_node/_utils.ts";
|
|
import { HandleWrap } from "ext:deno_node/internal_binding/handle_wrap.ts";
|
|
import { ownerSymbol } from "ext:deno_node/internal/async_hooks.ts";
|
|
import {
|
|
AsyncWrap,
|
|
providerType,
|
|
} from "ext:deno_node/internal_binding/async_wrap.ts";
|
|
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
|
|
import { _readWithCancelHandle } from "ext:deno_io/12_io.js";
|
|
import { NodeTypeError } from "ext:deno_node/internal/errors.ts";
|
|
|
|
export interface Reader {
|
|
read(p: Uint8Array): Promise<number | null>;
|
|
}
|
|
|
|
export interface Writer {
|
|
write(p: Uint8Array): Promise<number>;
|
|
}
|
|
|
|
export interface Closer {
|
|
close(): void;
|
|
}
|
|
|
|
export interface Ref {
|
|
ref(): void;
|
|
unref(): void;
|
|
}
|
|
|
|
export interface StreamBase extends Reader, Writer, Closer, Ref {}
|
|
|
|
const enum StreamBaseStateFields {
|
|
kReadBytesOrError,
|
|
kArrayBufferOffset,
|
|
kBytesWritten,
|
|
kLastWriteWasAsync,
|
|
kNumStreamBaseStateFields,
|
|
}
|
|
|
|
export const kReadBytesOrError = StreamBaseStateFields.kReadBytesOrError;
|
|
export const kArrayBufferOffset = StreamBaseStateFields.kArrayBufferOffset;
|
|
export const kBytesWritten = StreamBaseStateFields.kBytesWritten;
|
|
export const kLastWriteWasAsync = StreamBaseStateFields.kLastWriteWasAsync;
|
|
export const kNumStreamBaseStateFields =
|
|
StreamBaseStateFields.kNumStreamBaseStateFields;
|
|
|
|
export const streamBaseState = new Uint8Array(5);
|
|
|
|
// This is Deno, it always will be async.
|
|
streamBaseState[kLastWriteWasAsync] = 1;
|
|
|
|
export class WriteWrap<H extends HandleWrap> extends AsyncWrap {
|
|
handle!: H;
|
|
oncomplete!: (status: number) => void;
|
|
async!: boolean;
|
|
bytes!: number;
|
|
buffer!: unknown;
|
|
callback!: unknown;
|
|
_chunks!: unknown[];
|
|
|
|
constructor() {
|
|
super(providerType.WRITEWRAP);
|
|
}
|
|
}
|
|
|
|
export class ShutdownWrap<H extends HandleWrap> extends AsyncWrap {
|
|
handle!: H;
|
|
oncomplete!: (status: number) => void;
|
|
callback!: () => void;
|
|
|
|
constructor() {
|
|
super(providerType.SHUTDOWNWRAP);
|
|
}
|
|
}
|
|
|
|
export const kStreamBaseField = Symbol("kStreamBaseField");
|
|
|
|
const SUGGESTED_SIZE = 64 * 1024;
|
|
|
|
export class LibuvStreamWrap extends HandleWrap {
|
|
[kStreamBaseField]?: Reader & Writer & Closer & Ref;
|
|
|
|
reading!: boolean;
|
|
#reading = false;
|
|
destroyed = false;
|
|
writeQueueSize = 0;
|
|
bytesRead = 0;
|
|
bytesWritten = 0;
|
|
#buf = new Uint8Array(SUGGESTED_SIZE);
|
|
|
|
onread!: (_arrayBuffer: Uint8Array, _nread: number) => Uint8Array | undefined;
|
|
|
|
constructor(
|
|
provider: providerType,
|
|
stream?: Reader & Writer & Closer & Ref,
|
|
) {
|
|
super(provider, stream?.[internalRidSymbol]);
|
|
this.#attachToObject(stream);
|
|
}
|
|
|
|
/**
|
|
* Start the reading of the stream.
|
|
* @return An error status code.
|
|
*/
|
|
readStart(): number {
|
|
if (!this.#reading) {
|
|
this.#reading = true;
|
|
this.#read();
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Stop the reading of the stream.
|
|
* @return An error status code.
|
|
*/
|
|
readStop(): number {
|
|
this.#reading = false;
|
|
if (this.cancelHandle) {
|
|
core.close(this.cancelHandle);
|
|
this.cancelHandle = undefined;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Shutdown the stream.
|
|
* @param req A shutdown request wrapper.
|
|
* @return An error status code.
|
|
*/
|
|
shutdown(req: ShutdownWrap<LibuvStreamWrap>): number {
|
|
const status = this._onClose();
|
|
|
|
try {
|
|
req.oncomplete(status);
|
|
} catch {
|
|
// swallow callback error.
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* @param userBuf
|
|
* @return An error status code.
|
|
*/
|
|
useUserBuffer(_userBuf: unknown): number {
|
|
// TODO(cmorten)
|
|
notImplemented("LibuvStreamWrap.prototype.useUserBuffer");
|
|
}
|
|
|
|
/**
|
|
* Write a buffer to the stream.
|
|
* @param req A write request wrapper.
|
|
* @param data The Uint8Array buffer to write to the stream.
|
|
* @return An error status code.
|
|
*/
|
|
writeBuffer(req: WriteWrap<LibuvStreamWrap>, data: Uint8Array): number {
|
|
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, data)) {
|
|
throw new NodeTypeError(
|
|
"ERR_INVALID_ARG_TYPE",
|
|
"Second argument must be a buffer",
|
|
);
|
|
}
|
|
|
|
this.#write(req, data);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Write multiple chunks at once.
|
|
* @param req A write request wrapper.
|
|
* @param chunks
|
|
* @param allBuffers
|
|
* @return An error status code.
|
|
*/
|
|
writev(
|
|
req: WriteWrap<LibuvStreamWrap>,
|
|
chunks: Buffer[] | (string | Buffer)[],
|
|
allBuffers: boolean,
|
|
): number {
|
|
const supportsWritev = this.provider === providerType.TCPSERVERWRAP;
|
|
const rid = this[kStreamBaseField]![internalRidSymbol];
|
|
// Fast case optimization: two chunks, and all buffers.
|
|
if (
|
|
chunks.length === 2 && allBuffers && supportsWritev &&
|
|
op_can_write_vectored(rid)
|
|
) {
|
|
// String chunks.
|
|
if (typeof chunks[0] === "string") chunks[0] = Buffer.from(chunks[0]);
|
|
if (typeof chunks[1] === "string") chunks[1] = Buffer.from(chunks[1]);
|
|
|
|
PromisePrototypeThen(
|
|
op_raw_write_vectored(
|
|
rid,
|
|
chunks[0],
|
|
chunks[1],
|
|
),
|
|
(nwritten) => {
|
|
try {
|
|
req.oncomplete(0);
|
|
} catch {
|
|
// swallow callback errors.
|
|
}
|
|
|
|
streamBaseState[kBytesWritten] = nwritten;
|
|
this.bytesWritten += nwritten;
|
|
},
|
|
);
|
|
|
|
return 0;
|
|
}
|
|
|
|
const count = allBuffers ? chunks.length : chunks.length >> 1;
|
|
const buffers: Buffer[] = new Array(count);
|
|
|
|
if (!allBuffers) {
|
|
for (let i = 0; i < count; i++) {
|
|
const chunk = chunks[i * 2];
|
|
|
|
if (Buffer.isBuffer(chunk)) {
|
|
buffers[i] = chunk;
|
|
}
|
|
|
|
// String chunk
|
|
const encoding: string = chunks[i * 2 + 1] as string;
|
|
buffers[i] = Buffer.from(chunk as string, encoding);
|
|
}
|
|
} else {
|
|
for (let i = 0; i < count; i++) {
|
|
buffers[i] = chunks[i] as Buffer;
|
|
}
|
|
}
|
|
|
|
// Ignoring primordial lint here since the static method `concat` is invoked
|
|
// via the Node.js `Buffer` class instead of a JS builtin.
|
|
// deno-lint-ignore prefer-primordials
|
|
return this.writeBuffer(req, Buffer.concat(buffers));
|
|
}
|
|
|
|
/**
|
|
* Write an ASCII string to the stream.
|
|
* @return An error status code.
|
|
*/
|
|
writeAsciiString(req: WriteWrap<LibuvStreamWrap>, data: string): number {
|
|
const buffer = new TextEncoder().encode(data);
|
|
|
|
return this.writeBuffer(req, buffer);
|
|
}
|
|
|
|
/**
|
|
* Write an UTF8 string to the stream.
|
|
* @return An error status code.
|
|
*/
|
|
writeUtf8String(req: WriteWrap<LibuvStreamWrap>, data: string): number {
|
|
const buffer = new TextEncoder().encode(data);
|
|
|
|
return this.writeBuffer(req, buffer);
|
|
}
|
|
|
|
/**
|
|
* Write an UCS2 string to the stream.
|
|
* @return An error status code.
|
|
*/
|
|
writeUcs2String(_req: WriteWrap<LibuvStreamWrap>, _data: string): number {
|
|
notImplemented("LibuvStreamWrap.prototype.writeUcs2String");
|
|
}
|
|
|
|
/**
|
|
* Write an LATIN1 string to the stream.
|
|
* @return An error status code.
|
|
*/
|
|
writeLatin1String(req: WriteWrap<LibuvStreamWrap>, data: string): number {
|
|
const buffer = Buffer.from(data, "latin1");
|
|
return this.writeBuffer(req, buffer);
|
|
}
|
|
|
|
override _onClose(): number {
|
|
let status = 0;
|
|
this.#reading = false;
|
|
|
|
try {
|
|
this[kStreamBaseField]?.close();
|
|
} catch {
|
|
status = MapPrototypeGet(codeMap, "ENOTCONN")!;
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
/**
|
|
* Attaches the class to the underlying stream.
|
|
* @param stream The stream to attach to.
|
|
*/
|
|
#attachToObject(stream?: Reader & Writer & Closer & Ref) {
|
|
this[kStreamBaseField] = stream;
|
|
}
|
|
|
|
/** Internal method for reading from the attached stream. */
|
|
async #read() {
|
|
// Queue the read operation and allow TLS upgrades to complete.
|
|
//
|
|
// This is done to ensure that the resource is not locked up by
|
|
// op_read.
|
|
await PromiseResolve();
|
|
|
|
let buf = this.#buf;
|
|
|
|
let nread: number | null;
|
|
|
|
if (this.upgrading) {
|
|
// Starting an upgrade, stop reading. Upgrading will resume reading.
|
|
this.readStop();
|
|
return;
|
|
}
|
|
|
|
const ridBefore = this[kStreamBaseField]![internalRidSymbol];
|
|
try {
|
|
if (this[kStreamBaseField]![_readWithCancelHandle]) {
|
|
const { cancelHandle, nread: p } = this[kStreamBaseField]!
|
|
[_readWithCancelHandle](buf);
|
|
if (cancelHandle) {
|
|
this.cancelHandle = cancelHandle;
|
|
}
|
|
|
|
nread = await p;
|
|
} else {
|
|
nread = await this[kStreamBaseField]!.read(buf);
|
|
}
|
|
} catch (e) {
|
|
// Try to read again if the underlying stream resource
|
|
// changed. This can happen during TLS upgrades (eg. STARTTLS)
|
|
if (
|
|
ridBefore != this[kStreamBaseField]![internalRidSymbol]
|
|
) {
|
|
return this.#read();
|
|
}
|
|
|
|
if (e.message === "cancelled") return null;
|
|
|
|
if (
|
|
ObjectPrototypeIsPrototypeOf(Deno.errors.Interrupted.prototype, e) ||
|
|
ObjectPrototypeIsPrototypeOf(Deno.errors.BadResource.prototype, e)
|
|
) {
|
|
nread = MapPrototypeGet(codeMap, "EOF")!;
|
|
} else if (
|
|
ObjectPrototypeIsPrototypeOf(
|
|
Deno.errors.ConnectionReset.prototype,
|
|
e,
|
|
) ||
|
|
ObjectPrototypeIsPrototypeOf(Deno.errors.ConnectionAborted.prototype, e)
|
|
) {
|
|
nread = MapPrototypeGet(codeMap, "ECONNRESET")!;
|
|
} else {
|
|
this[ownerSymbol].destroy(e);
|
|
return;
|
|
}
|
|
}
|
|
|
|
nread ??= MapPrototypeGet(codeMap, "EOF")!;
|
|
|
|
streamBaseState[kReadBytesOrError] = nread;
|
|
|
|
if (nread > 0) {
|
|
this.bytesRead += nread;
|
|
}
|
|
|
|
buf = TypedArrayPrototypeSlice(buf, 0, nread);
|
|
|
|
streamBaseState[kArrayBufferOffset] = 0;
|
|
|
|
try {
|
|
this.onread!(buf, nread);
|
|
} catch {
|
|
// swallow callback errors.
|
|
}
|
|
|
|
if (nread >= 0 && this.#reading) {
|
|
this.#read();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Internal method for writing to the attached stream.
|
|
* @param req A write request wrapper.
|
|
* @param data The Uint8Array buffer to write to the stream.
|
|
*/
|
|
async #write(req: WriteWrap<LibuvStreamWrap>, data: Uint8Array) {
|
|
const { byteLength } = data;
|
|
|
|
const ridBefore = this[kStreamBaseField]![internalRidSymbol];
|
|
|
|
if (this.upgrading) {
|
|
// There is an upgrade in progress, queue the write request.
|
|
await this.upgrading;
|
|
}
|
|
|
|
let nwritten = 0;
|
|
try {
|
|
// TODO(crowlKats): duplicate from runtime/js/13_buffer.js
|
|
while (nwritten < data.length) {
|
|
nwritten += await this[kStreamBaseField]!.write(
|
|
data.subarray(nwritten),
|
|
);
|
|
}
|
|
} catch (e) {
|
|
// Try to read again if the underlying stream resource
|
|
// changed. This can happen during TLS upgrades (eg. STARTTLS)
|
|
if (
|
|
ridBefore != this[kStreamBaseField]![internalRidSymbol]
|
|
) {
|
|
return this.#write(req, data.subarray(nwritten));
|
|
}
|
|
|
|
let status: number;
|
|
// TODO(cmorten): map err to status codes
|
|
if (
|
|
ObjectPrototypeIsPrototypeOf(Deno.errors.BadResource.prototype, e) ||
|
|
ObjectPrototypeIsPrototypeOf(Deno.errors.BrokenPipe.prototype, e)
|
|
) {
|
|
status = MapPrototypeGet(codeMap, "EBADF")!;
|
|
} else {
|
|
status = MapPrototypeGet(codeMap, "UNKNOWN")!;
|
|
}
|
|
|
|
try {
|
|
req.oncomplete(status);
|
|
} catch {
|
|
// swallow callback errors.
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
streamBaseState[kBytesWritten] = byteLength;
|
|
this.bytesWritten += byteLength;
|
|
|
|
try {
|
|
req.oncomplete(0);
|
|
} catch {
|
|
// swallow callback errors.
|
|
}
|
|
|
|
return;
|
|
}
|
|
}
|