Codexportfolio intelligence
Packages/TypeScript

@philiprehberger/promise-pool

Concurrent promise execution with configurable pool size

TypeScriptnpm

Capabilities

README

@philiprehberger/promise-pool

CI npm version Last updated

@philiprehberger/promise-pool

Concurrent promise execution with configurable pool size

Installation

npm install @philiprehberger/promise-pool

Usage

Batch Execution

import { promisePool } from '@philiprehberger/promise-pool';

const tasks = urls.map(url => () => fetch(url).then(r => r.json()));

const { results, errors } = await promisePool(tasks, {
  concurrency: 5,
  onProgress: ({ completed, total, percent }) => {
    console.log(`${percent}% done (${completed}/${total})`);
  },
});

Stop on Error

const { results, errors } = await promisePool(tasks, {
  concurrency: 3,
  stopOnError: true, // Stop scheduling new tasks after first failure
});

AbortSignal Support

Cancel remaining tasks using an AbortSignal:

const controller = new AbortController();

const { results, errors, aborted } = await promisePool(tasks, {
  concurrency: 5,
  signal: controller.signal,
});

// Cancel from elsewhere:
controller.abort();

if (aborted) {
  console.log('Pool was cancelled before all tasks completed');
}

Per-Task Timeout

Set a timeout (in milliseconds) for individual tasks. Tasks that exceed the timeout throw a TimeoutError:

import { promisePool, TimeoutError } from '@philiprehberger/promise-pool';

const { results, errors } = await promisePool(tasks, {
  concurrency: 5,
  taskTimeout: 3000, // 3 seconds per task
});

for (const { index, error } of errors) {
  if (error instanceof TimeoutError) {
    console.log(`Task ${index} timed out after ${error.timeout}ms`);
  }
}

Streaming Results

Process results as each task completes using the onResult callback, instead of waiting for all tasks to finish:

await promisePool(tasks, {
  concurrency: 5,
  onResult: ({ index, value, error, status }) => {
    if (status === 'fulfilled') {
      console.log(`Task ${index} completed:`, value);
    } else {
      console.log(`Task ${index} failed:`, error);
    }
  },
});

Streaming Errors

Receive errors the moment a task rejects, in addition to the aggregated errors array. Useful for logging, metrics, or early surface of failures while the pool keeps running:

import { promisePool } from '@philiprehberger/promise-pool';

const { results, errors } = await promisePool(tasks, {
  concurrency: 5,
  onError: ({ index, error }) => {
    console.error(`Task ${index} failed:`, error);
  },
});

onError is called synchronously the moment a task rejects, before any stopOnError handling. A throwing onError callback will not crash the pool.

Task Prioritization

Assign numeric priorities to tasks. Higher priority tasks are processed first:

import { promisePool } from '@philiprehberger/promise-pool';
import type { PrioritizedTask } from '@philiprehberger/promise-pool';

const tasks: PrioritizedTask<string>[] = [
  { task: () => fetch('/low').then(r => r.text()), priority: 1 },
  { task: () => fetch('/critical').then(r => r.text()), priority: 10 },
  { task: () => fetch('/medium').then(r => r.text()), priority: 5 },
];

const { results } = await promisePool(tasks, { concurrency: 2 });
// '/critical' runs first, then '/medium', then '/low'

Reusable Pool

import { createPool } from '@philiprehberger/promise-pool';

const pool = createPool({ concurrency: 3 });

// Tasks are queued and run with at most 3 concurrent
const result1 = pool.run(() => fetch('/api/1'));
const result2 = pool.run(() => fetch('/api/2'));
const result3 = pool.run(() => fetch('/api/3'));
const result4 = pool.run(() => fetch('/api/4')); // waits for a slot

API

ExportDescription
promisePool(tasks, options?)Execute tasks with concurrency limit, returns PoolResult
createPool(options?)Create a reusable pool with run() method
TimeoutErrorError class thrown when a task exceeds its timeout

PoolOptions

OptionTypeDefaultDescription
concurrencynumber5Max concurrent tasks
stopOnErrorbooleanfalseStop scheduling after first error
onProgress(progress) => voidProgress callback
signalAbortSignalSignal to cancel remaining tasks
taskTimeoutnumberPer-task timeout in milliseconds
onResult(result: TaskResult) => voidCallback invoked as each task completes
onError(error: PoolError) => voidCallback invoked synchronously when a task rejects, before stopOnError handling

PoolResult<T>

PropertyTypeDescription
results(T | undefined)[]Results in original order (undefined for failed tasks)
errorsPoolError[]Array of { index, error } for failed tasks
abortedbooleanWhether the pool was cancelled via AbortSignal

PrioritizedTask<T>

PropertyTypeDescription
task() => Promise<T>The async task function
prioritynumberPriority level (higher runs first, default 0)

TaskResult<T>

PropertyTypeDescription
indexnumberOriginal index of the task
valueT | undefinedResolved value (when fulfilled)
errorunknownError (when rejected)
status'fulfilled' | 'rejected'Whether the task succeeded or failed

TimeoutError

PropertyTypeDescription
indexnumberIndex of the timed-out task
timeoutnumberTimeout duration in milliseconds

Development

npm install
npm run build
npm test

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT