API Reference

RezoQueue

A high-performance task queue with priority-based ordering, configurable concurrency limits, interval-based rate limiting, task timeout and cancellation, and a comprehensive event system.

Constructor

new RezoQueue<T = any>(config?: QueueConfig)

QueueConfig

PropertyTypeDefaultDescription
namestringauto-generatedQueue name for debugging/logging.
concurrencynumberInfinityMaximum concurrent tasks.
autoStartbooleantrueAuto-start processing when tasks are added.
timeoutnumber0Per-task timeout in milliseconds. 0 = no timeout.
throwOnTimeoutbooleantrueThrow on timeout vs. silently fail.
intervalnumber0Interval between task starts in ms for rate limiting.
intervalCapnumberInfinityMax tasks to start per interval.
carryoverConcurrencyCountbooleanfalseCarry over unused interval capacity to next interval.
rejectOnErrorbooleantrueReject task promises on error (true) or resolve with undefined (false).

Example

import { RezoQueue } from 'rezo';

// Basic queue with concurrency
const queue = new RezoQueue({ concurrency: 5 });

// Rate-limited queue: max 2 tasks per second
const rateLimited = new RezoQueue({
  concurrency: 10,
  interval: 1000,
  intervalCap: 2
});

// Queue that starts paused
const manual = new RezoQueue({
  concurrency: 3,
  autoStart: false
});

Adding Tasks

add(fn, options?)

Adds a task to the queue. Returns a promise that resolves with the task result.

add<R = T>(fn: TaskFunction<R>, options?: TaskOptions): Promise<R>

TaskOptions

PropertyTypeDefaultDescription
prioritynumber0Task priority (higher runs first).
timeoutnumberQueue defaultTask-specific timeout override.
idstringauto-generatedUnique ID for tracking and cancellation.
signalAbortSignalundefinedExternal cancellation signal.

Example

// Basic task
const result = await queue.add(() => fetch('/api/data').then(r => r.json()));

// With priority (higher = runs first)
const urgent = await queue.add(() => fetchImportant(), { priority: 100 });

// With ID for cancellation
const taskId = 'download-123';
const promise = queue.add(() => downloadFile(), { id: taskId });

// With AbortSignal
const controller = new AbortController();
queue.add(() => longTask(), { signal: controller.signal });
controller.abort(); // Cancels the task

addAll(fns, options?)

Adds multiple tasks at once. Returns a promise that resolves to an array of results.

addAll<R = T>(fns: TaskFunction<R>[], options?: TaskOptions): Promise<R[]>

Example

const urls = ['/api/users', '/api/posts', '/api/comments'];
const results = await queue.addAll(
  urls.map(url => () => fetch(url).then(r => r.json()))
);

Queue Control

pause()

Pauses queue processing. Running tasks continue to completion; no new tasks are started.

pause(): void

start()

Resumes queue processing after a pause.

start(): void

clear()

Removes all pending tasks from the queue. Tasks are rejected (if rejectOnError: true) or resolved with undefined.

clear(): void

Example

queue.pause();
// Add tasks while paused...
queue.add(() => task1());
queue.add(() => task2());
// Resume when ready
queue.start();

// Clear all pending
queue.clear();

Cancellation

cancel(id)

Cancels a specific task by ID. Returns true if the task was found and cancelled.

cancel(id: string): boolean

cancelBy(predicate)

Cancels all tasks matching a predicate. Returns the number of tasks cancelled.

cancelBy(predicate: (task: { id: string; priority: number }) => boolean): number

Example

// Cancel specific task
queue.cancel('download-123');

// Cancel all low-priority tasks
const cancelled = queue.cancelBy(task => task.priority < 10);
console.log(`Cancelled ${cancelled} tasks`);

Waiting / Synchronization

onIdle()

Returns a promise that resolves when the queue becomes idle (no running or pending tasks). If called before any tasks are added, waits for the first task to be added and completed.

onIdle(): Promise<void>

onEmpty()

Returns a promise that resolves when the pending queue is empty (but tasks may still be running).

onEmpty(): Promise<void>

onSizeLessThan(limit, timeoutMs?)

Returns a promise that resolves when the queue size drops below limit. Optional timeout (resolves, not rejects, on timeout).

onSizeLessThan(limit: number, timeoutMs?: number): Promise<void>

Example

// Wait for all work to finish
await queue.onIdle();
console.log('All tasks complete');

// Wait for queue to drain
await queue.onEmpty();
console.log('No more waiting tasks');

// Backpressure: wait until queue has room
await queue.onSizeLessThan(100);
queue.add(() => newTask());

// With timeout to prevent blocking forever
await queue.onSizeLessThan(10, 5000);

Events

on(event, handler)

Registers an event handler.

on<E extends keyof QueueEvents>(event: E, handler: EventHandler<QueueEvents[E]>): void

off(event, handler)

Removes an event handler.

off<E extends keyof QueueEvents>(event: E, handler: EventHandler<QueueEvents[E]>): void

QueueEvents

EventPayloadDescription
add{ id: string; priority: number }Task added to queue.
start{ id: string }Task started executing.
completed{ id: string; result: any; duration: number }Task completed successfully.
error{ id: string; error: Error }Task failed with error.
timeout{ id: string }Task timed out.
cancelled{ id: string }Task cancelled.
activeundefinedQueue became active (was idle).
idleundefinedQueue became idle (all done).
pausedundefinedQueue was paused.
resumedundefinedQueue was resumed.
nextundefinedNext task about to run.
emptyundefinedQueue was emptied (no pending).

Example

queue.on('completed', ({ id, result, duration }) => {
  console.log(`Task ${id} completed in ${duration}ms`);
});

queue.on('error', ({ id, error }) => {
  console.error(`Task ${id} failed:`, error.message);
});

queue.on('idle', () => {
  console.log('Queue is idle');
});

// Remove handler
const handler = (data) => console.log(data);
queue.on('start', handler);
queue.off('start', handler);

Lifecycle

destroy()

Destroys the queue: clears all pending tasks, stops interval timers, and removes event handlers.

destroy(): void

Getters

state

Returns the current queue state.

get state: QueueState
interface QueueState {
  pending: number;   // Tasks currently running
  size: number;      // Tasks waiting in queue
  total: number;     // pending + size
  isPaused: boolean;
  isIdle: boolean;   // No running or pending tasks
}

stats

Returns queue statistics.

get stats: QueueStats
interface QueueStats {
  added: number;           // Total tasks added
  processed: number;       // Total tasks started
  completed: number;       // Total successes
  failed: number;          // Total failures
  timedOut: number;        // Total timeouts
  cancelled: number;       // Total cancellations
  averageDuration: number; // Average task duration (ms)
  throughput: number;      // Tasks per second (rolling average)
}

concurrency

Get or set the concurrency limit at runtime.

get concurrency: number
set concurrency(value: number)

pending

Number of tasks currently running.

get pending: number

size

Number of tasks waiting in the queue.

get size: number

isPaused

Whether the queue is paused.

get isPaused: boolean

Example

console.log(queue.state);
// { pending: 3, size: 12, total: 15, isPaused: false, isIdle: false }

console.log(queue.stats);
// { added: 50, processed: 38, completed: 35, failed: 2, timedOut: 1, ... }

// Dynamic concurrency adjustment
queue.concurrency = 10;

Priority Constants

import { Priority } from 'rezo';

const Priority = {
  LOWEST: 0,
  LOW: 25,
  NORMAL: 50,
  HIGH: 75,
  HIGHEST: 100,
  CRITICAL: 1000
} as const;

Example

queue.add(() => normalTask(), { priority: Priority.NORMAL });
queue.add(() => criticalTask(), { priority: Priority.CRITICAL });
queue.add(() => backgroundTask(), { priority: Priority.LOWEST });