Skip to content

Python Client Guide

Complete guide for the Queen MQ Python client library.

Python client is available on PyPi, here: https://pypi.org/project/queen-mq/

Installation

bash
pip install queen-mq

Requirements: Python 3.8+

Table of Contents

Getting Started

Import and Connect

python
import asyncio
from queen import Queen

async def main():
    # Single server
    queen = Queen('http://localhost:6632')
    
    # Multiple servers (high availability)
    queen = Queen([
        'http://server1:6632',
        'http://server2:6632'
    ])
    
    # Full configuration
    queen = Queen({
        'urls': ['http://server1:6632', 'http://server2:6632', 'http://server3:6632'],
        'timeout_millis': 30000,
        'retry_attempts': 3,
        'load_balancing_strategy': 'affinity',  # 'affinity', 'round-robin', or 'session'
        'affinity_hash_ring': 128,              # Virtual nodes per server (for affinity)
        'enable_failover': True
    })
    
    # With proxy authentication (bearer token)
    queen = Queen(
        url='https://proxy.example.com:3000',
        bearer_token=os.environ.get('QUEEN_TOKEN')  # Token from create-user script
    )
    
    # Recommended: Use async context manager for automatic cleanup
    async with Queen('http://localhost:6632') as queen:
        # Your code here
        pass

asyncio.run(main())

Proxy Authentication

When connecting through the Queen proxy (which provides authentication, SSL termination, etc.), you need to provide a bearer token:

python
import os
from queen import Queen

queen = Queen(
    url='https://queen-proxy.example.com',
    bearer_token=os.environ.get('QUEEN_TOKEN')
)

Getting a token: Use the proxy's create-user.js script to generate tokens for microservices. See Proxy Setup for details.

Environment Variables

Store tokens in environment variables, never hardcode them:

bash
export QUEEN_TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
python
import os
from queen import Queen

queen = Queen(
    url=os.environ.get('QUEEN_PROXY_URL'),
    bearer_token=os.environ.get('QUEEN_TOKEN')
)

Direct Connection

When connecting directly to the Queen server (without the proxy), no bearer_token is needed:

python
# Direct connection - no auth required
queen = Queen('http://queen-server:6632')

Load Balancing & Affinity Routing

When connecting to multiple Queen servers, you can choose how requests are distributed. The client supports three load balancing strategies.

Best for: Production deployments with 3+ servers and multiple consumer groups.

Uses consistent hashing with virtual nodes to route consumer groups to the same backend server. This optimizes database queries by consolidating poll intentions.

python
queen = Queen({
    'urls': ['http://server1:6632', 'http://server2:6632', 'http://server3:6632'],
    'load_balancing_strategy': 'affinity',
    'affinity_hash_ring': 128  # Virtual nodes per server (default: 128)
})

Benefits:

  • ✅ Same consumer group always routes to same server
  • ✅ Poll intentions consolidated → optimized DB queries
  • ✅ Graceful failover (only ~33% of keys move if server fails)
  • ✅ Works great with 3-server HA setup

How it works:

  1. Each consumer group generates an affinity key: queue:partition:consumerGroup
  2. Key is hashed (FNV-1a) and mapped to a virtual node on the ring
  3. Virtual node maps to a real backend server
  4. Same key always routes to same server

Round-Robin Mode

Cycles through servers in order. Simple but doesn't optimize for poll intention consolidation.

python
queen = Queen({
    'urls': ['http://server1:6632', 'http://server2:6632'],
    'load_balancing_strategy': 'round-robin'
})

Session Mode

Sticky sessions - each client instance sticks to one server.

python
queen = Queen({
    'urls': ['http://server1:6632', 'http://server2:6632'],
    'load_balancing_strategy': 'session'
})

Quick Start Examples

Basic Push and Consume

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create a queue
        await queen.queue('tasks').create()
        
        # Push messages
        await queen.queue('tasks').push([
            {'data': {'task': 'send-email', 'to': 'alice@example.com'}}
        ])
        
        # Consume messages
        async def handler(message):
            print('Processing:', message['data'])
            # Auto-ack on success, auto-retry on error
        
        await queen.queue('tasks').consume(handler)

asyncio.run(main())

With Partitions

python
# Push to specific partition
await queen.queue('user-events').partition('user-123').push([
    {'data': {'event': 'login', 'timestamp': '2025-11-22T10:00:00Z'}}
])

# Consume from specific partition
async def handler(message):
    print('User 123 event:', message['data'])

await queen.queue('user-events').partition('user-123').consume(handler)

Consumer Groups

python
# Worker 1 in group "processors"
async def worker1():
    await queen.queue('emails').group('processors').consume(async def handler(msg):
        print('Worker 1 processing:', msg['data'])
    )

# Worker 2 in the SAME group (shares the load)
async def worker2():
    await queen.queue('emails').group('processors').consume(async def handler(msg):
        print('Worker 2 processing:', msg['data'])
    )

# Run both workers concurrently
await asyncio.gather(worker1(), worker2())

Queue Operations

Creating Queues

python
# Simple queue
await queen.queue('my-tasks').create()

# With configuration
await queen.queue('orders').config({
    'leaseTime': 300,              # 5 minutes
    'retryLimit': 3,
    'dlqAfterMaxRetries': True,
    'encryptionEnabled': False
}).create()

Deleting Queues

python
await queen.queue('my-tasks').delete()

Queue Configuration Options

OptionTypeDefaultDescription
leaseTimeint300Lease duration in seconds
retryLimitint3Max retry attempts
delayedProcessingint0Delay before messages become available (seconds)
windowBufferint0Server-side batching window (seconds)
maxSizeint0Max queue size (0 = unlimited)
retentionSecondsint0Message retention time (0 = forever)
completedRetentionSecondsint0Completed message retention
encryptionEnabledboolFalseEnable payload encryption
dlqAfterMaxRetriesboolFalseMove to DLQ after max retries

Pushing Messages

Basic Push

python
# Single message
await queen.queue('tasks').push([
    {'data': {'job': 'resize-image', 'imageId': 123}}
])

# Multiple messages
await queen.queue('tasks').push([
    {'data': {'job': 'send-email', 'to': 'alice@example.com'}},
    {'data': {'job': 'send-email', 'to': 'bob@example.com'}},
    {'data': {'job': 'resize-image', 'id': 123}}
])

With Partitions

python
await queen.queue('user-events').partition('user-123').push([
    {'data': {'event': 'login'}},
    {'data': {'event': 'view-page'}},
    {'data': {'event': 'logout'}}
])

With Custom Transaction IDs (Exactly-Once)

python
await queen.queue('orders').push([
    {'transactionId': 'order-12345', 'data': {'orderId': 12345, 'amount': 99.99}}
])

Client-Side Buffering (High Performance)

For high-throughput scenarios, buffer messages client-side before sending:

python
# Buffer up to 500 messages OR 1 second (whichever comes first)
for i in range(10000):
    await queen.queue('events').buffer({
        'message_count': 500,
        'time_millis': 1000
    }).push([
        {'data': {'id': i, 'timestamp': time.time()}}
    ])

# Flush remaining buffered messages
await queen.flush_all_buffers()

# Result: 10x-100x faster than individual pushes

Push with Callbacks

python
await (queen.queue('tasks')
    .push([{'data': {'value': 1}}])
    .on_success(async def on_success(messages):
        print('Push successful!')
    )
    .on_error(async def on_error(messages, error):
        print(f'Push failed: {error}')
    )
    .on_duplicate(async def on_duplicate(messages, error):
        print('Duplicate transaction IDs detected')
    ))

Consuming Messages

Pop vs Consume

Pop - Manual control, one-shot retrieval:

python
messages = await queen.queue('tasks').batch(10).pop()
for message in messages:
    try:
        await process_message(message['data'])
        await queen.ack(message, True)
    except Exception as e:
        await queen.ack(message, False)

Consume - Long-running workers, automatic retry:

python
# Runs forever, processing messages as they arrive
async def handler(message):
    await process_task(message['data'])
    # Auto-ack on success, auto-retry on error

await queen.queue('tasks').consume(handler)

Handler Signatures

Important: The handler signature depends on batch size:

python
# batch=1 (default): handler receives single message
async def handler(message):
    print(message['data'])

await queen.queue('tasks').consume(handler)

# batch>1: handler receives array of messages
async def batch_handler(messages):
    for message in messages:
        print(message['data'])

await queen.queue('tasks').batch(10).consume(batch_handler)

# each=True: always receives single messages
async def each_handler(message):
    print(message['data'])

await queen.queue('tasks').batch(10).each().consume(each_handler)

Consume Options

python
await (queen.queue('tasks')
    .concurrency(5)          # 5 parallel workers
    .batch(20)               # Fetch 20 at a time
    .limit(100)              # Stop after 100 messages
    .idle_millis(5000)       # Stop after 5s of no messages
    .auto_ack(True)          # Auto-ack (default)
    .wait(True)              # Long polling (default)
    .consume(handler))

Pop Options

python
messages = await (queen.queue('tasks')
    .batch(10)               # Fetch 10 at a time
    .wait(True)              # Long polling (wait for messages)
    .pop())

Partitions

Partitions provide ordered message processing within a queue.

Creating Partitioned Messages

python
# Messages in same partition are processed in order
await queen.queue('user-events').partition('user-123').push([
    {'data': {'event': 'login'}},
    {'data': {'event': 'view-page'}},
    {'data': {'event': 'logout'}}
])

Consuming from Partitions

python
# Process only messages from user-123's partition
async def handler(message):
    print('User 123 did:', message['data']['event'])

await queen.queue('user-events').partition('user-123').consume(handler)

Consumer Groups

Consumer groups enable multiple workers to share message processing while ensuring each message is processed exactly once per group.

Basic Consumer Groups

python
# Worker 1 in group "processors"
async def worker1():
    async def handler(message):
        print('Worker 1:', message['data'])
    
    await queen.queue('emails').group('processors').consume(handler)

# Worker 2 in SAME group (shares the load)
async def worker2():
    async def handler(message):
        print('Worker 2:', message['data'])
    
    await queen.queue('emails').group('processors').consume(handler)

# Run both workers
await asyncio.gather(worker1(), worker2())

Multiple Consumer Groups (Fan-Out)

python
# Group 1: Send emails
async def email_worker():
    async def handler(message):
        await send_email(message['data'])
    
    await queen.queue('notifications').group('email-sender').consume(handler)

# Group 2: Log analytics (processes THE SAME messages)
async def analytics_worker():
    async def handler(message):
        await track_event(message['data'])
    
    await queen.queue('notifications').group('analytics').consume(handler)

# Both groups process every message independently
await asyncio.gather(email_worker(), analytics_worker())

Subscription Modes

Control whether consumer groups process historical messages or only new messages.

Default Behavior (All Messages)

By default, consumer groups start from the beginning and process all messages:

python
# Processes ALL messages, including historical ones
async def handler(message):
    print('Processing:', message['data'])

await queen.queue('events').group('new-analytics').consume(handler)

Subscription Mode: 'new'

Skip historical messages and process only messages that arrive after subscription:

python
# Skip history, only process new messages
async def handler(message):
    print('New event:', message['data'])

await (queen.queue('events')
    .group('realtime-monitor')
    .subscription_mode('new')
    .consume(handler))

Subscription From Timestamp

Start consuming from a specific point in time:

python
# Start from specific timestamp
start_time = '2025-10-28T10:00:00.000Z'

async def handler(message):
    print('Processing:', message['data'])

await (queen.queue('events')
    .group('replay-from-10am')
    .subscription_from(start_time)
    .consume(handler))

# Start from now
await (queen.queue('events')
    .group('from-now')
    .subscription_from('now')
    .consume(handler))

# Start from 1 hour ago
from datetime import datetime, timedelta
one_hour_ago = (datetime.utcnow() - timedelta(hours=1)).isoformat() + 'Z'

await (queen.queue('events')
    .group('last-hour')
    .subscription_from(one_hour_ago)
    .consume(handler))

Real-World Example

python
# Group 1: Process ALL messages (batch processing)
async def batch_analytics():
    async def handler(message):
        await generate_full_report(message['data'])
    
    await queen.queue('user-actions').group('batch-analytics').consume(handler)

# Group 2: Only NEW messages (real-time monitoring)
async def realtime_alerts():
    async def handler(message):
        await send_realtime_alert(message['data'])
    
    await (queen.queue('user-actions')
        .group('realtime-alerts')
        .subscription_mode('new')
        .consume(handler))

# Group 3: Replay from specific time
async def debug_replay():
    async def handler(message):
        await debug_specific_timeframe(message['data'])
    
    await (queen.queue('user-actions')
        .group('debug-replay')
        .subscription_from('2025-10-28T15:30:00.000Z')
        .consume(handler))

# Run all three groups concurrently
await asyncio.gather(batch_analytics(), realtime_alerts(), debug_replay())

Acknowledgment

Manual Acknowledgment

python
# Success
await queen.ack(message, True)

# Failure (will retry)
await queen.ack(message, False)

# With error context
await queen.ack(message, False, {'error': 'Invalid data format'})

# Batch acknowledgment
await queen.ack([msg1, msg2, msg3], True)

Auto-Acknowledgment

python
# Auto-ack enabled (default for consume)
async def handler(message):
    await process(message['data'])
    # Automatically acked on success
    # Automatically nacked on exception

await queen.queue('tasks').consume(handler)

# Disable auto-ack for manual control
async def handler(message):
    result = await process(message['data'])
    if result.success:
        await queen.ack(message, True)
    else:
        await queen.ack(message, False)

await queen.queue('tasks').auto_ack(False).consume(handler)

Batch Ack with Mixed Results

python
messages = await queen.queue('tasks').batch(10).pop()

# Process and mark each message individually
for message in messages:
    try:
        await process_message(message['data'])
        message['_status'] = True  # Mark as success
    except Exception as error:
        message['_status'] = False  # Mark as failure
        message['_error'] = str(error)

# Batch ack with individual statuses
await queen.ack(messages)
# Queen will ack some and nack others based on _status

Transactions

Transactions provide atomic operations across acknowledgments and pushes.

Basic Transaction

python
# Pop from input queue
messages = await queen.queue('raw-data').pop()

if messages:
    message = messages[0]
    
    # Process it
    processed = await transform_data(message['data'])
    
    # Atomically: ack the input AND push the output
    await (queen.transaction()
        .ack(message)
        .queue('processed-data')
        .push([{'data': processed}])
        .commit())

Multi-Queue Pipeline

python
# Pop from queue A
messages = await queen.queue('queue-a').pop()

# Transaction: ack from A, push to B and C
await (queen.transaction()
    .ack(messages[0])
    .queue('queue-b')
    .push([{'data': {'step': 2}}])
    .queue('queue-c')
    .push([{'data': {'step': 2}}])
    .commit())

Batch Transaction

python
# Pop multiple messages
messages = await queen.queue('inputs').batch(10).pop()

# Process them
results = [await process(m['data']) for m in messages]

# Atomically ack all inputs and push all outputs
txn = queen.transaction()

# Ack all inputs
for message in messages:
    txn.ack(message)

# Push all outputs
txn.queue('outputs').push([{'data': r} for r in results])

await txn.commit()

Transaction with Consumer

python
async def handler(message):
    # Process
    result = await process_message(message['data'])
    
    # Transactionally ack and push result
    await (queen.transaction()
        .ack(message)
        .queue('destination')
        .push([{'data': result}])
        .commit())

# Must disable auto-ack for manual transaction
await queen.queue('source').auto_ack(False).consume(handler)

Client-Side Buffering

Boost throughput by 10x-100x with client-side buffering.

How It Works

Instead of sending messages immediately:

  1. Messages collect in a local buffer
  2. Buffer flushes when it reaches count or time threshold
  3. All buffered messages sent in one HTTP request

Basic Buffering

python
# Buffer up to 100 messages OR 1 second
await queen.queue('logs').buffer({
    'message_count': 100,
    'time_millis': 1000
}).push([
    {'data': {'level': 'info', 'message': 'User logged in'}}
])

High-Throughput Example

python
import time

# Send 10,000 messages super fast
for i in range(10000):
    await queen.queue('events').buffer({
        'message_count': 500,
        'time_millis': 100
    }).push([
        {'data': {'id': i, 'timestamp': time.time()}}
    ])

# Flush any remaining buffered messages
await queen.flush_all_buffers()

# Performance: Seconds instead of minutes!

Manual Flush

python
# Flush all buffers for all queues
await queen.flush_all_buffers()

# Flush a specific queue's buffer
await queen.queue('my-queue').flush_buffer()

# Get buffer statistics
stats = queen.get_buffer_stats()
print('Buffers:', stats)
# Example: {'activeBuffers': 2, 'totalBufferedMessages': 145, ...}

Dead Letter Queue (DLQ)

Handle failed messages gracefully.

Enable DLQ

python
await queen.queue('risky-business').config({
    'retryLimit': 3,
    'dlqAfterMaxRetries': True
}).create()

Query DLQ

python
# Get failed messages
dlq = await queen.queue('risky-business').dlq().limit(10).get()

print(f"Found {dlq['total']} failed messages")

for message in dlq['messages']:
    print('Failed message:', message['data'])
    print('Error:', message.get('errorMessage'))
    print('Failed at:', message.get('dlqTimestamp'))

DLQ with Consumer Groups

python
# Check DLQ for specific consumer group
dlq = await (queen.queue('risky-business')
    .dlq('my-consumer-group')
    .limit(100)
    .get())

Advanced DLQ Queries

python
# Query with time range and pagination
dlq = await (queen.queue('risky-business')
    .dlq()
    .from_('2025-01-01')  # Note: from_ (underscore to avoid Python keyword)
    .to('2025-01-31')
    .limit(100)
    .offset(0)
    .get())

Lease Renewal

Why Lease Renewal?

When processing takes longer than the lease time, you must renew the lease to prevent the message from being redelivered.

python
# Auto-renew every 60 seconds
async def handler(message):
    # Can take hours - lease keeps renewing!
    await process_very_long_task(message['data'])

await (queen.queue('long-tasks')
    .renew_lease(True, 60000)
    .consume(handler))

Manual Renewal

python
# Pop a message
messages = await queen.queue('long-tasks').pop()
message = messages[0]

# Create renewal task
async def renew_task():
    while True:
        await asyncio.sleep(30)  # Every 30 seconds
        await queen.renew(message)
        print('Lease renewed!')

renewal = asyncio.create_task(renew_task())

try:
    await process_very_long_task(message['data'])
    await queen.ack(message, True)
finally:
    renewal.cancel()

Batch Renewal

python
messages = await queen.queue('tasks').batch(10).pop()

# Renew all at once
await queen.renew(messages)

# Or renew by lease IDs
await queen.renew([msg['leaseId'] for msg in messages])

Message Tracing

Debug distributed workflows by recording trace events as messages are processed.

Basic Tracing

python
async def handler(msg):
    # Record a trace event
    await msg['trace']({
        'data': {'text': 'Order processing started'}
    })
    
    order = await process_order(msg['data'])
    
    await msg['trace']({
        'data': {
            'text': 'Order processed successfully',
            'orderId': order['id'],
            'total': order['total']
        }
    })

await queen.queue('orders').consume(handler)

Trace Names - Cross-Service Correlation

Link traces across multiple services:

python
# Service 1: Order Service
async def order_handler(msg):
    order_id = msg['data']['orderId']
    
    await msg['trace']({
        'traceName': f"order-{order_id}",
        'data': {'text': 'Order created', 'service': 'orders'}
    })
    
    # Push to inventory queue
    await queen.queue('inventory').push([{
        'data': {'orderId': order_id, 'items': msg['data']['items']}
    }])

await queen.queue('orders').consume(order_handler)

# Service 2: Inventory Service
async def inventory_handler(msg):
    order_id = msg['data']['orderId']
    
    await msg['trace']({
        'traceName': f"order-{order_id}",  # Same name = connected!
        'data': {'text': 'Stock checked', 'service': 'inventory'}
    })

await queen.queue('inventory').consume(inventory_handler)

# View in webapp: Traces → Search "order-12345" → See entire workflow!

Multi-Dimensional Tracing

python
async def handler(msg):
    tenant_id = msg['data']['tenantId']
    room_id = msg['data']['roomId']
    user_id = msg['data']['userId']
    
    await msg['trace']({
        'traceName': [
            f"tenant-{tenant_id}",
            f"room-{room_id}",
            f"user-{user_id}"
        ],
        'data': {'text': 'Message sent'}
    })

await queen.queue('chat-messages').consume(handler)

# Now you can search by:
# - tenant-acme (all tenant activity)
# - room-123 (all room activity)
# - user-456 (all user activity)

Event Types

python
async def handler(msg):
    await msg['trace']({
        'eventType': 'info',
        'data': {'text': 'Started processing'}
    })
    
    await msg['trace']({
        'eventType': 'step',
        'data': {'text': 'Validated data'}
    })
    
    await msg['trace']({
        'eventType': 'processing',
        'data': {'text': 'Sending email'}
    })
    
    # Available types: info, error, step, processing, warning

await queen.queue('analytics').consume(handler)

Error-Safe Tracing

Traces never crash your consumer - errors are logged but don't throw:

python
async def handler(msg):
    try:
        await msg['trace']({'data': {'text': 'Job started'}})
        result = await compute_analytics(msg['data'])
        await msg['trace']({
            'data': {
                'text': 'Job completed',
                'recordsProcessed': result['count']
            }
        })
    except Exception as error:
        # Record error (won't crash!)
        await msg['trace']({
            'eventType': 'error',
            'data': {
                'text': 'Job failed',
                'error': str(error)
            }
        })
        raise  # Still fail the message for retry

await queen.queue('jobs').consume(handler)

Namespaces & Tasks

Process messages across multiple queues using wildcards.

Namespaces

python
# Create queues with namespaces
await queen.queue('billing-invoices').namespace('accounting').create()
await queen.queue('billing-receipts').namespace('accounting').create()

# Consume from ALL queues in 'accounting' namespace
async def handler(message):
    print('Accounting message:', message['data'])

await queen.queue().namespace('accounting').consume(handler)

Tasks

python
# Create queues with tasks
await queen.queue('video-uploads').task('video-processing').create()
await queen.queue('image-uploads').task('image-processing').create()

# Consume by task type
async def handler(message):
    await process_video(message['data'])

await queen.queue().task('video-processing').consume(handler)

Combining Namespace + Task

python
# Super specific filtering
async def handler(message):
    await process_urgent_media(message['data'])

await (queen.queue()
    .namespace('media')
    .task('urgent-processing')
    .consume(handler))

Delayed Processing

Messages don't become available until the delay passes.

python
# Messages invisible for 60 seconds
await queen.queue('scheduled-tasks').config({
    'delayedProcessing': 60
}).create()

# Push a message
await queen.queue('scheduled-tasks').push([
    {'data': {'task': 'send-reminder'}}
])

# Pop immediately: gets nothing
now = await queen.queue('scheduled-tasks').pop()
print(now)  # []

# Wait 60 seconds...
await asyncio.sleep(61)

# Pop again: now we get the message!
later = await queen.queue('scheduled-tasks').pop()
print(later)  # [{'data': {'task': 'send-reminder'}}]

Callbacks & Error Handling

Success and Error Callbacks

python
async def handler(message):
    return await process_message(message['data'])

async def on_success(message, result):
    print('Success! Result:', result)

async def on_error(message, error):
    print('Failed:', str(error))
    # Custom retry logic
    if 'temporary' in str(error):
        await queen.ack(message, False)  # Retry
    else:
        await queen.ack(message, 'failed', {'error': str(error)})  # DLQ

await (queen.queue('tasks')
    .auto_ack(False)
    .consume(handler)
    .on_success(on_success)
    .on_error(on_error))

Consumer Group Management

Delete Consumer Group

python
# Delete consumer group (including metadata)
await queen.delete_consumer_group('my-group')

# Delete but keep subscription metadata
await queen.delete_consumer_group('my-group', delete_metadata=False)

Update Subscription Timestamp

python
# Reset consumer group to start from specific time
await queen.update_consumer_group_timestamp(
    'my-group',
    '2025-11-10T10:00:00Z'
)

Graceful Shutdown

python
async with Queen('http://localhost:6632') as queen:
    # Your code here
    pass
# Automatically flushes buffers and closes on exit

Manual Shutdown

python
queen = Queen('http://localhost:6632')

try:
    # Your code here
    pass
finally:
    await queen.close()

With AbortController

Stop consumers gracefully:

python
import asyncio

# Create abort signal
signal = asyncio.Event()

# Start consumer with signal
consumer_task = asyncio.create_task(
    queen.queue('tasks').consume(handler, signal=signal)
)

# Later... stop the consumer
signal.set()

# Wait for consumer to finish current message
await consumer_task

# Close Queen
await queen.close()

Concurrency Patterns

Parallel Workers

python
async def worker(worker_id):
    async def handler(message):
        print(f'Worker {worker_id}:', message['data'])
    
    await queen.queue('tasks').group('workers').consume(handler)

# Launch 5 workers
workers = [worker(i) for i in range(5)]
await asyncio.gather(*workers)

Built-In Concurrency

python
# Launch 5 workers with a single call
async def handler(message):
    await process_task(message['data'])

await queen.queue('tasks').concurrency(5).consume(handler)

Advanced Patterns

Request-Reply Pattern

python
import uuid

# Requester
async def make_request():
    correlation_id = str(uuid.uuid4())
    
    # Push request with correlation ID
    await queen.queue('requests').push([{
        'data': {'action': 'process', 'correlationId': correlation_id}
    }])
    
    # Wait for reply
    async def reply_handler(message):
        if message['data'].get('correlationId') == correlation_id:
            print('Got reply:', message['data'])
            return message['data']
    
    await queen.queue('replies').limit(1).consume(reply_handler)

# Worker
async def worker():
    async def handler(message):
        correlation_id = message['data']['correlationId']
        
        # Process
        result = await process(message['data'])
        
        # Send reply
        await queen.queue('replies').push([{
            'data': {'correlationId': correlation_id, 'result': result}
        }])
    
    await queen.queue('requests').consume(handler)

Pipeline Pattern

python
# Stage 1: Raw events → Processed events
async def stage1():
    async def handler(messages):
        results = [await process(m['data']) for m in messages]
        
        txn = queen.transaction()
        for msg in messages:
            txn.ack(msg)
        txn.queue('processed-events').push([{'data': r} for r in results])
        await txn.commit()
    
    await (queen.queue('raw-events')
        .group('processors')
        .batch(10)
        .auto_ack(False)
        .consume(handler))

# Stage 2: Processed events → Notifications
async def stage2():
    async def handler(message):
        await queen.queue('notifications').push([{
            'data': {'userId': message['data']['userId'], 'message': 'Done!'}
        }])
    
    await queen.queue('processed-events').group('notifiers').consume(handler)

# Run pipeline
await asyncio.gather(stage1(), stage2())

Configuration Reference

Client Configuration

python
{
    'timeout_millis': 30000,               # 30 seconds
    'retry_attempts': 3,
    'retry_delay_millis': 1000,
    'load_balancing_strategy': 'affinity', # or 'round-robin', 'session'
    'affinity_hash_ring': 128,
    'enable_failover': True,
    'health_retry_after_millis': 5000,
    'bearer_token': None                   # For proxy authentication
}

Queue Configuration

python
{
    'leaseTime': 300,                      # 5 minutes
    'retryLimit': 3,
    'delayedProcessing': 0,
    'windowBuffer': 0,
    'maxSize': 0,                          # Unlimited
    'retentionSeconds': 0,                 # Keep forever
    'completedRetentionSeconds': 0,
    'encryptionEnabled': False
}

Consume Configuration

python
{
    'concurrency': 1,
    'batch': 1,
    'auto_ack': True,
    'wait': True,                          # Long polling
    'timeout_millis': 30000,
    'limit': None,                         # Run forever
    'idle_millis': None,
    'renew_lease': False
}

Pop Configuration

python
{
    'batch': 1,
    'wait': False,                         # No long polling
    'timeout_millis': 30000,
    'auto_ack': False                      # Manual ack required
}

Logging

Enable detailed logging for debugging:

bash
export QUEEN_CLIENT_LOG=true
python your_app.py

Example output:

[2025-10-28T10:30:45.123Z] [INFO] [Queen.constructor] {"status":"initialized","urls":1}
[2025-10-28T10:30:45.234Z] [INFO] [QueueBuilder.push] {"queue":"tasks","partition":"Default","count":5}
[2025-10-28T10:30:46.789Z] [INFO] [HttpClient.request] {"method":"POST","url":"http://localhost:6632/api/v1/push"}

Type Hints

Full type hints included for IDE support:

python
from queen import Queen, Message
from typing import Dict, Any

queen: Queen = Queen('http://localhost:6632')

async def handler(message: Message) -> None:
    data: Dict[str, Any] = message['data']
    print(data)

await queen.queue('orders').consume(handler)

Error Handling

Network Errors

The client automatically handles network errors with retry and failover:

python
# Client retries 3 times with exponential backoff
# Then fails over to other servers if available
queen = Queen({
    'urls': ['http://server1:6632', 'http://server2:6632'],
    'retry_attempts': 3,
    'enable_failover': True
})

Processing Errors

python
async def handler(message):
    try:
        await risky_operation(message['data'])
    except TemporaryError as e:
        # Will retry automatically (auto_ack=True)
        raise
    except PermanentError as e:
        # Handle explicitly
        await queen.ack(message, 'failed', {'error': str(e)})

await queen.queue('tasks').consume(handler)

Complete Example

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create queues
        await queen.queue('raw-events').create()
        await queen.queue('processed-events').create()
        
        # Stage 1: Ingest with buffering
        async def ingest():
            for i in range(10000):
                await (queen.queue('raw-events')
                    .partition(f"user-{i % 100}")
                    .buffer({'message_count': 500, 'time_millis': 1000})
                    .push([{
                        'data': {
                            'userId': i % 100,
                            'event': 'page_view',
                            'timestamp': i
                        }
                    }]))
            
            await queen.flush_all_buffers()
            print('Ingestion complete!')
        
        # Stage 2: Process with transactions
        async def process():
            async def handler(messages):
                processed = [{
                    'userId': m['data']['userId'],
                    'processed': True
                } for m in messages]
                
                txn = queen.transaction()
                for msg in messages:
                    txn.ack(msg)
                txn.queue('processed-events').push([{'data': p} for p in processed])
                await txn.commit()
            
            await (queen.queue('raw-events')
                .group('processors')
                .concurrency(5)
                .batch(10)
                .auto_ack(False)
                .consume(handler))
        
        # Stage 3: Monitor with tracing
        async def monitor():
            async def handler(message):
                await message['trace']({
                    'traceName': f"user-{message['data']['userId']}",
                    'data': {'text': 'Event processed'}
                })
            
            await (queen.queue('processed-events')
                .group('monitors')
                .subscription_mode('new')
                .consume(handler))
        
        # Run pipeline
        await ingest()
        await asyncio.gather(process(), monitor())

if __name__ == '__main__':
    asyncio.run(main())

API Reference

Queen Class

python
# Initialize
queen = Queen(config)

# Queue operations
queen.queue(name: str) -> QueueBuilder

# Transactions
queen.transaction() -> TransactionBuilder

# Acknowledgment
await queen.ack(message, status, context)

# Lease renewal
await queen.renew(message_or_lease_id)

# Buffering
await queen.flush_all_buffers()
queen.get_buffer_stats()

# Consumer groups
await queen.delete_consumer_group(group, delete_metadata)
await queen.update_consumer_group_timestamp(group, timestamp)

# Shutdown
await queen.close()

QueueBuilder Class

python
# Configuration
.namespace(name: str)
.task(name: str)
.config(options: dict)
.create()
.delete()

# Push
.partition(name: str)
.buffer(options: dict)
.push(payload) -> PushBuilder

# Consume configuration
.group(name: str)
.concurrency(count: int)
.batch(size: int)
.limit(count: int)
.idle_millis(millis: int)
.auto_ack(enabled: bool)
.renew_lease(enabled: bool, interval_millis: int)
.subscription_mode(mode: str)
.subscription_from(from_: str)
.each()

# Consume
.consume(handler, signal) -> ConsumeBuilder

# Pop
.wait(enabled: bool)
.pop() -> List[Message]

# Buffer management
.flush_buffer()

# DLQ
.dlq(consumer_group: str) -> DLQBuilder

Best Practices

  1. Use async with - Ensures proper cleanup and buffer flushing
  2. Use consume() for workers - Simpler API, handles retries automatically
  3. Use pop() for control - When you need precise control over acking
  4. Buffer for speed - Always use buffering when pushing many messages
  5. Partitions for order - Use partitions when message order matters
  6. Consumer groups for scale - Run multiple workers in the same group
  7. Transactions for consistency - Use transactions for atomic operations
  8. Enable DLQ - Always enable DLQ in production
  9. Renew long leases - Use auto-renewal for long-running tasks
  10. Monitor DLQ - Regularly check for failed messages
  11. Type hints - Use type hints for better IDE support and fewer bugs

Common Patterns

High-Throughput Ingestion

python
async def ingest(items):
    for item in items:
        await (queen.queue('events')
            .buffer({'message_count': 500, 'time_millis': 100})
            .push([{'data': item}]))
    
    await queen.flush_all_buffers()

Scalable Processing

python
async def process():
    await (queen.queue('tasks')
        .group('workers')
        .concurrency(10)
        .batch(20)
        .consume(handler))

Ordered Processing

python
async def process_user_events(user_id):
    async def handler(message):
        await process_in_order(message['data'])
    
    await queen.queue('events').partition(f"user-{user_id}").consume(handler)

Fan-Out Pattern

python
# Multiple groups process same messages
async def email_worker():
    await queen.queue('events').group('emailer').consume(send_email_handler)

async def analytics_worker():
    await queen.queue('events').group('analytics').consume(analytics_handler)

await asyncio.gather(email_worker(), analytics_worker())

Migration from Node.js

The Python client API is nearly identical to the Node.js client:

Syntax Differences

Node.jsPython
const queen = new Queen(...)queen = Queen(...)
await queen.queue('q').create()await queen.queue('q').create()
async (message) => { ... }async def handler(message): ...
camelCasesnake_case (for kwargs)
.then()await

Key Differences

  1. Async context managers: Use async with Queen(...) as queen:
  2. Parameter naming: Use snake_case for keyword arguments
  3. Abort signals: Use asyncio.Event instead of AbortController
  4. DLQ from: Use .from_() instead of .from() (Python keyword)

Example Migration

Node.js:

javascript
const queen = new Queen('http://localhost:6632')

await queen.queue('tasks')
  .concurrency(5)
  .batch(10)
  .consume(async (message) => {
    await processTask(message.data)
  })

Python:

python
async with Queen('http://localhost:6632') as queen:
    async def handler(message):
        await process_task(message['data'])
    
    await (queen.queue('tasks')
        .concurrency(5)
        .batch(10)
        .consume(handler))

Troubleshooting

Import Errors

bash
# Make sure httpx is installed
pip install httpx

# Or reinstall the package
pip install -e .

Type Checking

bash
# Run mypy for type checking
pip install mypy
mypy your_app.py

Connection Issues

python
# Enable logging
import os
os.environ['QUEEN_CLIENT_LOG'] = 'true'

# Then run your app

Performance Issues

python
# Use buffering for high throughput
await queen.queue('q').buffer({'message_count': 500}).push([...])

# Use batching for consumption
await queen.queue('q').batch(50).concurrency(10).consume(handler)

# Monitor buffer stats
stats = queen.get_buffer_stats()
print(stats)

Admin API

The Admin API provides administrative and observability operations typically used by dashboards, monitoring tools, and admin scripts. These endpoints are for inspecting the system, not for regular application message processing.

Accessing the Admin API

python
from queen import Queen

async with Queen('http://localhost:6632') as queen:
    # Access via .admin property
    overview = await queen.admin.get_overview()

Resources

python
# System overview (queue counts, message stats, etc.)
overview = await queen.admin.get_overview()

# List all namespaces
namespaces = await queen.admin.get_namespaces()

# List all tasks
tasks = await queen.admin.get_tasks()

Queue Management

python
# List all queues
queues = await queen.admin.list_queues(limit=100, offset=0)

# Get specific queue details
queue = await queen.admin.get_queue('my-queue')

# List partitions
partitions = await queen.admin.get_partitions(queue='my-queue')

Message Inspection

python
# List messages with filters
messages = await queen.admin.list_messages(
    queue='my-queue',
    status='pending',  # pending, processing, completed, failed
    limit=50,
    offset=0
)

# Get specific message
message = await queen.admin.get_message(partition_id, transaction_id)

# Delete a message (DESTRUCTIVE!)
await queen.admin.delete_message(partition_id, transaction_id)

# Retry a failed message
await queen.admin.retry_message(partition_id, transaction_id)

# Move message to Dead Letter Queue
await queen.admin.move_message_to_dlq(partition_id, transaction_id)

Traces

python
# Get available trace names
trace_names = await queen.admin.get_trace_names(limit=100)

# Get traces by name (cross-service correlation)
traces = await queen.admin.get_traces_by_name(
    'order-12345',
    limit=100,
    from_='2025-01-01T00:00:00Z',
    to='2025-01-31T23:59:59Z'
)

# Get traces for a specific message
message_traces = await queen.admin.get_traces_for_message(partition_id, transaction_id)

Analytics & Status

python
# System status
status = await queen.admin.get_status()

# Queue statistics
queue_stats = await queen.admin.get_queue_stats(limit=100)

# Detailed stats for specific queue
detail = await queen.admin.get_queue_detail('my-queue')

# Analytics data (throughput, latency, etc.)
analytics = await queen.admin.get_analytics(
    from_='2025-01-01T00:00:00Z',
    to='2025-01-31T23:59:59Z'
)

Consumer Groups

python
# List all consumer groups
groups = await queen.admin.list_consumer_groups()

# Get specific consumer group details
group = await queen.admin.get_consumer_group('my-consumer-group')

# Find lagging consumers (behind by > 60 seconds)
lagging = await queen.admin.get_lagging_consumers(min_lag_seconds=60)

# Refresh consumer statistics
await queen.admin.refresh_consumer_stats()

# Delete consumer group for a specific queue
await queen.admin.delete_consumer_group_for_queue(
    'my-group', 
    'my-queue', 
    delete_metadata=True
)

# Seek consumer group offset
await queen.admin.seek_consumer_group(
    'my-group',
    'my-queue',
    {'timestamp': '2025-01-15T10:00:00Z'}
)

System Operations

python
# Health check
health = await queen.admin.health()

# Prometheus metrics (raw text)
metrics = await queen.admin.metrics()

# Maintenance mode (push operations)
maintenance_status = await queen.admin.get_maintenance_mode()
await queen.admin.set_maintenance_mode(True)   # Enable
await queen.admin.set_maintenance_mode(False)  # Disable

# Pop maintenance mode
pop_maintenance = await queen.admin.get_pop_maintenance_mode()
await queen.admin.set_pop_maintenance_mode(True)

# System metrics (CPU, memory, connections)
system_metrics = await queen.admin.get_system_metrics()

# Worker metrics
worker_metrics = await queen.admin.get_worker_metrics()

# PostgreSQL statistics
pg_stats = await queen.admin.get_postgres_stats()

Admin API Reference

MethodDescription
Resources
get_overview()System overview with counts
get_namespaces()List all namespaces
get_tasks()List all tasks
Queues
list_queues(**params)List queues with pagination
get_queue(name)Get queue details
clear_queue(name, partition?)Clear queue messages
get_partitions(**params)List partitions
Messages
list_messages(**params)List messages with filters
get_message(partition_id, tx_id)Get specific message
delete_message(partition_id, tx_id)Delete message
retry_message(partition_id, tx_id)Retry failed message
move_message_to_dlq(partition_id, tx_id)Move to DLQ
Traces
get_trace_names(**params)List available trace names
get_traces_by_name(name, **params)Get traces by name
get_traces_for_message(partition_id, tx_id)Get message traces
Analytics
get_status(**params)System status
get_queue_stats(**params)Queue statistics
get_queue_detail(name, **params)Detailed queue stats
get_analytics(**params)Analytics data
Consumer Groups
list_consumer_groups()List all consumer groups
get_consumer_group(name)Get consumer group details
get_lagging_consumers(min_lag_seconds)Find lagging consumers
refresh_consumer_stats()Refresh statistics
delete_consumer_group_for_queue(cg, queue, delete_meta)Delete CG for queue
seek_consumer_group(cg, queue, options)Seek offset
System
health()Health check
metrics()Prometheus metrics
get_maintenance_mode()Get push maintenance status
set_maintenance_mode(enabled)Set push maintenance
get_pop_maintenance_mode()Get pop maintenance status
set_pop_maintenance_mode(enabled)Set pop maintenance
get_system_metrics(**params)System metrics
get_worker_metrics(**params)Worker metrics
get_postgres_stats()PostgreSQL stats

When to Use Admin API

Use the Admin API for:

  • Dashboards - Building monitoring UIs
  • Scripts - Maintenance and debugging scripts
  • Monitoring - Integration with alerting systems
  • Operations - Managing consumer groups and queues

For normal message processing, use queue().push(), queue().consume(), and ack() instead.

Built with ❤️ by Smartness