From 8e7169ba63d2c5417c38b04316fe4e7daa4dc781 Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Thu, 19 Feb 2026 08:43:22 +0100 Subject: [PATCH 1/5] feat(async/unstable): add support for AbortSignal in --- async/deno.json | 1 + async/unstable_pool.ts | 178 ++++++++++++++++++++++++++++++++++++ async/unstable_pool_test.ts | 166 +++++++++++++++++++++++++++++++++ 3 files changed, 345 insertions(+) create mode 100644 async/unstable_pool.ts create mode 100644 async/unstable_pool_test.ts diff --git a/async/deno.json b/async/deno.json index 4f6b68d49f1e..396e10ff523d 100644 --- a/async/deno.json +++ b/async/deno.json @@ -10,6 +10,7 @@ "./mux-async-iterator": "./mux_async_iterator.ts", "./unstable-mux-async-iterator": "./unstable_mux_async_iterator.ts", "./pool": "./pool.ts", + "./unstable-pool": "./unstable_pool.ts", "./retry": "./retry.ts", "./unstable-retry": "./unstable_retry.ts", "./tee": "./tee.ts", diff --git a/async/unstable_pool.ts b/async/unstable_pool.ts new file mode 100644 index 000000000000..873ee688ab3d --- /dev/null +++ b/async/unstable_pool.ts @@ -0,0 +1,178 @@ +// Copyright 2018-2026 the Deno authors. MIT license. +// This module is browser compatible. + +/** Error message emitted from the thrown error while mapping. */ +const ERROR_WHILE_MAPPING_MESSAGE = + "Cannot complete the mapping as an error was thrown from an item"; + +/** Options for {@linkcode pooledMap}. */ +export interface PooledMapOptions { + /** + * The maximum count of items being processed concurrently. + */ + poolLimit: number; + /** + * An AbortSignal to cancel the pooled mapping operation. + * + * If the signal is aborted, no new items will begin processing. All currently + * executing items are allowed to finish. The iterator then rejects with the + * signal's reason. + * + * @default {undefined} + */ + signal?: AbortSignal; +} + +/** + * Transforms values from an (async) iterable into another async iterable. + * The transforms are done concurrently, with a max concurrency defined by + * {@linkcode PooledMapOptions.poolLimit}. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * + * If an error is thrown from `iteratorFn`, no new transformations will begin. + * All currently executing transformations are allowed to finish and still + * yielded on success. After that, the rejections among them are gathered and + * thrown by the iterator in an `AggregateError`. + * + * @example Usage + * ```ts + * import { pooledMap } from "@std/async/unstable-pool"; + * import { assertEquals } from "@std/assert"; + * + * const results = pooledMap( + * [1, 2, 3], + * (i) => new Promise((r) => setTimeout(() => r(i), 1000)), + * { poolLimit: 2 }, + * ); + * + * assertEquals(await Array.fromAsync(results), [1, 2, 3]); + * ``` + * + * @example Cancellation with AbortSignal + * ```ts + * import { pooledMap } from "@std/async/unstable-pool"; + * import { assertRejects } from "@std/assert"; + * + * const controller = new AbortController(); + * const results = pooledMap( + * [1, 2, 3, 4, 5], + * (i) => new Promise((r) => setTimeout(() => r(i), 1000)), + * { poolLimit: 2, signal: controller.signal }, + * ); + * + * controller.abort(new Error("cancelled")); + * + * await assertRejects( + * () => Array.fromAsync(results), + * Error, + * "cancelled", + * ); + * ``` + * + * @typeParam T the input type. + * @typeParam R the output type. + * @param array The input array for mapping. + * @param iteratorFn The function to call for every item of the array. + * @param options Options including pool limit and abort signal. + * @returns The async iterator with the transformed values. + */ +export function pooledMap( + array: Iterable | AsyncIterable, + iteratorFn: (data: T) => Promise, + options: PooledMapOptions, +): AsyncIterableIterator { + const { poolLimit, signal } = options; + + const res = new TransformStream, R>({ + async transform( + p: Promise, + controller: TransformStreamDefaultController, + ) { + try { + const s = await p; + controller.enqueue(s); + } catch (e) { + if (signal?.aborted) { + controller.error(signal.reason); + } else if ( + e instanceof AggregateError && + e.message === ERROR_WHILE_MAPPING_MESSAGE + ) { + controller.error(e as unknown); + } + } + }, + }); + + (async () => { + const writer = res.writable.getWriter(); + const executing: Array> = []; + + function raceWithSignal( + promises: Array>, + ): Promise { + if (!signal) return Promise.race(promises); + const { promise, resolve, reject } = Promise.withResolvers(); + const onAbort = () => reject(signal.reason); + signal.addEventListener("abort", onAbort, { once: true }); + return Promise.race([...promises, promise]).finally(() => { + signal.removeEventListener("abort", onAbort); + resolve(undefined as never); + }); + } + + try { + signal?.throwIfAborted(); + + for await (const item of array) { + signal?.throwIfAborted(); + + const p = Promise.resolve().then(() => iteratorFn(item)); + // Only write on success. If we `writer.write()` a rejected promise, + // that will end the iteration. We don't want that yet. Instead let it + // fail the race, taking us to the catch block where all currently + // executing jobs are allowed to finish and all rejections among them + // can be reported together. + writer.write(p); + const e: Promise = p.then(() => + executing.splice(executing.indexOf(e), 1) + ); + executing.push(e); + if (executing.length >= poolLimit) { + await raceWithSignal(executing); + } + } + await Promise.all(executing); + writer.close(); + } catch { + const errors = []; + for (const result of await Promise.allSettled(executing)) { + if (result.status === "rejected") { + errors.push(result.reason); + } + } + if (signal?.aborted) { + writer.write(Promise.reject(signal.reason)).catch(() => {}); + } else { + writer.write(Promise.reject( + new AggregateError(errors, ERROR_WHILE_MAPPING_MESSAGE), + )).catch(() => {}); + } + } + })(); + + // Feature test until browser coverage is adequate + return Symbol.asyncIterator in res.readable && + typeof res.readable[Symbol.asyncIterator] === "function" + ? (res.readable[Symbol.asyncIterator] as () => AsyncIterableIterator)() + : (async function* () { + const reader = res.readable.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + yield value; + } + reader.releaseLock(); + })(); +} diff --git a/async/unstable_pool_test.ts b/async/unstable_pool_test.ts new file mode 100644 index 000000000000..48c8b3865f8a --- /dev/null +++ b/async/unstable_pool_test.ts @@ -0,0 +1,166 @@ +// Copyright 2018-2026 the Deno authors. MIT license. +import { pooledMap } from "./unstable_pool.ts"; +import { + assertEquals, + assertGreaterOrEqual, + assertLess, + assertRejects, + assertStringIncludes, +} from "@std/assert"; +import { delay } from "./delay.ts"; +import { FakeTime } from "@std/testing/time"; + +Deno.test("pooledMap()", async () => { + using time = new FakeTime(); + + const start = Date.now(); + const results = pooledMap( + [1, 2, 3], + (i) => new Promise((r) => setTimeout(() => r(i), 300)), + { poolLimit: 2 }, + ); + for (const _ of Array(7)) { + time.tick(100); + await time.runMicrotasks(); + } + const array = await Array.fromAsync(results); + assertEquals(array, [1, 2, 3]); + const diff = Date.now() - start; + + assertGreaterOrEqual(diff, 600); + assertLess(diff, 900); +}); + +Deno.test("pooledMap() handles errors", async () => { + async function mapNumber(n: number): Promise { + if (n <= 2) { + throw new Error(`Bad number: ${n}`); + } + await delay(100); + return n; + } + const mappedNumbers: number[] = []; + const error = await assertRejects( + async () => { + for await (const m of pooledMap([1, 2, 3, 4], mapNumber, { poolLimit: 3 })) { + mappedNumbers.push(m); + } + }, + AggregateError, + "Cannot complete the mapping as an error was thrown from an item", + ); + assertEquals(error.errors.length, 2); + assertStringIncludes(error.errors[0].stack, "Error: Bad number: 1"); + assertStringIncludes(error.errors[1].stack, "Error: Bad number: 2"); + assertEquals(mappedNumbers, [3]); +}); + +Deno.test("pooledMap() returns ordered items", async () => { + const results = pooledMap( + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + (i) => new Promise((r) => setTimeout(() => r(i), 100 / i)), + { poolLimit: 2 }, + ); + + const returned = await Array.fromAsync(results); + assertEquals(returned, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); +}); + +Deno.test("pooledMap() checks browser compat", async () => { + const asyncIterFunc = ReadableStream.prototype[Symbol.asyncIterator]; + // deno-lint-ignore no-explicit-any + delete (ReadableStream.prototype as any)[Symbol.asyncIterator]; + try { + const results = pooledMap( + [1, 2, 3], + (i) => new Promise((r) => setTimeout(() => r(i), 100)), + { poolLimit: 2 }, + ); + const array = await Array.fromAsync(results); + assertEquals(array, [1, 2, 3]); + } finally { + ReadableStream.prototype[Symbol.asyncIterator] = asyncIterFunc; + } +}); + +Deno.test("pooledMap() rejects immediately with already-aborted signal", async () => { + const controller = new AbortController(); + controller.abort(new Error("already aborted")); + + const results = pooledMap( + [1, 2, 3], + (i) => Promise.resolve(i), + { poolLimit: 2, signal: controller.signal }, + ); + + await assertRejects( + () => Array.fromAsync(results), + Error, + "already aborted", + ); +}); + +Deno.test("pooledMap() stops processing when signal is aborted", async () => { + const controller = new AbortController(); + const started: number[] = []; + + const results = pooledMap( + [1, 2, 3, 4, 5], + async (i) => { + started.push(i); + await delay(50); + if (i === 2) controller.abort(new Error("stop at 2")); + return i; + }, + { poolLimit: 1, signal: controller.signal }, + ); + + const collected: number[] = []; + await assertRejects( + async () => { + for await (const value of results) { + collected.push(value); + } + }, + Error, + "stop at 2", + ); + + assertGreaterOrEqual(started.length, 2); + assertLess(started.length, 5); +}); + +Deno.test("pooledMap() aborts during pool wait", async () => { + const controller = new AbortController(); + + const results = pooledMap( + [1, 2, 3, 4, 5, 6, 7, 8], + async (i) => { + await delay(200); + return i; + }, + { poolLimit: 2, signal: controller.signal }, + ); + + setTimeout(() => controller.abort(new Error("timed out")), 50); + + await assertRejects( + () => Array.fromAsync(results), + Error, + "timed out", + ); +}); + +Deno.test("pooledMap() works normally without signal", async () => { + const results = pooledMap( + [10, 20, 30], + async (i) => { + await delay(10); + return i * 2; + }, + { poolLimit: 2 }, + ); + + const array = await Array.fromAsync(results); + assertEquals(array, [20, 40, 60]); +}); From bd465c13933de55084741bab3505a17de4950463 Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Thu, 19 Feb 2026 08:48:18 +0100 Subject: [PATCH 2/5] fmt --- async/unstable_pool_test.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/async/unstable_pool_test.ts b/async/unstable_pool_test.ts index 48c8b3865f8a..01bffe637d88 100644 --- a/async/unstable_pool_test.ts +++ b/async/unstable_pool_test.ts @@ -42,7 +42,9 @@ Deno.test("pooledMap() handles errors", async () => { const mappedNumbers: number[] = []; const error = await assertRejects( async () => { - for await (const m of pooledMap([1, 2, 3, 4], mapNumber, { poolLimit: 3 })) { + for await ( + const m of pooledMap([1, 2, 3, 4], mapNumber, { poolLimit: 3 }) + ) { mappedNumbers.push(m); } }, From 8e7ed40a2c19e5c3fca9bf8cff4a6df31f0406c2 Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Thu, 19 Feb 2026 09:38:36 +0100 Subject: [PATCH 3/5] add input validation --- async/unstable_pool.ts | 4 ++++ async/unstable_pool_test.ts | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/async/unstable_pool.ts b/async/unstable_pool.ts index 873ee688ab3d..6ed3c74b92b0 100644 --- a/async/unstable_pool.ts +++ b/async/unstable_pool.ts @@ -84,6 +84,10 @@ export function pooledMap( ): AsyncIterableIterator { const { poolLimit, signal } = options; + if (!Number.isInteger(poolLimit) || poolLimit < 1) { + throw new RangeError("'poolLimit' must be a positive integer"); + } + const res = new TransformStream, R>({ async transform( p: Promise, diff --git a/async/unstable_pool_test.ts b/async/unstable_pool_test.ts index 01bffe637d88..c1e1a8b473da 100644 --- a/async/unstable_pool_test.ts +++ b/async/unstable_pool_test.ts @@ -6,10 +6,44 @@ import { assertLess, assertRejects, assertStringIncludes, + assertThrows, } from "@std/assert"; import { delay } from "./delay.ts"; import { FakeTime } from "@std/testing/time"; +Deno.test("pooledMap() throws for non-positive poolLimit", () => { + const noop = (i: number) => Promise.resolve(i); + assertThrows( + () => pooledMap([1], noop, { poolLimit: 0 }), + RangeError, + "'poolLimit' must be a positive integer", + ); + assertThrows( + () => pooledMap([1], noop, { poolLimit: -1 }), + RangeError, + "'poolLimit' must be a positive integer", + ); +}); + +Deno.test("pooledMap() throws for non-integer poolLimit", () => { + const noop = (i: number) => Promise.resolve(i); + assertThrows( + () => pooledMap([1], noop, { poolLimit: 1.5 }), + RangeError, + "'poolLimit' must be a positive integer", + ); + assertThrows( + () => pooledMap([1], noop, { poolLimit: NaN }), + RangeError, + "'poolLimit' must be a positive integer", + ); + assertThrows( + () => pooledMap([1], noop, { poolLimit: Infinity }), + RangeError, + "'poolLimit' must be a positive integer", + ); +}); + Deno.test("pooledMap()", async () => { using time = new FakeTime(); From b25ff6ac8fc3238a1e35410ebb4e012d1a0a7c9b Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Sat, 28 Feb 2026 11:51:58 +0100 Subject: [PATCH 4/5] merge --- async/unstable_pool.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/async/unstable_pool.ts b/async/unstable_pool.ts index 6ed3c74b92b0..c6e7187367de 100644 --- a/async/unstable_pool.ts +++ b/async/unstable_pool.ts @@ -9,6 +9,7 @@ const ERROR_WHILE_MAPPING_MESSAGE = export interface PooledMapOptions { /** * The maximum count of items being processed concurrently. + * Must be a positive integer. */ poolLimit: number; /** @@ -76,6 +77,7 @@ export interface PooledMapOptions { * @param iteratorFn The function to call for every item of the array. * @param options Options including pool limit and abort signal. * @returns The async iterator with the transformed values. + * @throws {RangeError} If `poolLimit` is not a positive integer. */ export function pooledMap( array: Iterable | AsyncIterable, From ac26cdd7a40355ea200428d536405b4b1f8d73b9 Mon Sep 17 00:00:00 2001 From: Tomas Zijdemans Date: Fri, 6 Mar 2026 11:00:29 +0100 Subject: [PATCH 5/5] use overload --- async/unstable_pool.ts | 68 +++++++++++++++++++++++++++++++++---- async/unstable_pool_test.ts | 61 +++++++++++++++++++++++++-------- 2 files changed, 108 insertions(+), 21 deletions(-) diff --git a/async/unstable_pool.ts b/async/unstable_pool.ts index c6e7187367de..0e3886108c06 100644 --- a/async/unstable_pool.ts +++ b/async/unstable_pool.ts @@ -27,7 +27,7 @@ export interface PooledMapOptions { /** * Transforms values from an (async) iterable into another async iterable. * The transforms are done concurrently, with a max concurrency defined by - * {@linkcode PooledMapOptions.poolLimit}. + * `poolLimit`. * * @experimental **UNSTABLE**: New API, yet to be vetted. * @@ -42,9 +42,49 @@ export interface PooledMapOptions { * import { assertEquals } from "@std/assert"; * * const results = pooledMap( + * 2, * [1, 2, 3], * (i) => new Promise((r) => setTimeout(() => r(i), 1000)), + * ); + * + * assertEquals(await Array.fromAsync(results), [1, 2, 3]); + * ``` + * + * @typeParam T the input type. + * @typeParam R the output type. + * @param poolLimit The maximum count of items being processed concurrently. + * @param array The input array for mapping. + * @param iteratorFn The function to call for every item of the array. + * @returns The async iterator with the transformed values. + * @throws {RangeError} If `poolLimit` is not a positive integer. + */ +export function pooledMap( + poolLimit: number, + array: Iterable | AsyncIterable, + iteratorFn: (data: T) => Promise, +): AsyncIterableIterator; + +/** + * Transforms values from an (async) iterable into another async iterable. + * The transforms are done concurrently, with a max concurrency defined by + * {@linkcode PooledMapOptions.poolLimit}. + * + * @experimental **UNSTABLE**: New API, yet to be vetted. + * + * If an error is thrown from `iteratorFn`, no new transformations will begin. + * All currently executing transformations are allowed to finish and still + * yielded on success. After that, the rejections among them are gathered and + * thrown by the iterator in an `AggregateError`. + * + * @example Usage with options + * ```ts + * import { pooledMap } from "@std/async/unstable-pool"; + * import { assertEquals } from "@std/assert"; + * + * const results = pooledMap( * { poolLimit: 2 }, + * [1, 2, 3], + * (i) => new Promise((r) => setTimeout(() => r(i), 1000)), * ); * * assertEquals(await Array.fromAsync(results), [1, 2, 3]); @@ -57,9 +97,9 @@ export interface PooledMapOptions { * * const controller = new AbortController(); * const results = pooledMap( + * { poolLimit: 2, signal: controller.signal }, * [1, 2, 3, 4, 5], * (i) => new Promise((r) => setTimeout(() => r(i), 1000)), - * { poolLimit: 2, signal: controller.signal }, * ); * * controller.abort(new Error("cancelled")); @@ -73,18 +113,32 @@ export interface PooledMapOptions { * * @typeParam T the input type. * @typeParam R the output type. + * @param options Options including pool limit and abort signal. * @param array The input array for mapping. * @param iteratorFn The function to call for every item of the array. - * @param options Options including pool limit and abort signal. * @returns The async iterator with the transformed values. * @throws {RangeError} If `poolLimit` is not a positive integer. */ export function pooledMap( + options: PooledMapOptions, + array: Iterable | AsyncIterable, + iteratorFn: (data: T) => Promise, +): AsyncIterableIterator; + +export function pooledMap( + poolLimitOrOptions: number | PooledMapOptions, array: Iterable | AsyncIterable, iteratorFn: (data: T) => Promise, - options: PooledMapOptions, ): AsyncIterableIterator { - const { poolLimit, signal } = options; + let poolLimit: number; + let signal: AbortSignal | undefined; + + if (typeof poolLimitOrOptions === "number") { + poolLimit = poolLimitOrOptions; + } else { + poolLimit = poolLimitOrOptions.poolLimit; + signal = poolLimitOrOptions.signal; + } if (!Number.isInteger(poolLimit) || poolLimit < 1) { throw new RangeError("'poolLimit' must be a positive integer"); @@ -120,10 +174,10 @@ export function pooledMap( ): Promise { if (!signal) return Promise.race(promises); const { promise, resolve, reject } = Promise.withResolvers(); - const onAbort = () => reject(signal.reason); + const onAbort = () => reject(signal!.reason); signal.addEventListener("abort", onAbort, { once: true }); return Promise.race([...promises, promise]).finally(() => { - signal.removeEventListener("abort", onAbort); + signal!.removeEventListener("abort", onAbort); resolve(undefined as never); }); } diff --git a/async/unstable_pool_test.ts b/async/unstable_pool_test.ts index c1e1a8b473da..4ef08514e421 100644 --- a/async/unstable_pool_test.ts +++ b/async/unstable_pool_test.ts @@ -14,12 +14,12 @@ import { FakeTime } from "@std/testing/time"; Deno.test("pooledMap() throws for non-positive poolLimit", () => { const noop = (i: number) => Promise.resolve(i); assertThrows( - () => pooledMap([1], noop, { poolLimit: 0 }), + () => pooledMap(0, [1], noop), RangeError, "'poolLimit' must be a positive integer", ); assertThrows( - () => pooledMap([1], noop, { poolLimit: -1 }), + () => pooledMap(-1, [1], noop), RangeError, "'poolLimit' must be a positive integer", ); @@ -28,17 +28,31 @@ Deno.test("pooledMap() throws for non-positive poolLimit", () => { Deno.test("pooledMap() throws for non-integer poolLimit", () => { const noop = (i: number) => Promise.resolve(i); assertThrows( - () => pooledMap([1], noop, { poolLimit: 1.5 }), + () => pooledMap(1.5, [1], noop), RangeError, "'poolLimit' must be a positive integer", ); assertThrows( - () => pooledMap([1], noop, { poolLimit: NaN }), + () => pooledMap(NaN, [1], noop), RangeError, "'poolLimit' must be a positive integer", ); assertThrows( - () => pooledMap([1], noop, { poolLimit: Infinity }), + () => pooledMap(Infinity, [1], noop), + RangeError, + "'poolLimit' must be a positive integer", + ); +}); + +Deno.test("pooledMap() throws for non-positive poolLimit in options form", () => { + const noop = (i: number) => Promise.resolve(i); + assertThrows( + () => pooledMap({ poolLimit: 0 }, [1], noop), + RangeError, + "'poolLimit' must be a positive integer", + ); + assertThrows( + () => pooledMap({ poolLimit: -1 }, [1], noop), RangeError, "'poolLimit' must be a positive integer", ); @@ -49,9 +63,30 @@ Deno.test("pooledMap()", async () => { const start = Date.now(); const results = pooledMap( + 2, [1, 2, 3], (i) => new Promise((r) => setTimeout(() => r(i), 300)), + ); + for (const _ of Array(7)) { + time.tick(100); + await time.runMicrotasks(); + } + const array = await Array.fromAsync(results); + assertEquals(array, [1, 2, 3]); + const diff = Date.now() - start; + + assertGreaterOrEqual(diff, 600); + assertLess(diff, 900); +}); + +Deno.test("pooledMap() with options form", async () => { + using time = new FakeTime(); + + const start = Date.now(); + const results = pooledMap( { poolLimit: 2 }, + [1, 2, 3], + (i) => new Promise((r) => setTimeout(() => r(i), 300)), ); for (const _ of Array(7)) { time.tick(100); @@ -76,9 +111,7 @@ Deno.test("pooledMap() handles errors", async () => { const mappedNumbers: number[] = []; const error = await assertRejects( async () => { - for await ( - const m of pooledMap([1, 2, 3, 4], mapNumber, { poolLimit: 3 }) - ) { + for await (const m of pooledMap(3, [1, 2, 3, 4], mapNumber)) { mappedNumbers.push(m); } }, @@ -93,9 +126,9 @@ Deno.test("pooledMap() handles errors", async () => { Deno.test("pooledMap() returns ordered items", async () => { const results = pooledMap( + 2, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], (i) => new Promise((r) => setTimeout(() => r(i), 100 / i)), - { poolLimit: 2 }, ); const returned = await Array.fromAsync(results); @@ -108,9 +141,9 @@ Deno.test("pooledMap() checks browser compat", async () => { delete (ReadableStream.prototype as any)[Symbol.asyncIterator]; try { const results = pooledMap( + 2, [1, 2, 3], (i) => new Promise((r) => setTimeout(() => r(i), 100)), - { poolLimit: 2 }, ); const array = await Array.fromAsync(results); assertEquals(array, [1, 2, 3]); @@ -124,9 +157,9 @@ Deno.test("pooledMap() rejects immediately with already-aborted signal", async ( controller.abort(new Error("already aborted")); const results = pooledMap( + { poolLimit: 2, signal: controller.signal }, [1, 2, 3], (i) => Promise.resolve(i), - { poolLimit: 2, signal: controller.signal }, ); await assertRejects( @@ -141,6 +174,7 @@ Deno.test("pooledMap() stops processing when signal is aborted", async () => { const started: number[] = []; const results = pooledMap( + { poolLimit: 1, signal: controller.signal }, [1, 2, 3, 4, 5], async (i) => { started.push(i); @@ -148,7 +182,6 @@ Deno.test("pooledMap() stops processing when signal is aborted", async () => { if (i === 2) controller.abort(new Error("stop at 2")); return i; }, - { poolLimit: 1, signal: controller.signal }, ); const collected: number[] = []; @@ -170,12 +203,12 @@ Deno.test("pooledMap() aborts during pool wait", async () => { const controller = new AbortController(); const results = pooledMap( + { poolLimit: 2, signal: controller.signal }, [1, 2, 3, 4, 5, 6, 7, 8], async (i) => { await delay(200); return i; }, - { poolLimit: 2, signal: controller.signal }, ); setTimeout(() => controller.abort(new Error("timed out")), 50); @@ -189,12 +222,12 @@ Deno.test("pooledMap() aborts during pool wait", async () => { Deno.test("pooledMap() works normally without signal", async () => { const results = pooledMap( + { poolLimit: 2 }, [10, 20, 30], async (i) => { await delay(10); return i * 2; }, - { poolLimit: 2 }, ); const array = await Array.fromAsync(results);