Features

Request Queue

Rezo provides two queue classes for controlling concurrency, rate limiting, and task prioritization:

  • RezoQueue — a general-purpose priority queue with concurrency control.
  • HttpQueue — adds HTTP-aware features like per-domain concurrency, rate limit header handling, and automatic retry.

Enabling the Queue

The simplest way to use queuing is through Rezo’s instance configuration:

import rezo from 'rezo';

// All requests through this client are automatically queued
const client = rezo.create({
  queue: {
    concurrency: 5,
    interval: 1000,
    intervalCap: 2
  }
});

// These run with at most 5 concurrent, max 2 starting per second
const [users, posts, comments] = await Promise.all([
  client.get('https://api.example.com/users'),
  client.get('https://api.example.com/posts'),
  client.get('https://api.example.com/comments'),
]);

For HTTP-aware queuing with per-domain limits, add domainConcurrency:

const client = rezo.create({
  queue: {
    concurrency: 20,
    domainConcurrency: 3,
    retryOnRateLimit: true,
    maxRetries: 3
  }
});

RezoQueue

For standalone use or advanced control, you can create a queue directly:

import { RezoQueue } from 'rezo';

const queue = new RezoQueue({ concurrency: 5 });

// Add tasks using rezo
const result = await queue.add(() => rezo.get('https://api.example.com/data'));

// Add with priority
const urgent = await queue.add(() => rezo.get('https://api.example.com/critical'), {
  priority: 100
});

// Wait for all tasks to complete
await queue.onIdle();

Configuration

const queue = new RezoQueue({
  // Name for debugging and logging
  name: 'my-queue',

  // Maximum concurrent tasks (default: Infinity)
  concurrency: 10,

  // Auto-start processing when tasks are added (default: true)
  autoStart: true,

  // Timeout per task in milliseconds (default: none)
  timeout: 30_000,

  // Throw on timeout vs silently fail (default: true)
  throwOnTimeout: true,

  // Interval between task starts in ms (rate limiting)
  interval: 1000,

  // Max tasks to start per interval (default: Infinity)
  intervalCap: 5,

  // Carry over unused interval capacity to next interval
  carryoverConcurrencyCount: false,

  // Reject the task promise on error (default: true)
  rejectOnError: true
});

Adding Tasks

add()

Add a single async function. Returns a promise that resolves with the task result:

const result = await queue.add(async () => {
  const { data } = await rezo.get('https://api.example.com/users');
  return data;
});

console.log(result); // The parsed JSON

With options:

const result = await queue.add(
  () => fetchData(),
  {
    priority: 75,        // Higher runs first
    timeout: 10_000,     // 10s timeout for this task
    id: 'fetch-users',   // ID for tracking/cancellation
    signal: controller.signal  // AbortSignal for external cancellation
  }
);

addAll()

Add multiple tasks at once. Returns a promise resolving to an array of results:

const urls = ['/users', '/posts', '/comments'];

const results = await queue.addAll(
  urls.map(url => () => rezo.get(`https://api.example.com${url}`).then(r => r.data)),
  { priority: 50 }
);

console.log(results); // [usersData, postsData, commentsData]

Priority Levels

Higher priority tasks run first. Rezo exports preset constants:

import { Priority } from 'rezo';

Priority.LOWEST;   // 0
Priority.LOW;      // 25
Priority.NORMAL;   // 50
Priority.HIGH;     // 75
Priority.HIGHEST;  // 100
Priority.CRITICAL; // 1000
// Critical tasks jump to the front of the queue
await queue.add(() => handleAlert(), { priority: Priority.CRITICAL });

// Low-priority background work
await queue.add(() => syncCache(), { priority: Priority.LOWEST });

The default priority is 0.

Flow Control

pause() / start()

Pause stops new tasks from starting. Already-running tasks continue to completion:

queue.pause();

// Queue is paused -- tasks accumulate but don't execute
queue.add(() => doWork()); // Queued, not running

queue.start(); // Resume -- queued tasks begin executing

clear()

Remove all pending tasks from the queue. Running tasks are not affected:

queue.clear(); // Rejects all pending tasks with "Queue was cleared"

cancel()

Cancel a specific task by ID:

queue.add(() => slowWork(), { id: 'slow-task' });

// Later...
const wasCancelled = queue.cancel('slow-task'); // true if found

cancelBy()

Cancel tasks matching a predicate:

const count = queue.cancelBy((task) => task.priority < 50);
console.log(`Cancelled ${count} low-priority tasks`);

AbortSignal

Cancel tasks from outside using an AbortController:

const controller = new AbortController();

queue.add(() => longRunningTask(), {
  signal: controller.signal
});

// Cancel from outside
controller.abort();

Waiting for Completion

// Wait for all tasks (running + queued) to finish
await queue.onIdle();

// Wait for queue to be empty (no pending tasks, running may continue)
await queue.onEmpty();

// Wait for queue size to drop below a threshold
await queue.onSizeLessThan(10);

// With timeout (resolves if threshold not met in time)
await queue.onSizeLessThan(5, 30_000);

Rate Limiting

Limit how many tasks start per time interval:

// 2 requests per second
const queue = new RezoQueue({
  concurrency: 2,
  interval: 1000,
  intervalCap: 2
});

// 10 requests per minute
const queue2 = new RezoQueue({
  interval: 60_000,
  intervalCap: 10
});

The interval timer is automatically cleaned up when the queue empties, allowing Node.js to exit naturally.

Queue State and Statistics

const state = queue.state;
console.log(state);
// {
//   pending: 3,     // Running tasks
//   size: 12,       // Waiting in queue
//   total: 15,      // pending + size
//   isPaused: false,
//   isIdle: false
// }

const stats = queue.stats;
console.log(stats);
// {
//   added: 50,
//   processed: 38,
//   completed: 35,
//   failed: 3,
//   timedOut: 0,
//   cancelled: 2,
//   averageDuration: 245,    // ms
//   throughput: 4.2          // tasks/sec
// }

Shorthand properties:

queue.pending;     // Number of running tasks
queue.size;        // Number of queued tasks
queue.isPaused;    // Whether queue is paused
queue.concurrency; // Get/set concurrency limit

RezoQueue Events

queue.on('add', ({ id, priority }) => {
  console.log(`Task ${id} added with priority ${priority}`);
});

queue.on('start', ({ id }) => {
  console.log(`Task ${id} started`);
});

queue.on('completed', ({ id, result, duration }) => {
  console.log(`Task ${id} completed in ${duration}ms`);
});

queue.on('error', ({ id, error }) => {
  console.error(`Task ${id} failed:`, error.message);
});

queue.on('timeout', ({ id }) => {
  console.warn(`Task ${id} timed out`);
});

queue.on('cancelled', ({ id }) => {
  console.log(`Task ${id} cancelled`);
});

queue.on('active', () => {
  console.log('Queue became active');
});

queue.on('idle', () => {
  console.log('Queue is idle -- all tasks done');
});

queue.on('empty', () => {
  console.log('Queue emptied -- no pending tasks');
});

queue.on('paused', () => {
  console.log('Queue paused');
});

queue.on('resumed', () => {
  console.log('Queue resumed');
});

queue.on('next', () => {
  // Fired after each task completes, before the next starts
});

Remove event handlers:

const handler = (data) => console.log(data);
queue.on('completed', handler);

// Later...
queue.off('completed', handler);

Cleanup

queue.destroy(); // Clear all tasks, timers, and event handlers

HttpQueue

HttpQueue provides HTTP-specific features: per-domain concurrency, rate limit handling, automatic retry, and HTTP method-based priority.

Basic Usage

import { HttpQueue } from 'rezo';

const queue = new HttpQueue({
  concurrency: 10,
  domainConcurrency: 2,
  autoRetry: true,
  maxRetries: 3
});

// HTTP-aware task
const result = await queue.addHttp(
  () => rezo.get('https://api.example.com/data'),
  { domain: 'api.example.com', method: 'GET' }
);

Configuration

const queue = new HttpQueue({
  // --- Base queue options ---
  concurrency: 20,
  timeout: 30_000,

  // --- HTTP-specific options ---

  // Per-domain concurrency limit
  domainConcurrency: 3,

  // Or per-domain overrides
  domainConcurrency: {
    'api.example.com': 5,
    'cdn.example.com': 10,
    'slow-api.example.com': 1
  },

  // Global requests per second
  requestsPerSecond: 10,

  // Respect Retry-After headers (default: true)
  respectRetryAfter: true,

  // Respect X-RateLimit-* headers (default: true)
  respectRateLimitHeaders: true,

  // Automatic retry on failure (default: false)
  autoRetry: true,

  // Alias for autoRetry
  retryOnRateLimit: true,

  // Max retry attempts (default: 3)
  maxRetries: 3,

  // Delay between retries in ms (supports backoff function)
  retryDelay: 1000,

  // Or exponential backoff
  retryDelay: (attempt) => Math.pow(2, attempt) * 1000,

  // Status codes that trigger retry (default: [429, 500, 502, 503, 504])
  retryStatusCodes: [429, 500, 502, 503, 504]
});

addHttp()

Add an HTTP-aware task:

const result = await queue.addHttp(
  () => rezo.get('https://api.example.com/users'),
  {
    domain: 'api.example.com',  // For per-domain limiting
    method: 'GET',              // For method-based priority
    retry: 3,                   // Override max retries for this task
    retryDelay: 2000,           // Override retry delay for this task
    priority: 75,               // Override priority
    timeout: 10_000,            // Task-specific timeout
    id: 'fetch-users'           // For tracking/cancellation
  }
);

If domain is omitted, it defaults to 'default'. If method is omitted, it defaults to 'GET'.

HTTP Method Priority

When no explicit priority is set, HttpQueue assigns priority based on HTTP method:

MethodPriority
HEAD100
GET75
OPTIONS50
POST50
PUT50
PATCH50
DELETE25

This ensures read operations are prioritized over writes by default. Override with the priority option on any task.

Per-Domain Concurrency

Limit concurrent requests to each domain independently:

const queue = new HttpQueue({
  concurrency: 20,        // Global max
  domainConcurrency: 3    // Max 3 concurrent per domain
});

// These run with at most 3 concurrent per domain
queue.addHttp(() => rezo.get('https://api.example.com/a'), { domain: 'api.example.com' });
queue.addHttp(() => rezo.get('https://api.example.com/b'), { domain: 'api.example.com' });
queue.addHttp(() => rezo.get('https://api.example.com/c'), { domain: 'api.example.com' });
queue.addHttp(() => rezo.get('https://api.example.com/d'), { domain: 'api.example.com' }); // Waits

// Different domain -- runs immediately
queue.addHttp(() => rezo.get('https://cdn.example.com/asset'), { domain: 'cdn.example.com' });

Set per-domain limits dynamically:

queue.setDomainConcurrency('slow-api.example.com', 1);

Rate Limit Handling

When a server returns 429 Too Many Requests with a Retry-After header, tell the queue:

queue.handleRateLimit('api.example.com', 30); // Pause domain for 30 seconds

The domain is paused automatically and resumes after the specified seconds.

Domain Pause / Resume

Manually pause and resume requests to specific domains:

// Pause all requests to a domain
queue.pauseDomain('api.example.com');

// Check domain state
const state = queue.getDomainState('api.example.com');
console.log(state);
// {
//   pending: 2,
//   size: 5,
//   isPaused: true,
//   rateLimitedUntil: undefined
// }

// Resume
queue.resumeDomain('api.example.com');

HttpQueue Events

HttpQueue supports all base queue events plus HTTP-specific ones:

// Rate limit hit
queue.on('rateLimited', ({ domain, retryAfter }) => {
  console.warn(`${domain} rate limited for ${retryAfter}s`);
});

// Domain became available (after rate limit or pause)
queue.on('domainAvailable', ({ domain }) => {
  console.log(`${domain} is available again`);
});

// Task being retried
queue.on('retry', ({ id, attempt, error }) => {
  console.log(`Retrying task ${id} (attempt ${attempt}): ${error.message}`);
});

// All base events also work
queue.on('completed', ({ id, result, duration }) => {
  console.log(`HTTP task ${id} completed in ${duration}ms`);
});

queue.on('error', ({ id, error }) => {
  console.error(`HTTP task ${id} failed:`, error.message);
});

HTTP Queue Statistics

const stats = queue.httpStats;
console.log(stats);
// {
//   added: 100,
//   processed: 85,
//   completed: 80,
//   failed: 5,
//   timedOut: 0,
//   cancelled: 0,
//   averageDuration: 320,
//   throughput: 2.5,
//   retries: 12,
//   rateLimitHits: 3,
//   byDomain: {
//     'api.example.com': { pending: 2, completed: 50, failed: 3, rateLimited: 2 },
//     'cdn.example.com': { pending: 0, completed: 30, failed: 2, rateLimited: 1 }
//   }
// }

Cancel HTTP Tasks

queue.cancelHttp('task-id'); // Cancel a specific HTTP task
queue.clearHttp();           // Clear all HTTP tasks

Cleanup

queue.destroy(); // Clears both base and HTTP tasks, removes all handlers

Passing a Queue Instance

Instead of a config object, you can pass a queue instance directly for full control:

import rezo, { RezoQueue, HttpQueue } from 'rezo';

const queue = new RezoQueue({ concurrency: 5, interval: 1000, intervalCap: 2 });
const client = rezo.create({ queue });

// Or with HttpQueue
const httpQueue = new HttpQueue({ concurrency: 10, domainConcurrency: 3 });
const client2 = rezo.create({ queue: httpQueue });

This is useful when you want to share a queue across multiple Rezo instances or access queue methods directly (events, pause/resume, stats).