RezoQueue
A high-performance task queue with priority-based ordering, configurable concurrency limits, interval-based rate limiting, task timeout and cancellation, and a comprehensive event system.
Constructor
new RezoQueue<T = any>(config?: QueueConfig) QueueConfig
| Property | Type | Default | Description |
|---|---|---|---|
name | string | auto-generated | Queue name for debugging/logging. |
concurrency | number | Infinity | Maximum concurrent tasks. |
autoStart | boolean | true | Auto-start processing when tasks are added. |
timeout | number | 0 | Per-task timeout in milliseconds. 0 = no timeout. |
throwOnTimeout | boolean | true | Throw on timeout vs. silently fail. |
interval | number | 0 | Interval between task starts in ms for rate limiting. |
intervalCap | number | Infinity | Max tasks to start per interval. |
carryoverConcurrencyCount | boolean | false | Carry over unused interval capacity to next interval. |
rejectOnError | boolean | true | Reject task promises on error (true) or resolve with undefined (false). |
Example
import { RezoQueue } from 'rezo';
// Basic queue with concurrency
const queue = new RezoQueue({ concurrency: 5 });
// Rate-limited queue: max 2 tasks per second
const rateLimited = new RezoQueue({
concurrency: 10,
interval: 1000,
intervalCap: 2
});
// Queue that starts paused
const manual = new RezoQueue({
concurrency: 3,
autoStart: false
}); Adding Tasks
add(fn, options?)
Adds a task to the queue. Returns a promise that resolves with the task result.
add<R = T>(fn: TaskFunction<R>, options?: TaskOptions): Promise<R> TaskOptions
| Property | Type | Default | Description |
|---|---|---|---|
priority | number | 0 | Task priority (higher runs first). |
timeout | number | Queue default | Task-specific timeout override. |
id | string | auto-generated | Unique ID for tracking and cancellation. |
signal | AbortSignal | undefined | External cancellation signal. |
Example
// Basic task
const result = await queue.add(() => fetch('/api/data').then(r => r.json()));
// With priority (higher = runs first)
const urgent = await queue.add(() => fetchImportant(), { priority: 100 });
// With ID for cancellation
const taskId = 'download-123';
const promise = queue.add(() => downloadFile(), { id: taskId });
// With AbortSignal
const controller = new AbortController();
queue.add(() => longTask(), { signal: controller.signal });
controller.abort(); // Cancels the task addAll(fns, options?)
Adds multiple tasks at once. Returns a promise that resolves to an array of results.
addAll<R = T>(fns: TaskFunction<R>[], options?: TaskOptions): Promise<R[]> Example
const urls = ['/api/users', '/api/posts', '/api/comments'];
const results = await queue.addAll(
urls.map(url => () => fetch(url).then(r => r.json()))
); Queue Control
pause()
Pauses queue processing. Running tasks continue to completion; no new tasks are started.
pause(): void start()
Resumes queue processing after a pause.
start(): void clear()
Removes all pending tasks from the queue. Tasks are rejected (if rejectOnError: true) or resolved with undefined.
clear(): void Example
queue.pause();
// Add tasks while paused...
queue.add(() => task1());
queue.add(() => task2());
// Resume when ready
queue.start();
// Clear all pending
queue.clear(); Cancellation
cancel(id)
Cancels a specific task by ID. Returns true if the task was found and cancelled.
cancel(id: string): boolean cancelBy(predicate)
Cancels all tasks matching a predicate. Returns the number of tasks cancelled.
cancelBy(predicate: (task: { id: string; priority: number }) => boolean): number Example
// Cancel specific task
queue.cancel('download-123');
// Cancel all low-priority tasks
const cancelled = queue.cancelBy(task => task.priority < 10);
console.log(`Cancelled ${cancelled} tasks`); Waiting / Synchronization
onIdle()
Returns a promise that resolves when the queue becomes idle (no running or pending tasks). If called before any tasks are added, waits for the first task to be added and completed.
onIdle(): Promise<void> onEmpty()
Returns a promise that resolves when the pending queue is empty (but tasks may still be running).
onEmpty(): Promise<void> onSizeLessThan(limit, timeoutMs?)
Returns a promise that resolves when the queue size drops below limit. Optional timeout (resolves, not rejects, on timeout).
onSizeLessThan(limit: number, timeoutMs?: number): Promise<void> Example
// Wait for all work to finish
await queue.onIdle();
console.log('All tasks complete');
// Wait for queue to drain
await queue.onEmpty();
console.log('No more waiting tasks');
// Backpressure: wait until queue has room
await queue.onSizeLessThan(100);
queue.add(() => newTask());
// With timeout to prevent blocking forever
await queue.onSizeLessThan(10, 5000); Events
on(event, handler)
Registers an event handler.
on<E extends keyof QueueEvents>(event: E, handler: EventHandler<QueueEvents[E]>): void off(event, handler)
Removes an event handler.
off<E extends keyof QueueEvents>(event: E, handler: EventHandler<QueueEvents[E]>): void QueueEvents
| Event | Payload | Description |
|---|---|---|
add | { id: string; priority: number } | Task added to queue. |
start | { id: string } | Task started executing. |
completed | { id: string; result: any; duration: number } | Task completed successfully. |
error | { id: string; error: Error } | Task failed with error. |
timeout | { id: string } | Task timed out. |
cancelled | { id: string } | Task cancelled. |
active | undefined | Queue became active (was idle). |
idle | undefined | Queue became idle (all done). |
paused | undefined | Queue was paused. |
resumed | undefined | Queue was resumed. |
next | undefined | Next task about to run. |
empty | undefined | Queue was emptied (no pending). |
Example
queue.on('completed', ({ id, result, duration }) => {
console.log(`Task ${id} completed in ${duration}ms`);
});
queue.on('error', ({ id, error }) => {
console.error(`Task ${id} failed:`, error.message);
});
queue.on('idle', () => {
console.log('Queue is idle');
});
// Remove handler
const handler = (data) => console.log(data);
queue.on('start', handler);
queue.off('start', handler); Lifecycle
destroy()
Destroys the queue: clears all pending tasks, stops interval timers, and removes event handlers.
destroy(): void Getters
state
Returns the current queue state.
get state: QueueState interface QueueState {
pending: number; // Tasks currently running
size: number; // Tasks waiting in queue
total: number; // pending + size
isPaused: boolean;
isIdle: boolean; // No running or pending tasks
} stats
Returns queue statistics.
get stats: QueueStats interface QueueStats {
added: number; // Total tasks added
processed: number; // Total tasks started
completed: number; // Total successes
failed: number; // Total failures
timedOut: number; // Total timeouts
cancelled: number; // Total cancellations
averageDuration: number; // Average task duration (ms)
throughput: number; // Tasks per second (rolling average)
} concurrency
Get or set the concurrency limit at runtime.
get concurrency: number
set concurrency(value: number) pending
Number of tasks currently running.
get pending: number size
Number of tasks waiting in the queue.
get size: number isPaused
Whether the queue is paused.
get isPaused: boolean Example
console.log(queue.state);
// { pending: 3, size: 12, total: 15, isPaused: false, isIdle: false }
console.log(queue.stats);
// { added: 50, processed: 38, completed: 35, failed: 2, timedOut: 1, ... }
// Dynamic concurrency adjustment
queue.concurrency = 10; Priority Constants
import { Priority } from 'rezo';
const Priority = {
LOWEST: 0,
LOW: 25,
NORMAL: 50,
HIGH: 75,
HIGHEST: 100,
CRITICAL: 1000
} as const; Example
queue.add(() => normalTask(), { priority: Priority.NORMAL });
queue.add(() => criticalTask(), { priority: Priority.CRITICAL });
queue.add(() => backgroundTask(), { priority: Priority.LOWEST });