Skip to content

JavaScript Client Guide

Complete guide for the Queen MQ JavaScript/Node.js client library.

Installation

bash
npm install queen-mq

Requirements: Node.js 22+ (required for native fetch and modern JS features)

Table of Contents

Getting Started

Import and Connect

javascript
import { Queen } from 'queen-mq'

// Single server
const queen = new Queen('http://localhost:6632')

// Multiple servers (high availability)
const queen = new Queen([
  'http://server1:6632',
  'http://server2:6632'
])

// Full configuration
const queen = new Queen({
  urls: ['http://server1:6632', 'http://server2:6632', 'http://server3:6632'],
  timeoutMillis: 30000,
  retryAttempts: 3,
  loadBalancingStrategy: 'affinity',  // 'affinity', 'round-robin', or 'session'
  affinityHashRing: 128,              // Virtual nodes per server (for affinity)
  enableFailover: true
})

Load Balancing & Affinity Routing

When connecting to multiple Queen servers, you can choose how requests are distributed. The client supports three load balancing strategies.

Best for: Production deployments with 3+ servers and multiple consumer groups.

Affinity mode uses consistent hashing with virtual nodes to route consumer groups to the same backend server. This optimizes database queries by consolidating poll intentions on a single server.

javascript
const queen = new Queen({
  urls: [
    'http://queen-server-1:6632',
    'http://queen-server-2:6632',
    'http://queen-server-3:6632'
  ],
  loadBalancingStrategy: 'affinity',
  affinityHashRing: 150  // Virtual nodes per server (default: 150)
})

How it works:

  1. Each consumer generates an affinity key from its parameters:

    • Queue-based: queue:partition:consumerGroup
    • Namespace-based: namespace:task:consumerGroup
  2. The key is hashed using FNV-1a algorithm (fast, deterministic)

  3. Hash is mapped to a virtual node on the consistent hash ring

  4. Virtual node maps to a real backend server

  5. Same key always routes to same server (unless server is unhealthy)

Example:

javascript
// Consumer 1
await queen.queue('orders')
  .partition('priority')
  .group('email-processor')
  .consume(async (msg) => {
    // Affinity key: "orders:priority:email-processor"
    // Routes to: server-2 (based on hash)
  })

// Consumer 2 (same group, different worker)
await queen.queue('orders')
  .partition('priority')
  .group('email-processor')
  .consume(async (msg) => {
    // Same affinity key: "orders:priority:email-processor"  
    // Routes to: server-2 (same server!)
    // → Poll intentions consolidated
    // → Single DB query serves both workers
  })

// Consumer 3 (different group)
await queen.queue('orders')
  .partition('priority')
  .group('analytics')
  .consume(async (msg) => {
    // Different affinity key: "orders:priority:analytics"
    // Routes to: server-1 (different server)
    // → Load is distributed across servers
  })

Benefits:

  • Optimized Database Queries - Poll intentions for same consumer group are consolidated on one server, reducing duplicate SELECT queries
  • Better Cache Locality - In-memory poll registry stays warm on the same server
  • Graceful Failover - If a server fails, only ~33% of consumer groups move to other servers
  • Works with HA - Perfect for 3-server high-availability setups
  • Automatic - No manual configuration required

Virtual Nodes:

The affinityHashRing parameter controls how many virtual nodes each server gets:

javascript
affinityHashRing: 150   // Default: good for 3-5 servers
affinityHashRing: 300   // Better distribution, more memory (~14KB total)
affinityHashRing: 50    // Less memory, worse distribution

Performance Impact

With affinity routing and 3 backend servers, multiple workers in the same consumer group will hit the same backend, allowing the server to consolidate poll intentions and serve them with a single database query instead of multiple queries.

Round-Robin Mode

Best for: Simple setups where poll optimization is not critical.

Cycles through servers in order. Simple but doesn't optimize for poll intention consolidation.

javascript
const queen = new Queen({
  urls: ['http://server1:6632', 'http://server2:6632'],
  loadBalancingStrategy: 'round-robin'
})

Each request goes to the next server in the list. Load is evenly distributed but consumer groups may hit different servers on each poll.

Session Mode

Best for: Single client instance that should stick to one server.

Each client instance picks a server and sticks to it for all requests.

javascript
const queen = new Queen({
  urls: ['http://server1:6632', 'http://server2:6632'],
  loadBalancingStrategy: 'session'
})

Comparison

FeatureAffinityRound-RobinSession
Poll Consolidation✅ Yes❌ No⚠️ Partial
Load Distribution✅ Good✅ Perfect❌ Poor
Failover✅ Graceful✅ Automatic✅ Automatic
Memory~10KBMinimalMinimal
Best ForProductionTestingSimple apps

Push vs Consume Routing

Important: Affinity routing is only applied to consumer operations (pop/consume), not push operations.

  • Consumers (pop/consume): Use affinity key for consistent routing
  • Producers (push): Use default strategy (round-robin) for even write distribution

This gives you the best of both worlds: optimized reads with affinity, balanced writes without hotspots.

javascript
// Push - uses round-robin (even distribution)
await queen.queue('orders').push([{ data: { order: 123 } }])

// Consume - uses affinity (consolidated polling)  
await queen.queue('orders').group('processors').consume(async (msg) => {
  // Same group always routes to same server
})

Monitoring Affinity Routing

You can check the load balancer status:

javascript
const httpClient = queen._httpClient
const loadBalancer = httpClient.getLoadBalancer()

if (loadBalancer) {
  console.log('Strategy:', loadBalancer.getStrategy())
  console.log('Virtual nodes:', loadBalancer.getVirtualNodeCount())
  console.log('Servers:', loadBalancer.getAllUrls())
  console.log('Health:', loadBalancer.getHealthStatus())
}

Queue Management

Create Queue

javascript
// Simple creation
await queen.queue('my-tasks').create()

// With configuration
await queen.queue('my-tasks')
  .config({
    leaseTime: 60,           // 60 seconds to process
    retryLimit: 3,           // Max 3 retries
    priority: 5,             // Priority level (0-10)
    dlqAfterMaxRetries: true, // Auto move to DLQ after max retries
    retentionSeconds: 86400, // Keep messages 24 hours
    encryptionEnabled: false
  })
  .create()

Delete Queue

javascript
await queen.queue('my-tasks').delete()

Warning

This deletes all messages, partitions, and consumer group state. Cannot be undone!

Pushing Messages

Basic Push

javascript
await queen.queue('tasks').push([
  { data: { task: 'send-email', to: 'user@example.com' } }
])

Multiple Messages

javascript
await queen.queue('tasks').push([
  { data: { task: 'send-email', to: 'alice@example.com' } },
  { data: { task: 'send-email', to: 'bob@example.com' } },
  { data: { task: 'resize-image', id: 123 } }
])

With Custom Transaction ID

javascript
await queen.queue('tasks').push([
  {
    transactionId: 'unique-id-123',  // Auto-generated if not provided
    data: { task: 'process-order', orderId: 456 }
  }
])

With Callbacks

javascript
await queen.queue('tasks').push([...])
  .onSuccess(async (messages) => {
    console.log('Pushed successfully:', messages)
  })
  .onDuplicate(async (messages) => {
    console.warn('Duplicate transaction IDs detected')
  })
  .onError(async (messages, error) => {
    console.error('Push failed:', error)
  })

Consuming Messages

Basic Consume

The easiest way to process messages continuously:

javascript
await queen.queue('my-tasks').consume(async (message) => {
  console.log('Processing:', message.data)
  
  // Do your work
  await processTask(message.data)
  
  // Auto-ack on success, auto-nack on error
})

What happens:

  1. Consumer pulls messages from the queue
  2. Your function processes each message
  3. If successful → message marked as complete ✅
  4. If error → message goes back for retry 🔄

With Configuration

javascript
await queen.queue('tasks')
  .concurrency(10)          // 10 parallel workers
  .batch(20)                // Fetch 20 at a time
  .autoAck(false)           // Manual ack
  .renewLease(true, 5000)   // Auto-renew every 5s
  .each()                   // Process individually
  .consume(async (message) => {
    await process(message.data)
  })
  .onSuccess(async (message) => {
    await queen.ack(message, true)
  })
  .onError(async (message, error) => {
    console.error('Failed:', error)
    await queen.ack(message, false)
  })

Process Limited Messages

javascript
// Process exactly 100 messages then stop
await queen.queue('tasks')
  .limit(100)
  .consume(async (message) => {
    await processMessage(message.data)
  })

Batch Processing

javascript
// Process messages in batches
await queen.queue('events')
  .batch(10)
  .consume(async (messages) => {
    // messages is an array of 10 messages
    for (const msg of messages) {
      await process(msg)
    }
  })

Pop vs Consume

Use when: Long-running worker that continuously processes messages.

javascript
await queen.queue('tasks').consume(async (message) => {
  // Auto-loops, auto-ack, long-polling built-in
})

The Pop Way

Use when: Manual control over message fetching.

javascript
const messages = await queen.queue('tasks').pop()

if (messages.length > 0) {
  const message = messages[0]
  
  try {
    await processMessage(message.data)
    await queen.ack(message, true)
  } catch (error) {
    await queen.ack(message, false, { error: error.message })
  }
}

Pop with Long Polling

javascript
// Wait up to 30 seconds for messages
const messages = await queen.queue('tasks')
  .batch(10)
  .wait(true)
  .timeout(30000)
  .pop()

Partitions

Use partitions for ordering guarantees. Messages in the same partition are processed in order.

Push to Partition

javascript
await queen.queue('user-events')
  .partition('user-123')
  .push([
    { data: { event: 'login', timestamp: Date.now() } },
    { data: { event: 'purchase', orderId: 456 } },
    { data: { event: 'logout', timestamp: Date.now() } }
  ])

Consume from Partition

javascript
await queen.queue('user-events')
  .partition('user-123')
  .consume(async (message) => {
    // Messages processed in exact order
    console.log('User 123:', message.data.event)
  })

Important: Messages in different partitions are independent.

Consumer Groups

Consumer groups enable:

  • Multiple workers sharing the same queue
  • Fan-out patterns (same message to multiple groups)
  • Message replay from any point

Basic Consumer Group

javascript
// Worker 1 in group "processors"
await queen.queue('emails')
  .group('processors')
  .consume(async (message) => {
    console.log('Worker 1 processing:', message.data)
  })

// Worker 2 in SAME group (shares the load)
await queen.queue('emails')
  .group('processors')
  .consume(async (message) => {
    console.log('Worker 2 processing:', message.data)
  })

Messages are distributed between workers. Each message goes to only ONE worker.

Multiple Consumer Groups (Fan-Out)

javascript
// Group 1: Send emails
await queen.queue('notifications')
  .group('email-sender')
  .consume(async (message) => {
    await sendEmail(message.data)
  })

// Group 2: Log to analytics (processes THE SAME messages)
await queen.queue('notifications')
  .group('analytics')
  .consume(async (message) => {
    await trackEvent(message.data)
  })

Every message is processed by BOTH groups independently! 🎉

Subscription Modes

Control whether consumer groups process historical messages or only new ones.

Default Behavior (Process All Messages):

javascript
await queen.queue('events')
  .group('new-analytics')
  .consume(async (message) => {
    // Processes ALL messages, including historical
  })

Skip Historical Messages:

javascript
await queen.queue('events')
  .group('realtime-monitor')
  .subscriptionMode('new')  // Skip history
  .consume(async (message) => {
    // Only processes messages arriving after subscription
  })

Subscribe from Specific Timestamp:

javascript
const startTime = '2025-10-28T10:00:00.000Z'

await queen.queue('events')
  .group('replay-from-10am')
  .subscriptionFrom(startTime)
  .consume(async (message) => {
    // Process messages from 10am onwards
  })

Server Default:

The server can be configured to change default subscription behavior:

bash
export DEFAULT_SUBSCRIPTION_MODE="new"
./bin/queen-server

When set, new consumer groups automatically skip historical messages unless you explicitly override with .subscriptionMode('all').

Transactions

Transactions ensure atomic operations. Either everything succeeds or nothing does.

Basic Transaction: Ack + Push

javascript
// Pop a message
const messages = await queen.queue('raw-data').batch(1).pop()

if (messages.length > 0) {
  const message = messages[0]
  
  // Process it
  const processed = await transformData(message.data)
  
  // Atomically: ack input AND push output
  await queen.transaction()
    .ack(message)                    // Complete input
    .queue('processed-data')
    .push([{ data: processed }])     // Push to next queue
    .commit()
}

// If commit fails, NOTHING happens. Message stays in raw-data!

Multi-Queue Pipeline

javascript
const messages = await queen.queue('queue-a').batch(1).pop()

// Atomic: ack from A, push to B and C
await queen.transaction()
  .ack(messages[0])
  .queue('queue-b')
  .push([{ data: { step: 2, value: messages[0].data.value * 2 } }])
  .queue('queue-c')
  .push([{ data: { step: 2, value: messages[0].data.value * 2 } }])
  .commit()

Transaction with Consumer

javascript
await queen.queue('source')
  .autoAck(false)  // Must disable auto-ack
  .consume(async (message) => {
    // Do work
    const result = await processMessage(message.data)
    
    // Transactionally ack and push result
    await queen.transaction()
      .ack(message)
      .queue('destination')
      .push([{ data: result }])
      .commit()
  })

Client-Side Buffering

Buffering batches messages for 10x-100x faster throughput!

How It Works

  1. Messages collect in a local buffer
  2. Buffer flushes when it reaches count OR time threshold
  3. All buffered messages sent in one HTTP request
javascript
// Buffer up to 100 messages OR 1 second
await queen.queue('logs')
  .buffer({ messageCount: 100, timeMillis: 1000 })
  .push([
    { data: { level: 'info', message: 'User logged in' } }
  ])

High-Throughput Example

javascript
// Send 10,000 messages super fast
for (let i = 0; i < 10000; i++) {
  await queen.queue('events')
    .buffer({ messageCount: 500, timeMillis: 100 })
    .push([{ data: { id: i, timestamp: Date.now() } }])
}

// Flush remaining buffered messages
await queen.flushAllBuffers()

Manual Flush

javascript
// Flush all buffers
await queen.flushAllBuffers()

// Flush specific queue
await queen.queue('my-queue').flushBuffer()

// Get buffer statistics
const stats = queen.getBufferStats()
console.log('Buffers:', stats)

Acknowledgment

ACK (Success)

javascript
await queen.ack(message, true)

NACK (Failure)

javascript
await queen.ack(message, false)

With Error Context

javascript
await queen.ack(message, false, {
  error: 'Invalid data format',
  details: { field: 'email', reason: 'not a valid email' }
})

Batch ACK

javascript
// Ack multiple messages at once
await queen.ack([msg1, msg2, msg3], true)

Dead Letter Queue

Enable DLQ

javascript
await queen.queue('risky-business')
  .config({
    retryLimit: 3,
    dlqAfterMaxRetries: true  // Auto-move to DLQ after 3 failures
  })
  .create()

Query DLQ

javascript
const dlq = await queen.queue('risky-business')
  .dlq()
  .limit(10)
  .get()

console.log(`Found ${dlq.total} failed messages`)

for (const message of dlq.messages) {
  console.log('Failed:', message.data)
  console.log('Error:', message.errorMessage)
  console.log('Failed at:', message.dlqTimestamp)
}

DLQ with Time Range

javascript
const dlq = await queen.queue('risky-business')
  .dlq()
  .from('2025-01-01')
  .to('2025-01-31')
  .limit(100)
  .get()

Lease Renewal

Keep locks active during long-running tasks.

Automatic Lease Renewal

javascript
await queen.queue('long-tasks')
  .renewLease(true, 60000)  // Renew every 60 seconds
  .consume(async (message) => {
    // Even if this takes 30 minutes, lease keeps renewing!
    await processVeryLongTask(message.data)
  })

Manual Lease Renewal

javascript
const messages = await queen.queue('long-tasks').pop()
const message = messages[0]

// Start renewal
const timer = setInterval(async () => {
  await queen.renew(message)
  console.log('Lease renewed')
}, 30000)

try {
  await processVeryLongTask(message.data)
  await queen.ack(message, true)
} finally {
  clearInterval(timer)
}

Message Tracing

Record breadcrumbs as messages flow through your system.

Basic Tracing

javascript
await queen.queue('orders').consume(async (msg) => {
  // Record trace event
  await msg.trace({
    data: { text: 'Order processing started' }
  })
  
  const order = await processOrder(msg.data)
  
  await msg.trace({
    data: { 
      text: 'Order processed successfully',
      orderId: order.id,
      total: order.total
    }
  })
})

Trace Names (Connect the Dots)

Link traces across multiple messages:

javascript
// Service 1: Order Service
await queen.queue('orders').consume(async (msg) => {
  const orderId = msg.data.orderId
  
  await msg.trace({
    traceName: `order-${orderId}`,  // Link traces
    data: { text: 'Order created', service: 'orders' }
  })
  
  await queen.queue('inventory').push([{
    data: { orderId, items: msg.data.items }
  }])
})

// Service 2: Inventory Service
await queen.queue('inventory').consume(async (msg) => {
  const orderId = msg.data.orderId
  
  await msg.trace({
    traceName: `order-${orderId}`,  // Same name = connected!
    data: { text: 'Stock checked', service: 'inventory' }
  })
})

In the dashboard:

  • Search for order-12345
  • See the ENTIRE workflow across all services! 🎉

Event Types

Organize traces with event types:

javascript
await msg.trace({
  eventType: 'info',  // Blue in UI
  data: { text: 'Started processing' }
})

await msg.trace({
  eventType: 'error',  // Red in UI
  data: { text: 'Validation failed', reason: 'Invalid email' }
})

await msg.trace({
  eventType: 'processing',  // Green in UI
  data: { text: 'Sending email' }
})

Namespaces & Tasks

Logical grouping with wildcard filtering.

Namespaces

javascript
// Create queues with namespaces
await queen.queue('billing-invoices').namespace('accounting').create()
await queen.queue('billing-receipts').namespace('accounting').create()

// Consume from ALL queues in the namespace
await queen.queue()
  .namespace('accounting')
  .consume(async (message) => {
    // Receives from BOTH billing-invoices AND billing-receipts
  })

Tasks

javascript
// Create queues with tasks
await queen.queue('video-uploads').task('video-processing').create()
await queen.queue('image-uploads').task('image-processing').create()

// Consume by task type
await queen.queue()
  .task('video-processing')
  .consume(async (message) => {
    // Only video processing messages
  })

Combining Namespace + Task

javascript
await queen.queue()
  .namespace('media')
  .task('urgent-processing')
  .consume(async (message) => {
    // Only urgent media processing from media namespace
  })

Stream Processing

Queen provides powerful windowed stream processing for aggregating messages from multiple queues into time-based windows. Perfect for real-time analytics, event correlation, and temporal aggregations.

What is a Stream?

Instead of consuming individual messages, streams group messages into time windows and deliver entire windows for processing. This enables temporal correlation and multi-queue aggregation.

Basic Stream Example

javascript
// Create source queues
await queen.queue('events').namespace('analytics').task('stream').create()
await queen.queue('clicks').namespace('analytics').task('stream').create()

// Define stream with 5-second tumbling windows
await queen.stream('event-stream', 'analytics')
  .sources(['events', 'clicks'])
  .tumblingTime(5)         // 5-second windows
  .gracePeriod(1)          // 1-second grace period for late messages
  .define()

// Consume windows
const consumer = queen.consumer('event-stream', 'analytics-consumer')

await consumer.process(async (window) => {
  console.log(`Window: ${window.start} - ${window.end}`)
  console.log(`Total messages: ${window.allMessages.length}`)
  
  // Process all messages in the window
  for (const msg of window.allMessages) {
    console.log(msg.data)
  }
})

Partitioned Streams

Partitioned streams ensure messages with the same partition key are grouped together. Essential for per-user analytics, session tracking, or conversation processing.

javascript
// Define partitioned stream
await queen.stream('chat-stream', 'prod')
  .sources(['chat-translations', 'chat-agent'])
  .partitioned()           // Enable partition-aware processing
  .tumblingTime(5)
  .gracePeriod(1)
  .define()

// Producer: Push with partition key
await queen
  .queue('chat-translations')
  .partition(chatId.toString())  // Set partition key
  .push({
    data: {
      kind: 'translation',
      chatId: chatId,
      text: 'Hello world'
    }
  })

// Consumer: Process windows with groupBy
const consumer = queen.consumer('chat-stream', 'chat-analytics')

await consumer.process(async (window) => {
  // Group messages by chat ID
  const byChatId = window.groupBy('data.chatId')
  
  // Process each chat's messages together
  for (const [chatId, messages] of Object.entries(byChatId)) {
    console.log(`Chat ${chatId}: ${messages.length} messages`)
    
    const translations = messages.filter(m => m.data.kind === 'translation')
    const agentReplies = messages.filter(m => m.data.kind === 'agent')
    
    // Calculate metrics per chat
    console.log(`Translations: ${translations.length}`)
    console.log(`Agent replies: ${agentReplies.length}`)
  }
})

Stream Configuration Options

javascript
await queen.stream('stream-name', 'namespace')
  .sources(['queue1', 'queue2', 'queue3'])  // 1+ source queues
  .partitioned()                             // Optional: enable partitioning
  .tumblingTime(5)                           // Window size in seconds
  .gracePeriod(2)                            // Optional: grace period in seconds
  .define()

Key Concepts:

  • Tumbling windows: Fixed-size, non-overlapping time windows
  • Grace period: Additional time to wait for late-arriving messages
  • Partitioned: Messages with same partition key processed together
  • Multi-source: Aggregate messages from multiple related queues

Window Operations

The window object provides powerful aggregation methods:

javascript
await consumer.process(async (window) => {
  // Window metadata
  console.log(window.id)          // Unique window identifier
  console.log(window.start)       // Window start time (ISO 8601)
  console.log(window.end)         // Window end time (ISO 8601)
  
  // All messages in the window
  console.log(window.allMessages.length)
  
  // Group by any field path
  const byUserId = window.groupBy('data.userId')
  const byEventType = window.groupBy('data.event.type')
  const byQueue = window.groupBy('queue_name')
  
  // Process grouped messages
  for (const [key, messages] of Object.entries(byUserId)) {
    console.log(`User ${key}: ${messages.length} events`)
  }
})

Use Cases

Real-time Analytics:

javascript
// Aggregate metrics every 10 seconds
await queen.stream('metrics-10s', 'monitoring')
  .sources(['app-metrics'])
  .tumblingTime(10)
  .define()

await consumer.process(async (window) => {
  const values = window.allMessages.map(m => m.data.value)
  const stats = {
    count: values.length,
    avg: values.reduce((a, b) => a + b, 0) / values.length,
    min: Math.min(...values),
    max: Math.max(...values)
  }
  await saveMetrics(stats)
})

Event Correlation:

javascript
// Correlate related events across multiple queues
await queen.stream('user-journey', 'analytics')
  .sources(['page-views', 'clicks', 'conversions'])
  .partitioned()            // Group by user
  .tumblingTime(60)         // 60-second windows
  .define()

await consumer.process(async (window) => {
  const byUser = window.groupBy('data.userId')
  
  for (const [userId, events] of Object.entries(byUser)) {
    const journey = {
      views: events.filter(e => e.queue_name === 'page-views').length,
      clicks: events.filter(e => e.queue_name === 'clicks').length,
      conversions: events.filter(e => e.queue_name === 'conversions').length
    }
    console.log(`User ${userId}:`, journey)
  }
})

Multi-stage Pipeline:

javascript
// Stage 1: Raw events stream
await queen.stream('raw-events', 'pipeline')
  .sources(['clicks', 'views'])
  .tumblingTime(10)
  .define()

// Stage 2: Process and enrich
const enrichConsumer = queen.consumer('raw-events', 'enricher')
await enrichConsumer.process(async (window) => {
  for (const msg of window.allMessages) {
    const enriched = await enrichEvent(msg.data)
    await queen.queue('enriched-events').push({ data: enriched })
  }
})

// Stage 3: Final aggregation (longer window)
await queen.stream('analytics', 'pipeline')
  .sources(['enriched-events'])
  .tumblingTime(60)
  .define()

Performance Tuning

Control stream processing performance via server environment variables:

bash
# Worker configuration
STREAM_POLL_WORKER_COUNT=2        # Number of stream poll workers
STREAM_CONCURRENT_CHECKS=10       # Concurrent window checks per worker

# Polling intervals
STREAM_POLL_INTERVAL=1000         # DB query interval (ms)
STREAM_BACKOFF_THRESHOLD=5        # Empty checks before backoff
STREAM_MAX_POLL_INTERVAL=5000     # Max backoff interval (ms)

Tuning recommendations:

  • Low latency: Lower STREAM_POLL_INTERVAL (500ms), increase workers
  • High throughput: Increase STREAM_CONCURRENT_CHECKS (20), more workers
  • Low load: Reduce workers (1), increase intervals (2000ms)

Best Practices

  1. Choose appropriate window sizes - 5-30 seconds for most use cases
  2. Set grace periods - Typically 10-20% of window size
  3. Use partitioning for related events - Ensures consistency
  4. Preserve partition keys in pipelines - Critical for partitioned streams
  5. Process windows efficiently - Minimize time in consumer
  6. Use consumer groups for scaling - Distribute windows across workers

Learn More

For comprehensive documentation, complete examples, and tuning guides, see:

Advanced Configuration

Queue Configuration Options

javascript
await queen.queue('super-queue').config({
  // Lease & Retry
  leaseTime: 300,                // 5 minutes to process (seconds)
  retryLimit: 3,                 // Retry 3 times
  retryDelay: 5000,              // Wait 5 seconds between retries (ms)
  
  // Dead Letter Queue
  dlqAfterMaxRetries: true,      // Move to DLQ after max retries
  
  // Priority
  priority: 5,                   // Higher = higher priority (0-10)
  
  // Delays & Buffers
  delayedProcessing: 60,         // Available after 60 seconds
  windowBuffer: 30,              // Hold messages for 30 seconds to batch
  
  // Retention
  retentionSeconds: 86400,       // Keep pending messages 24 hours
  completedRetentionSeconds: 3600, // Keep completed 1 hour
  ttl: 86400,                    // Message expires after 24 hours
  
  // Security
  encryptionEnabled: true        // Encrypt payloads at rest
}).create()

Consumer Configuration

javascript
await queen.queue('tasks')
  .group('workers')
  .concurrency(10)        // 10 parallel workers
  .batch(20)              // Fetch 20 at a time
  .autoAck(true)          // Auto-ack on success
  .renewLease(true, 5000) // Auto-renew every 5s
  .limit(1000)            // Process 1000 messages then stop
  .each()                 // Process individually (vs batch)
  .consume(async (message) => {
    await process(message.data)
  })

Graceful Shutdown

Always clean up properly!

Automatic Shutdown

Queen automatically handles SIGINT and SIGTERM:

javascript
const queen = new Queen('http://localhost:6632')

// Your app runs...

// User presses Ctrl+C:
// Queen automatically flushes buffers and closes cleanly!

Manual Shutdown

javascript
await queen.close()
console.log('Queen shut down cleanly')

With AbortController

javascript
const controller = new AbortController()

const consumerPromise = queen.queue('tasks')
  .consume(async (message) => {
    await processMessage(message.data)
  }, { signal: controller.signal })

// Later... stop consumer
controller.abort()

// Wait for consumer to finish current message
await consumerPromise

// Close Queen
await queen.close()

Error Handling

javascript
try {
  await queen.queue('tasks').push([...])
} catch (error) {
  if (error.code === 'DUPLICATE') {
    console.log('Message already exists')
  } else if (error.code === 'TIMEOUT') {
    console.log('Operation timed out')
  } else {
    console.error('Error:', error.message)
  }
}

Configuration Defaults

Client Defaults

javascript
{
  timeoutMillis: 30000,
  retryAttempts: 3,
  retryDelayMillis: 1000,
  loadBalancingStrategy: 'affinity',    // 'affinity', 'round-robin', or 'session'
  affinityHashRing: 150,                // Virtual nodes per server
  enableFailover: true
}

Queue Defaults

javascript
{
  leaseTime: 300,          // 5 minutes
  retryLimit: 3,
  priority: 0,
  delayedProcessing: 0,
  windowBuffer: 0,
  maxSize: 0,              // Unlimited
  retentionSeconds: 0,     // Keep forever
  encryptionEnabled: false
}

Consume Defaults

javascript
{
  concurrency: 1,
  batch: 1,
  autoAck: true,
  wait: true,              // Long polling
  timeoutMillis: 30000,
  limit: null,             // Run forever
  renewLease: false
}

Logging

Enable detailed logging for debugging:

bash
export QUEEN_CLIENT_LOG=true
node your-app.js

Example output:

[2025-10-28T10:30:45.123Z] [INFO] [Queen.constructor] {"status":"initialized","urls":1}
[2025-10-28T10:30:45.234Z] [INFO] [QueueBuilder.push] {"queue":"tasks","partition":"Default","count":5}

Best Practices

  1. Use affinity routing - Enable loadBalancingStrategy: 'affinity' for production to optimize poll intentions
  2. Use consume() for workers - Simpler, handles retries automatically
  3. Use pop() for control - When you need precise control over acking
  4. Buffer for speed - Always use buffering when pushing many messages
  5. Partitions for order - Use partitions when message order matters
  6. Consumer groups for scale - Run multiple workers in the same group
  7. Transactions for consistency - Use transactions when operations must be atomic
  8. Enable DLQ - Always enable DLQ in production to catch failures
  9. Renew long leases - Use auto-renewal for long-running tasks
  10. Graceful shutdown - Always call queen.close() before exiting
  11. Monitor DLQ - Regularly check your DLQ for failed messages

Complete Example

javascript
import { Queen } from 'queen-mq'

const queen = new Queen('http://localhost:6632')

// Setup queues
await queen.queue('raw-events').config({ priority: 5 }).create()
await queen.queue('processed-events').config({ priority: 10 }).create()

// Stage 1: Ingest with buffering
async function ingestEvents() {
  for (let i = 0; i < 10000; i++) {
    await queen.queue('raw-events')
      .partition(`user-${i % 100}`)
      .buffer({ messageCount: 500, timeMillis: 1000 })
      .push([{
        data: {
          userId: i % 100,
          event: 'page_view',
          timestamp: Date.now()
        }
      }])
  }
  await queen.flushAllBuffers()
}

// Stage 2: Process with transactions
async function processEvents() {
  await queen.queue('raw-events')
    .group('processors')
    .concurrency(5)
    .batch(10)
    .autoAck(false)
    .consume(async (messages) => {
      const processed = messages.map(m => ({
        userId: m.data.userId,
        processed: true,
        timestamp: Date.now()
      }))
      
      const txn = queen.transaction()
      for (const msg of messages) {
        txn.ack(msg)
      }
      txn.queue('processed-events').push(
        processed.map(p => ({ data: p }))
      )
      await txn.commit()
    })
    .onError(async (messages, error) => {
      console.error('Processing failed:', error)
      await queen.ack(messages, false)
    })
}

// Run pipeline
await ingestEvents()
await processEvents()

// Graceful shutdown
process.on('SIGINT', async () => {
  await queen.close()
  process.exit(0)
})

See Also

Built with ❤️ by Smartness