Stream Processing
Queen provides powerful stream processing capabilities for real-time windowed aggregations across multiple queues. Streams collect messages from one or more source queues into time-based windows, allowing you to process related events together.
Core Concepts
What is a Stream?
A stream is a continuous flow of messages from multiple queues that are grouped into time-based windows. Instead of consuming individual messages, you process entire windows of messages that arrived within a specific time period.
Key Benefits:
- Temporal correlation: Process all messages that arrived within the same time window
- Multi-queue aggregation: Combine messages from multiple related queues
- Partitioned processing: Group messages by key for isolated processing
- Automatic windowing: Server handles window management and delivery
Window Types
Tumbling Windows
Fixed-size, non-overlapping windows. When a window ends, a new one begins immediately.
await queen.stream('my-stream', 'my-namespace')
.sources(['queue1', 'queue2'])
.tumblingTime(5) // 5-second windows
.define();Example timeline:
Window 1: [0s - 5s) -> Process at 5s
Window 2: [5s - 10s) -> Process at 10s
Window 3: [10s - 15s) -> Process at 15sGrace Period
Allows late-arriving messages to be included in closed windows:
await queen.stream('my-stream', 'my-namespace')
.sources(['queue1'])
.tumblingTime(5)
.gracePeriod(1) // Wait 1 extra second for late messages
.define();Timeline with grace period:
Window: [0s - 5s)
Closes at: 5s
Grace period: 5s - 6s (accepts late messages)
Delivered at: 6sBasic Stream Example
Simple stream combining messages from multiple queues:
import { Queen } from 'queen-mq';
const queen = new Queen('http://localhost:6632');
// Create source queues
await queen.queue('events').namespace('analytics').task('stream').create();
await queen.queue('clicks').namespace('analytics').task('stream').create();
// Define stream with 5-second tumbling windows
await queen.stream('event-stream', 'analytics')
.sources(['events', 'clicks'])
.tumblingTime(5)
.gracePeriod(1)
.define();
// Consume the stream
const consumer = queen.consumer('event-stream', 'analytics-consumer');
await consumer.process(async (window) => {
console.log(`Window: ${window.start} - ${window.end}`);
console.log(`Total messages: ${window.allMessages.length}`);
// Process all messages in the window
for (const msg of window.allMessages) {
console.log(msg.data);
}
});Partitioned Streaming
Partitioned streams ensure that messages with the same partition key are processed together in the same window. This is crucial for maintaining consistency when processing related events.
Use Cases for Partitioned Streams
- Per-user analytics: Group all events for a specific user
- Session tracking: Process all events in a user session together
- Chat/conversation processing: Handle all messages in a conversation
- Transaction correlation: Group related transaction events
Complete Example: Chat Message Processing
This example demonstrates a pipeline where translation requests are processed and forwarded to an agent queue, with both being aggregated by chat ID:
import { Queen } from 'queen-mq';
const queen = new Queen({ url: 'http://localhost:6632' });
// Setup queues
await queen.queue('chat-translations').delete();
await queen.queue('chat-agent').delete();
await queen.queue('chat-translations').namespace('prod').task('stream').create();
await queen.queue('chat-agent').namespace('prod').task('stream').create();
// Define partitioned stream
await queen.stream('chat-stream', 'prod')
.sources(['chat-translations', 'chat-agent'])
.partitioned() // Enable partition-aware processing
.tumblingTime(5) // 5-second windows
.gracePeriod(1) // 1-second grace period
.define();
// Producer: Generate translation requests
let chatId = 0;
const producer = async () => {
while (true) {
chatId++;
await queen
.queue('chat-translations')
.partition(chatId.toString()) // Set partition key
.push({
data: {
kind: 'translation',
text: 'Hello world',
timestamp: new Date().toISOString(),
chatId: chatId
}
});
await new Promise(resolve => setTimeout(resolve, 100));
}
};
producer();
// Pipeline: Transform translations to agent messages
const transformer = async () => {
await queen
.queue('chat-translations')
.autoAck(false)
.batch(1)
.each()
.consume(async (message) => {
const agentMessage = {
...message.data,
kind: 'agent',
processedAt: new Date().toISOString()
};
// Transactionally ack and push to next queue
await queen
.transaction()
.ack(message)
.queue('chat-agent')
.partition(message.data.chatId.toString()) // Preserve partition key!
.push([{ data: agentMessage }])
.commit();
});
};
transformer();
// Stream consumer: Process windowed messages
const consumer = queen.consumer('chat-stream', 'chat-analytics');
await consumer.process(async (window) => {
console.log(`\n=== Window ${window.id} ===`);
console.log(`Time: ${window.start} - ${window.end}`);
console.log(`Total messages: ${window.allMessages.length}`);
// Group messages by chat ID
const byChatId = window.groupBy('data.chatId');
// Process each chat's messages together
for (const [chatId, messages] of Object.entries(byChatId)) {
console.log(`\nChat ${chatId}:`);
const translations = messages.filter(m => m.data.kind === 'translation');
const agentReplies = messages.filter(m => m.data.kind === 'agent');
console.log(` - ${translations.length} translations`);
console.log(` - ${agentReplies.length} agent replies`);
// Example: Calculate processing latency
if (translations.length > 0 && agentReplies.length > 0) {
const firstTranslation = new Date(translations[0].data.timestamp);
const lastAgent = new Date(agentReplies[agentReplies.length - 1].data.processedAt);
const latency = lastAgent - firstTranslation;
console.log(` - Processing latency: ${latency}ms`);
}
}
});Why Preserve Partition Keys?
When using partitioned streams, always preserve the partition key through your pipeline:
// ✅ CORRECT: Same partition key throughout
await queen.queue('source').partition('user-123').push(msg);
await queen.queue('destination').partition('user-123').push(transformed);
// ❌ WRONG: Different partition keys break correlation
await queen.queue('source').partition('user-123').push(msg);
await queen.queue('destination').partition('other-key').push(transformed);Window Operations
The window object provided to your consumer includes powerful methods for processing messages:
await consumer.process(async (window) => {
// Window metadata
console.log(window.id); // Unique window identifier
console.log(window.start); // Window start time (ISO 8601)
console.log(window.end); // Window end time (ISO 8601)
// All messages in the window
console.log(window.allMessages.length);
// Group by any field path
const byUserId = window.groupBy('data.userId');
const byEventType = window.groupBy('data.event.type');
const byQueue = window.groupBy('queue_name');
// Process grouped messages
for (const [key, messages] of Object.entries(byUserId)) {
console.log(`User ${key}: ${messages.length} events`);
}
});groupBy Examples
// Group by nested field
const grouped = window.groupBy('data.user.country');
// Result: { 'US': [...], 'UK': [...], 'DE': [...] }
// Group by source queue
const byQueue = window.groupBy('queue_name');
// Result: { 'events': [...], 'clicks': [...] }
// Group by message priority
const byPriority = window.groupBy('priority');
// Result: { '0': [...], '5': [...], '10': [...] }Stream Configuration
Define a Stream
await queen.stream('stream-name', 'namespace')
.sources(['queue1', 'queue2', 'queue3']) // 1 or more source queues
.partitioned() // Optional: enable partitioning
.tumblingTime(5) // Window size in seconds
.gracePeriod(2) // Optional: grace period in seconds
.define();Stream Lifecycle
// Define a stream
await queen.stream('my-stream', 'analytics').sources(['events']).tumblingTime(5).define();
// Get stream info
const info = await queen.stream('my-stream', 'analytics').info();
console.log(info);
// Delete a stream (also deletes the underlying queue)
await queen.stream('my-stream', 'analytics').delete();Consumer Groups for Streams
Streams support consumer groups for horizontal scaling:
// Multiple consumers in the same group
const consumer1 = queen.consumer('my-stream', 'analytics-group');
const consumer2 = queen.consumer('my-stream', 'analytics-group');
const consumer3 = queen.consumer('my-stream', 'analytics-group');
// Each window is delivered to only ONE consumer in the group
await Promise.all([
consumer1.process(processWindow),
consumer2.process(processWindow),
consumer3.process(processWindow)
]);Benefits:
- Horizontal scaling: Add more consumers to process windows faster
- Load distribution: Windows are automatically distributed across consumers
- Fault tolerance: If one consumer fails, others continue processing
Performance Tuning
Server-Side Configuration
Stream processing performance can be tuned via environment variables on the Queen server.
Worker Configuration
Control how many stream poll workers run and how they check for windows:
# Number of dedicated stream poll worker threads
# Scale based on number of active stream consumers
STREAM_POLL_WORKER_COUNT=2 # Default: 2
# Low load (<10 consumers): 2 workers
# Medium load (10-50): 4 workers
# High load (50+): 8 workers
# How often workers check for new consumer requests (in-memory check, very cheap)
STREAM_POLL_WORKER_INTERVAL=100 # Default: 100ms
# Maximum concurrent window checks per stream worker
STREAM_CONCURRENT_CHECKS=10 # Default: 10
# Low load: 10 concurrent checks
# Medium load: 15 concurrent checks
# High load: 20 concurrent checksThread Pool Sizing: The database thread pool needs to accommodate stream workers:
DB ThreadPool Size = P + S + (S × C) + T
Where:
P = POLL_WORKER_COUNT (regular queue poll workers)
S = STREAM_POLL_WORKER_COUNT
C = STREAM_CONCURRENT_CHECKS
T = DB_THREAD_POOL_SERVICE_THREADS (background operations)
Example: 2 + 2 + (2 × 10) + 5 = 29 threadsBackoff Configuration
Control how aggressively the server checks for completed windows:
# Minimum time between stream checks per consumer group
STREAM_POLL_INTERVAL=1000 # Default: 1000ms (1 second)
# Lower = more responsive, higher DB load
# Higher = less DB load, higher latency
# Number of consecutive empty checks before backoff starts
STREAM_BACKOFF_THRESHOLD=5 # Default: 5
# Lower = backoff sooner (reduce load)
# Higher = stay aggressive longer
# Exponential backoff multiplier
STREAM_BACKOFF_MULTIPLIER=2.0 # Default: 2.0
# Interval *= multiplier after each empty check
# Maximum poll interval after backoff
STREAM_MAX_POLL_INTERVAL=5000 # Default: 5000ms (5 seconds)
# Caps the exponential backoff growthBackoff Example: With defaults (1000ms initial, threshold 5, multiplier 2.0):
Check 1-5: 1000ms (aggressive initial checks)
Check 6: 2000ms (backoff threshold reached)
Check 7: 4000ms
Check 8+: 5000ms (capped at max)
Messages arrive: Reset to 1000ms immediatelyTuning Recommendations
Low-Latency Streams (Real-time Analytics)
For sub-second response times:
STREAM_POLL_WORKER_COUNT=4
STREAM_POLL_WORKER_INTERVAL=50 # Check more frequently
STREAM_POLL_INTERVAL=500 # Aggressive window checks
STREAM_BACKOFF_THRESHOLD=10 # Stay aggressive longer
STREAM_MAX_POLL_INTERVAL=2000 # Lower backoff cap
STREAM_CONCURRENT_CHECKS=15Best for: Real-time dashboards, live monitoring, immediate alerts
High-Throughput Streams (Heavy Load)
For processing many windows with many consumers:
STREAM_POLL_WORKER_COUNT=8 # More workers
STREAM_POLL_WORKER_INTERVAL=100
STREAM_POLL_INTERVAL=1000
STREAM_BACKOFF_THRESHOLD=5
STREAM_MAX_POLL_INTERVAL=5000
STREAM_CONCURRENT_CHECKS=20 # More concurrent checksBest for: Batch analytics, high-volume event processing, many concurrent streams
Low-Load / Development
For minimal resource usage during development or low traffic:
STREAM_POLL_WORKER_COUNT=1 # Single worker
STREAM_POLL_WORKER_INTERVAL=200 # Less frequent checks
STREAM_POLL_INTERVAL=2000 # Relaxed polling
STREAM_BACKOFF_THRESHOLD=3 # Quick backoff
STREAM_MAX_POLL_INTERVAL=10000 # Higher backoff cap
STREAM_CONCURRENT_CHECKS=5 # Fewer concurrent checksBest for: Development, testing, low-traffic production
Client-Side Best Practices
Use appropriate window sizes
- Too small: High overhead, many small windows
- Too large: Higher latency, larger memory footprint
- Sweet spot: 5-30 seconds for most use cases
Set reasonable grace periods
- Typically 10-20% of window size
- Balance between completeness and latency
- Example: 5s window + 1s grace = 6s total latency
Process windows efficiently
- Minimize processing time in your consumer
- Use async operations where possible
- Consider batching database writes
Monitor window sizes
javascriptawait consumer.process(async (window) => { if (window.allMessages.length > 10000) { console.warn('Large window detected, consider smaller window size'); } });
Advanced Patterns
Multi-Stage Streaming Pipeline
Create complex event processing pipelines:
// Stage 1: Raw events
await queen.stream('raw-events', 'pipeline')
.sources(['clicks', 'views', 'conversions'])
.tumblingTime(10)
.define();
// Stage 2: Enriched events (consume from Stage 1, push to enriched queue)
const enrichConsumer = queen.consumer('raw-events', 'enricher');
await enrichConsumer.process(async (window) => {
for (const msg of window.allMessages) {
const enriched = await enrichEvent(msg.data);
await queen.queue('enriched-events').push({ data: enriched });
}
});
// Stage 3: Final analytics
await queen.stream('analytics', 'pipeline')
.sources(['enriched-events'])
.tumblingTime(60) // Longer window for aggregation
.define();
const analyticsConsumer = queen.consumer('analytics', 'aggregator');
await analyticsConsumer.process(async (window) => {
const stats = calculateStats(window.allMessages);
await saveToDatabase(stats);
});Time-Series Aggregation
Rolling metrics calculation:
await queen.stream('metrics-5s', 'monitoring')
.sources(['app-metrics'])
.tumblingTime(5)
.define();
const consumer = queen.consumer('metrics-5s', 'aggregator');
await consumer.process(async (window) => {
const metrics = window.allMessages.map(m => m.data.value);
const stats = {
timestamp: window.start,
count: metrics.length,
sum: metrics.reduce((a, b) => a + b, 0),
avg: metrics.reduce((a, b) => a + b, 0) / metrics.length,
min: Math.min(...metrics),
max: Math.max(...metrics),
p50: percentile(metrics, 50),
p95: percentile(metrics, 95),
p99: percentile(metrics, 99)
};
await saveMetrics(stats);
});Session Windows (Custom Implementation)
While Queen provides tumbling windows, you can implement session-like windows using partitioning:
await queen.stream('user-sessions', 'analytics')
.sources(['user-events'])
.partitioned() // Group by user
.tumblingTime(30) // 30-second base window
.gracePeriod(5)
.define();
const consumer = queen.consumer('user-sessions', 'session-analyzer');
await consumer.process(async (window) => {
const byUser = window.groupBy('data.userId');
for (const [userId, events] of Object.entries(byUser)) {
// Analyze user's session within this window
const session = {
userId,
events: events.length,
duration: calculateDuration(events),
pages: new Set(events.map(e => e.data.page)).size
};
await saveSession(session);
}
});Troubleshooting
Windows Not Arriving
Symptoms: Consumer never receives windows
Checks:
- Verify source queues exist and have messages
- Check window size - may need to wait for window to complete
- Verify consumer group name matches
- Check server logs for errors
- Ensure
STREAM_POLL_WORKER_COUNT > 0
High Latency
Symptoms: Windows arrive much later than expected
Solutions:
- Reduce
STREAM_POLL_INTERVALon server - Increase
STREAM_POLL_WORKER_COUNT - Reduce
STREAM_CONCURRENT_CHECKSif DB is overloaded - Check if grace period is too long
- Monitor database performance
High Database Load
Symptoms: Database CPU/IO is very high
Solutions:
- Increase
STREAM_POLL_INTERVAL(check less frequently) - Increase
STREAM_BACKOFF_THRESHOLD(backoff sooner) - Reduce
STREAM_CONCURRENT_CHECKS - Reduce
STREAM_POLL_WORKER_COUNT - Use longer tumbling windows
Memory Issues
Symptoms: Server memory usage grows over time
Solutions:
- Reduce window size (smaller
tumblingTime) - Process windows faster in consumer
- Reduce number of source queues per stream
- Check for unprocessed windows piling up
More Examples
See the examples directory for complete working examples:
18-streaming.js- Basic non-partitioned streaming19-streaming-partitioned.js- Partitioned streaming with pipeline16-streaming.js- Additional streaming patterns
