Consumer Groups
Consumer groups enable Kafka-style message distribution where multiple independent groups can process the same messages. This powerful feature allows you to use messages for different purposes simultaneously.
What are Consumer Groups?
A consumer group is an independent cursor that tracks its position in a queue. Multiple consumer groups can read from the same queue, each maintaining their own position.
// Group 1: Analytics
await queen.queue('events')
.group('analytics-team')
.consume(async (message) => {
await updateAnalytics(message.data)
})
// Group 2: Notifications
await queen.queue('events')
.group('notifications-team')
.consume(async (message) => {
await sendNotification(message.data)
})
// Both groups process ALL messages independently!Queue Mode vs Consumer Groups
Queue Mode (Default)
Without a consumer group, messages are distributed across consumers (traditional queue behavior):
// Queue mode - messages distributed
await queen.queue('tasks').consume(async (message) => {
// Each message processed by ONE consumer only
})Queue: "tasks"
├── Message 1 → Consumer A
├── Message 2 → Consumer B
├── Message 3 → Consumer A
└── Message 4 → Consumer B
Result: Work distributed, each message processed onceConsumer Group Mode
With consumer groups, ALL messages are delivered to each group:
// Consumer group mode
await queen.queue('events')
.group('group-1')
.consume(async (message) => {
// Process for group 1
})
await queen.queue('events')
.group('group-2')
.consume(async (message) => {
// Process for group 2
})Queue: "events"
├── Message 1 → Group 1 Consumer A
│ → Group 2 Consumer X
├── Message 2 → Group 1 Consumer B
│ → Group 2 Consumer Y
├── Message 3 → Group 1 Consumer A
│ → Group 2 Consumer X
Result: Each group processes ALL messagesCreating Consumer Groups
Consumer groups are created automatically when first used:
// First consumer in the group creates it
await queen.queue('orders')
.group('order-processor')
.subscriptionMode('new') // Start from new messages only
.consume(async (message) => {
await processOrder(message.data)
})Subscription Modes
Control where a consumer group starts reading when first created. Subscription modes only apply to new consumer groups - existing groups continue from their saved position.
Default Behavior
Server Default: Process all messages including historical ones
You can change the server default:
# Make all new consumer groups skip historical messages by default
export DEFAULT_SUBSCRIPTION_MODE="new"
./bin/queen-serverThis is useful for real-time systems where only new messages matter, or to prevent accidental processing of large backlogs.
1. New Messages Only
await queen.queue('events')
.group('realtime-alerts')
.subscriptionMode('new') // Skip historical messages
.consume(async (message) => {
// Processes messages from subscription time (with lookback)
})Use when: Real-time monitoring, alerts, or notifications where historical data isn't relevant.
Aliases: .subscriptionMode('new-only') or .subscriptionFrom('now')
How NEW Mode Works
NEW mode tracks when the consumer group first subscribes and uses that timestamp consistently:
// Timeline example:
10:00:00 - Client makes first pop() call
→ Server records: subscription_timestamp = 10:00:00
→ Consumer processes partition P1
10:10:00 - New partition P2 is created, messages arrive
10:15:00 - Consumer discovers P2 via pop()
→ Server looks up: subscription_timestamp = 10:00:00 (original time)
→ Cursor set to: 10:00:00
→ Messages from 10:10:00 are captured ✓Why it works:
- Queen stores subscription metadata in a dedicated table (
consumer_groups_metadata) - The subscription timestamp is set on the first pop request (
NOW()) - All subsequent partitions/queues discovered use the same original timestamp
- Ensures consistent "NEW" behavior across the entire consumer group
Benefits:
- ✅ Consistent: All partitions use the same subscription time
- ✅ No skipping: New partitions discovered later work correctly
- ✅ True NEW semantics: Only messages arriving after first pop
- ✅ Works with wildcards: Namespace/task filters maintain subscription time
- ✅ No artificial lookback: Clean, precise behavior
2. All Messages (Default)
await queen.queue('events')
.group('analytics')
// No subscriptionMode = process ALL messages
.consume(async (message) => {
// Processes ALL messages from the beginning
})Use when: Analytics, backfilling data, or replaying historical events.
Note: If server has DEFAULT_SUBSCRIPTION_MODE="new" set, you can explicitly request all messages:
.subscriptionMode('all') // Force process all messages even if server default is "new"3. From Timestamp
await queen.queue('events')
.group('recovery-processor')
.subscriptionFrom('2025-01-01T00:00:00.000Z')
.consume(async (message) => {
// Processes messages from Jan 1, 2025 onwards
})Use when: You need to start from a specific point in time for debugging or recovery.
Consumer Group Patterns
Pattern 1: Multi-Purpose Processing
Process the same events for different purposes:
// Purpose 1: Real-time analytics (process all historical data)
await queen.queue('user-events')
.group('analytics')
// No subscriptionMode = process from beginning
.consume(async (event) => {
await metrics.track(event.data.userId, event.data.action)
})
// Purpose 2: Notification system (only new events)
await queen.queue('user-events')
.group('notifications')
.subscriptionMode('new') // Only new events
.consume(async (event) => {
if (event.data.action === 'purchase') {
await sendPurchaseEmail(event.data.userId)
}
})
// Purpose 3: Audit log (process all historical data)
await queen.queue('user-events')
.group('audit')
// No subscriptionMode = process from beginning
.consume(async (event) => {
await auditLog.record(event.data)
})Pattern 2: A/B Testing
Run old and new implementations in parallel:
// Current production implementation
await queen.queue('orders')
.group('order-processor-v1')
.consume(async (order) => {
await processOrderV1(order.data)
})
// New implementation being tested
await queen.queue('orders')
.group('order-processor-v2')
.consume(async (order) => {
await processOrderV2(order.data)
// Compare results, measure performance
})
// Both process the same orders!Pattern 3: Development/Testing
Create separate consumer groups for different environments:
// Production consumer group
if (process.env.NODE_ENV === 'production') {
await queen.queue('tasks')
.group('prod-workers')
.consume(async (task) => {
await processTask(task.data)
})
}
// Development consumer group (doesn't affect production)
if (process.env.NODE_ENV === 'development') {
await queen.queue('tasks')
.group('dev-workers')
// No subscriptionMode = process all messages for testing
.consume(async (task) => {
await testProcessTask(task.data)
})
}Pattern 4: Fan-Out Processing
One source, multiple destinations:
// Single producer
await queen.queue('events').push([
{ data: { type: 'user_signup', userId: 123 } }
])
// Multiple consumers, each doing different work
const consumers = [
{
group: 'email-service',
handler: async (event) => {
await sendWelcomeEmail(event.data.userId)
}
},
{
group: 'crm-sync',
handler: async (event) => {
await syncToCRM(event.data)
}
},
{
group: 'analytics',
handler: async (event) => {
await trackSignup(event.data.userId)
}
},
{
group: 'webhook-service',
handler: async (event) => {
await triggerWebhooks(event.data)
}
}
]
// Start all consumers
for (const consumer of consumers) {
queen.queue('events')
.group(consumer.group)
.consume(consumer.handler)
}Scaling Consumer Groups
Single Consumer per Group
// One consumer in the group
await queen.queue('events')
.group('processor')
.concurrency(1)
.consume(async (message) => {
// Process messages sequentially
})Multiple Consumers per Group
// Multiple consumers in the same group
// They cooperate to process partitions
// Consumer 1
await queen.queue('events')
.group('processor')
.concurrency(5)
.consume(async (message) => {
// Handles some partitions
})
// Consumer 2 (different process/machine)
await queen.queue('events')
.group('processor')
.concurrency(5)
.consume(async (message) => {
// Handles other partitions
})Partition Distribution:
Queue: "events"
├── Partition A → Group "processor" → Consumer 1
├── Partition B → Group "processor" → Consumer 2
├── Partition C → Group "processor" → Consumer 1
└── Partition D → Group "processor" → Consumer 2Managing Consumer Groups
List Consumer Groups
const groups = await queen.listConsumerGroups('events')
console.log(groups)
// [
// { name: 'analytics', position: 1250 },
// { name: 'notifications', position: 1248 },
// { name: 'audit', position: 1250 }
// ]Get Consumer Group Info
const info = await queen.getConsumerGroupInfo('events', 'analytics')
console.log(info)
// {
// group: 'analytics',
// queue: 'events',
// position: 1250,
// lag: 5, // Messages behind
// partitions: [
// { partition: 'A', position: 100, lag: 2 },
// { partition: 'B', position: 200, lag: 3 }
// ]
// }Reset Consumer Group Position
// Reset to beginning
await queen.resetConsumerGroup('events', 'analytics', 'beginning')
// Reset to specific timestamp
await queen.resetConsumerGroup(
'events',
'analytics',
'timestamp',
new Date('2025-01-01')
)
// Reset to end (skip all pending)
await queen.resetConsumerGroup('events', 'analytics', 'end')Delete Consumer Group
// Remove consumer group entirely
await queen.deleteConsumerGroup('events', 'old-processor')Consumer Group Lag
Lag indicates how far behind a consumer group is:
const lag = await queen.getConsumerGroupLag('events', 'analytics')
console.log(lag)
// {
// group: 'analytics',
// totalLag: 1250, // Total messages behind
// partitions: [
// { partition: 'A', lag: 500 },
// { partition: 'B', lag: 750 }
// ]
// }
// Alert if lag is too high
if (lag.totalLag > 10000) {
console.warn('Consumer group falling behind!')
// Scale up consumers or investigate slow processing
}Advanced Patterns
Pattern: Data Migration
Use consumer groups to migrate data:
// Create migration consumer group
await queen.queue('user-data')
.group('migration-to-new-system')
// No subscriptionMode = process all historical data
.batch(100) // Process in batches
.consume(async (messages) => {
// Migrate data to new system
for (const msg of messages) {
await newSystem.import(msg.data)
}
// Track progress
const lag = await queen.getConsumerGroupLag('user-data', 'migration-to-new-system')
console.log(`Migration progress: ${lag.totalLag} remaining`)
})Pattern: Time-Travel Debugging
Replay messages for debugging:
// Create debug consumer group
const problemStartTime = '2025-10-28T14:30:00.000Z'
await queen.queue('transactions')
.group('debug-session-' + Date.now())
.subscriptionFrom(problemStartTime) // Start from specific timestamp
.consume(async (message) => {
// Replay and debug problematic messages
console.log('Replaying:', message.data)
try {
await processTransaction(message.data)
} catch (error) {
console.error('Found the bug:', error)
process.exit(0) // Stop when found
}
})Pattern: Aggregate Views
Build multiple aggregate views from same source:
// Aggregate by customer
await queen.queue('orders')
.group('customer-aggregates')
// No subscriptionMode = process all historical data
.consume(async (order) => {
await updateCustomerStats(order.data.customerId, order.data)
})
// Aggregate by product
await queen.queue('orders')
.group('product-aggregates')
// No subscriptionMode = process all historical data
.consume(async (order) => {
for (const item of order.data.items) {
await updateProductStats(item.productId, item)
}
})
// Aggregate by region
await queen.queue('orders')
.group('region-aggregates')
// No subscriptionMode = process all historical data
.consume(async (order) => {
await updateRegionStats(order.data.region, order.data)
})Best Practices
1. Name Consumer Groups Descriptively
// ✅ Good: Describes purpose
.group('analytics-daily-reports')
.group('email-notification-service')
.group('crm-sync-v2')
// ❌ Bad: Unclear purpose
.group('consumer1')
.group('test')
.group('new')2. Choose Appropriate Subscription Mode
// ✅ Good: Real-time features only need new messages
.group('realtime-notifications').subscriptionMode('new')
// ✅ Good: Analytics needs all historical data
.group('analytics-backfill')
// No subscriptionMode = process all messages
// ✅ Good: Recover from specific incident
.group('recovery').subscriptionFrom(incidentTime)
// ✅ Good: Override server default if needed
.group('explicit-all').subscriptionMode('all') // Even if server default is "new"Server Default Configuration:
Consider setting a server-wide default to match your use case:
# Real-time system: Only new messages by default
export DEFAULT_SUBSCRIPTION_MODE="new"
./bin/queen-server
# Analytics system: All messages by default (this is already the default)
./bin/queen-server3. Monitor Consumer Group Lag
// Set up monitoring
setInterval(async () => {
const groups = await queen.listConsumerGroups('orders')
for (const group of groups) {
const lag = await queen.getConsumerGroupLag('orders', group.name)
if (lag.totalLag > THRESHOLD) {
await alertTeam(`Group ${group.name} lag: ${lag.totalLag}`)
}
}
}, 60000) // Check every minute4. Clean Up Old Consumer Groups
// Remove unused consumer groups
const groups = await queen.listConsumerGroups('events')
for (const group of groups) {
if (group.name.includes('test-') || group.name.includes('debug-')) {
const lastUsed = await getLastUsedTime(group.name)
if (Date.now() - lastUsed > 7 * 24 * 60 * 60 * 1000) { // 7 days
await queen.deleteConsumerGroup('events', group.name)
console.log(`Deleted old group: ${group.name}`)
}
}
}Comparison with Other Systems
| Feature | Queen | Kafka | RabbitMQ | NATS |
|---|---|---|---|---|
| Consumer Groups | ✅ Native | ✅ Native | ⚠️ Pattern | ✅ Queue Groups |
| Replay from Beginning | ✅ Yes | ✅ Yes | ❌ No | ⚠️ Limited |
| Replay from Timestamp | ✅ Yes | ❌ Offset only | ❌ No | ❌ No |
| Independent Cursors | ✅ Yes | ✅ Yes | ❌ No | ✅ Yes |
| Dynamic Groups | ✅ Yes | ✅ Yes | ⚠️ Manual | ✅ Yes |
Related Topics
- Queues & Partitions - Understanding FIFO ordering
- Transactions - Atomic operations
- Long Polling - Efficient message waiting
- JavaScript Client - Complete API reference
Summary
Consumer groups in Queen MQ provide:
- Multiple Purposes: Process same messages for different reasons
- Independent Cursors: Each group tracks its own position
- Replay Capability: Start from beginning, timestamp, or end
- Scalability: Add consumers to a group for parallelism
- Flexibility: Perfect for analytics, testing, and fan-out patterns
Master consumer groups to unlock the full power of Queen MQ! 🚀
