Request Queue
Rezo provides two queue classes for controlling concurrency, rate limiting, and task prioritization:
- RezoQueue — a general-purpose priority queue with concurrency control.
- HttpQueue — adds HTTP-aware features like per-domain concurrency, rate limit header handling, and automatic retry.
Enabling the Queue
The simplest way to use queuing is through Rezo’s instance configuration:
import rezo from 'rezo';
// All requests through this client are automatically queued
const client = rezo.create({
queue: {
concurrency: 5,
interval: 1000,
intervalCap: 2
}
});
// These run with at most 5 concurrent, max 2 starting per second
const [users, posts, comments] = await Promise.all([
client.get('https://api.example.com/users'),
client.get('https://api.example.com/posts'),
client.get('https://api.example.com/comments'),
]); For HTTP-aware queuing with per-domain limits, add domainConcurrency:
const client = rezo.create({
queue: {
concurrency: 20,
domainConcurrency: 3,
retryOnRateLimit: true,
maxRetries: 3
}
}); RezoQueue
For standalone use or advanced control, you can create a queue directly:
import { RezoQueue } from 'rezo';
const queue = new RezoQueue({ concurrency: 5 });
// Add tasks using rezo
const result = await queue.add(() => rezo.get('https://api.example.com/data'));
// Add with priority
const urgent = await queue.add(() => rezo.get('https://api.example.com/critical'), {
priority: 100
});
// Wait for all tasks to complete
await queue.onIdle(); Configuration
const queue = new RezoQueue({
// Name for debugging and logging
name: 'my-queue',
// Maximum concurrent tasks (default: Infinity)
concurrency: 10,
// Auto-start processing when tasks are added (default: true)
autoStart: true,
// Timeout per task in milliseconds (default: none)
timeout: 30_000,
// Throw on timeout vs silently fail (default: true)
throwOnTimeout: true,
// Interval between task starts in ms (rate limiting)
interval: 1000,
// Max tasks to start per interval (default: Infinity)
intervalCap: 5,
// Carry over unused interval capacity to next interval
carryoverConcurrencyCount: false,
// Reject the task promise on error (default: true)
rejectOnError: true
}); Adding Tasks
add()
Add a single async function. Returns a promise that resolves with the task result:
const result = await queue.add(async () => {
const { data } = await rezo.get('https://api.example.com/users');
return data;
});
console.log(result); // The parsed JSON With options:
const result = await queue.add(
() => fetchData(),
{
priority: 75, // Higher runs first
timeout: 10_000, // 10s timeout for this task
id: 'fetch-users', // ID for tracking/cancellation
signal: controller.signal // AbortSignal for external cancellation
}
); addAll()
Add multiple tasks at once. Returns a promise resolving to an array of results:
const urls = ['/users', '/posts', '/comments'];
const results = await queue.addAll(
urls.map(url => () => rezo.get(`https://api.example.com${url}`).then(r => r.data)),
{ priority: 50 }
);
console.log(results); // [usersData, postsData, commentsData] Priority Levels
Higher priority tasks run first. Rezo exports preset constants:
import { Priority } from 'rezo';
Priority.LOWEST; // 0
Priority.LOW; // 25
Priority.NORMAL; // 50
Priority.HIGH; // 75
Priority.HIGHEST; // 100
Priority.CRITICAL; // 1000 // Critical tasks jump to the front of the queue
await queue.add(() => handleAlert(), { priority: Priority.CRITICAL });
// Low-priority background work
await queue.add(() => syncCache(), { priority: Priority.LOWEST }); The default priority is 0.
Flow Control
pause() / start()
Pause stops new tasks from starting. Already-running tasks continue to completion:
queue.pause();
// Queue is paused -- tasks accumulate but don't execute
queue.add(() => doWork()); // Queued, not running
queue.start(); // Resume -- queued tasks begin executing clear()
Remove all pending tasks from the queue. Running tasks are not affected:
queue.clear(); // Rejects all pending tasks with "Queue was cleared" cancel()
Cancel a specific task by ID:
queue.add(() => slowWork(), { id: 'slow-task' });
// Later...
const wasCancelled = queue.cancel('slow-task'); // true if found cancelBy()
Cancel tasks matching a predicate:
const count = queue.cancelBy((task) => task.priority < 50);
console.log(`Cancelled ${count} low-priority tasks`); AbortSignal
Cancel tasks from outside using an AbortController:
const controller = new AbortController();
queue.add(() => longRunningTask(), {
signal: controller.signal
});
// Cancel from outside
controller.abort(); Waiting for Completion
// Wait for all tasks (running + queued) to finish
await queue.onIdle();
// Wait for queue to be empty (no pending tasks, running may continue)
await queue.onEmpty();
// Wait for queue size to drop below a threshold
await queue.onSizeLessThan(10);
// With timeout (resolves if threshold not met in time)
await queue.onSizeLessThan(5, 30_000); Rate Limiting
Limit how many tasks start per time interval:
// 2 requests per second
const queue = new RezoQueue({
concurrency: 2,
interval: 1000,
intervalCap: 2
});
// 10 requests per minute
const queue2 = new RezoQueue({
interval: 60_000,
intervalCap: 10
}); The interval timer is automatically cleaned up when the queue empties, allowing Node.js to exit naturally.
Queue State and Statistics
const state = queue.state;
console.log(state);
// {
// pending: 3, // Running tasks
// size: 12, // Waiting in queue
// total: 15, // pending + size
// isPaused: false,
// isIdle: false
// }
const stats = queue.stats;
console.log(stats);
// {
// added: 50,
// processed: 38,
// completed: 35,
// failed: 3,
// timedOut: 0,
// cancelled: 2,
// averageDuration: 245, // ms
// throughput: 4.2 // tasks/sec
// } Shorthand properties:
queue.pending; // Number of running tasks
queue.size; // Number of queued tasks
queue.isPaused; // Whether queue is paused
queue.concurrency; // Get/set concurrency limit RezoQueue Events
queue.on('add', ({ id, priority }) => {
console.log(`Task ${id} added with priority ${priority}`);
});
queue.on('start', ({ id }) => {
console.log(`Task ${id} started`);
});
queue.on('completed', ({ id, result, duration }) => {
console.log(`Task ${id} completed in ${duration}ms`);
});
queue.on('error', ({ id, error }) => {
console.error(`Task ${id} failed:`, error.message);
});
queue.on('timeout', ({ id }) => {
console.warn(`Task ${id} timed out`);
});
queue.on('cancelled', ({ id }) => {
console.log(`Task ${id} cancelled`);
});
queue.on('active', () => {
console.log('Queue became active');
});
queue.on('idle', () => {
console.log('Queue is idle -- all tasks done');
});
queue.on('empty', () => {
console.log('Queue emptied -- no pending tasks');
});
queue.on('paused', () => {
console.log('Queue paused');
});
queue.on('resumed', () => {
console.log('Queue resumed');
});
queue.on('next', () => {
// Fired after each task completes, before the next starts
}); Remove event handlers:
const handler = (data) => console.log(data);
queue.on('completed', handler);
// Later...
queue.off('completed', handler); Cleanup
queue.destroy(); // Clear all tasks, timers, and event handlers HttpQueue
HttpQueue provides HTTP-specific features: per-domain concurrency, rate limit handling, automatic retry, and HTTP method-based priority.
Basic Usage
import { HttpQueue } from 'rezo';
const queue = new HttpQueue({
concurrency: 10,
domainConcurrency: 2,
autoRetry: true,
maxRetries: 3
});
// HTTP-aware task
const result = await queue.addHttp(
() => rezo.get('https://api.example.com/data'),
{ domain: 'api.example.com', method: 'GET' }
); Configuration
const queue = new HttpQueue({
// --- Base queue options ---
concurrency: 20,
timeout: 30_000,
// --- HTTP-specific options ---
// Per-domain concurrency limit
domainConcurrency: 3,
// Or per-domain overrides
domainConcurrency: {
'api.example.com': 5,
'cdn.example.com': 10,
'slow-api.example.com': 1
},
// Global requests per second
requestsPerSecond: 10,
// Respect Retry-After headers (default: true)
respectRetryAfter: true,
// Respect X-RateLimit-* headers (default: true)
respectRateLimitHeaders: true,
// Automatic retry on failure (default: false)
autoRetry: true,
// Alias for autoRetry
retryOnRateLimit: true,
// Max retry attempts (default: 3)
maxRetries: 3,
// Delay between retries in ms (supports backoff function)
retryDelay: 1000,
// Or exponential backoff
retryDelay: (attempt) => Math.pow(2, attempt) * 1000,
// Status codes that trigger retry (default: [429, 500, 502, 503, 504])
retryStatusCodes: [429, 500, 502, 503, 504]
}); addHttp()
Add an HTTP-aware task:
const result = await queue.addHttp(
() => rezo.get('https://api.example.com/users'),
{
domain: 'api.example.com', // For per-domain limiting
method: 'GET', // For method-based priority
retry: 3, // Override max retries for this task
retryDelay: 2000, // Override retry delay for this task
priority: 75, // Override priority
timeout: 10_000, // Task-specific timeout
id: 'fetch-users' // For tracking/cancellation
}
); If domain is omitted, it defaults to 'default'. If method is omitted, it defaults to 'GET'.
HTTP Method Priority
When no explicit priority is set, HttpQueue assigns priority based on HTTP method:
| Method | Priority |
|---|---|
| HEAD | 100 |
| GET | 75 |
| OPTIONS | 50 |
| POST | 50 |
| PUT | 50 |
| PATCH | 50 |
| DELETE | 25 |
This ensures read operations are prioritized over writes by default. Override with the priority option on any task.
Per-Domain Concurrency
Limit concurrent requests to each domain independently:
const queue = new HttpQueue({
concurrency: 20, // Global max
domainConcurrency: 3 // Max 3 concurrent per domain
});
// These run with at most 3 concurrent per domain
queue.addHttp(() => rezo.get('https://api.example.com/a'), { domain: 'api.example.com' });
queue.addHttp(() => rezo.get('https://api.example.com/b'), { domain: 'api.example.com' });
queue.addHttp(() => rezo.get('https://api.example.com/c'), { domain: 'api.example.com' });
queue.addHttp(() => rezo.get('https://api.example.com/d'), { domain: 'api.example.com' }); // Waits
// Different domain -- runs immediately
queue.addHttp(() => rezo.get('https://cdn.example.com/asset'), { domain: 'cdn.example.com' }); Set per-domain limits dynamically:
queue.setDomainConcurrency('slow-api.example.com', 1); Rate Limit Handling
When a server returns 429 Too Many Requests with a Retry-After header, tell the queue:
queue.handleRateLimit('api.example.com', 30); // Pause domain for 30 seconds The domain is paused automatically and resumes after the specified seconds.
Domain Pause / Resume
Manually pause and resume requests to specific domains:
// Pause all requests to a domain
queue.pauseDomain('api.example.com');
// Check domain state
const state = queue.getDomainState('api.example.com');
console.log(state);
// {
// pending: 2,
// size: 5,
// isPaused: true,
// rateLimitedUntil: undefined
// }
// Resume
queue.resumeDomain('api.example.com'); HttpQueue Events
HttpQueue supports all base queue events plus HTTP-specific ones:
// Rate limit hit
queue.on('rateLimited', ({ domain, retryAfter }) => {
console.warn(`${domain} rate limited for ${retryAfter}s`);
});
// Domain became available (after rate limit or pause)
queue.on('domainAvailable', ({ domain }) => {
console.log(`${domain} is available again`);
});
// Task being retried
queue.on('retry', ({ id, attempt, error }) => {
console.log(`Retrying task ${id} (attempt ${attempt}): ${error.message}`);
});
// All base events also work
queue.on('completed', ({ id, result, duration }) => {
console.log(`HTTP task ${id} completed in ${duration}ms`);
});
queue.on('error', ({ id, error }) => {
console.error(`HTTP task ${id} failed:`, error.message);
}); HTTP Queue Statistics
const stats = queue.httpStats;
console.log(stats);
// {
// added: 100,
// processed: 85,
// completed: 80,
// failed: 5,
// timedOut: 0,
// cancelled: 0,
// averageDuration: 320,
// throughput: 2.5,
// retries: 12,
// rateLimitHits: 3,
// byDomain: {
// 'api.example.com': { pending: 2, completed: 50, failed: 3, rateLimited: 2 },
// 'cdn.example.com': { pending: 0, completed: 30, failed: 2, rateLimited: 1 }
// }
// } Cancel HTTP Tasks
queue.cancelHttp('task-id'); // Cancel a specific HTTP task
queue.clearHttp(); // Clear all HTTP tasks Cleanup
queue.destroy(); // Clears both base and HTTP tasks, removes all handlers Passing a Queue Instance
Instead of a config object, you can pass a queue instance directly for full control:
import rezo, { RezoQueue, HttpQueue } from 'rezo';
const queue = new RezoQueue({ concurrency: 5, interval: 1000, intervalCap: 2 });
const client = rezo.create({ queue });
// Or with HttpQueue
const httpQueue = new HttpQueue({ concurrency: 10, domainConcurrency: 3 });
const client2 = rezo.create({ queue: httpQueue }); This is useful when you want to share a queue across multiple Rezo instances or access queue methods directly (events, pause/resume, stats).