mirror of
https://github.com/denoland/deno.git
synced 2025-10-03 07:34:36 +00:00
Add TransformStream and TransformStreamController (#5042)
This commit is contained in:
parent
1560af2b6e
commit
7e32269f3f
8 changed files with 1156 additions and 9 deletions
|
@ -23,6 +23,7 @@ import * as workers from "./web/workers.ts";
|
||||||
import * as performanceUtil from "./web/performance.ts";
|
import * as performanceUtil from "./web/performance.ts";
|
||||||
import * as request from "./web/request.ts";
|
import * as request from "./web/request.ts";
|
||||||
import * as readableStream from "./web/streams/readable_stream.ts";
|
import * as readableStream from "./web/streams/readable_stream.ts";
|
||||||
|
import * as transformStream from "./web/streams/transform_stream.ts";
|
||||||
import * as queuingStrategy from "./web/streams/queuing_strategy.ts";
|
import * as queuingStrategy from "./web/streams/queuing_strategy.ts";
|
||||||
import * as writableStream from "./web/streams/writable_stream.ts";
|
import * as writableStream from "./web/streams/writable_stream.ts";
|
||||||
|
|
||||||
|
@ -234,6 +235,7 @@ export const windowOrWorkerGlobalScopeProperties = {
|
||||||
TextEncoder: nonEnumerable(textEncoding.TextEncoder),
|
TextEncoder: nonEnumerable(textEncoding.TextEncoder),
|
||||||
TextDecoder: nonEnumerable(textEncoding.TextDecoder),
|
TextDecoder: nonEnumerable(textEncoding.TextDecoder),
|
||||||
ReadableStream: nonEnumerable(readableStream.ReadableStreamImpl),
|
ReadableStream: nonEnumerable(readableStream.ReadableStreamImpl),
|
||||||
|
TransformStream: nonEnumerable(transformStream.TransformStreamImpl),
|
||||||
Request: nonEnumerable(request.Request),
|
Request: nonEnumerable(request.Request),
|
||||||
Response: nonEnumerable(fetchTypes.Response),
|
Response: nonEnumerable(fetchTypes.Response),
|
||||||
performance: writable(new performanceUtil.Performance()),
|
performance: writable(new performanceUtil.Performance()),
|
||||||
|
|
36
cli/js/lib.deno.shared_globals.d.ts
vendored
36
cli/js/lib.deno.shared_globals.d.ts
vendored
|
@ -423,6 +423,42 @@ interface WritableStreamDefaultWriter<W = any> {
|
||||||
write(chunk: W): Promise<void>;
|
write(chunk: W): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
declare class TransformStream<I = any, O = any> {
|
||||||
|
constructor(
|
||||||
|
transformer?: Transformer<I, O>,
|
||||||
|
writableStrategy?: QueuingStrategy<I>,
|
||||||
|
readableStrategy?: QueuingStrategy<O>
|
||||||
|
);
|
||||||
|
readonly readable: ReadableStream<O>;
|
||||||
|
readonly writable: WritableStream<I>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface TransformStreamDefaultController<O = any> {
|
||||||
|
readonly desiredSize: number | null;
|
||||||
|
enqueue(chunk: O): void;
|
||||||
|
error(reason?: any): void;
|
||||||
|
terminate(): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Transformer<I = any, O = any> {
|
||||||
|
flush?: TransformStreamDefaultControllerCallback<O>;
|
||||||
|
readableType?: undefined;
|
||||||
|
start?: TransformStreamDefaultControllerCallback<O>;
|
||||||
|
transform?: TransformStreamDefaultControllerTransformCallback<I, O>;
|
||||||
|
writableType?: undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface TransformStreamDefaultControllerCallback<O> {
|
||||||
|
(controller: TransformStreamDefaultController<O>): void | PromiseLike<void>;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface TransformStreamDefaultControllerTransformCallback<I, O> {
|
||||||
|
(
|
||||||
|
chunk: I,
|
||||||
|
controller: TransformStreamDefaultController<O>
|
||||||
|
): void | PromiseLike<void>;
|
||||||
|
}
|
||||||
|
|
||||||
interface DOMStringList {
|
interface DOMStringList {
|
||||||
/** Returns the number of strings in strings. */
|
/** Returns the number of strings in strings. */
|
||||||
readonly length: number;
|
readonly length: number;
|
||||||
|
|
562
cli/js/tests/streams_transform_test.ts
Normal file
562
cli/js/tests/streams_transform_test.ts
Normal file
|
@ -0,0 +1,562 @@
|
||||||
|
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||||
|
import {
|
||||||
|
unitTest,
|
||||||
|
assert,
|
||||||
|
assertEquals,
|
||||||
|
assertNotEquals,
|
||||||
|
assertThrows,
|
||||||
|
} from "./test_util.ts";
|
||||||
|
|
||||||
|
function delay(seconds: number): Promise<void> {
|
||||||
|
return new Promise<void>((resolve) => {
|
||||||
|
setTimeout(() => {
|
||||||
|
resolve();
|
||||||
|
}, seconds);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function readableStreamToArray<R>(
|
||||||
|
readable: { getReader(): ReadableStreamDefaultReader<R> },
|
||||||
|
reader?: ReadableStreamDefaultReader<R>
|
||||||
|
): Promise<R[]> {
|
||||||
|
if (reader === undefined) {
|
||||||
|
reader = readable.getReader();
|
||||||
|
}
|
||||||
|
|
||||||
|
const chunks: R[] = [];
|
||||||
|
|
||||||
|
return pump();
|
||||||
|
|
||||||
|
function pump(): Promise<R[]> {
|
||||||
|
return reader!.read().then((result) => {
|
||||||
|
if (result.done) {
|
||||||
|
return chunks;
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks.push(result.value);
|
||||||
|
return pump();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unitTest(function transformStreamConstructedWithTransformFunction() {
|
||||||
|
new TransformStream({ transform(): void {} });
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamConstructedNoTransform() {
|
||||||
|
new TransformStream();
|
||||||
|
new TransformStream({});
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamIntstancesHaveProperProperties() {
|
||||||
|
const ts = new TransformStream({ transform(): void {} });
|
||||||
|
const proto = Object.getPrototypeOf(ts);
|
||||||
|
|
||||||
|
const writableStream = Object.getOwnPropertyDescriptor(proto, "writable");
|
||||||
|
assert(writableStream !== undefined, "it has a writable property");
|
||||||
|
assert(!writableStream.enumerable, "writable should be non-enumerable");
|
||||||
|
assertEquals(
|
||||||
|
typeof writableStream.get,
|
||||||
|
"function",
|
||||||
|
"writable should have a getter"
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
writableStream.set,
|
||||||
|
undefined,
|
||||||
|
"writable should not have a setter"
|
||||||
|
);
|
||||||
|
assert(writableStream.configurable, "writable should be configurable");
|
||||||
|
assert(
|
||||||
|
ts.writable instanceof WritableStream,
|
||||||
|
"writable is an instance of WritableStream"
|
||||||
|
);
|
||||||
|
assert(
|
||||||
|
WritableStream.prototype.getWriter.call(ts.writable),
|
||||||
|
"writable should pass WritableStream brand check"
|
||||||
|
);
|
||||||
|
|
||||||
|
const readableStream = Object.getOwnPropertyDescriptor(proto, "readable");
|
||||||
|
assert(readableStream !== undefined, "it has a readable property");
|
||||||
|
assert(!readableStream.enumerable, "readable should be non-enumerable");
|
||||||
|
assertEquals(
|
||||||
|
typeof readableStream.get,
|
||||||
|
"function",
|
||||||
|
"readable should have a getter"
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
readableStream.set,
|
||||||
|
undefined,
|
||||||
|
"readable should not have a setter"
|
||||||
|
);
|
||||||
|
assert(readableStream.configurable, "readable should be configurable");
|
||||||
|
assert(
|
||||||
|
ts.readable instanceof ReadableStream,
|
||||||
|
"readable is an instance of ReadableStream"
|
||||||
|
);
|
||||||
|
assertNotEquals(
|
||||||
|
ReadableStream.prototype.getReader.call(ts.readable),
|
||||||
|
undefined,
|
||||||
|
"readable should pass ReadableStream brand check"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamWritableStartsAsWritable() {
|
||||||
|
const ts = new TransformStream({ transform(): void {} });
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
assertEquals(writer.desiredSize, 1, "writer.desiredSize should be 1");
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamReadableCanReadOutOfWritable() {
|
||||||
|
const ts = new TransformStream();
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
assertEquals(
|
||||||
|
writer.desiredSize,
|
||||||
|
0,
|
||||||
|
"writer.desiredSize should be 0 after write()"
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await ts.readable.getReader().read();
|
||||||
|
assertEquals(
|
||||||
|
result.value,
|
||||||
|
"a",
|
||||||
|
"result from reading the readable is the same as was written to writable"
|
||||||
|
);
|
||||||
|
assert(!result.done, "stream should not be done");
|
||||||
|
|
||||||
|
await delay(0);
|
||||||
|
assert(writer.desiredSize === 1, "desiredSize should be 1 again");
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamCanReadWhatIsWritten() {
|
||||||
|
let c: TransformStreamDefaultController;
|
||||||
|
const ts = new TransformStream({
|
||||||
|
start(controller: TransformStreamDefaultController): void {
|
||||||
|
c = controller;
|
||||||
|
},
|
||||||
|
transform(chunk: string): void {
|
||||||
|
c.enqueue(chunk.toUpperCase());
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
|
||||||
|
const result = await ts.readable.getReader().read();
|
||||||
|
assertEquals(
|
||||||
|
result.value,
|
||||||
|
"A",
|
||||||
|
"result from reading the readable is the transformation of what was written to writable"
|
||||||
|
);
|
||||||
|
assert(!result.done, "stream should not be done");
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamCanReadBothChunks() {
|
||||||
|
let c: TransformStreamDefaultController;
|
||||||
|
const ts = new TransformStream({
|
||||||
|
start(controller: TransformStreamDefaultController): void {
|
||||||
|
c = controller;
|
||||||
|
},
|
||||||
|
transform(chunk: string): void {
|
||||||
|
c.enqueue(chunk.toUpperCase());
|
||||||
|
c.enqueue(chunk.toUpperCase());
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
|
||||||
|
const reader = ts.readable.getReader();
|
||||||
|
|
||||||
|
const result1 = await reader.read();
|
||||||
|
assertEquals(
|
||||||
|
result1.value,
|
||||||
|
"A",
|
||||||
|
"the first chunk read is the transformation of the single chunk written"
|
||||||
|
);
|
||||||
|
assert(!result1.done, "stream should not be done");
|
||||||
|
|
||||||
|
const result2 = await reader.read();
|
||||||
|
assertEquals(
|
||||||
|
result2.value,
|
||||||
|
"A",
|
||||||
|
"the second chunk read is also the transformation of the single chunk written"
|
||||||
|
);
|
||||||
|
assert(!result2.done, "stream should not be done");
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamCanReadWhatIsWritten() {
|
||||||
|
let c: TransformStreamDefaultController;
|
||||||
|
const ts = new TransformStream({
|
||||||
|
start(controller: TransformStreamDefaultController): void {
|
||||||
|
c = controller;
|
||||||
|
},
|
||||||
|
transform(chunk: string): Promise<void> {
|
||||||
|
return delay(0).then(() => c.enqueue(chunk.toUpperCase()));
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
|
||||||
|
const result = await ts.readable.getReader().read();
|
||||||
|
assertEquals(
|
||||||
|
result.value,
|
||||||
|
"A",
|
||||||
|
"result from reading the readable is the transformation of what was written to writable"
|
||||||
|
);
|
||||||
|
assert(!result.done, "stream should not be done");
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamAsyncReadMultipleChunks() {
|
||||||
|
let doSecondEnqueue: () => void;
|
||||||
|
let returnFromTransform: () => void;
|
||||||
|
const ts = new TransformStream({
|
||||||
|
transform(
|
||||||
|
chunk: string,
|
||||||
|
controller: TransformStreamDefaultController
|
||||||
|
): Promise<void> {
|
||||||
|
delay(0).then(() => controller.enqueue(chunk.toUpperCase()));
|
||||||
|
doSecondEnqueue = (): void => controller.enqueue(chunk.toUpperCase());
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
returnFromTransform = resolve;
|
||||||
|
});
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const reader = ts.readable.getReader();
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
|
||||||
|
const result1 = await reader.read();
|
||||||
|
assertEquals(
|
||||||
|
result1.value,
|
||||||
|
"A",
|
||||||
|
"the first chunk read is the transformation of the single chunk written"
|
||||||
|
);
|
||||||
|
assert(!result1.done, "stream should not be done");
|
||||||
|
doSecondEnqueue!();
|
||||||
|
|
||||||
|
const result2 = await reader.read();
|
||||||
|
assertEquals(
|
||||||
|
result2.value,
|
||||||
|
"A",
|
||||||
|
"the second chunk read is also the transformation of the single chunk written"
|
||||||
|
);
|
||||||
|
assert(!result2.done, "stream should not be done");
|
||||||
|
returnFromTransform!();
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamClosingWriteClosesRead() {
|
||||||
|
const ts = new TransformStream({ transform(): void {} });
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
return Promise.all([writer.closed, ts.readable.getReader().closed]).then(
|
||||||
|
undefined
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamCloseWaitAwaitsTransforms() {
|
||||||
|
let transformResolve: () => void;
|
||||||
|
const transformPromise = new Promise<void>((resolve) => {
|
||||||
|
transformResolve = resolve;
|
||||||
|
});
|
||||||
|
const ts = new TransformStream(
|
||||||
|
{
|
||||||
|
transform(): Promise<void> {
|
||||||
|
return transformPromise;
|
||||||
|
},
|
||||||
|
},
|
||||||
|
undefined,
|
||||||
|
{ highWaterMark: 1 }
|
||||||
|
);
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
let rsClosed = false;
|
||||||
|
ts.readable.getReader().closed.then(() => {
|
||||||
|
rsClosed = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
await delay(0);
|
||||||
|
assertEquals(rsClosed, false, "readable is not closed after a tick");
|
||||||
|
transformResolve!();
|
||||||
|
|
||||||
|
await writer.closed;
|
||||||
|
// TODO: Is this expectation correct?
|
||||||
|
assertEquals(rsClosed, true, "readable is closed at that point");
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamCloseWriteAfterSyncEnqueues() {
|
||||||
|
let c: TransformStreamDefaultController<string>;
|
||||||
|
const ts = new TransformStream<string, string>({
|
||||||
|
start(controller: TransformStreamDefaultController): void {
|
||||||
|
c = controller;
|
||||||
|
},
|
||||||
|
transform(): Promise<void> {
|
||||||
|
c.enqueue("x");
|
||||||
|
c.enqueue("y");
|
||||||
|
return delay(0);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
const readableChunks = readableStreamToArray(ts.readable);
|
||||||
|
|
||||||
|
await writer.closed;
|
||||||
|
const chunks = await readableChunks;
|
||||||
|
assertEquals(
|
||||||
|
chunks,
|
||||||
|
["x", "y"],
|
||||||
|
"both enqueued chunks can be read from the readable"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamWritableCloseAsyncAfterAsyncEnqueues() {
|
||||||
|
let c: TransformStreamDefaultController<string>;
|
||||||
|
const ts = new TransformStream<string, string>({
|
||||||
|
start(controller: TransformStreamDefaultController<string>): void {
|
||||||
|
c = controller;
|
||||||
|
},
|
||||||
|
transform(): Promise<void> {
|
||||||
|
return delay(0)
|
||||||
|
.then(() => c.enqueue("x"))
|
||||||
|
.then(() => c.enqueue("y"))
|
||||||
|
.then(() => delay(0));
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
const readableChunks = readableStreamToArray(ts.readable);
|
||||||
|
|
||||||
|
await writer.closed;
|
||||||
|
const chunks = await readableChunks;
|
||||||
|
assertEquals(
|
||||||
|
chunks,
|
||||||
|
["x", "y"],
|
||||||
|
"both enqueued chunks can be read from the readable"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamTransformerMethodsCalledAsMethods() {
|
||||||
|
let c: TransformStreamDefaultController<string>;
|
||||||
|
const transformer = {
|
||||||
|
suffix: "-suffix",
|
||||||
|
|
||||||
|
start(controller: TransformStreamDefaultController<string>): void {
|
||||||
|
c = controller;
|
||||||
|
c.enqueue("start" + this.suffix);
|
||||||
|
},
|
||||||
|
|
||||||
|
transform(chunk: string): void {
|
||||||
|
c.enqueue(chunk + this.suffix);
|
||||||
|
},
|
||||||
|
|
||||||
|
flush(): void {
|
||||||
|
c.enqueue("flushed" + this.suffix);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
const ts = new TransformStream(transformer);
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
const readableChunks = readableStreamToArray(ts.readable);
|
||||||
|
|
||||||
|
await writer.closed;
|
||||||
|
const chunks = await readableChunks;
|
||||||
|
assertEquals(
|
||||||
|
chunks,
|
||||||
|
["start-suffix", "a-suffix", "flushed-suffix"],
|
||||||
|
"all enqueued chunks have suffixes"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamMethodsShouldNotBeAppliedOrCalled() {
|
||||||
|
function functionWithOverloads(): void {}
|
||||||
|
functionWithOverloads.apply = (): void => {
|
||||||
|
throw new Error("apply() should not be called");
|
||||||
|
};
|
||||||
|
functionWithOverloads.call = (): void => {
|
||||||
|
throw new Error("call() should not be called");
|
||||||
|
};
|
||||||
|
const ts = new TransformStream({
|
||||||
|
start: functionWithOverloads,
|
||||||
|
transform: functionWithOverloads,
|
||||||
|
flush: functionWithOverloads,
|
||||||
|
});
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.write("a");
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
await readableStreamToArray(ts.readable);
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamCallTransformSync() {
|
||||||
|
let transformCalled = false;
|
||||||
|
const ts = new TransformStream(
|
||||||
|
{
|
||||||
|
transform(): void {
|
||||||
|
transformCalled = true;
|
||||||
|
},
|
||||||
|
},
|
||||||
|
undefined,
|
||||||
|
{ highWaterMark: Infinity }
|
||||||
|
);
|
||||||
|
// transform() is only called synchronously when there is no backpressure and
|
||||||
|
// all microtasks have run.
|
||||||
|
await delay(0);
|
||||||
|
const writePromise = ts.writable.getWriter().write(undefined);
|
||||||
|
assert(transformCalled, "transform() should have been called");
|
||||||
|
await writePromise;
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamCloseWriteCloesesReadWithNoChunks() {
|
||||||
|
const ts = new TransformStream({}, undefined, { highWaterMark: 0 });
|
||||||
|
|
||||||
|
const writer = ts.writable.getWriter();
|
||||||
|
writer.close();
|
||||||
|
|
||||||
|
return Promise.all([writer.closed, ts.readable.getReader().closed]).then(
|
||||||
|
undefined
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamEnqueueThrowsAfterTerminate() {
|
||||||
|
new TransformStream({
|
||||||
|
start(controller: TransformStreamDefaultController): void {
|
||||||
|
controller.terminate();
|
||||||
|
assertThrows(() => {
|
||||||
|
controller.enqueue(undefined);
|
||||||
|
}, TypeError);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamEnqueueThrowsAfterReadableCancel() {
|
||||||
|
let controller: TransformStreamDefaultController;
|
||||||
|
const ts = new TransformStream({
|
||||||
|
start(c: TransformStreamDefaultController): void {
|
||||||
|
controller = c;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const cancelPromise = ts.readable.cancel();
|
||||||
|
assertThrows(
|
||||||
|
() => controller.enqueue(undefined),
|
||||||
|
TypeError,
|
||||||
|
undefined,
|
||||||
|
"enqueue should throw"
|
||||||
|
);
|
||||||
|
return cancelPromise;
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamSecondTerminateNoOp() {
|
||||||
|
new TransformStream({
|
||||||
|
start(controller: TransformStreamDefaultController): void {
|
||||||
|
controller.terminate();
|
||||||
|
controller.terminate();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamTerminateAfterReadableCancelIsNoop() {
|
||||||
|
let controller: TransformStreamDefaultController;
|
||||||
|
const ts = new TransformStream({
|
||||||
|
start(c: TransformStreamDefaultController): void {
|
||||||
|
controller = c;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const cancelReason = { name: "cancelReason" };
|
||||||
|
const cancelPromise = ts.readable.cancel(cancelReason);
|
||||||
|
controller!.terminate();
|
||||||
|
await cancelPromise;
|
||||||
|
try {
|
||||||
|
await ts.writable.getWriter().closed;
|
||||||
|
} catch (e) {
|
||||||
|
assert(e === cancelReason);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new Error("closed should have rejected");
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(async function transformStreamStartCalledOnce() {
|
||||||
|
let calls = 0;
|
||||||
|
new TransformStream({
|
||||||
|
start(): void {
|
||||||
|
++calls;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
await delay(0);
|
||||||
|
assertEquals(calls, 1, "start() should have been called exactly once");
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamReadableTypeThrows() {
|
||||||
|
assertThrows(
|
||||||
|
// eslint-disable-next-line
|
||||||
|
() => new TransformStream({ readableType: "bytes" as any }),
|
||||||
|
RangeError,
|
||||||
|
undefined,
|
||||||
|
"constructor should throw"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamWirtableTypeThrows() {
|
||||||
|
assertThrows(
|
||||||
|
// eslint-disable-next-line
|
||||||
|
() => new TransformStream({ writableType: "bytes" as any }),
|
||||||
|
RangeError,
|
||||||
|
undefined,
|
||||||
|
"constructor should throw"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
unitTest(function transformStreamSubclassable() {
|
||||||
|
class Subclass extends TransformStream {
|
||||||
|
extraFunction(): boolean {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert(
|
||||||
|
Object.getPrototypeOf(Subclass.prototype) === TransformStream.prototype,
|
||||||
|
"Subclass.prototype's prototype should be TransformStream.prototype"
|
||||||
|
);
|
||||||
|
assert(
|
||||||
|
Object.getPrototypeOf(Subclass) === TransformStream,
|
||||||
|
"Subclass's prototype should be TransformStream"
|
||||||
|
);
|
||||||
|
const sub = new Subclass();
|
||||||
|
assert(
|
||||||
|
sub instanceof TransformStream,
|
||||||
|
"Subclass object should be an instance of TransformStream"
|
||||||
|
);
|
||||||
|
assert(
|
||||||
|
sub instanceof Subclass,
|
||||||
|
"Subclass object should be an instance of Subclass"
|
||||||
|
);
|
||||||
|
const readableGetter = Object.getOwnPropertyDescriptor(
|
||||||
|
TransformStream.prototype,
|
||||||
|
"readable"
|
||||||
|
)!.get;
|
||||||
|
assert(
|
||||||
|
readableGetter!.call(sub) === sub.readable,
|
||||||
|
"Subclass object should pass brand check"
|
||||||
|
);
|
||||||
|
assert(
|
||||||
|
sub.extraFunction(),
|
||||||
|
"extraFunction() should be present on Subclass object"
|
||||||
|
);
|
||||||
|
});
|
|
@ -53,6 +53,7 @@ import "./resources_test.ts";
|
||||||
import "./signal_test.ts";
|
import "./signal_test.ts";
|
||||||
import "./stat_test.ts";
|
import "./stat_test.ts";
|
||||||
import "./streams_piping_test.ts";
|
import "./streams_piping_test.ts";
|
||||||
|
import "./streams_transform_test.ts";
|
||||||
import "./streams_writable_test.ts";
|
import "./streams_writable_test.ts";
|
||||||
import "./symlink_test.ts";
|
import "./symlink_test.ts";
|
||||||
import "./text_encoding_test.ts";
|
import "./text_encoding_test.ts";
|
||||||
|
|
|
@ -13,6 +13,8 @@ import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_c
|
||||||
import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
|
import { ReadableStreamDefaultReaderImpl } from "./readable_stream_default_reader.ts";
|
||||||
import { ReadableStreamImpl } from "./readable_stream.ts";
|
import { ReadableStreamImpl } from "./readable_stream.ts";
|
||||||
import * as sym from "./symbols.ts";
|
import * as sym from "./symbols.ts";
|
||||||
|
import { TransformStreamImpl } from "./transform_stream.ts";
|
||||||
|
import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts";
|
||||||
import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
|
import { WritableStreamDefaultControllerImpl } from "./writable_stream_default_controller.ts";
|
||||||
import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
|
import { WritableStreamDefaultWriterImpl } from "./writable_stream_default_writer.ts";
|
||||||
import { WritableStreamImpl } from "./writable_stream.ts";
|
import { WritableStreamImpl } from "./writable_stream.ts";
|
||||||
|
@ -36,10 +38,12 @@ type Container<R = any> = {
|
||||||
[sym.queue]: Array<Pair<R> | BufferQueueItem>;
|
[sym.queue]: Array<Pair<R> | BufferQueueItem>;
|
||||||
[sym.queueTotalSize]: number;
|
[sym.queueTotalSize]: number;
|
||||||
};
|
};
|
||||||
|
export type FlushAlgorithm = () => Promise<void>;
|
||||||
export type Pair<R> = { value: R; size: number };
|
export type Pair<R> = { value: R; size: number };
|
||||||
export type PullAlgorithm = () => PromiseLike<void>;
|
export type PullAlgorithm = () => PromiseLike<void>;
|
||||||
export type SizeAlgorithm<T> = (chunk: T) => number;
|
export type SizeAlgorithm<T> = (chunk: T) => number;
|
||||||
export type StartAlgorithm = () => void | PromiseLike<void>;
|
export type StartAlgorithm = () => void | PromiseLike<void>;
|
||||||
|
export type TransformAlgorithm<I> = (chunk: I) => Promise<void>;
|
||||||
export type WriteAlgorithm<W> = (chunk: W) => Promise<void>;
|
export type WriteAlgorithm<W> = (chunk: W) => Promise<void>;
|
||||||
export interface Deferred<T> {
|
export interface Deferred<T> {
|
||||||
promise: Promise<T>;
|
promise: Promise<T>;
|
||||||
|
@ -76,8 +80,16 @@ export function acquireWritableStreamDefaultWriter<W>(
|
||||||
return new WritableStreamDefaultWriterImpl(stream);
|
return new WritableStreamDefaultWriterImpl(stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function call<F extends (...args: any[]) => any>(
|
||||||
|
fn: F,
|
||||||
|
v: ThisType<F>,
|
||||||
|
args: Parameters<F>
|
||||||
|
): ReturnType<F> {
|
||||||
|
return Function.prototype.apply.call(fn, v, args);
|
||||||
|
}
|
||||||
|
|
||||||
function createAlgorithmFromUnderlyingMethod<
|
function createAlgorithmFromUnderlyingMethod<
|
||||||
O extends UnderlyingByteSource | UnderlyingSource,
|
O extends UnderlyingByteSource | UnderlyingSource | Transformer,
|
||||||
P extends keyof O
|
P extends keyof O
|
||||||
>(
|
>(
|
||||||
underlyingObject: O,
|
underlyingObject: O,
|
||||||
|
@ -86,7 +98,7 @@ function createAlgorithmFromUnderlyingMethod<
|
||||||
...extraArgs: any[]
|
...extraArgs: any[]
|
||||||
): () => Promise<void>;
|
): () => Promise<void>;
|
||||||
function createAlgorithmFromUnderlyingMethod<
|
function createAlgorithmFromUnderlyingMethod<
|
||||||
O extends UnderlyingByteSource | UnderlyingSource,
|
O extends UnderlyingByteSource | UnderlyingSource | Transformer,
|
||||||
P extends keyof O
|
P extends keyof O
|
||||||
>(
|
>(
|
||||||
underlyingObject: O,
|
underlyingObject: O,
|
||||||
|
@ -95,7 +107,7 @@ function createAlgorithmFromUnderlyingMethod<
|
||||||
...extraArgs: any[]
|
...extraArgs: any[]
|
||||||
): (arg: any) => Promise<void>;
|
): (arg: any) => Promise<void>;
|
||||||
function createAlgorithmFromUnderlyingMethod<
|
function createAlgorithmFromUnderlyingMethod<
|
||||||
O extends UnderlyingByteSource | UnderlyingSource,
|
O extends UnderlyingByteSource | UnderlyingSource | Transformer,
|
||||||
P extends keyof O
|
P extends keyof O
|
||||||
>(
|
>(
|
||||||
underlyingObject: O,
|
underlyingObject: O,
|
||||||
|
@ -110,11 +122,11 @@ function createAlgorithmFromUnderlyingMethod<
|
||||||
}
|
}
|
||||||
if (algoArgCount === 0) {
|
if (algoArgCount === 0) {
|
||||||
return async (): Promise<void> =>
|
return async (): Promise<void> =>
|
||||||
method.call(underlyingObject, ...extraArgs);
|
call(method, underlyingObject, extraArgs as any);
|
||||||
} else {
|
} else {
|
||||||
return async (arg: any): Promise<void> => {
|
return async (arg: any): Promise<void> => {
|
||||||
const fullArgs = [arg, ...extraArgs];
|
const fullArgs = [arg, ...extraArgs];
|
||||||
return method.call(underlyingObject, ...fullArgs);
|
return call(method, underlyingObject, fullArgs as any);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -148,6 +160,33 @@ function createReadableStream<T>(
|
||||||
return stream;
|
return stream;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function createWritableStream<W>(
|
||||||
|
startAlgorithm: StartAlgorithm,
|
||||||
|
writeAlgorithm: WriteAlgorithm<W>,
|
||||||
|
closeAlgorithm: CloseAlgorithm,
|
||||||
|
abortAlgorithm: AbortAlgorithm,
|
||||||
|
highWaterMark = 1,
|
||||||
|
sizeAlgorithm: SizeAlgorithm<W> = (): number => 1
|
||||||
|
): WritableStreamImpl<W> {
|
||||||
|
assert(isNonNegativeNumber(highWaterMark));
|
||||||
|
const stream = Object.create(WritableStreamImpl.prototype);
|
||||||
|
initializeWritableStream(stream);
|
||||||
|
const controller = Object.create(
|
||||||
|
WritableStreamDefaultControllerImpl.prototype
|
||||||
|
);
|
||||||
|
setUpWritableStreamDefaultController(
|
||||||
|
stream,
|
||||||
|
controller,
|
||||||
|
startAlgorithm,
|
||||||
|
writeAlgorithm,
|
||||||
|
closeAlgorithm,
|
||||||
|
abortAlgorithm,
|
||||||
|
highWaterMark,
|
||||||
|
sizeAlgorithm
|
||||||
|
);
|
||||||
|
return stream;
|
||||||
|
}
|
||||||
|
|
||||||
export function dequeueValue<R>(container: Container<R>): R {
|
export function dequeueValue<R>(container: Container<R>): R {
|
||||||
assert(sym.queue in container && sym.queueTotalSize in container);
|
assert(sym.queue in container && sym.queueTotalSize in container);
|
||||||
assert(container[sym.queue].length);
|
assert(container[sym.queue].length);
|
||||||
|
@ -185,13 +224,61 @@ export function getDeferred<T>(): Required<Deferred<T>> {
|
||||||
return { promise, resolve: resolve!, reject: reject! };
|
return { promise, resolve: resolve!, reject: reject! };
|
||||||
}
|
}
|
||||||
|
|
||||||
export function initializeReadableStream(stream: ReadableStreamImpl): void {
|
export function initializeReadableStream<R>(
|
||||||
|
stream: ReadableStreamImpl<R>
|
||||||
|
): void {
|
||||||
stream[sym.state] = "readable";
|
stream[sym.state] = "readable";
|
||||||
stream[sym.reader] = stream[sym.storedError] = undefined;
|
stream[sym.reader] = stream[sym.storedError] = undefined;
|
||||||
stream[sym.disturbed] = false;
|
stream[sym.disturbed] = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function initializeWritableStream(stream: WritableStreamImpl): void {
|
export function initializeTransformStream<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>,
|
||||||
|
startPromise: Promise<void>,
|
||||||
|
writableHighWaterMark: number,
|
||||||
|
writableSizeAlgorithm: SizeAlgorithm<I>,
|
||||||
|
readableHighWaterMark: number,
|
||||||
|
readableSizeAlgorithm: SizeAlgorithm<O>
|
||||||
|
): void {
|
||||||
|
const startAlgorithm = (): Promise<void> => startPromise;
|
||||||
|
const writeAlgorithm = (chunk: any): Promise<void> =>
|
||||||
|
transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
|
||||||
|
const abortAlgorithm = (reason: any): Promise<void> =>
|
||||||
|
transformStreamDefaultSinkAbortAlgorithm(stream, reason);
|
||||||
|
const closeAlgorithm = (): Promise<void> =>
|
||||||
|
transformStreamDefaultSinkCloseAlgorithm(stream);
|
||||||
|
stream[sym.writable] = createWritableStream(
|
||||||
|
startAlgorithm,
|
||||||
|
writeAlgorithm,
|
||||||
|
closeAlgorithm,
|
||||||
|
abortAlgorithm,
|
||||||
|
writableHighWaterMark,
|
||||||
|
writableSizeAlgorithm
|
||||||
|
);
|
||||||
|
const pullAlgorithm = (): PromiseLike<void> =>
|
||||||
|
transformStreamDefaultSourcePullAlgorithm(stream);
|
||||||
|
const cancelAlgorithm = (reason: any): Promise<void> => {
|
||||||
|
transformStreamErrorWritableAndUnblockWrite(stream, reason);
|
||||||
|
return Promise.resolve(undefined);
|
||||||
|
};
|
||||||
|
stream[sym.readable] = createReadableStream(
|
||||||
|
startAlgorithm,
|
||||||
|
pullAlgorithm,
|
||||||
|
cancelAlgorithm,
|
||||||
|
readableHighWaterMark,
|
||||||
|
readableSizeAlgorithm
|
||||||
|
);
|
||||||
|
stream[sym.backpressure] = stream[sym.backpressureChangePromise] = undefined;
|
||||||
|
transformStreamSetBackpressure(stream, true);
|
||||||
|
Object.defineProperty(stream, sym.transformStreamController, {
|
||||||
|
value: undefined,
|
||||||
|
configurable: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export function initializeWritableStream<W>(
|
||||||
|
stream: WritableStreamImpl<W>
|
||||||
|
): void {
|
||||||
stream[sym.state] = "writable";
|
stream[sym.state] = "writable";
|
||||||
stream[sym.storedError] = stream[sym.writer] = stream[
|
stream[sym.storedError] = stream[sym.writer] = stream[
|
||||||
sym.writableStreamController
|
sym.writableStreamController
|
||||||
|
@ -202,7 +289,7 @@ export function initializeWritableStream(stream: WritableStreamImpl): void {
|
||||||
stream[sym.backpressure] = false;
|
stream[sym.backpressure] = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
function invokeOrNoop<O extends any, P extends keyof O>(
|
export function invokeOrNoop<O extends Record<string, any>, P extends keyof O>(
|
||||||
o: O,
|
o: O,
|
||||||
p: P,
|
p: P,
|
||||||
...args: Parameters<O[P]>
|
...args: Parameters<O[P]>
|
||||||
|
@ -212,7 +299,7 @@ function invokeOrNoop<O extends any, P extends keyof O>(
|
||||||
if (!method) {
|
if (!method) {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
return method.call(o, ...args);
|
return call(method, o, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
function isCallable(value: unknown): value is (...args: any) => any {
|
function isCallable(value: unknown): value is (...args: any) => any {
|
||||||
|
@ -299,6 +386,26 @@ export function isReadableStreamLocked(stream: ReadableStreamImpl): boolean {
|
||||||
return stream[sym.reader] ? true : false;
|
return stream[sym.reader] ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function isTransformStream(
|
||||||
|
x: unknown
|
||||||
|
): x is TransformStreamImpl<any, any> {
|
||||||
|
return typeof x !== "object" ||
|
||||||
|
x === null ||
|
||||||
|
!(sym.transformStreamController in x)
|
||||||
|
? false
|
||||||
|
: true;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isTransformStreamDefaultController(
|
||||||
|
x: unknown
|
||||||
|
): x is TransformStreamDefaultControllerImpl<any, any> {
|
||||||
|
return typeof x !== "object" ||
|
||||||
|
x === null ||
|
||||||
|
!(sym.controlledTransformStream in x)
|
||||||
|
? false
|
||||||
|
: true;
|
||||||
|
}
|
||||||
|
|
||||||
export function isUnderlyingByteSource(
|
export function isUnderlyingByteSource(
|
||||||
underlyingSource: UnderlyingByteSource | UnderlyingSource
|
underlyingSource: UnderlyingByteSource | UnderlyingSource
|
||||||
): underlyingSource is UnderlyingByteSource {
|
): underlyingSource is UnderlyingByteSource {
|
||||||
|
@ -717,6 +824,14 @@ export function readableStreamDefaultControllerError<T>(
|
||||||
readableStreamError(stream, e);
|
readableStreamError(stream, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function readableStreamDefaultControllerHasBackpressure<T>(
|
||||||
|
controller: ReadableStreamDefaultControllerImpl<T>
|
||||||
|
): boolean {
|
||||||
|
return readableStreamDefaultControllerShouldCallPull(controller)
|
||||||
|
? true
|
||||||
|
: false;
|
||||||
|
}
|
||||||
|
|
||||||
function readableStreamDefaultControllerShouldCallPull<T>(
|
function readableStreamDefaultControllerShouldCallPull<T>(
|
||||||
controller: ReadableStreamDefaultControllerImpl<T>
|
controller: ReadableStreamDefaultControllerImpl<T>
|
||||||
): boolean {
|
): boolean {
|
||||||
|
@ -1416,6 +1531,62 @@ export function setUpReadableStreamDefaultControllerFromUnderlyingSource<T>(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function setUpTransformStreamDefaultController<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>,
|
||||||
|
controller: TransformStreamDefaultControllerImpl<I, O>,
|
||||||
|
transformAlgorithm: TransformAlgorithm<I>,
|
||||||
|
flushAlgorithm: FlushAlgorithm
|
||||||
|
): void {
|
||||||
|
assert(isTransformStream(stream));
|
||||||
|
assert(stream[sym.transformStreamController] === undefined);
|
||||||
|
controller[sym.controlledTransformStream] = stream;
|
||||||
|
stream[sym.transformStreamController] = controller;
|
||||||
|
controller[sym.transformAlgorithm] = transformAlgorithm;
|
||||||
|
controller[sym.flushAlgorithm] = flushAlgorithm;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function setUpTransformStreamDefaultControllerFromTransformer<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>,
|
||||||
|
transformer: Transformer<I, O>
|
||||||
|
): void {
|
||||||
|
assert(transformer);
|
||||||
|
const controller = Object.create(
|
||||||
|
TransformStreamDefaultControllerImpl.prototype
|
||||||
|
) as TransformStreamDefaultControllerImpl<I, O>;
|
||||||
|
let transformAlgorithm: TransformAlgorithm<I> = (chunk) => {
|
||||||
|
try {
|
||||||
|
transformStreamDefaultControllerEnqueue(
|
||||||
|
controller,
|
||||||
|
// it defaults to no tranformation, so I is assumed to be O
|
||||||
|
(chunk as unknown) as O
|
||||||
|
);
|
||||||
|
} catch (e) {
|
||||||
|
return Promise.reject(e);
|
||||||
|
}
|
||||||
|
return Promise.resolve();
|
||||||
|
};
|
||||||
|
const transformMethod = transformer.transform;
|
||||||
|
if (transformMethod) {
|
||||||
|
if (typeof transformMethod !== "function") {
|
||||||
|
throw new TypeError("tranformer.transform must be callable.");
|
||||||
|
}
|
||||||
|
transformAlgorithm = async (chunk): Promise<void> =>
|
||||||
|
call(transformMethod, transformer, [chunk, controller]);
|
||||||
|
}
|
||||||
|
const flushAlgorithm = createAlgorithmFromUnderlyingMethod(
|
||||||
|
transformer,
|
||||||
|
"flush",
|
||||||
|
0,
|
||||||
|
controller
|
||||||
|
);
|
||||||
|
setUpTransformStreamDefaultController(
|
||||||
|
stream,
|
||||||
|
controller,
|
||||||
|
transformAlgorithm,
|
||||||
|
flushAlgorithm
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
function setUpWritableStreamDefaultController<W>(
|
function setUpWritableStreamDefaultController<W>(
|
||||||
stream: WritableStreamImpl<W>,
|
stream: WritableStreamImpl<W>,
|
||||||
controller: WritableStreamDefaultControllerImpl<W>,
|
controller: WritableStreamDefaultControllerImpl<W>,
|
||||||
|
@ -1508,6 +1679,181 @@ export function setUpWritableStreamDefaultControllerFromUnderlyingSink<W>(
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function transformStreamDefaultControllerClearAlgorithms<I, O>(
|
||||||
|
controller: TransformStreamDefaultControllerImpl<I, O>
|
||||||
|
): void {
|
||||||
|
(controller as any)[sym.transformAlgorithm] = undefined;
|
||||||
|
(controller as any)[sym.flushAlgorithm] = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function transformStreamDefaultControllerEnqueue<I, O>(
|
||||||
|
controller: TransformStreamDefaultControllerImpl<I, O>,
|
||||||
|
chunk: O
|
||||||
|
): void {
|
||||||
|
const stream = controller[sym.controlledTransformStream];
|
||||||
|
const readableController = stream[sym.readable][
|
||||||
|
sym.readableStreamController
|
||||||
|
] as ReadableStreamDefaultControllerImpl<O>;
|
||||||
|
if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
|
||||||
|
throw new TypeError(
|
||||||
|
"TransformStream's readable controller cannot be closed or enqueued."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
readableStreamDefaultControllerEnqueue(readableController, chunk);
|
||||||
|
} catch (e) {
|
||||||
|
transformStreamErrorWritableAndUnblockWrite(stream, e);
|
||||||
|
throw stream[sym.readable][sym.storedError];
|
||||||
|
}
|
||||||
|
const backpressure = readableStreamDefaultControllerHasBackpressure(
|
||||||
|
readableController
|
||||||
|
);
|
||||||
|
if (backpressure) {
|
||||||
|
transformStreamSetBackpressure(stream, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function transformStreamDefaultControllerError<I, O>(
|
||||||
|
controller: TransformStreamDefaultControllerImpl<I, O>,
|
||||||
|
e: any
|
||||||
|
): void {
|
||||||
|
transformStreamError(controller[sym.controlledTransformStream], e);
|
||||||
|
}
|
||||||
|
|
||||||
|
function transformStreamDefaultControllerPerformTransform<I, O>(
|
||||||
|
controller: TransformStreamDefaultControllerImpl<I, O>,
|
||||||
|
chunk: I
|
||||||
|
): Promise<void> {
|
||||||
|
const transformPromise = controller[sym.transformAlgorithm](chunk);
|
||||||
|
return transformPromise.then(undefined, (r) => {
|
||||||
|
transformStreamError(controller[sym.controlledTransformStream], r);
|
||||||
|
throw r;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function transformStreamDefaultSinkAbortAlgorithm<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>,
|
||||||
|
reason: any
|
||||||
|
): Promise<void> {
|
||||||
|
transformStreamError(stream, reason);
|
||||||
|
return Promise.resolve(undefined);
|
||||||
|
}
|
||||||
|
|
||||||
|
function transformStreamDefaultSinkCloseAlgorithm<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>
|
||||||
|
): Promise<void> {
|
||||||
|
const readable = stream[sym.readable];
|
||||||
|
const controller = stream[sym.transformStreamController];
|
||||||
|
const flushPromise = controller[sym.flushAlgorithm]();
|
||||||
|
transformStreamDefaultControllerClearAlgorithms(controller);
|
||||||
|
return flushPromise.then(
|
||||||
|
() => {
|
||||||
|
if (readable[sym.state] === "errored") {
|
||||||
|
throw readable[sym.storedError];
|
||||||
|
}
|
||||||
|
const readableController = readable[
|
||||||
|
sym.readableStreamController
|
||||||
|
] as ReadableStreamDefaultControllerImpl<O>;
|
||||||
|
if (
|
||||||
|
readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
|
||||||
|
) {
|
||||||
|
readableStreamDefaultControllerClose(readableController);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
(r) => {
|
||||||
|
transformStreamError(stream, r);
|
||||||
|
throw readable[sym.storedError];
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function transformStreamDefaultSinkWriteAlgorithm<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>,
|
||||||
|
chunk: I
|
||||||
|
): Promise<void> {
|
||||||
|
assert(stream[sym.writable][sym.state] === "writable");
|
||||||
|
const controller = stream[sym.transformStreamController];
|
||||||
|
if (stream[sym.backpressure]) {
|
||||||
|
const backpressureChangePromise = stream[sym.backpressureChangePromise];
|
||||||
|
assert(backpressureChangePromise);
|
||||||
|
return backpressureChangePromise.promise.then(() => {
|
||||||
|
const writable = stream[sym.writable];
|
||||||
|
const state = writable[sym.state];
|
||||||
|
if (state === "erroring") {
|
||||||
|
throw writable[sym.storedError];
|
||||||
|
}
|
||||||
|
assert(state === "writable");
|
||||||
|
return transformStreamDefaultControllerPerformTransform(
|
||||||
|
controller,
|
||||||
|
chunk
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return transformStreamDefaultControllerPerformTransform(controller, chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
function transformStreamDefaultSourcePullAlgorithm<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>
|
||||||
|
): Promise<void> {
|
||||||
|
assert(stream[sym.backpressure] === true);
|
||||||
|
assert(stream[sym.backpressureChangePromise] !== undefined);
|
||||||
|
transformStreamSetBackpressure(stream, false);
|
||||||
|
return stream[sym.backpressureChangePromise]!.promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
function transformStreamError<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>,
|
||||||
|
e: any
|
||||||
|
): void {
|
||||||
|
readableStreamDefaultControllerError(
|
||||||
|
stream[sym.readable][
|
||||||
|
sym.readableStreamController
|
||||||
|
] as ReadableStreamDefaultControllerImpl<O>,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
transformStreamErrorWritableAndUnblockWrite(stream, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function transformStreamDefaultControllerTerminate<I, O>(
|
||||||
|
controller: TransformStreamDefaultControllerImpl<I, O>
|
||||||
|
): void {
|
||||||
|
const stream = controller[sym.controlledTransformStream];
|
||||||
|
const readableController = stream[sym.readable][
|
||||||
|
sym.readableStreamController
|
||||||
|
] as ReadableStreamDefaultControllerImpl<O>;
|
||||||
|
readableStreamDefaultControllerClose(readableController);
|
||||||
|
const error = new TypeError("TransformStream is closed.");
|
||||||
|
transformStreamErrorWritableAndUnblockWrite(stream, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
function transformStreamErrorWritableAndUnblockWrite<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>,
|
||||||
|
e: any
|
||||||
|
): void {
|
||||||
|
transformStreamDefaultControllerClearAlgorithms(
|
||||||
|
stream[sym.transformStreamController]
|
||||||
|
);
|
||||||
|
writableStreamDefaultControllerErrorIfNeeded(
|
||||||
|
stream[sym.writable][sym.writableStreamController]!,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
if (stream[sym.backpressure]) {
|
||||||
|
transformStreamSetBackpressure(stream, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function transformStreamSetBackpressure<I, O>(
|
||||||
|
stream: TransformStreamImpl<I, O>,
|
||||||
|
backpressure: boolean
|
||||||
|
): void {
|
||||||
|
assert(stream[sym.backpressure] !== backpressure);
|
||||||
|
if (stream[sym.backpressureChangePromise] !== undefined) {
|
||||||
|
stream[sym.backpressureChangePromise]!.resolve!(undefined);
|
||||||
|
}
|
||||||
|
stream[sym.backpressureChangePromise] = getDeferred<void>();
|
||||||
|
stream[sym.backpressure] = backpressure;
|
||||||
|
}
|
||||||
|
|
||||||
function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
|
function transferArrayBuffer(buffer: ArrayBuffer): ArrayBuffer {
|
||||||
assert(!isDetachedBuffer(buffer));
|
assert(!isDetachedBuffer(buffer));
|
||||||
const transferredIshVersion = buffer.slice(0);
|
const transferredIshVersion = buffer.slice(0);
|
||||||
|
|
|
@ -11,6 +11,7 @@ export const abortSteps = Symbol("abortSteps");
|
||||||
export const asyncIteratorReader = Symbol("asyncIteratorReader");
|
export const asyncIteratorReader = Symbol("asyncIteratorReader");
|
||||||
export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize");
|
export const autoAllocateChunkSize = Symbol("autoAllocateChunkSize");
|
||||||
export const backpressure = Symbol("backpressure");
|
export const backpressure = Symbol("backpressure");
|
||||||
|
export const backpressureChangePromise = Symbol("backpressureChangePromise");
|
||||||
export const byobRequest = Symbol("byobRequest");
|
export const byobRequest = Symbol("byobRequest");
|
||||||
export const cancelAlgorithm = Symbol("cancelAlgorithm");
|
export const cancelAlgorithm = Symbol("cancelAlgorithm");
|
||||||
export const cancelSteps = Symbol("cancelSteps");
|
export const cancelSteps = Symbol("cancelSteps");
|
||||||
|
@ -22,9 +23,11 @@ export const controlledReadableByteStream = Symbol(
|
||||||
"controlledReadableByteStream"
|
"controlledReadableByteStream"
|
||||||
);
|
);
|
||||||
export const controlledReadableStream = Symbol("controlledReadableStream");
|
export const controlledReadableStream = Symbol("controlledReadableStream");
|
||||||
|
export const controlledTransformStream = Symbol("controlledTransformStream");
|
||||||
export const controlledWritableStream = Symbol("controlledWritableStream");
|
export const controlledWritableStream = Symbol("controlledWritableStream");
|
||||||
export const disturbed = Symbol("disturbed");
|
export const disturbed = Symbol("disturbed");
|
||||||
export const errorSteps = Symbol("errorSteps");
|
export const errorSteps = Symbol("errorSteps");
|
||||||
|
export const flushAlgorithm = Symbol("flushAlgorithm");
|
||||||
export const forAuthorCode = Symbol("forAuthorCode");
|
export const forAuthorCode = Symbol("forAuthorCode");
|
||||||
export const inFlightWriteRequest = Symbol("inFlightWriteRequest");
|
export const inFlightWriteRequest = Symbol("inFlightWriteRequest");
|
||||||
export const inFlightCloseRequest = Symbol("inFlightCloseRequest");
|
export const inFlightCloseRequest = Symbol("inFlightCloseRequest");
|
||||||
|
@ -39,6 +42,7 @@ export const pulling = Symbol("pulling");
|
||||||
export const pullSteps = Symbol("pullSteps");
|
export const pullSteps = Symbol("pullSteps");
|
||||||
export const queue = Symbol("queue");
|
export const queue = Symbol("queue");
|
||||||
export const queueTotalSize = Symbol("queueTotalSize");
|
export const queueTotalSize = Symbol("queueTotalSize");
|
||||||
|
export const readable = Symbol("readable");
|
||||||
export const readableStreamController = Symbol("readableStreamController");
|
export const readableStreamController = Symbol("readableStreamController");
|
||||||
export const reader = Symbol("reader");
|
export const reader = Symbol("reader");
|
||||||
export const readRequests = Symbol("readRequests");
|
export const readRequests = Symbol("readRequests");
|
||||||
|
@ -48,7 +52,10 @@ export const state = Symbol("state");
|
||||||
export const storedError = Symbol("storedError");
|
export const storedError = Symbol("storedError");
|
||||||
export const strategyHWM = Symbol("strategyHWM");
|
export const strategyHWM = Symbol("strategyHWM");
|
||||||
export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm");
|
export const strategySizeAlgorithm = Symbol("strategySizeAlgorithm");
|
||||||
|
export const transformAlgorithm = Symbol("transformAlgorithm");
|
||||||
|
export const transformStreamController = Symbol("transformStreamController");
|
||||||
export const writableStreamController = Symbol("writableStreamController");
|
export const writableStreamController = Symbol("writableStreamController");
|
||||||
export const writeAlgorithm = Symbol("writeAlgorithm");
|
export const writeAlgorithm = Symbol("writeAlgorithm");
|
||||||
|
export const writable = Symbol("writable");
|
||||||
export const writer = Symbol("writer");
|
export const writer = Symbol("writer");
|
||||||
export const writeRequests = Symbol("writeRequests");
|
export const writeRequests = Symbol("writeRequests");
|
||||||
|
|
118
cli/js/web/streams/transform_stream.ts
Normal file
118
cli/js/web/streams/transform_stream.ts
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
import {
|
||||||
|
Deferred,
|
||||||
|
getDeferred,
|
||||||
|
initializeTransformStream,
|
||||||
|
invokeOrNoop,
|
||||||
|
isTransformStream,
|
||||||
|
makeSizeAlgorithmFromSizeFunction,
|
||||||
|
setFunctionName,
|
||||||
|
setUpTransformStreamDefaultControllerFromTransformer,
|
||||||
|
validateAndNormalizeHighWaterMark,
|
||||||
|
} from "./internals.ts";
|
||||||
|
import { ReadableStreamImpl } from "./readable_stream.ts";
|
||||||
|
import * as sym from "./symbols.ts";
|
||||||
|
import { TransformStreamDefaultControllerImpl } from "./transform_stream_default_controller.ts";
|
||||||
|
import { WritableStreamImpl } from "./writable_stream.ts";
|
||||||
|
import { customInspect, inspect } from "../console.ts";
|
||||||
|
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
export class TransformStreamImpl<I = any, O = any>
|
||||||
|
implements TransformStream<I, O> {
|
||||||
|
[sym.backpressure]?: boolean;
|
||||||
|
[sym.backpressureChangePromise]?: Deferred<void>;
|
||||||
|
[sym.readable]: ReadableStreamImpl<O>;
|
||||||
|
[sym.transformStreamController]: TransformStreamDefaultControllerImpl<I, O>;
|
||||||
|
[sym.writable]: WritableStreamImpl<I>;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
transformer: Transformer<I, O> = {},
|
||||||
|
writableStrategy: QueuingStrategy<I> = {},
|
||||||
|
readableStrategy: QueuingStrategy<O> = {}
|
||||||
|
) {
|
||||||
|
const writableSizeFunction = writableStrategy.size;
|
||||||
|
let writableHighWaterMark = writableStrategy.highWaterMark;
|
||||||
|
const readableSizeFunction = readableStrategy.size;
|
||||||
|
let readableHighWaterMark = readableStrategy.highWaterMark;
|
||||||
|
const writableType = transformer.writableType;
|
||||||
|
if (writableType !== undefined) {
|
||||||
|
throw new RangeError(
|
||||||
|
`Expected transformer writableType to be undefined, received "${String(
|
||||||
|
writableType
|
||||||
|
)}"`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const writableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
|
||||||
|
writableSizeFunction
|
||||||
|
);
|
||||||
|
if (writableHighWaterMark === undefined) {
|
||||||
|
writableHighWaterMark = 1;
|
||||||
|
}
|
||||||
|
writableHighWaterMark = validateAndNormalizeHighWaterMark(
|
||||||
|
writableHighWaterMark
|
||||||
|
);
|
||||||
|
const readableType = transformer.readableType;
|
||||||
|
if (readableType !== undefined) {
|
||||||
|
throw new RangeError(
|
||||||
|
`Expected transformer readableType to be undefined, received "${String(
|
||||||
|
readableType
|
||||||
|
)}"`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const readableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
|
||||||
|
readableSizeFunction
|
||||||
|
);
|
||||||
|
if (readableHighWaterMark === undefined) {
|
||||||
|
readableHighWaterMark = 1;
|
||||||
|
}
|
||||||
|
readableHighWaterMark = validateAndNormalizeHighWaterMark(
|
||||||
|
readableHighWaterMark
|
||||||
|
);
|
||||||
|
const startPromise = getDeferred<void>();
|
||||||
|
initializeTransformStream(
|
||||||
|
this,
|
||||||
|
startPromise.promise,
|
||||||
|
writableHighWaterMark,
|
||||||
|
writableSizeAlgorithm,
|
||||||
|
readableHighWaterMark,
|
||||||
|
readableSizeAlgorithm
|
||||||
|
);
|
||||||
|
// the brand check expects this, and the brand check occurs in the following
|
||||||
|
// but the property hasn't been defined.
|
||||||
|
Object.defineProperty(this, sym.transformStreamController, {
|
||||||
|
value: undefined,
|
||||||
|
writable: true,
|
||||||
|
configurable: true,
|
||||||
|
});
|
||||||
|
setUpTransformStreamDefaultControllerFromTransformer(this, transformer);
|
||||||
|
const startResult: void | PromiseLike<void> = invokeOrNoop(
|
||||||
|
transformer,
|
||||||
|
"start",
|
||||||
|
this[sym.transformStreamController]
|
||||||
|
);
|
||||||
|
startPromise.resolve(startResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
get readable(): ReadableStream<O> {
|
||||||
|
if (!isTransformStream(this)) {
|
||||||
|
throw new TypeError("Invalid TransformStream.");
|
||||||
|
}
|
||||||
|
return this[sym.readable];
|
||||||
|
}
|
||||||
|
|
||||||
|
get writable(): WritableStream<I> {
|
||||||
|
if (!isTransformStream(this)) {
|
||||||
|
throw new TypeError("Invalid TransformStream.");
|
||||||
|
}
|
||||||
|
return this[sym.writable];
|
||||||
|
}
|
||||||
|
|
||||||
|
[customInspect](): string {
|
||||||
|
return `${this.constructor.name} {\n readable: ${inspect(
|
||||||
|
this.readable
|
||||||
|
)}\n writable: ${inspect(this.writable)}\n}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
setFunctionName(TransformStreamImpl, "TransformStream");
|
75
cli/js/web/streams/transform_stream_default_controller.ts
Normal file
75
cli/js/web/streams/transform_stream_default_controller.ts
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
import {
|
||||||
|
FlushAlgorithm,
|
||||||
|
isTransformStreamDefaultController,
|
||||||
|
readableStreamDefaultControllerGetDesiredSize,
|
||||||
|
setFunctionName,
|
||||||
|
TransformAlgorithm,
|
||||||
|
transformStreamDefaultControllerEnqueue,
|
||||||
|
transformStreamDefaultControllerError,
|
||||||
|
transformStreamDefaultControllerTerminate,
|
||||||
|
} from "./internals.ts";
|
||||||
|
import { ReadableStreamDefaultControllerImpl } from "./readable_stream_default_controller.ts";
|
||||||
|
import * as sym from "./symbols.ts";
|
||||||
|
import { TransformStreamImpl } from "./transform_stream.ts";
|
||||||
|
import { customInspect } from "../console.ts";
|
||||||
|
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
export class TransformStreamDefaultControllerImpl<I = any, O = any>
|
||||||
|
implements TransformStreamDefaultController<O> {
|
||||||
|
[sym.controlledTransformStream]: TransformStreamImpl<I, O>;
|
||||||
|
[sym.flushAlgorithm]: FlushAlgorithm;
|
||||||
|
[sym.transformAlgorithm]: TransformAlgorithm<I>;
|
||||||
|
|
||||||
|
private constructor() {
|
||||||
|
throw new TypeError(
|
||||||
|
"TransformStreamDefaultController's constructor cannot be called."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
get desiredSize(): number | null {
|
||||||
|
if (!isTransformStreamDefaultController(this)) {
|
||||||
|
throw new TypeError("Invalid TransformStreamDefaultController.");
|
||||||
|
}
|
||||||
|
const readableController = this[sym.controlledTransformStream][
|
||||||
|
sym.readable
|
||||||
|
][sym.readableStreamController];
|
||||||
|
return readableStreamDefaultControllerGetDesiredSize(
|
||||||
|
readableController as ReadableStreamDefaultControllerImpl<O>
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
enqueue(chunk: O): void {
|
||||||
|
if (!isTransformStreamDefaultController(this)) {
|
||||||
|
throw new TypeError("Invalid TransformStreamDefaultController.");
|
||||||
|
}
|
||||||
|
transformStreamDefaultControllerEnqueue(this, chunk);
|
||||||
|
}
|
||||||
|
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||||
|
error(reason?: any): void {
|
||||||
|
if (!isTransformStreamDefaultController(this)) {
|
||||||
|
throw new TypeError("Invalid TransformStreamDefaultController.");
|
||||||
|
}
|
||||||
|
transformStreamDefaultControllerError(this, reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
terminate(): void {
|
||||||
|
if (!isTransformStreamDefaultController(this)) {
|
||||||
|
throw new TypeError("Invalid TransformStreamDefaultController.");
|
||||||
|
}
|
||||||
|
transformStreamDefaultControllerTerminate(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
[customInspect](): string {
|
||||||
|
return `${this.constructor.name} { desiredSize: ${String(
|
||||||
|
this.desiredSize
|
||||||
|
)} }`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
setFunctionName(
|
||||||
|
TransformStreamDefaultControllerImpl,
|
||||||
|
"TransformStreamDefaultController"
|
||||||
|
);
|
Loading…
Add table
Add a link
Reference in a new issue