Python Client Guide
Complete guide for the Queen MQ Python client library.
Python client is available on PyPi, here: https://pypi.org/project/queen-mq/
Installation
pip install queen-mqRequirements: Python 3.8+
Table of Contents
Getting Started
Import and Connect
import asyncio
from queen import Queen
async def main():
# Single server
queen = Queen('http://localhost:6632')
# Multiple servers (high availability)
queen = Queen([
'http://server1:6632',
'http://server2:6632'
])
# Full configuration
queen = Queen({
'urls': ['http://server1:6632', 'http://server2:6632', 'http://server3:6632'],
'timeout_millis': 30000,
'retry_attempts': 3,
'load_balancing_strategy': 'affinity', # 'affinity', 'round-robin', or 'session'
'affinity_hash_ring': 128, # Virtual nodes per server (for affinity)
'enable_failover': True
})
# With proxy authentication (bearer token)
queen = Queen(
url='https://proxy.example.com:3000',
bearer_token=os.environ.get('QUEEN_TOKEN') # Token from create-user script
)
# Recommended: Use async context manager for automatic cleanup
async with Queen('http://localhost:6632') as queen:
# Your code here
pass
asyncio.run(main())Proxy Authentication
When connecting through the Queen proxy (which provides authentication, SSL termination, etc.), you need to provide a bearer token:
import os
from queen import Queen
queen = Queen(
url='https://queen-proxy.example.com',
bearer_token=os.environ.get('QUEEN_TOKEN')
)Getting a token: Use the proxy's create-user.js script to generate tokens for microservices. See Proxy Setup for details.
Environment Variables
Store tokens in environment variables, never hardcode them:
export QUEEN_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."import os
from queen import Queen
queen = Queen(
url=os.environ.get('QUEEN_PROXY_URL'),
bearer_token=os.environ.get('QUEEN_TOKEN')
)Direct Connection
When connecting directly to the Queen server (without the proxy), no bearer_token is needed:
# Direct connection - no auth required
queen = Queen('http://queen-server:6632')Load Balancing & Affinity Routing
When connecting to multiple Queen servers, you can choose how requests are distributed. The client supports three load balancing strategies.
Affinity Mode (Recommended for Production)
Best for: Production deployments with 3+ servers and multiple consumer groups.
Uses consistent hashing with virtual nodes to route consumer groups to the same backend server. This optimizes database queries by consolidating poll intentions.
queen = Queen({
'urls': ['http://server1:6632', 'http://server2:6632', 'http://server3:6632'],
'load_balancing_strategy': 'affinity',
'affinity_hash_ring': 128 # Virtual nodes per server (default: 128)
})Benefits:
- ✅ Same consumer group always routes to same server
- ✅ Poll intentions consolidated → optimized DB queries
- ✅ Graceful failover (only ~33% of keys move if server fails)
- ✅ Works great with 3-server HA setup
How it works:
- Each consumer group generates an affinity key:
queue:partition:consumerGroup - Key is hashed (FNV-1a) and mapped to a virtual node on the ring
- Virtual node maps to a real backend server
- Same key always routes to same server
Round-Robin Mode
Cycles through servers in order. Simple but doesn't optimize for poll intention consolidation.
queen = Queen({
'urls': ['http://server1:6632', 'http://server2:6632'],
'load_balancing_strategy': 'round-robin'
})Session Mode
Sticky sessions - each client instance sticks to one server.
queen = Queen({
'urls': ['http://server1:6632', 'http://server2:6632'],
'load_balancing_strategy': 'session'
})Quick Start Examples
Basic Push and Consume
import asyncio
from queen import Queen
async def main():
async with Queen('http://localhost:6632') as queen:
# Create a queue
await queen.queue('tasks').create()
# Push messages
await queen.queue('tasks').push([
{'data': {'task': 'send-email', 'to': 'alice@example.com'}}
])
# Consume messages
async def handler(message):
print('Processing:', message['data'])
# Auto-ack on success, auto-retry on error
await queen.queue('tasks').consume(handler)
asyncio.run(main())With Partitions
# Push to specific partition
await queen.queue('user-events').partition('user-123').push([
{'data': {'event': 'login', 'timestamp': '2025-11-22T10:00:00Z'}}
])
# Consume from specific partition
async def handler(message):
print('User 123 event:', message['data'])
await queen.queue('user-events').partition('user-123').consume(handler)Consumer Groups
# Worker 1 in group "processors"
async def worker1():
await queen.queue('emails').group('processors').consume(async def handler(msg):
print('Worker 1 processing:', msg['data'])
)
# Worker 2 in the SAME group (shares the load)
async def worker2():
await queen.queue('emails').group('processors').consume(async def handler(msg):
print('Worker 2 processing:', msg['data'])
)
# Run both workers concurrently
await asyncio.gather(worker1(), worker2())Queue Operations
Creating Queues
# Simple queue
await queen.queue('my-tasks').create()
# With configuration
await queen.queue('orders').config({
'leaseTime': 300, # 5 minutes
'retryLimit': 3,
'dlqAfterMaxRetries': True,
'encryptionEnabled': False
}).create()Deleting Queues
await queen.queue('my-tasks').delete()Queue Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
leaseTime | int | 300 | Lease duration in seconds |
retryLimit | int | 3 | Max retry attempts |
delayedProcessing | int | 0 | Delay before messages become available (seconds) |
windowBuffer | int | 0 | Server-side batching window (seconds) |
maxSize | int | 0 | Max queue size (0 = unlimited) |
retentionSeconds | int | 0 | Message retention time (0 = forever) |
completedRetentionSeconds | int | 0 | Completed message retention |
encryptionEnabled | bool | False | Enable payload encryption |
dlqAfterMaxRetries | bool | False | Move to DLQ after max retries |
Pushing Messages
Basic Push
# Single message
await queen.queue('tasks').push([
{'data': {'job': 'resize-image', 'imageId': 123}}
])
# Multiple messages
await queen.queue('tasks').push([
{'data': {'job': 'send-email', 'to': 'alice@example.com'}},
{'data': {'job': 'send-email', 'to': 'bob@example.com'}},
{'data': {'job': 'resize-image', 'id': 123}}
])With Partitions
await queen.queue('user-events').partition('user-123').push([
{'data': {'event': 'login'}},
{'data': {'event': 'view-page'}},
{'data': {'event': 'logout'}}
])With Custom Transaction IDs (Exactly-Once)
await queen.queue('orders').push([
{'transactionId': 'order-12345', 'data': {'orderId': 12345, 'amount': 99.99}}
])Client-Side Buffering (High Performance)
For high-throughput scenarios, buffer messages client-side before sending:
# Buffer up to 500 messages OR 1 second (whichever comes first)
for i in range(10000):
await queen.queue('events').buffer({
'message_count': 500,
'time_millis': 1000
}).push([
{'data': {'id': i, 'timestamp': time.time()}}
])
# Flush remaining buffered messages
await queen.flush_all_buffers()
# Result: 10x-100x faster than individual pushesPush with Callbacks
await (queen.queue('tasks')
.push([{'data': {'value': 1}}])
.on_success(async def on_success(messages):
print('Push successful!')
)
.on_error(async def on_error(messages, error):
print(f'Push failed: {error}')
)
.on_duplicate(async def on_duplicate(messages, error):
print('Duplicate transaction IDs detected')
))Consuming Messages
Pop vs Consume
Pop - Manual control, one-shot retrieval:
messages = await queen.queue('tasks').batch(10).pop()
for message in messages:
try:
await process_message(message['data'])
await queen.ack(message, True)
except Exception as e:
await queen.ack(message, False)Consume - Long-running workers, automatic retry:
# Runs forever, processing messages as they arrive
async def handler(message):
await process_task(message['data'])
# Auto-ack on success, auto-retry on error
await queen.queue('tasks').consume(handler)Handler Signatures
Important: The handler signature depends on batch size:
# batch=1 (default): handler receives single message
async def handler(message):
print(message['data'])
await queen.queue('tasks').consume(handler)
# batch>1: handler receives array of messages
async def batch_handler(messages):
for message in messages:
print(message['data'])
await queen.queue('tasks').batch(10).consume(batch_handler)
# each=True: always receives single messages
async def each_handler(message):
print(message['data'])
await queen.queue('tasks').batch(10).each().consume(each_handler)Consume Options
await (queen.queue('tasks')
.concurrency(5) # 5 parallel workers
.batch(20) # Fetch 20 at a time
.limit(100) # Stop after 100 messages
.idle_millis(5000) # Stop after 5s of no messages
.auto_ack(True) # Auto-ack (default)
.wait(True) # Long polling (default)
.consume(handler))Pop Options
messages = await (queen.queue('tasks')
.batch(10) # Fetch 10 at a time
.wait(True) # Long polling (wait for messages)
.pop())Partitions
Partitions provide ordered message processing within a queue.
Creating Partitioned Messages
# Messages in same partition are processed in order
await queen.queue('user-events').partition('user-123').push([
{'data': {'event': 'login'}},
{'data': {'event': 'view-page'}},
{'data': {'event': 'logout'}}
])Consuming from Partitions
# Process only messages from user-123's partition
async def handler(message):
print('User 123 did:', message['data']['event'])
await queen.queue('user-events').partition('user-123').consume(handler)Consumer Groups
Consumer groups enable multiple workers to share message processing while ensuring each message is processed exactly once per group.
Basic Consumer Groups
# Worker 1 in group "processors"
async def worker1():
async def handler(message):
print('Worker 1:', message['data'])
await queen.queue('emails').group('processors').consume(handler)
# Worker 2 in SAME group (shares the load)
async def worker2():
async def handler(message):
print('Worker 2:', message['data'])
await queen.queue('emails').group('processors').consume(handler)
# Run both workers
await asyncio.gather(worker1(), worker2())Multiple Consumer Groups (Fan-Out)
# Group 1: Send emails
async def email_worker():
async def handler(message):
await send_email(message['data'])
await queen.queue('notifications').group('email-sender').consume(handler)
# Group 2: Log analytics (processes THE SAME messages)
async def analytics_worker():
async def handler(message):
await track_event(message['data'])
await queen.queue('notifications').group('analytics').consume(handler)
# Both groups process every message independently
await asyncio.gather(email_worker(), analytics_worker())Subscription Modes
Control whether consumer groups process historical messages or only new messages.
Default Behavior (All Messages)
By default, consumer groups start from the beginning and process all messages:
# Processes ALL messages, including historical ones
async def handler(message):
print('Processing:', message['data'])
await queen.queue('events').group('new-analytics').consume(handler)Subscription Mode: 'new'
Skip historical messages and process only messages that arrive after subscription:
# Skip history, only process new messages
async def handler(message):
print('New event:', message['data'])
await (queen.queue('events')
.group('realtime-monitor')
.subscription_mode('new')
.consume(handler))Subscription From Timestamp
Start consuming from a specific point in time:
# Start from specific timestamp
start_time = '2025-10-28T10:00:00.000Z'
async def handler(message):
print('Processing:', message['data'])
await (queen.queue('events')
.group('replay-from-10am')
.subscription_from(start_time)
.consume(handler))
# Start from now
await (queen.queue('events')
.group('from-now')
.subscription_from('now')
.consume(handler))
# Start from 1 hour ago
from datetime import datetime, timedelta
one_hour_ago = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z'
await (queen.queue('events')
.group('last-hour')
.subscription_from(one_hour_ago)
.consume(handler))Real-World Example
# Group 1: Process ALL messages (batch processing)
async def batch_analytics():
async def handler(message):
await generate_full_report(message['data'])
await queen.queue('user-actions').group('batch-analytics').consume(handler)
# Group 2: Only NEW messages (real-time monitoring)
async def realtime_alerts():
async def handler(message):
await send_realtime_alert(message['data'])
await (queen.queue('user-actions')
.group('realtime-alerts')
.subscription_mode('new')
.consume(handler))
# Group 3: Replay from specific time
async def debug_replay():
async def handler(message):
await debug_specific_timeframe(message['data'])
await (queen.queue('user-actions')
.group('debug-replay')
.subscription_from('2025-10-28T15:30:00.000Z')
.consume(handler))
# Run all three groups concurrently
await asyncio.gather(batch_analytics(), realtime_alerts(), debug_replay())Acknowledgment
Manual Acknowledgment
# Success
await queen.ack(message, True)
# Failure (will retry)
await queen.ack(message, False)
# With error context
await queen.ack(message, False, {'error': 'Invalid data format'})
# Batch acknowledgment
await queen.ack([msg1, msg2, msg3], True)Auto-Acknowledgment
# Auto-ack enabled (default for consume)
async def handler(message):
await process(message['data'])
# Automatically acked on success
# Automatically nacked on exception
await queen.queue('tasks').consume(handler)
# Disable auto-ack for manual control
async def handler(message):
result = await process(message['data'])
if result.success:
await queen.ack(message, True)
else:
await queen.ack(message, False)
await queen.queue('tasks').auto_ack(False).consume(handler)Batch Ack with Mixed Results
messages = await queen.queue('tasks').batch(10).pop()
# Process and mark each message individually
for message in messages:
try:
await process_message(message['data'])
message['_status'] = True # Mark as success
except Exception as error:
message['_status'] = False # Mark as failure
message['_error'] = str(error)
# Batch ack with individual statuses
await queen.ack(messages)
# Queen will ack some and nack others based on _statusTransactions
Transactions provide atomic operations across acknowledgments and pushes.
Basic Transaction
# Pop from input queue
messages = await queen.queue('raw-data').pop()
if messages:
message = messages[0]
# Process it
processed = await transform_data(message['data'])
# Atomically: ack the input AND push the output
await (queen.transaction()
.ack(message)
.queue('processed-data')
.push([{'data': processed}])
.commit())Multi-Queue Pipeline
# Pop from queue A
messages = await queen.queue('queue-a').pop()
# Transaction: ack from A, push to B and C
await (queen.transaction()
.ack(messages[0])
.queue('queue-b')
.push([{'data': {'step': 2}}])
.queue('queue-c')
.push([{'data': {'step': 2}}])
.commit())Batch Transaction
# Pop multiple messages
messages = await queen.queue('inputs').batch(10).pop()
# Process them
results = [await process(m['data']) for m in messages]
# Atomically ack all inputs and push all outputs
txn = queen.transaction()
# Ack all inputs
for message in messages:
txn.ack(message)
# Push all outputs
txn.queue('outputs').push([{'data': r} for r in results])
await txn.commit()Transaction with Consumer
async def handler(message):
# Process
result = await process_message(message['data'])
# Transactionally ack and push result
await (queen.transaction()
.ack(message)
.queue('destination')
.push([{'data': result}])
.commit())
# Must disable auto-ack for manual transaction
await queen.queue('source').auto_ack(False).consume(handler)Client-Side Buffering
Boost throughput by 10x-100x with client-side buffering.
How It Works
Instead of sending messages immediately:
- Messages collect in a local buffer
- Buffer flushes when it reaches count or time threshold
- All buffered messages sent in one HTTP request
Basic Buffering
# Buffer up to 100 messages OR 1 second
await queen.queue('logs').buffer({
'message_count': 100,
'time_millis': 1000
}).push([
{'data': {'level': 'info', 'message': 'User logged in'}}
])High-Throughput Example
import time
# Send 10,000 messages super fast
for i in range(10000):
await queen.queue('events').buffer({
'message_count': 500,
'time_millis': 100
}).push([
{'data': {'id': i, 'timestamp': time.time()}}
])
# Flush any remaining buffered messages
await queen.flush_all_buffers()
# Performance: Seconds instead of minutes!Manual Flush
# Flush all buffers for all queues
await queen.flush_all_buffers()
# Flush a specific queue's buffer
await queen.queue('my-queue').flush_buffer()
# Get buffer statistics
stats = queen.get_buffer_stats()
print('Buffers:', stats)
# Example: {'activeBuffers': 2, 'totalBufferedMessages': 145, ...}Dead Letter Queue (DLQ)
Handle failed messages gracefully.
Enable DLQ
await queen.queue('risky-business').config({
'retryLimit': 3,
'dlqAfterMaxRetries': True
}).create()Query DLQ
# Get failed messages
dlq = await queen.queue('risky-business').dlq().limit(10).get()
print(f"Found {dlq['total']} failed messages")
for message in dlq['messages']:
print('Failed message:', message['data'])
print('Error:', message.get('errorMessage'))
print('Failed at:', message.get('dlqTimestamp'))DLQ with Consumer Groups
# Check DLQ for specific consumer group
dlq = await (queen.queue('risky-business')
.dlq('my-consumer-group')
.limit(100)
.get())Advanced DLQ Queries
# Query with time range and pagination
dlq = await (queen.queue('risky-business')
.dlq()
.from_('2025-01-01') # Note: from_ (underscore to avoid Python keyword)
.to('2025-01-31')
.limit(100)
.offset(0)
.get())Lease Renewal
Why Lease Renewal?
When processing takes longer than the lease time, you must renew the lease to prevent the message from being redelivered.
Automatic Renewal (Recommended)
# Auto-renew every 60 seconds
async def handler(message):
# Can take hours - lease keeps renewing!
await process_very_long_task(message['data'])
await (queen.queue('long-tasks')
.renew_lease(True, 60000)
.consume(handler))Manual Renewal
# Pop a message
messages = await queen.queue('long-tasks').pop()
message = messages[0]
# Create renewal task
async def renew_task():
while True:
await asyncio.sleep(30) # Every 30 seconds
await queen.renew(message)
print('Lease renewed!')
renewal = asyncio.create_task(renew_task())
try:
await process_very_long_task(message['data'])
await queen.ack(message, True)
finally:
renewal.cancel()Batch Renewal
messages = await queen.queue('tasks').batch(10).pop()
# Renew all at once
await queen.renew(messages)
# Or renew by lease IDs
await queen.renew([msg['leaseId'] for msg in messages])Message Tracing
Debug distributed workflows by recording trace events as messages are processed.
Basic Tracing
async def handler(msg):
# Record a trace event
await msg['trace']({
'data': {'text': 'Order processing started'}
})
order = await process_order(msg['data'])
await msg['trace']({
'data': {
'text': 'Order processed successfully',
'orderId': order['id'],
'total': order['total']
}
})
await queen.queue('orders').consume(handler)Trace Names - Cross-Service Correlation
Link traces across multiple services:
# Service 1: Order Service
async def order_handler(msg):
order_id = msg['data']['orderId']
await msg['trace']({
'traceName': f"order-{order_id}",
'data': {'text': 'Order created', 'service': 'orders'}
})
# Push to inventory queue
await queen.queue('inventory').push([{
'data': {'orderId': order_id, 'items': msg['data']['items']}
}])
await queen.queue('orders').consume(order_handler)
# Service 2: Inventory Service
async def inventory_handler(msg):
order_id = msg['data']['orderId']
await msg['trace']({
'traceName': f"order-{order_id}", # Same name = connected!
'data': {'text': 'Stock checked', 'service': 'inventory'}
})
await queen.queue('inventory').consume(inventory_handler)
# View in webapp: Traces → Search "order-12345" → See entire workflow!Multi-Dimensional Tracing
async def handler(msg):
tenant_id = msg['data']['tenantId']
room_id = msg['data']['roomId']
user_id = msg['data']['userId']
await msg['trace']({
'traceName': [
f"tenant-{tenant_id}",
f"room-{room_id}",
f"user-{user_id}"
],
'data': {'text': 'Message sent'}
})
await queen.queue('chat-messages').consume(handler)
# Now you can search by:
# - tenant-acme (all tenant activity)
# - room-123 (all room activity)
# - user-456 (all user activity)Event Types
async def handler(msg):
await msg['trace']({
'eventType': 'info',
'data': {'text': 'Started processing'}
})
await msg['trace']({
'eventType': 'step',
'data': {'text': 'Validated data'}
})
await msg['trace']({
'eventType': 'processing',
'data': {'text': 'Sending email'}
})
# Available types: info, error, step, processing, warning
await queen.queue('analytics').consume(handler)Error-Safe Tracing
Traces never crash your consumer - errors are logged but don't throw:
async def handler(msg):
try:
await msg['trace']({'data': {'text': 'Job started'}})
result = await compute_analytics(msg['data'])
await msg['trace']({
'data': {
'text': 'Job completed',
'recordsProcessed': result['count']
}
})
except Exception as error:
# Record error (won't crash!)
await msg['trace']({
'eventType': 'error',
'data': {
'text': 'Job failed',
'error': str(error)
}
})
raise # Still fail the message for retry
await queen.queue('jobs').consume(handler)Namespaces & Tasks
Process messages across multiple queues using wildcards.
Namespaces
# Create queues with namespaces
await queen.queue('billing-invoices').namespace('accounting').create()
await queen.queue('billing-receipts').namespace('accounting').create()
# Consume from ALL queues in 'accounting' namespace
async def handler(message):
print('Accounting message:', message['data'])
await queen.queue().namespace('accounting').consume(handler)Tasks
# Create queues with tasks
await queen.queue('video-uploads').task('video-processing').create()
await queen.queue('image-uploads').task('image-processing').create()
# Consume by task type
async def handler(message):
await process_video(message['data'])
await queen.queue().task('video-processing').consume(handler)Combining Namespace + Task
# Super specific filtering
async def handler(message):
await process_urgent_media(message['data'])
await (queen.queue()
.namespace('media')
.task('urgent-processing')
.consume(handler))Delayed Processing
Messages don't become available until the delay passes.
# Messages invisible for 60 seconds
await queen.queue('scheduled-tasks').config({
'delayedProcessing': 60
}).create()
# Push a message
await queen.queue('scheduled-tasks').push([
{'data': {'task': 'send-reminder'}}
])
# Pop immediately: gets nothing
now = await queen.queue('scheduled-tasks').pop()
print(now) # []
# Wait 60 seconds...
await asyncio.sleep(61)
# Pop again: now we get the message!
later = await queen.queue('scheduled-tasks').pop()
print(later) # [{'data': {'task': 'send-reminder'}}]Callbacks & Error Handling
Success and Error Callbacks
async def handler(message):
return await process_message(message['data'])
async def on_success(message, result):
print('Success! Result:', result)
async def on_error(message, error):
print('Failed:', str(error))
# Custom retry logic
if 'temporary' in str(error):
await queen.ack(message, False) # Retry
else:
await queen.ack(message, 'failed', {'error': str(error)}) # DLQ
await (queen.queue('tasks')
.auto_ack(False)
.consume(handler)
.on_success(on_success)
.on_error(on_error))Consumer Group Management
Delete Consumer Group
# Delete consumer group (including metadata)
await queen.delete_consumer_group('my-group')
# Delete but keep subscription metadata
await queen.delete_consumer_group('my-group', delete_metadata=False)Update Subscription Timestamp
# Reset consumer group to start from specific time
await queen.update_consumer_group_timestamp(
'my-group',
'2025-11-10T10:00:00Z'
)Graceful Shutdown
Async Context Manager (Recommended)
async with Queen('http://localhost:6632') as queen:
# Your code here
pass
# Automatically flushes buffers and closes on exitManual Shutdown
queen = Queen('http://localhost:6632')
try:
# Your code here
pass
finally:
await queen.close()With AbortController
Stop consumers gracefully:
import asyncio
# Create abort signal
signal = asyncio.Event()
# Start consumer with signal
consumer_task = asyncio.create_task(
queen.queue('tasks').consume(handler, signal=signal)
)
# Later... stop the consumer
signal.set()
# Wait for consumer to finish current message
await consumer_task
# Close Queen
await queen.close()Concurrency Patterns
Parallel Workers
async def worker(worker_id):
async def handler(message):
print(f'Worker {worker_id}:', message['data'])
await queen.queue('tasks').group('workers').consume(handler)
# Launch 5 workers
workers = [worker(i) for i in range(5)]
await asyncio.gather(*workers)Built-In Concurrency
# Launch 5 workers with a single call
async def handler(message):
await process_task(message['data'])
await queen.queue('tasks').concurrency(5).consume(handler)Advanced Patterns
Request-Reply Pattern
import uuid
# Requester
async def make_request():
correlation_id = str(uuid.uuid4())
# Push request with correlation ID
await queen.queue('requests').push([{
'data': {'action': 'process', 'correlationId': correlation_id}
}])
# Wait for reply
async def reply_handler(message):
if message['data'].get('correlationId') == correlation_id:
print('Got reply:', message['data'])
return message['data']
await queen.queue('replies').limit(1).consume(reply_handler)
# Worker
async def worker():
async def handler(message):
correlation_id = message['data']['correlationId']
# Process
result = await process(message['data'])
# Send reply
await queen.queue('replies').push([{
'data': {'correlationId': correlation_id, 'result': result}
}])
await queen.queue('requests').consume(handler)Pipeline Pattern
# Stage 1: Raw events → Processed events
async def stage1():
async def handler(messages):
results = [await process(m['data']) for m in messages]
txn = queen.transaction()
for msg in messages:
txn.ack(msg)
txn.queue('processed-events').push([{'data': r} for r in results])
await txn.commit()
await (queen.queue('raw-events')
.group('processors')
.batch(10)
.auto_ack(False)
.consume(handler))
# Stage 2: Processed events → Notifications
async def stage2():
async def handler(message):
await queen.queue('notifications').push([{
'data': {'userId': message['data']['userId'], 'message': 'Done!'}
}])
await queen.queue('processed-events').group('notifiers').consume(handler)
# Run pipeline
await asyncio.gather(stage1(), stage2())Configuration Reference
Client Configuration
{
'timeout_millis': 30000, # 30 seconds
'retry_attempts': 3,
'retry_delay_millis': 1000,
'load_balancing_strategy': 'affinity', # or 'round-robin', 'session'
'affinity_hash_ring': 128,
'enable_failover': True,
'health_retry_after_millis': 5000,
'bearer_token': None # For proxy authentication
}Queue Configuration
{
'leaseTime': 300, # 5 minutes
'retryLimit': 3,
'delayedProcessing': 0,
'windowBuffer': 0,
'maxSize': 0, # Unlimited
'retentionSeconds': 0, # Keep forever
'completedRetentionSeconds': 0,
'encryptionEnabled': False
}Consume Configuration
{
'concurrency': 1,
'batch': 1,
'auto_ack': True,
'wait': True, # Long polling
'timeout_millis': 30000,
'limit': None, # Run forever
'idle_millis': None,
'renew_lease': False
}Pop Configuration
{
'batch': 1,
'wait': False, # No long polling
'timeout_millis': 30000,
'auto_ack': False # Manual ack required
}Logging
Enable detailed logging for debugging:
export QUEEN_CLIENT_LOG=true
python your_app.pyExample output:
[2025-10-28T10:30:45.123Z] [INFO] [Queen.constructor] {"status":"initialized","urls":1}
[2025-10-28T10:30:45.234Z] [INFO] [QueueBuilder.push] {"queue":"tasks","partition":"Default","count":5}
[2025-10-28T10:30:46.789Z] [INFO] [HttpClient.request] {"method":"POST","url":"http://localhost:6632/api/v1/push"}Type Hints
Full type hints included for IDE support:
from queen import Queen, Message
from typing import Dict, Any
queen: Queen = Queen('http://localhost:6632')
async def handler(message: Message) -> None:
data: Dict[str, Any] = message['data']
print(data)
await queen.queue('orders').consume(handler)Error Handling
Network Errors
The client automatically handles network errors with retry and failover:
# Client retries 3 times with exponential backoff
# Then fails over to other servers if available
queen = Queen({
'urls': ['http://server1:6632', 'http://server2:6632'],
'retry_attempts': 3,
'enable_failover': True
})Processing Errors
async def handler(message):
try:
await risky_operation(message['data'])
except TemporaryError as e:
# Will retry automatically (auto_ack=True)
raise
except PermanentError as e:
# Handle explicitly
await queen.ack(message, 'failed', {'error': str(e)})
await queen.queue('tasks').consume(handler)Complete Example
import asyncio
from queen import Queen
async def main():
async with Queen('http://localhost:6632') as queen:
# Create queues
await queen.queue('raw-events').create()
await queen.queue('processed-events').create()
# Stage 1: Ingest with buffering
async def ingest():
for i in range(10000):
await (queen.queue('raw-events')
.partition(f"user-{i % 100}")
.buffer({'message_count': 500, 'time_millis': 1000})
.push([{
'data': {
'userId': i % 100,
'event': 'page_view',
'timestamp': i
}
}]))
await queen.flush_all_buffers()
print('Ingestion complete!')
# Stage 2: Process with transactions
async def process():
async def handler(messages):
processed = [{
'userId': m['data']['userId'],
'processed': True
} for m in messages]
txn = queen.transaction()
for msg in messages:
txn.ack(msg)
txn.queue('processed-events').push([{'data': p} for p in processed])
await txn.commit()
await (queen.queue('raw-events')
.group('processors')
.concurrency(5)
.batch(10)
.auto_ack(False)
.consume(handler))
# Stage 3: Monitor with tracing
async def monitor():
async def handler(message):
await message['trace']({
'traceName': f"user-{message['data']['userId']}",
'data': {'text': 'Event processed'}
})
await (queen.queue('processed-events')
.group('monitors')
.subscription_mode('new')
.consume(handler))
# Run pipeline
await ingest()
await asyncio.gather(process(), monitor())
if __name__ == '__main__':
asyncio.run(main())API Reference
Queen Class
# Initialize
queen = Queen(config)
# Queue operations
queen.queue(name: str) -> QueueBuilder
# Transactions
queen.transaction() -> TransactionBuilder
# Acknowledgment
await queen.ack(message, status, context)
# Lease renewal
await queen.renew(message_or_lease_id)
# Buffering
await queen.flush_all_buffers()
queen.get_buffer_stats()
# Consumer groups
await queen.delete_consumer_group(group, delete_metadata)
await queen.update_consumer_group_timestamp(group, timestamp)
# Shutdown
await queen.close()QueueBuilder Class
# Configuration
.namespace(name: str)
.task(name: str)
.config(options: dict)
.create()
.delete()
# Push
.partition(name: str)
.buffer(options: dict)
.push(payload) -> PushBuilder
# Consume configuration
.group(name: str)
.concurrency(count: int)
.batch(size: int)
.limit(count: int)
.idle_millis(millis: int)
.auto_ack(enabled: bool)
.renew_lease(enabled: bool, interval_millis: int)
.subscription_mode(mode: str)
.subscription_from(from_: str)
.each()
# Consume
.consume(handler, signal) -> ConsumeBuilder
# Pop
.wait(enabled: bool)
.pop() -> List[Message]
# Buffer management
.flush_buffer()
# DLQ
.dlq(consumer_group: str) -> DLQBuilderBest Practices
- ✅ Use
async with- Ensures proper cleanup and buffer flushing - ✅ Use
consume()for workers - Simpler API, handles retries automatically - ✅ Use
pop()for control - When you need precise control over acking - ✅ Buffer for speed - Always use buffering when pushing many messages
- ✅ Partitions for order - Use partitions when message order matters
- ✅ Consumer groups for scale - Run multiple workers in the same group
- ✅ Transactions for consistency - Use transactions for atomic operations
- ✅ Enable DLQ - Always enable DLQ in production
- ✅ Renew long leases - Use auto-renewal for long-running tasks
- ✅ Monitor DLQ - Regularly check for failed messages
- ✅ Type hints - Use type hints for better IDE support and fewer bugs
Common Patterns
High-Throughput Ingestion
async def ingest(items):
for item in items:
await (queen.queue('events')
.buffer({'message_count': 500, 'time_millis': 100})
.push([{'data': item}]))
await queen.flush_all_buffers()Scalable Processing
async def process():
await (queen.queue('tasks')
.group('workers')
.concurrency(10)
.batch(20)
.consume(handler))Ordered Processing
async def process_user_events(user_id):
async def handler(message):
await process_in_order(message['data'])
await queen.queue('events').partition(f"user-{user_id}").consume(handler)Fan-Out Pattern
# Multiple groups process same messages
async def email_worker():
await queen.queue('events').group('emailer').consume(send_email_handler)
async def analytics_worker():
await queen.queue('events').group('analytics').consume(analytics_handler)
await asyncio.gather(email_worker(), analytics_worker())Migration from Node.js
The Python client API is nearly identical to the Node.js client:
Syntax Differences
| Node.js | Python |
|---|---|
const queen = new Queen(...) | queen = Queen(...) |
await queen.queue('q').create() | await queen.queue('q').create() |
async (message) => { ... } | async def handler(message): ... |
camelCase | snake_case (for kwargs) |
.then() | await |
Key Differences
- Async context managers: Use
async with Queen(...) as queen: - Parameter naming: Use
snake_casefor keyword arguments - Abort signals: Use
asyncio.Eventinstead ofAbortController - DLQ from: Use
.from_()instead of.from()(Python keyword)
Example Migration
Node.js:
const queen = new Queen('http://localhost:6632')
await queen.queue('tasks')
.concurrency(5)
.batch(10)
.consume(async (message) => {
await processTask(message.data)
})Python:
async with Queen('http://localhost:6632') as queen:
async def handler(message):
await process_task(message['data'])
await (queen.queue('tasks')
.concurrency(5)
.batch(10)
.consume(handler))Troubleshooting
Import Errors
# Make sure httpx is installed
pip install httpx
# Or reinstall the package
pip install -e .Type Checking
# Run mypy for type checking
pip install mypy
mypy your_app.pyConnection Issues
# Enable logging
import os
os.environ['QUEEN_CLIENT_LOG'] = 'true'
# Then run your appPerformance Issues
# Use buffering for high throughput
await queen.queue('q').buffer({'message_count': 500}).push([...])
# Use batching for consumption
await queen.queue('q').batch(50).concurrency(10).consume(handler)
# Monitor buffer stats
stats = queen.get_buffer_stats()
print(stats)Admin API
The Admin API provides administrative and observability operations typically used by dashboards, monitoring tools, and admin scripts. These endpoints are for inspecting the system, not for regular application message processing.
Accessing the Admin API
from queen import Queen
async with Queen('http://localhost:6632') as queen:
# Access via .admin property
overview = await queen.admin.get_overview()Resources
# System overview (queue counts, message stats, etc.)
overview = await queen.admin.get_overview()
# List all namespaces
namespaces = await queen.admin.get_namespaces()
# List all tasks
tasks = await queen.admin.get_tasks()Queue Management
# List all queues
queues = await queen.admin.list_queues(limit=100, offset=0)
# Get specific queue details
queue = await queen.admin.get_queue('my-queue')
# List partitions
partitions = await queen.admin.get_partitions(queue='my-queue')Message Inspection
# List messages with filters
messages = await queen.admin.list_messages(
queue='my-queue',
status='pending', # pending, processing, completed, failed
limit=50,
offset=0
)
# Get specific message
message = await queen.admin.get_message(partition_id, transaction_id)
# Delete a message (DESTRUCTIVE!)
await queen.admin.delete_message(partition_id, transaction_id)
# Retry a failed message
await queen.admin.retry_message(partition_id, transaction_id)
# Move message to Dead Letter Queue
await queen.admin.move_message_to_dlq(partition_id, transaction_id)Traces
# Get available trace names
trace_names = await queen.admin.get_trace_names(limit=100)
# Get traces by name (cross-service correlation)
traces = await queen.admin.get_traces_by_name(
'order-12345',
limit=100,
from_='2025-01-01T00:00:00Z',
to='2025-01-31T23:59:59Z'
)
# Get traces for a specific message
message_traces = await queen.admin.get_traces_for_message(partition_id, transaction_id)Analytics & Status
# System status
status = await queen.admin.get_status()
# Queue statistics
queue_stats = await queen.admin.get_queue_stats(limit=100)
# Detailed stats for specific queue
detail = await queen.admin.get_queue_detail('my-queue')
# Analytics data (throughput, latency, etc.)
analytics = await queen.admin.get_analytics(
from_='2025-01-01T00:00:00Z',
to='2025-01-31T23:59:59Z'
)Consumer Groups
# List all consumer groups
groups = await queen.admin.list_consumer_groups()
# Get specific consumer group details
group = await queen.admin.get_consumer_group('my-consumer-group')
# Find lagging consumers (behind by > 60 seconds)
lagging = await queen.admin.get_lagging_consumers(min_lag_seconds=60)
# Refresh consumer statistics
await queen.admin.refresh_consumer_stats()
# Delete consumer group for a specific queue
await queen.admin.delete_consumer_group_for_queue(
'my-group',
'my-queue',
delete_metadata=True
)
# Seek consumer group offset
await queen.admin.seek_consumer_group(
'my-group',
'my-queue',
{'timestamp': '2025-01-15T10:00:00Z'}
)System Operations
# Health check
health = await queen.admin.health()
# Prometheus metrics (raw text)
metrics = await queen.admin.metrics()
# Maintenance mode (push operations)
maintenance_status = await queen.admin.get_maintenance_mode()
await queen.admin.set_maintenance_mode(True) # Enable
await queen.admin.set_maintenance_mode(False) # Disable
# Pop maintenance mode
pop_maintenance = await queen.admin.get_pop_maintenance_mode()
await queen.admin.set_pop_maintenance_mode(True)
# System metrics (CPU, memory, connections)
system_metrics = await queen.admin.get_system_metrics()
# Worker metrics
worker_metrics = await queen.admin.get_worker_metrics()
# PostgreSQL statistics
pg_stats = await queen.admin.get_postgres_stats()Admin API Reference
| Method | Description |
|---|---|
| Resources | |
get_overview() | System overview with counts |
get_namespaces() | List all namespaces |
get_tasks() | List all tasks |
| Queues | |
list_queues(**params) | List queues with pagination |
get_queue(name) | Get queue details |
clear_queue(name, partition?) | Clear queue messages |
get_partitions(**params) | List partitions |
| Messages | |
list_messages(**params) | List messages with filters |
get_message(partition_id, tx_id) | Get specific message |
delete_message(partition_id, tx_id) | Delete message |
retry_message(partition_id, tx_id) | Retry failed message |
move_message_to_dlq(partition_id, tx_id) | Move to DLQ |
| Traces | |
get_trace_names(**params) | List available trace names |
get_traces_by_name(name, **params) | Get traces by name |
get_traces_for_message(partition_id, tx_id) | Get message traces |
| Analytics | |
get_status(**params) | System status |
get_queue_stats(**params) | Queue statistics |
get_queue_detail(name, **params) | Detailed queue stats |
get_analytics(**params) | Analytics data |
| Consumer Groups | |
list_consumer_groups() | List all consumer groups |
get_consumer_group(name) | Get consumer group details |
get_lagging_consumers(min_lag_seconds) | Find lagging consumers |
refresh_consumer_stats() | Refresh statistics |
delete_consumer_group_for_queue(cg, queue, delete_meta) | Delete CG for queue |
seek_consumer_group(cg, queue, options) | Seek offset |
| System | |
health() | Health check |
metrics() | Prometheus metrics |
get_maintenance_mode() | Get push maintenance status |
set_maintenance_mode(enabled) | Set push maintenance |
get_pop_maintenance_mode() | Get pop maintenance status |
set_pop_maintenance_mode(enabled) | Set pop maintenance |
get_system_metrics(**params) | System metrics |
get_worker_metrics(**params) | Worker metrics |
get_postgres_stats() | PostgreSQL stats |
When to Use Admin API
Use the Admin API for:
- Dashboards - Building monitoring UIs
- Scripts - Maintenance and debugging scripts
- Monitoring - Integration with alerting systems
- Operations - Managing consumer groups and queues
For normal message processing, use queue().push(), queue().consume(), and ack() instead.
