JavaScript Client Guide
Complete guide for the Queen MQ JavaScript/Node.js client library.
Installation
npm install queen-mqRequirements: Node.js 22+ (required for native fetch and modern JS features)
Table of Contents
Getting Started
Import and Connect
import { Queen } from 'queen-mq'
// Single server
const queen = new Queen('http://localhost:6632')
// Multiple servers (high availability)
const queen = new Queen([
'http://server1:6632',
'http://server2:6632'
])
// Full configuration
const queen = new Queen({
urls: ['http://server1:6632', 'http://server2:6632', 'http://server3:6632'],
timeoutMillis: 30000,
retryAttempts: 3,
loadBalancingStrategy: 'affinity', // 'affinity', 'round-robin', or 'session'
affinityHashRing: 128, // Virtual nodes per server (for affinity)
enableFailover: true
})Load Balancing & Affinity Routing
When connecting to multiple Queen servers, you can choose how requests are distributed. The client supports three load balancing strategies.
Affinity Mode (Recommended for Production)
Best for: Production deployments with 3+ servers and multiple consumer groups.
Affinity mode uses consistent hashing with virtual nodes to route consumer groups to the same backend server. This optimizes database queries by consolidating poll intentions on a single server.
const queen = new Queen({
urls: [
'http://queen-server-1:6632',
'http://queen-server-2:6632',
'http://queen-server-3:6632'
],
loadBalancingStrategy: 'affinity',
affinityHashRing: 150 // Virtual nodes per server (default: 150)
})How it works:
Each consumer generates an affinity key from its parameters:
- Queue-based:
queue:partition:consumerGroup - Namespace-based:
namespace:task:consumerGroup
- Queue-based:
The key is hashed using FNV-1a algorithm (fast, deterministic)
Hash is mapped to a virtual node on the consistent hash ring
Virtual node maps to a real backend server
Same key always routes to same server (unless server is unhealthy)
Example:
// Consumer 1
await queen.queue('orders')
.partition('priority')
.group('email-processor')
.consume(async (msg) => {
// Affinity key: "orders:priority:email-processor"
// Routes to: server-2 (based on hash)
})
// Consumer 2 (same group, different worker)
await queen.queue('orders')
.partition('priority')
.group('email-processor')
.consume(async (msg) => {
// Same affinity key: "orders:priority:email-processor"
// Routes to: server-2 (same server!)
// → Poll intentions consolidated
// → Single DB query serves both workers
})
// Consumer 3 (different group)
await queen.queue('orders')
.partition('priority')
.group('analytics')
.consume(async (msg) => {
// Different affinity key: "orders:priority:analytics"
// Routes to: server-1 (different server)
// → Load is distributed across servers
})Benefits:
- ✅ Optimized Database Queries - Poll intentions for same consumer group are consolidated on one server, reducing duplicate SELECT queries
- ✅ Better Cache Locality - In-memory poll registry stays warm on the same server
- ✅ Graceful Failover - If a server fails, only ~33% of consumer groups move to other servers
- ✅ Works with HA - Perfect for 3-server high-availability setups
- ✅ Automatic - No manual configuration required
Virtual Nodes:
The affinityHashRing parameter controls how many virtual nodes each server gets:
affinityHashRing: 150 // Default: good for 3-5 servers
affinityHashRing: 300 // Better distribution, more memory (~14KB total)
affinityHashRing: 50 // Less memory, worse distributionPerformance Impact
With affinity routing and 3 backend servers, multiple workers in the same consumer group will hit the same backend, allowing the server to consolidate poll intentions and serve them with a single database query instead of multiple queries.
Round-Robin Mode
Best for: Simple setups where poll optimization is not critical.
Cycles through servers in order. Simple but doesn't optimize for poll intention consolidation.
const queen = new Queen({
urls: ['http://server1:6632', 'http://server2:6632'],
loadBalancingStrategy: 'round-robin'
})Each request goes to the next server in the list. Load is evenly distributed but consumer groups may hit different servers on each poll.
Session Mode
Best for: Single client instance that should stick to one server.
Each client instance picks a server and sticks to it for all requests.
const queen = new Queen({
urls: ['http://server1:6632', 'http://server2:6632'],
loadBalancingStrategy: 'session'
})Comparison
| Feature | Affinity | Round-Robin | Session |
|---|---|---|---|
| Poll Consolidation | ✅ Yes | ❌ No | ⚠️ Partial |
| Load Distribution | ✅ Good | ✅ Perfect | ❌ Poor |
| Failover | ✅ Graceful | ✅ Automatic | ✅ Automatic |
| Memory | ~10KB | Minimal | Minimal |
| Best For | Production | Testing | Simple apps |
Push vs Consume Routing
Important: Affinity routing is only applied to consumer operations (pop/consume), not push operations.
- Consumers (pop/consume): Use affinity key for consistent routing
- Producers (push): Use default strategy (round-robin) for even write distribution
This gives you the best of both worlds: optimized reads with affinity, balanced writes without hotspots.
// Push - uses round-robin (even distribution)
await queen.queue('orders').push([{ data: { order: 123 } }])
// Consume - uses affinity (consolidated polling)
await queen.queue('orders').group('processors').consume(async (msg) => {
// Same group always routes to same server
})Monitoring Affinity Routing
You can check the load balancer status:
const httpClient = queen._httpClient
const loadBalancer = httpClient.getLoadBalancer()
if (loadBalancer) {
console.log('Strategy:', loadBalancer.getStrategy())
console.log('Virtual nodes:', loadBalancer.getVirtualNodeCount())
console.log('Servers:', loadBalancer.getAllUrls())
console.log('Health:', loadBalancer.getHealthStatus())
}Queue Management
Create Queue
// Simple creation
await queen.queue('my-tasks').create()
// With configuration
await queen.queue('my-tasks')
.config({
leaseTime: 60, // 60 seconds to process
retryLimit: 3, // Max 3 retries
priority: 5, // Priority level (0-10)
dlqAfterMaxRetries: true, // Auto move to DLQ after max retries
retentionSeconds: 86400, // Keep messages 24 hours
encryptionEnabled: false
})
.create()Delete Queue
await queen.queue('my-tasks').delete()Warning
This deletes all messages, partitions, and consumer group state. Cannot be undone!
Pushing Messages
Basic Push
await queen.queue('tasks').push([
{ data: { task: 'send-email', to: 'user@example.com' } }
])Multiple Messages
await queen.queue('tasks').push([
{ data: { task: 'send-email', to: 'alice@example.com' } },
{ data: { task: 'send-email', to: 'bob@example.com' } },
{ data: { task: 'resize-image', id: 123 } }
])With Custom Transaction ID
await queen.queue('tasks').push([
{
transactionId: 'unique-id-123', // Auto-generated if not provided
data: { task: 'process-order', orderId: 456 }
}
])With Callbacks
await queen.queue('tasks').push([...])
.onSuccess(async (messages) => {
console.log('Pushed successfully:', messages)
})
.onDuplicate(async (messages) => {
console.warn('Duplicate transaction IDs detected')
})
.onError(async (messages, error) => {
console.error('Push failed:', error)
})Consuming Messages
Basic Consume
The easiest way to process messages continuously:
await queen.queue('my-tasks').consume(async (message) => {
console.log('Processing:', message.data)
// Do your work
await processTask(message.data)
// Auto-ack on success, auto-nack on error
})What happens:
- Consumer pulls messages from the queue
- Your function processes each message
- If successful → message marked as complete ✅
- If error → message goes back for retry 🔄
With Configuration
await queen.queue('tasks')
.concurrency(10) // 10 parallel workers
.batch(20) // Fetch 20 at a time
.autoAck(false) // Manual ack
.renewLease(true, 5000) // Auto-renew every 5s
.each() // Process individually
.consume(async (message) => {
await process(message.data)
})
.onSuccess(async (message) => {
await queen.ack(message, true)
})
.onError(async (message, error) => {
console.error('Failed:', error)
await queen.ack(message, false)
})Process Limited Messages
// Process exactly 100 messages then stop
await queen.queue('tasks')
.limit(100)
.consume(async (message) => {
await processMessage(message.data)
})Batch Processing
// Process messages in batches
await queen.queue('events')
.batch(10)
.consume(async (messages) => {
// messages is an array of 10 messages
for (const msg of messages) {
await process(msg)
}
})Pop vs Consume
The Consume Way (Recommended)
Use when: Long-running worker that continuously processes messages.
await queen.queue('tasks').consume(async (message) => {
// Auto-loops, auto-ack, long-polling built-in
})The Pop Way
Use when: Manual control over message fetching.
const messages = await queen.queue('tasks').pop()
if (messages.length > 0) {
const message = messages[0]
try {
await processMessage(message.data)
await queen.ack(message, true)
} catch (error) {
await queen.ack(message, false, { error: error.message })
}
}Pop with Long Polling
// Wait up to 30 seconds for messages
const messages = await queen.queue('tasks')
.batch(10)
.wait(true)
.timeout(30000)
.pop()Partitions
Use partitions for ordering guarantees. Messages in the same partition are processed in order.
Push to Partition
await queen.queue('user-events')
.partition('user-123')
.push([
{ data: { event: 'login', timestamp: Date.now() } },
{ data: { event: 'purchase', orderId: 456 } },
{ data: { event: 'logout', timestamp: Date.now() } }
])Consume from Partition
await queen.queue('user-events')
.partition('user-123')
.consume(async (message) => {
// Messages processed in exact order
console.log('User 123:', message.data.event)
})Important: Messages in different partitions are independent.
Consumer Groups
Consumer groups enable:
- Multiple workers sharing the same queue
- Fan-out patterns (same message to multiple groups)
- Message replay from any point
Basic Consumer Group
// Worker 1 in group "processors"
await queen.queue('emails')
.group('processors')
.consume(async (message) => {
console.log('Worker 1 processing:', message.data)
})
// Worker 2 in SAME group (shares the load)
await queen.queue('emails')
.group('processors')
.consume(async (message) => {
console.log('Worker 2 processing:', message.data)
})Messages are distributed between workers. Each message goes to only ONE worker.
Multiple Consumer Groups (Fan-Out)
// Group 1: Send emails
await queen.queue('notifications')
.group('email-sender')
.consume(async (message) => {
await sendEmail(message.data)
})
// Group 2: Log to analytics (processes THE SAME messages)
await queen.queue('notifications')
.group('analytics')
.consume(async (message) => {
await trackEvent(message.data)
})Every message is processed by BOTH groups independently! 🎉
Subscription Modes
Control whether consumer groups process historical messages or only new ones.
Default Behavior (Process All Messages):
await queen.queue('events')
.group('new-analytics')
.consume(async (message) => {
// Processes ALL messages, including historical
})Skip Historical Messages:
await queen.queue('events')
.group('realtime-monitor')
.subscriptionMode('new') // Skip history
.consume(async (message) => {
// Only processes messages arriving after subscription
})Subscribe from Specific Timestamp:
const startTime = '2025-10-28T10:00:00.000Z'
await queen.queue('events')
.group('replay-from-10am')
.subscriptionFrom(startTime)
.consume(async (message) => {
// Process messages from 10am onwards
})Server Default:
The server can be configured to change default subscription behavior:
export DEFAULT_SUBSCRIPTION_MODE="new"
./bin/queen-serverWhen set, new consumer groups automatically skip historical messages unless you explicitly override with .subscriptionMode('all').
Transactions
Transactions ensure atomic operations. Either everything succeeds or nothing does.
Basic Transaction: Ack + Push
// Pop a message
const messages = await queen.queue('raw-data').batch(1).pop()
if (messages.length > 0) {
const message = messages[0]
// Process it
const processed = await transformData(message.data)
// Atomically: ack input AND push output
await queen.transaction()
.ack(message) // Complete input
.queue('processed-data')
.push([{ data: processed }]) // Push to next queue
.commit()
}
// If commit fails, NOTHING happens. Message stays in raw-data!Multi-Queue Pipeline
const messages = await queen.queue('queue-a').batch(1).pop()
// Atomic: ack from A, push to B and C
await queen.transaction()
.ack(messages[0])
.queue('queue-b')
.push([{ data: { step: 2, value: messages[0].data.value * 2 } }])
.queue('queue-c')
.push([{ data: { step: 2, value: messages[0].data.value * 2 } }])
.commit()Transaction with Consumer
await queen.queue('source')
.autoAck(false) // Must disable auto-ack
.consume(async (message) => {
// Do work
const result = await processMessage(message.data)
// Transactionally ack and push result
await queen.transaction()
.ack(message)
.queue('destination')
.push([{ data: result }])
.commit()
})Client-Side Buffering
Buffering batches messages for 10x-100x faster throughput!
How It Works
- Messages collect in a local buffer
- Buffer flushes when it reaches count OR time threshold
- All buffered messages sent in one HTTP request
// Buffer up to 100 messages OR 1 second
await queen.queue('logs')
.buffer({ messageCount: 100, timeMillis: 1000 })
.push([
{ data: { level: 'info', message: 'User logged in' } }
])High-Throughput Example
// Send 10,000 messages super fast
for (let i = 0; i < 10000; i++) {
await queen.queue('events')
.buffer({ messageCount: 500, timeMillis: 100 })
.push([{ data: { id: i, timestamp: Date.now() } }])
}
// Flush remaining buffered messages
await queen.flushAllBuffers()Manual Flush
// Flush all buffers
await queen.flushAllBuffers()
// Flush specific queue
await queen.queue('my-queue').flushBuffer()
// Get buffer statistics
const stats = queen.getBufferStats()
console.log('Buffers:', stats)Acknowledgment
ACK (Success)
await queen.ack(message, true)NACK (Failure)
await queen.ack(message, false)With Error Context
await queen.ack(message, false, {
error: 'Invalid data format',
details: { field: 'email', reason: 'not a valid email' }
})Batch ACK
// Ack multiple messages at once
await queen.ack([msg1, msg2, msg3], true)Dead Letter Queue
Enable DLQ
await queen.queue('risky-business')
.config({
retryLimit: 3,
dlqAfterMaxRetries: true // Auto-move to DLQ after 3 failures
})
.create()Query DLQ
const dlq = await queen.queue('risky-business')
.dlq()
.limit(10)
.get()
console.log(`Found ${dlq.total} failed messages`)
for (const message of dlq.messages) {
console.log('Failed:', message.data)
console.log('Error:', message.errorMessage)
console.log('Failed at:', message.dlqTimestamp)
}DLQ with Time Range
const dlq = await queen.queue('risky-business')
.dlq()
.from('2025-01-01')
.to('2025-01-31')
.limit(100)
.get()Lease Renewal
Keep locks active during long-running tasks.
Automatic Lease Renewal
await queen.queue('long-tasks')
.renewLease(true, 60000) // Renew every 60 seconds
.consume(async (message) => {
// Even if this takes 30 minutes, lease keeps renewing!
await processVeryLongTask(message.data)
})Manual Lease Renewal
const messages = await queen.queue('long-tasks').pop()
const message = messages[0]
// Start renewal
const timer = setInterval(async () => {
await queen.renew(message)
console.log('Lease renewed')
}, 30000)
try {
await processVeryLongTask(message.data)
await queen.ack(message, true)
} finally {
clearInterval(timer)
}Message Tracing
Record breadcrumbs as messages flow through your system.
Basic Tracing
await queen.queue('orders').consume(async (msg) => {
// Record trace event
await msg.trace({
data: { text: 'Order processing started' }
})
const order = await processOrder(msg.data)
await msg.trace({
data: {
text: 'Order processed successfully',
orderId: order.id,
total: order.total
}
})
})Trace Names (Connect the Dots)
Link traces across multiple messages:
// Service 1: Order Service
await queen.queue('orders').consume(async (msg) => {
const orderId = msg.data.orderId
await msg.trace({
traceName: `order-${orderId}`, // Link traces
data: { text: 'Order created', service: 'orders' }
})
await queen.queue('inventory').push([{
data: { orderId, items: msg.data.items }
}])
})
// Service 2: Inventory Service
await queen.queue('inventory').consume(async (msg) => {
const orderId = msg.data.orderId
await msg.trace({
traceName: `order-${orderId}`, // Same name = connected!
data: { text: 'Stock checked', service: 'inventory' }
})
})In the dashboard:
- Search for
order-12345 - See the ENTIRE workflow across all services! 🎉
Event Types
Organize traces with event types:
await msg.trace({
eventType: 'info', // Blue in UI
data: { text: 'Started processing' }
})
await msg.trace({
eventType: 'error', // Red in UI
data: { text: 'Validation failed', reason: 'Invalid email' }
})
await msg.trace({
eventType: 'processing', // Green in UI
data: { text: 'Sending email' }
})Namespaces & Tasks
Logical grouping with wildcard filtering.
Namespaces
// Create queues with namespaces
await queen.queue('billing-invoices').namespace('accounting').create()
await queen.queue('billing-receipts').namespace('accounting').create()
// Consume from ALL queues in the namespace
await queen.queue()
.namespace('accounting')
.consume(async (message) => {
// Receives from BOTH billing-invoices AND billing-receipts
})Tasks
// Create queues with tasks
await queen.queue('video-uploads').task('video-processing').create()
await queen.queue('image-uploads').task('image-processing').create()
// Consume by task type
await queen.queue()
.task('video-processing')
.consume(async (message) => {
// Only video processing messages
})Combining Namespace + Task
await queen.queue()
.namespace('media')
.task('urgent-processing')
.consume(async (message) => {
// Only urgent media processing from media namespace
})Stream Processing
Queen provides powerful windowed stream processing for aggregating messages from multiple queues into time-based windows. Perfect for real-time analytics, event correlation, and temporal aggregations.
What is a Stream?
Instead of consuming individual messages, streams group messages into time windows and deliver entire windows for processing. This enables temporal correlation and multi-queue aggregation.
Basic Stream Example
// Create source queues
await queen.queue('events').namespace('analytics').task('stream').create()
await queen.queue('clicks').namespace('analytics').task('stream').create()
// Define stream with 5-second tumbling windows
await queen.stream('event-stream', 'analytics')
.sources(['events', 'clicks'])
.tumblingTime(5) // 5-second windows
.gracePeriod(1) // 1-second grace period for late messages
.define()
// Consume windows
const consumer = queen.consumer('event-stream', 'analytics-consumer')
await consumer.process(async (window) => {
console.log(`Window: ${window.start} - ${window.end}`)
console.log(`Total messages: ${window.allMessages.length}`)
// Process all messages in the window
for (const msg of window.allMessages) {
console.log(msg.data)
}
})Partitioned Streams
Partitioned streams ensure messages with the same partition key are grouped together. Essential for per-user analytics, session tracking, or conversation processing.
// Define partitioned stream
await queen.stream('chat-stream', 'prod')
.sources(['chat-translations', 'chat-agent'])
.partitioned() // Enable partition-aware processing
.tumblingTime(5)
.gracePeriod(1)
.define()
// Producer: Push with partition key
await queen
.queue('chat-translations')
.partition(chatId.toString()) // Set partition key
.push({
data: {
kind: 'translation',
chatId: chatId,
text: 'Hello world'
}
})
// Consumer: Process windows with groupBy
const consumer = queen.consumer('chat-stream', 'chat-analytics')
await consumer.process(async (window) => {
// Group messages by chat ID
const byChatId = window.groupBy('data.chatId')
// Process each chat's messages together
for (const [chatId, messages] of Object.entries(byChatId)) {
console.log(`Chat ${chatId}: ${messages.length} messages`)
const translations = messages.filter(m => m.data.kind === 'translation')
const agentReplies = messages.filter(m => m.data.kind === 'agent')
// Calculate metrics per chat
console.log(`Translations: ${translations.length}`)
console.log(`Agent replies: ${agentReplies.length}`)
}
})Stream Configuration Options
await queen.stream('stream-name', 'namespace')
.sources(['queue1', 'queue2', 'queue3']) // 1+ source queues
.partitioned() // Optional: enable partitioning
.tumblingTime(5) // Window size in seconds
.gracePeriod(2) // Optional: grace period in seconds
.define()Key Concepts:
- Tumbling windows: Fixed-size, non-overlapping time windows
- Grace period: Additional time to wait for late-arriving messages
- Partitioned: Messages with same partition key processed together
- Multi-source: Aggregate messages from multiple related queues
Window Operations
The window object provides powerful aggregation methods:
await consumer.process(async (window) => {
// Window metadata
console.log(window.id) // Unique window identifier
console.log(window.start) // Window start time (ISO 8601)
console.log(window.end) // Window end time (ISO 8601)
// All messages in the window
console.log(window.allMessages.length)
// Group by any field path
const byUserId = window.groupBy('data.userId')
const byEventType = window.groupBy('data.event.type')
const byQueue = window.groupBy('queue_name')
// Process grouped messages
for (const [key, messages] of Object.entries(byUserId)) {
console.log(`User ${key}: ${messages.length} events`)
}
})Use Cases
Real-time Analytics:
// Aggregate metrics every 10 seconds
await queen.stream('metrics-10s', 'monitoring')
.sources(['app-metrics'])
.tumblingTime(10)
.define()
await consumer.process(async (window) => {
const values = window.allMessages.map(m => m.data.value)
const stats = {
count: values.length,
avg: values.reduce((a, b) => a + b, 0) / values.length,
min: Math.min(...values),
max: Math.max(...values)
}
await saveMetrics(stats)
})Event Correlation:
// Correlate related events across multiple queues
await queen.stream('user-journey', 'analytics')
.sources(['page-views', 'clicks', 'conversions'])
.partitioned() // Group by user
.tumblingTime(60) // 60-second windows
.define()
await consumer.process(async (window) => {
const byUser = window.groupBy('data.userId')
for (const [userId, events] of Object.entries(byUser)) {
const journey = {
views: events.filter(e => e.queue_name === 'page-views').length,
clicks: events.filter(e => e.queue_name === 'clicks').length,
conversions: events.filter(e => e.queue_name === 'conversions').length
}
console.log(`User ${userId}:`, journey)
}
})Multi-stage Pipeline:
// Stage 1: Raw events stream
await queen.stream('raw-events', 'pipeline')
.sources(['clicks', 'views'])
.tumblingTime(10)
.define()
// Stage 2: Process and enrich
const enrichConsumer = queen.consumer('raw-events', 'enricher')
await enrichConsumer.process(async (window) => {
for (const msg of window.allMessages) {
const enriched = await enrichEvent(msg.data)
await queen.queue('enriched-events').push({ data: enriched })
}
})
// Stage 3: Final aggregation (longer window)
await queen.stream('analytics', 'pipeline')
.sources(['enriched-events'])
.tumblingTime(60)
.define()Performance Tuning
Control stream processing performance via server environment variables:
# Worker configuration
STREAM_POLL_WORKER_COUNT=2 # Number of stream poll workers
STREAM_CONCURRENT_CHECKS=10 # Concurrent window checks per worker
# Polling intervals
STREAM_POLL_INTERVAL=1000 # DB query interval (ms)
STREAM_BACKOFF_THRESHOLD=5 # Empty checks before backoff
STREAM_MAX_POLL_INTERVAL=5000 # Max backoff interval (ms)Tuning recommendations:
- Low latency: Lower
STREAM_POLL_INTERVAL(500ms), increase workers - High throughput: Increase
STREAM_CONCURRENT_CHECKS(20), more workers - Low load: Reduce workers (1), increase intervals (2000ms)
Best Practices
- ✅ Choose appropriate window sizes - 5-30 seconds for most use cases
- ✅ Set grace periods - Typically 10-20% of window size
- ✅ Use partitioning for related events - Ensures consistency
- ✅ Preserve partition keys in pipelines - Critical for partitioned streams
- ✅ Process windows efficiently - Minimize time in consumer
- ✅ Use consumer groups for scaling - Distribute windows across workers
Learn More
For comprehensive documentation, complete examples, and tuning guides, see:
- Stream Processing Examples - Detailed guide with examples
- Example 18: Basic Streaming
- Example 19: Partitioned Streaming
Advanced Configuration
Queue Configuration Options
await queen.queue('super-queue').config({
// Lease & Retry
leaseTime: 300, // 5 minutes to process (seconds)
retryLimit: 3, // Retry 3 times
retryDelay: 5000, // Wait 5 seconds between retries (ms)
// Dead Letter Queue
dlqAfterMaxRetries: true, // Move to DLQ after max retries
// Priority
priority: 5, // Higher = higher priority (0-10)
// Delays & Buffers
delayedProcessing: 60, // Available after 60 seconds
windowBuffer: 30, // Hold messages for 30 seconds to batch
// Retention
retentionSeconds: 86400, // Keep pending messages 24 hours
completedRetentionSeconds: 3600, // Keep completed 1 hour
ttl: 86400, // Message expires after 24 hours
// Security
encryptionEnabled: true // Encrypt payloads at rest
}).create()Consumer Configuration
await queen.queue('tasks')
.group('workers')
.concurrency(10) // 10 parallel workers
.batch(20) // Fetch 20 at a time
.autoAck(true) // Auto-ack on success
.renewLease(true, 5000) // Auto-renew every 5s
.limit(1000) // Process 1000 messages then stop
.each() // Process individually (vs batch)
.consume(async (message) => {
await process(message.data)
})Graceful Shutdown
Always clean up properly!
Automatic Shutdown
Queen automatically handles SIGINT and SIGTERM:
const queen = new Queen('http://localhost:6632')
// Your app runs...
// User presses Ctrl+C:
// Queen automatically flushes buffers and closes cleanly!Manual Shutdown
await queen.close()
console.log('Queen shut down cleanly')With AbortController
const controller = new AbortController()
const consumerPromise = queen.queue('tasks')
.consume(async (message) => {
await processMessage(message.data)
}, { signal: controller.signal })
// Later... stop consumer
controller.abort()
// Wait for consumer to finish current message
await consumerPromise
// Close Queen
await queen.close()Error Handling
try {
await queen.queue('tasks').push([...])
} catch (error) {
if (error.code === 'DUPLICATE') {
console.log('Message already exists')
} else if (error.code === 'TIMEOUT') {
console.log('Operation timed out')
} else {
console.error('Error:', error.message)
}
}Configuration Defaults
Client Defaults
{
timeoutMillis: 30000,
retryAttempts: 3,
retryDelayMillis: 1000,
loadBalancingStrategy: 'affinity', // 'affinity', 'round-robin', or 'session'
affinityHashRing: 150, // Virtual nodes per server
enableFailover: true
}Queue Defaults
{
leaseTime: 300, // 5 minutes
retryLimit: 3,
priority: 0,
delayedProcessing: 0,
windowBuffer: 0,
maxSize: 0, // Unlimited
retentionSeconds: 0, // Keep forever
encryptionEnabled: false
}Consume Defaults
{
concurrency: 1,
batch: 1,
autoAck: true,
wait: true, // Long polling
timeoutMillis: 30000,
limit: null, // Run forever
renewLease: false
}Logging
Enable detailed logging for debugging:
export QUEEN_CLIENT_LOG=true
node your-app.jsExample output:
[2025-10-28T10:30:45.123Z] [INFO] [Queen.constructor] {"status":"initialized","urls":1}
[2025-10-28T10:30:45.234Z] [INFO] [QueueBuilder.push] {"queue":"tasks","partition":"Default","count":5}Best Practices
- ✅ Use affinity routing - Enable
loadBalancingStrategy: 'affinity'for production to optimize poll intentions - ✅ Use
consume()for workers - Simpler, handles retries automatically - ✅ Use
pop()for control - When you need precise control over acking - ✅ Buffer for speed - Always use buffering when pushing many messages
- ✅ Partitions for order - Use partitions when message order matters
- ✅ Consumer groups for scale - Run multiple workers in the same group
- ✅ Transactions for consistency - Use transactions when operations must be atomic
- ✅ Enable DLQ - Always enable DLQ in production to catch failures
- ✅ Renew long leases - Use auto-renewal for long-running tasks
- ✅ Graceful shutdown - Always call
queen.close()before exiting - ✅ Monitor DLQ - Regularly check your DLQ for failed messages
Complete Example
import { Queen } from 'queen-mq'
const queen = new Queen('http://localhost:6632')
// Setup queues
await queen.queue('raw-events').config({ priority: 5 }).create()
await queen.queue('processed-events').config({ priority: 10 }).create()
// Stage 1: Ingest with buffering
async function ingestEvents() {
for (let i = 0; i < 10000; i++) {
await queen.queue('raw-events')
.partition(`user-${i % 100}`)
.buffer({ messageCount: 500, timeMillis: 1000 })
.push([{
data: {
userId: i % 100,
event: 'page_view',
timestamp: Date.now()
}
}])
}
await queen.flushAllBuffers()
}
// Stage 2: Process with transactions
async function processEvents() {
await queen.queue('raw-events')
.group('processors')
.concurrency(5)
.batch(10)
.autoAck(false)
.consume(async (messages) => {
const processed = messages.map(m => ({
userId: m.data.userId,
processed: true,
timestamp: Date.now()
}))
const txn = queen.transaction()
for (const msg of messages) {
txn.ack(msg)
}
txn.queue('processed-events').push(
processed.map(p => ({ data: p }))
)
await txn.commit()
})
.onError(async (messages, error) => {
console.error('Processing failed:', error)
await queen.ack(messages, false)
})
}
// Run pipeline
await ingestEvents()
await processEvents()
// Graceful shutdown
process.on('SIGINT', async () => {
await queen.close()
process.exit(0)
})See Also
- Quick Start Guide - Get started quickly
- Examples - More code examples
- API Reference - Complete HTTP API
- GitHub README - Extended tutorial (1940 lines!)
