feat(kv) queue implementation (#19459)

Extend the unstable `Deno.Kv` API to support queues.
This commit is contained in:
Igor Zinkovsky 2023-06-13 17:49:57 -07:00 committed by GitHub
parent d451abfc91
commit fd9d6baea3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 1203 additions and 51 deletions

View file

@ -0,0 +1,56 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { assertEquals } from "./test_util.ts";
const sleep = (time: number) => new Promise((r) => setTimeout(r, time));
let isCI: boolean;
try {
isCI = Deno.env.get("CI") !== undefined;
} catch {
isCI = true;
}
function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) {
Deno.test({
name,
// https://github.com/denoland/deno/issues/18363
ignore: Deno.build.os === "darwin" && isCI,
async fn() {
const db: Deno.Kv = await Deno.openKv(
":memory:",
);
await fn(db);
},
});
}
async function collect<T>(
iter: Deno.KvListIterator<T>,
): Promise<Deno.KvEntry<T>[]> {
const entries: Deno.KvEntry<T>[] = [];
for await (const entry of iter) {
entries.push(entry);
}
return entries;
}
queueTest("queue with undelivered", async (db) => {
const listener = db.listenQueue((_msg) => {
throw new TypeError("dequeue error");
});
try {
await db.enqueue("test", {
keysIfUndelivered: [["queue_failed", "a"], ["queue_failed", "b"]],
});
await sleep(100000);
const undelivered = await collect(db.list({ prefix: ["queue_failed"] }));
assertEquals(undelivered.length, 2);
assertEquals(undelivered[0].key, ["queue_failed", "a"]);
assertEquals(undelivered[0].value, "test");
assertEquals(undelivered[1].key, ["queue_failed", "b"]);
assertEquals(undelivered[1].value, "test");
} finally {
db.close();
await listener;
}
});

View file

@ -3,11 +3,16 @@ import {
assert,
assertEquals,
AssertionError,
assertNotEquals,
assertRejects,
assertThrows,
Deferred,
deferred,
} from "./test_util.ts";
import { assertType, IsExact } from "../../../test_util/std/testing/types.ts";
const sleep = (time: number) => new Promise((r) => setTimeout(r, time));
let isCI: boolean;
try {
isCI = Deno.env.get("CI") !== undefined;
@ -59,6 +64,20 @@ function dbTest(name: string, fn: (db: Deno.Kv) => Promise<void>) {
});
}
function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) {
Deno.test({
name,
// https://github.com/denoland/deno/issues/18363
ignore: Deno.build.os === "darwin" && isCI,
async fn() {
const db: Deno.Kv = await Deno.openKv(
":memory:",
);
await fn(db);
},
});
}
dbTest("basic read-write-delete and versionstamps", async (db) => {
const result1 = await db.get(["a"]);
assertEquals(result1.key, ["a"]);
@ -1304,3 +1323,429 @@ async function _typeCheckingTests() {
assert(!j.done);
assertType<IsExact<typeof j.value, Deno.KvEntry<string>>>(true);
}
queueTest("basic listenQueue and enqueue", async (db) => {
const promise = deferred();
let dequeuedMessage: unknown = null;
const listener = db.listenQueue((msg) => {
dequeuedMessage = msg;
promise.resolve();
});
try {
const res = await db.enqueue("test");
assert(res.ok);
assertNotEquals(res.versionstamp, null);
await promise;
assertEquals(dequeuedMessage, "test");
} finally {
db.close();
await listener;
}
});
for (const { name, value } of VALUE_CASES) {
queueTest(`listenQueue and enqueue ${name}`, async (db) => {
const numEnqueues = 10;
let count = 0;
const promises: Deferred<void>[] = [];
const dequeuedMessages: unknown[] = [];
const listeners: Promise<void>[] = [];
listeners.push(db.listenQueue((msg) => {
dequeuedMessages.push(msg);
promises[count++].resolve();
}));
try {
for (let i = 0; i < numEnqueues; i++) {
promises.push(deferred());
await db.enqueue(value);
}
for (let i = 0; i < numEnqueues; i++) {
await promises[i];
}
for (let i = 0; i < numEnqueues; i++) {
assertEquals(dequeuedMessages[i], value);
}
} finally {
db.close();
for (const listener of listeners) {
await listener;
}
}
});
}
queueTest("queue mixed types", async (db) => {
let promise: Deferred<void>;
let dequeuedMessage: unknown = null;
const listener = db.listenQueue((msg) => {
dequeuedMessage = msg;
promise.resolve();
});
try {
for (const item of VALUE_CASES) {
promise = deferred();
await db.enqueue(item.value);
await promise;
assertEquals(dequeuedMessage, item.value);
}
} finally {
db.close();
await listener;
}
});
queueTest("queue delay", async (db) => {
let dequeueTime: number | undefined;
const promise = deferred();
let dequeuedMessage: unknown = null;
const listener = db.listenQueue((msg) => {
dequeueTime = Date.now();
dequeuedMessage = msg;
promise.resolve();
});
try {
const enqueueTime = Date.now();
await db.enqueue("test", { delay: 1000 });
await promise;
assertEquals(dequeuedMessage, "test");
assert(dequeueTime !== undefined);
assert(dequeueTime - enqueueTime >= 1000);
} finally {
db.close();
await listener;
}
});
queueTest("queue delay with atomic", async (db) => {
let dequeueTime: number | undefined;
const promise = deferred();
let dequeuedMessage: unknown = null;
const listener = db.listenQueue((msg) => {
dequeueTime = Date.now();
dequeuedMessage = msg;
promise.resolve();
});
try {
const enqueueTime = Date.now();
const res = await db.atomic()
.enqueue("test", { delay: 1000 })
.commit();
assert(res.ok);
await promise;
assertEquals(dequeuedMessage, "test");
assert(dequeueTime !== undefined);
assert(dequeueTime - enqueueTime >= 1000);
} finally {
db.close();
await listener;
}
});
queueTest("queue delay and now", async (db) => {
let count = 0;
let dequeueTime: number | undefined;
const promise = deferred();
let dequeuedMessage: unknown = null;
const listener = db.listenQueue((msg) => {
count += 1;
if (count == 2) {
dequeueTime = Date.now();
dequeuedMessage = msg;
promise.resolve();
}
});
try {
const enqueueTime = Date.now();
await db.enqueue("test-1000", { delay: 1000 });
await db.enqueue("test");
await promise;
assertEquals(dequeuedMessage, "test-1000");
assert(dequeueTime !== undefined);
assert(dequeueTime - enqueueTime >= 1000);
} finally {
db.close();
await listener;
}
});
dbTest("queue negative delay", async (db) => {
await assertRejects(async () => {
await db.enqueue("test", { delay: -100 });
}, TypeError);
});
dbTest("queue nan delay", async (db) => {
await assertRejects(async () => {
await db.enqueue("test", { delay: Number.NaN });
}, TypeError);
});
dbTest("queue large delay", async (db) => {
await db.enqueue("test", { delay: 7 * 24 * 60 * 60 * 1000 });
await assertRejects(async () => {
await db.enqueue("test", { delay: 7 * 24 * 60 * 60 * 1000 + 1 });
}, TypeError);
});
queueTest("listenQueue with async callback", async (db) => {
const promise = deferred();
let dequeuedMessage: unknown = null;
const listener = db.listenQueue(async (msg) => {
dequeuedMessage = msg;
await sleep(100);
promise.resolve();
});
try {
await db.enqueue("test");
await promise;
assertEquals(dequeuedMessage, "test");
} finally {
db.close();
await listener;
}
});
queueTest("queue retries", async (db) => {
let count = 0;
const listener = db.listenQueue(async (_msg) => {
count += 1;
await sleep(10);
throw new TypeError("dequeue error");
});
try {
await db.enqueue("test");
await sleep(10000);
} finally {
db.close();
await listener;
}
// There should have been 1 attempt + 3 retries in the 10 seconds
assertEquals(4, count);
});
queueTest("multiple listenQueues", async (db) => {
const numListens = 10;
let count = 0;
const promises: Deferred<void>[] = [];
const dequeuedMessages: unknown[] = [];
const listeners: Promise<void>[] = [];
for (let i = 0; i < numListens; i++) {
listeners.push(db.listenQueue((msg) => {
dequeuedMessages.push(msg);
promises[count++].resolve();
}));
}
try {
for (let i = 0; i < numListens; i++) {
promises.push(deferred());
await db.enqueue("msg_" + i);
await promises[i];
const msg = dequeuedMessages[i];
assertEquals("msg_" + i, msg);
}
} finally {
db.close();
for (let i = 0; i < numListens; i++) {
await listeners[i];
}
}
});
queueTest("enqueue with atomic", async (db) => {
const promise = deferred();
let dequeuedMessage: unknown = null;
const listener = db.listenQueue((msg) => {
dequeuedMessage = msg;
promise.resolve();
});
try {
await db.set(["t"], "1");
let currentValue = await db.get(["t"]);
assertEquals("1", currentValue.value);
const res = await db.atomic()
.check(currentValue)
.set(currentValue.key, "2")
.enqueue("test")
.commit();
assert(res.ok);
await promise;
assertEquals("test", dequeuedMessage);
currentValue = await db.get(["t"]);
assertEquals("2", currentValue.value);
} finally {
db.close();
await listener;
}
});
queueTest("enqueue with atomic nonce", async (db) => {
const promise = deferred();
let dequeuedMessage: unknown = null;
const nonce = crypto.randomUUID();
const listener = db.listenQueue(async (val) => {
const message = val as { msg: string; nonce: string };
const nonce = message.nonce;
const nonceValue = await db.get(["nonces", nonce]);
if (nonceValue.versionstamp === null) {
dequeuedMessage = message.msg;
promise.resolve();
return;
}
assertNotEquals(nonceValue.versionstamp, null);
const res = await db.atomic()
.check(nonceValue)
.delete(["nonces", nonce])
.set(["a", "b"], message.msg)
.commit();
if (res.ok) {
// Simulate an error so that the message has to be redelivered
throw new Error("injected error");
}
});
try {
const res = await db.atomic()
.check({ key: ["nonces", nonce], versionstamp: null })
.set(["nonces", nonce], true)
.enqueue({ msg: "test", nonce })
.commit();
assert(res.ok);
await promise;
assertEquals("test", dequeuedMessage);
const currentValue = await db.get(["a", "b"]);
assertEquals("test", currentValue.value);
const nonceValue = await db.get(["nonces", nonce]);
assertEquals(nonceValue.versionstamp, null);
} finally {
db.close();
await listener;
}
});
Deno.test({
name: "queue persistence with inflight messages",
sanitizeOps: false,
sanitizeResources: false,
async fn() {
const filename = "cli/tests/testdata/queue.db";
try {
await Deno.remove(filename);
} catch {
// pass
}
try {
let db: Deno.Kv = await Deno.openKv(filename);
let count = 0;
let promise = deferred();
// Register long-running handler.
let listener = db.listenQueue(async (_msg) => {
count += 1;
if (count == 3) {
promise.resolve();
}
await sleep(60000);
});
// Enqueue 3 messages.
await db.enqueue("msg0");
await db.enqueue("msg1");
await db.enqueue("msg2");
await promise;
// Close the database and wait for the listerner to finish.
db.close();
await listener;
// Now reopen the database.
db = await Deno.openKv(filename);
count = 0;
promise = deferred();
// Register a handler that will complete quickly.
listener = db.listenQueue((_msg) => {
count += 1;
if (count == 3) {
promise.resolve();
}
});
// Wait for the handlers to finish.
await promise;
assertEquals(3, count);
db.close();
await listener;
} finally {
await Deno.remove(filename);
}
},
});
Deno.test({
name: "queue persistence with delay messages",
sanitizeOps: false,
sanitizeResources: false,
async fn() {
const filename = "cli/tests/testdata/queue.db";
try {
await Deno.remove(filename);
} catch {
// pass
}
try {
let db: Deno.Kv = await Deno.openKv(filename);
let count = 0;
let promise = deferred();
// Register long-running handler.
let listener = db.listenQueue((_msg) => {});
// Enqueue 3 messages into the future.
await db.enqueue("msg0", { delay: 10000 });
await db.enqueue("msg1", { delay: 10000 });
await db.enqueue("msg2", { delay: 10000 });
// Close the database and wait for the listerner to finish.
db.close();
await listener;
// Now reopen the database.
db = await Deno.openKv(filename);
count = 0;
promise = deferred();
// Register a handler that will complete quickly.
listener = db.listenQueue((_msg) => {
count += 1;
if (count == 3) {
promise.resolve();
}
});
// Wait for the handlers to finish.
await promise;
assertEquals(3, count);
db.close();
await listener;
} finally {
await Deno.remove(filename);
}
},
});