mirror of
https://github.com/denoland/deno.git
synced 2025-08-30 15:27:53 +00:00
feat: async context (#24402)
We are switching to ContinuationPreservedEmbedderData. This allows adding async context tracking to the various async operations that deno provides. Fixes: https://github.com/denoland/deno/issues/7010 Fixes: https://github.com/denoland/deno/issues/22886 Fixes: https://github.com/denoland/deno/issues/24368
This commit is contained in:
parent
b82a2f114c
commit
3a1a1cc030
8 changed files with 208 additions and 294 deletions
|
@ -1,191 +1,34 @@
|
|||
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
||||
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
|
||||
|
||||
// This implementation is inspired by "workerd" AsyncLocalStorage implementation:
|
||||
// https://github.com/cloudflare/workerd/blob/77fd0ed6ddba184414f0216508fc62b06e716cab/src/workerd/api/node/async-hooks.c++#L9
|
||||
|
||||
// TODO(petamoriken): enable prefer-primordials for node polyfills
|
||||
// deno-lint-ignore-file prefer-primordials
|
||||
|
||||
import { core } from "ext:core/mod.js";
|
||||
import { op_node_is_promise_rejected } from "ext:core/ops";
|
||||
import { primordials } from "ext:core/mod.js";
|
||||
import {
|
||||
AsyncVariable,
|
||||
getAsyncContext,
|
||||
setAsyncContext,
|
||||
} from "ext:runtime/01_async_context.js";
|
||||
import { validateFunction } from "ext:deno_node/internal/validators.mjs";
|
||||
import { newAsyncId } from "ext:deno_node/internal/async_hooks.ts";
|
||||
|
||||
function assert(cond: boolean) {
|
||||
if (!cond) throw new Error("Assertion failed");
|
||||
}
|
||||
const asyncContextStack: AsyncContextFrame[] = [];
|
||||
|
||||
function pushAsyncFrame(frame: AsyncContextFrame) {
|
||||
asyncContextStack.push(frame);
|
||||
}
|
||||
|
||||
function popAsyncFrame() {
|
||||
if (asyncContextStack.length > 0) {
|
||||
asyncContextStack.pop();
|
||||
}
|
||||
}
|
||||
|
||||
let rootAsyncFrame: AsyncContextFrame | undefined = undefined;
|
||||
let promiseHooksSet = false;
|
||||
|
||||
const asyncContext = Symbol("asyncContext");
|
||||
|
||||
function setPromiseHooks() {
|
||||
if (promiseHooksSet) {
|
||||
return;
|
||||
}
|
||||
promiseHooksSet = true;
|
||||
|
||||
const init = (promise: Promise<unknown>) => {
|
||||
const currentFrame = AsyncContextFrame.current();
|
||||
if (!currentFrame.isRoot()) {
|
||||
if (typeof promise[asyncContext] !== "undefined") {
|
||||
throw new Error("Promise already has async context");
|
||||
}
|
||||
AsyncContextFrame.attachContext(promise);
|
||||
}
|
||||
};
|
||||
const before = (promise: Promise<unknown>) => {
|
||||
const maybeFrame = promise[asyncContext];
|
||||
if (maybeFrame) {
|
||||
pushAsyncFrame(maybeFrame);
|
||||
} else {
|
||||
pushAsyncFrame(AsyncContextFrame.getRootAsyncContext());
|
||||
}
|
||||
};
|
||||
const after = (promise: Promise<unknown>) => {
|
||||
popAsyncFrame();
|
||||
if (!op_node_is_promise_rejected(promise)) {
|
||||
// @ts-ignore promise async context
|
||||
promise[asyncContext] = undefined;
|
||||
}
|
||||
};
|
||||
const resolve = (promise: Promise<unknown>) => {
|
||||
const currentFrame = AsyncContextFrame.current();
|
||||
if (
|
||||
!currentFrame.isRoot() && op_node_is_promise_rejected(promise) &&
|
||||
typeof promise[asyncContext] === "undefined"
|
||||
) {
|
||||
AsyncContextFrame.attachContext(promise);
|
||||
}
|
||||
};
|
||||
|
||||
core.setPromiseHooks(init, before, after, resolve);
|
||||
}
|
||||
|
||||
class AsyncContextFrame {
|
||||
storage: StorageEntry[];
|
||||
constructor(
|
||||
maybeParent?: AsyncContextFrame | null,
|
||||
maybeStorageEntry?: StorageEntry | null,
|
||||
isRoot = false,
|
||||
) {
|
||||
this.storage = [];
|
||||
|
||||
setPromiseHooks();
|
||||
|
||||
const propagate = (parent: AsyncContextFrame) => {
|
||||
parent.storage = parent.storage.filter((entry) => !entry.key.isDead());
|
||||
parent.storage.forEach((entry) => this.storage.push(entry.clone()));
|
||||
|
||||
if (maybeStorageEntry) {
|
||||
const existingEntry = this.storage.find((entry) =>
|
||||
entry.key === maybeStorageEntry.key
|
||||
);
|
||||
if (existingEntry) {
|
||||
existingEntry.value = maybeStorageEntry.value;
|
||||
} else {
|
||||
this.storage.push(maybeStorageEntry);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (!isRoot) {
|
||||
if (maybeParent) {
|
||||
propagate(maybeParent);
|
||||
} else {
|
||||
propagate(AsyncContextFrame.current());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static tryGetContext(promise: Promise<unknown>) {
|
||||
// @ts-ignore promise async context
|
||||
return promise[asyncContext];
|
||||
}
|
||||
|
||||
static attachContext(promise: Promise<unknown>) {
|
||||
// @ts-ignore promise async context
|
||||
promise[asyncContext] = AsyncContextFrame.current();
|
||||
}
|
||||
|
||||
static getRootAsyncContext() {
|
||||
if (typeof rootAsyncFrame !== "undefined") {
|
||||
return rootAsyncFrame;
|
||||
}
|
||||
|
||||
rootAsyncFrame = new AsyncContextFrame(null, null, true);
|
||||
return rootAsyncFrame;
|
||||
}
|
||||
|
||||
static current() {
|
||||
if (asyncContextStack.length === 0) {
|
||||
return AsyncContextFrame.getRootAsyncContext();
|
||||
}
|
||||
|
||||
return asyncContextStack[asyncContextStack.length - 1];
|
||||
}
|
||||
|
||||
static create(
|
||||
maybeParent?: AsyncContextFrame | null,
|
||||
maybeStorageEntry?: StorageEntry | null,
|
||||
) {
|
||||
return new AsyncContextFrame(maybeParent, maybeStorageEntry);
|
||||
}
|
||||
|
||||
static wrap(
|
||||
fn: () => unknown,
|
||||
maybeFrame: AsyncContextFrame | undefined,
|
||||
// deno-lint-ignore no-explicit-any
|
||||
thisArg: any,
|
||||
) {
|
||||
// deno-lint-ignore no-explicit-any
|
||||
return (...args: any) => {
|
||||
const frame = maybeFrame || AsyncContextFrame.current();
|
||||
Scope.enter(frame);
|
||||
try {
|
||||
return fn.apply(thisArg, args);
|
||||
} finally {
|
||||
Scope.exit();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
get(key: StorageKey) {
|
||||
assert(!key.isDead());
|
||||
this.storage = this.storage.filter((entry) => !entry.key.isDead());
|
||||
const entry = this.storage.find((entry) => entry.key === key);
|
||||
if (entry) {
|
||||
return entry.value;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
isRoot() {
|
||||
return AsyncContextFrame.getRootAsyncContext() == this;
|
||||
}
|
||||
}
|
||||
const {
|
||||
ObjectDefineProperties,
|
||||
ReflectApply,
|
||||
FunctionPrototypeBind,
|
||||
ArrayPrototypeUnshift,
|
||||
ObjectFreeze,
|
||||
} = primordials;
|
||||
|
||||
export class AsyncResource {
|
||||
frame: AsyncContextFrame;
|
||||
type: string;
|
||||
#snapshot: unknown;
|
||||
#asyncId: number;
|
||||
|
||||
constructor(type: string) {
|
||||
this.type = type;
|
||||
this.frame = AsyncContextFrame.current();
|
||||
this.#snapshot = getAsyncContext();
|
||||
this.#asyncId = newAsyncId();
|
||||
}
|
||||
|
||||
|
@ -198,35 +41,38 @@ export class AsyncResource {
|
|||
thisArg: unknown,
|
||||
...args: unknown[]
|
||||
) {
|
||||
Scope.enter(this.frame);
|
||||
|
||||
const previousContext = getAsyncContext();
|
||||
try {
|
||||
return fn.apply(thisArg, args);
|
||||
setAsyncContext(this.#snapshot);
|
||||
return ReflectApply(fn, thisArg, args);
|
||||
} finally {
|
||||
Scope.exit();
|
||||
setAsyncContext(previousContext);
|
||||
}
|
||||
}
|
||||
|
||||
emitDestroy() {}
|
||||
|
||||
bind(fn: (...args: unknown[]) => unknown, thisArg = this) {
|
||||
bind(fn: (...args: unknown[]) => unknown, thisArg) {
|
||||
validateFunction(fn, "fn");
|
||||
const frame = AsyncContextFrame.current();
|
||||
const bound = AsyncContextFrame.wrap(fn, frame, thisArg);
|
||||
|
||||
Object.defineProperties(bound, {
|
||||
let bound;
|
||||
if (thisArg === undefined) {
|
||||
// deno-lint-ignore no-this-alias
|
||||
const resource = this;
|
||||
bound = function (...args) {
|
||||
ArrayPrototypeUnshift(args, fn, this);
|
||||
return ReflectApply(resource.runInAsyncScope, resource, args);
|
||||
};
|
||||
} else {
|
||||
bound = FunctionPrototypeBind(this.runInAsyncScope, this, fn, thisArg);
|
||||
}
|
||||
ObjectDefineProperties(bound, {
|
||||
"length": {
|
||||
__proto__: null,
|
||||
configurable: true,
|
||||
enumerable: false,
|
||||
value: fn.length,
|
||||
writable: false,
|
||||
},
|
||||
"asyncResource": {
|
||||
configurable: true,
|
||||
enumerable: true,
|
||||
value: this,
|
||||
writable: true,
|
||||
},
|
||||
});
|
||||
return bound;
|
||||
}
|
||||
|
@ -236,95 +82,54 @@ export class AsyncResource {
|
|||
type?: string,
|
||||
thisArg?: AsyncResource,
|
||||
) {
|
||||
type = type || fn.name;
|
||||
return (new AsyncResource(type || "AsyncResource")).bind(fn, thisArg);
|
||||
type = type || fn.name || "bound-anonymous-fn";
|
||||
return (new AsyncResource(type)).bind(fn, thisArg);
|
||||
}
|
||||
}
|
||||
|
||||
class Scope {
|
||||
static enter(maybeFrame?: AsyncContextFrame) {
|
||||
if (maybeFrame) {
|
||||
pushAsyncFrame(maybeFrame);
|
||||
} else {
|
||||
pushAsyncFrame(AsyncContextFrame.getRootAsyncContext());
|
||||
}
|
||||
}
|
||||
|
||||
static exit() {
|
||||
popAsyncFrame();
|
||||
}
|
||||
}
|
||||
|
||||
class StorageEntry {
|
||||
key: StorageKey;
|
||||
value: unknown;
|
||||
constructor(key: StorageKey, value: unknown) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
clone() {
|
||||
return new StorageEntry(this.key, this.value);
|
||||
}
|
||||
}
|
||||
|
||||
class StorageKey {
|
||||
#dead = false;
|
||||
|
||||
reset() {
|
||||
this.#dead = true;
|
||||
}
|
||||
|
||||
isDead() {
|
||||
return this.#dead;
|
||||
}
|
||||
}
|
||||
|
||||
const fnReg = new FinalizationRegistry((key: StorageKey) => {
|
||||
key.reset();
|
||||
});
|
||||
|
||||
export class AsyncLocalStorage {
|
||||
#key;
|
||||
|
||||
constructor() {
|
||||
this.#key = new StorageKey();
|
||||
fnReg.register(this, this.#key);
|
||||
}
|
||||
#variable = new AsyncVariable();
|
||||
enabled = false;
|
||||
|
||||
// deno-lint-ignore no-explicit-any
|
||||
run(store: any, callback: any, ...args: any[]): any {
|
||||
const frame = AsyncContextFrame.create(
|
||||
null,
|
||||
new StorageEntry(this.#key, store),
|
||||
);
|
||||
Scope.enter(frame);
|
||||
let res;
|
||||
this.enabled = true;
|
||||
const previous = this.#variable.enter(store);
|
||||
try {
|
||||
res = callback(...args);
|
||||
return ReflectApply(callback, null, args);
|
||||
} finally {
|
||||
Scope.exit();
|
||||
setAsyncContext(previous);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
// deno-lint-ignore no-explicit-any
|
||||
exit(callback: (...args: unknown[]) => any, ...args: any[]): any {
|
||||
return this.run(undefined, callback, args);
|
||||
if (!this.enabled) {
|
||||
return ReflectApply(callback, null, args);
|
||||
}
|
||||
this.enabled = false;
|
||||
try {
|
||||
return ReflectApply(callback, null, args);
|
||||
} finally {
|
||||
this.enabled = true;
|
||||
}
|
||||
}
|
||||
|
||||
// deno-lint-ignore no-explicit-any
|
||||
getStore(): any {
|
||||
const currentFrame = AsyncContextFrame.current();
|
||||
return currentFrame.get(this.#key);
|
||||
if (!this.enabled) {
|
||||
return undefined;
|
||||
}
|
||||
return this.#variable.get();
|
||||
}
|
||||
|
||||
enterWith(store: unknown) {
|
||||
const frame = AsyncContextFrame.create(
|
||||
null,
|
||||
new StorageEntry(this.#key, store),
|
||||
);
|
||||
Scope.enter(frame);
|
||||
this.enabled = true;
|
||||
this.#variable.enter(store);
|
||||
}
|
||||
|
||||
disable() {
|
||||
this.enabled = false;
|
||||
}
|
||||
|
||||
static bind(fn: (...args: unknown[]) => unknown) {
|
||||
|
@ -335,14 +140,24 @@ export class AsyncLocalStorage {
|
|||
return AsyncLocalStorage.bind((
|
||||
cb: (...args: unknown[]) => unknown,
|
||||
...args: unknown[]
|
||||
) => cb(...args));
|
||||
) => ReflectApply(cb, null, args));
|
||||
}
|
||||
}
|
||||
|
||||
export function executionAsyncId() {
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
export function triggerAsyncId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
export function executionAsyncResource() {
|
||||
return {};
|
||||
}
|
||||
|
||||
export const asyncWrapProviders = ObjectFreeze({ __proto__: null });
|
||||
|
||||
class AsyncHook {
|
||||
enable() {
|
||||
}
|
||||
|
@ -355,12 +170,12 @@ export function createHook() {
|
|||
return new AsyncHook();
|
||||
}
|
||||
|
||||
// Placing all exports down here because the exported classes won't export
|
||||
// otherwise.
|
||||
export default {
|
||||
// Embedder API
|
||||
AsyncResource,
|
||||
executionAsyncId,
|
||||
createHook,
|
||||
AsyncLocalStorage,
|
||||
createHook,
|
||||
executionAsyncId,
|
||||
triggerAsyncId,
|
||||
executionAsyncResource,
|
||||
asyncWrapProviders,
|
||||
AsyncResource,
|
||||
};
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue