Skip to content

Python Client Examples

Comprehensive examples for the Queen MQ Python client.

Basic Push and Pop

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create queue
        await queen.queue('tasks').create()
        
        # Push messages
        await queen.queue('tasks').push([
            {'data': {'task': 'send-email', 'to': 'user@example.com'}},
            {'data': {'task': 'generate-report', 'id': 123}}
        ])
        
        # Pop messages
        messages = await queen.queue('tasks').pop()
        
        for message in messages:
            print('Processing:', message['data'])
            await queen.ack(message, True)

asyncio.run(main())

Simple Consumer

python
async def handler(message):
    print('Got task:', message['data'])
    await process_task(message['data'])
    # Automatically ACK'd on success

await queen.queue('tasks').consume(handler)

Consumer with Concurrency

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Process 10 messages in parallel
        async def handler(message):
            print(f'Processing: {message["data"]}')
            await asyncio.sleep(1)  # Simulate work
            print(f'Done: {message["data"]}')
        
        await queen.queue('tasks').concurrency(10).consume(handler)

asyncio.run(main())

Batch Processing

python
async def batch_handler(messages):
    """Handler receives array when batch > 1"""
    print(f'Processing batch of {len(messages)} messages')
    
    for message in messages:
        await process(message['data'])

# Fetch and process 20 messages at a time
await queen.queue('tasks').batch(20).consume(batch_handler)

Consumer Groups

python
import asyncio
from queen import Queen

async def worker1():
    """Worker 1 in consumer group"""
    async with Queen('http://localhost:6632') as queen:
        async def handler(message):
            print('Worker 1:', message['data'])
        
        await queen.queue('emails').group('processors').consume(handler)

async def worker2():
    """Worker 2 in same consumer group"""
    async with Queen('http://localhost:6632') as queen:
        async def handler(message):
            print('Worker 2:', message['data'])
        
        await queen.queue('emails').group('processors').consume(handler)

# Run both workers - they share the load
asyncio.run(asyncio.gather(worker1(), worker2()))

Partitions for Ordering

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create queue
        await queen.queue('user-events').create()
        
        # Push events for user-123 (will be processed in order)
        await queen.queue('user-events').partition('user-123').push([
            {'data': {'event': 'login', 'timestamp': '10:00:00'}},
            {'data': {'event': 'view-page', 'timestamp': '10:00:05'}},
            {'data': {'event': 'logout', 'timestamp': '10:00:30'}}
        ])
        
        # Consume events for this user in order
        async def handler(message):
            print(f"User 123: {message['data']['event']} at {message['data']['timestamp']}")
        
        await queen.queue('user-events').partition('user-123').limit(3).consume(handler)

asyncio.run(main())

Client-Side Buffering

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create queue
        await queen.queue('events').create()
        
        # Push 10,000 messages with buffering (super fast!)
        for i in range(10000):
            await queen.queue('events').buffer({
                'message_count': 500,
                'time_millis': 1000
            }).push([
                {'data': {'id': i, 'value': i * 2}}
            ])
        
        # Flush remaining buffered messages
        await queen.flush_all_buffers()
        
        print('✅ Pushed 10,000 messages with buffering!')

asyncio.run(main())

Transactions

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create queues
        await queen.queue('input').create()
        await queen.queue('output').create()
        
        # Push to input
        await queen.queue('input').push([
            {'data': {'value': 42}}
        ])
        
        # Pop from input
        messages = await queen.queue('input').pop()
        
        if messages:
            message = messages[0]
            
            # Process
            processed = {'value': message['data']['value'] * 2}
            
            # Atomically: ack input AND push output
            await (queen.transaction()
                .ack(message)
                .queue('output')
                .push([{'data': processed}])
                .commit())
            
            print('✅ Transaction committed!')

asyncio.run(main())

Subscription Modes

python
import asyncio
from queen import Queen

async def batch_analytics():
    """Process ALL messages including historical"""
    async with Queen('http://localhost:6632') as queen:
        async def handler(message):
            await generate_report(message['data'])
        
        await queen.queue('events').group('batch-analytics').consume(handler)

async def realtime_monitoring():
    """Process only NEW messages"""
    async with Queen('http://localhost:6632') as queen:
        async def handler(message):
            await send_alert(message['data'])
        
        await (queen.queue('events')
            .group('realtime-monitor')
            .subscription_mode('new')
            .consume(handler))

async def replay_debug():
    """Process from specific timestamp"""
    async with Queen('http://localhost:6632') as queen:
        async def handler(message):
            await debug_message(message['data'])
        
        await (queen.queue('events')
            .group('debug-replay')
            .subscription_from('2025-10-28T15:30:00.000Z')
            .consume(handler))

# Run all three groups
asyncio.run(asyncio.gather(
    batch_analytics(),
    realtime_monitoring(),
    replay_debug()
))

Dead Letter Queue

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create queue with DLQ enabled
        await queen.queue('risky').config({
            'retryLimit': 3,
            'dlqAfterMaxRetries': True
        }).create()
        
        # Push some messages
        for i in range(10):
            await queen.queue('risky').push([
                {'data': {'value': i}}
            ])
        
        # Consume (some will fail)
        processed = 0
        async def handler(message):
            nonlocal processed
            if message['data']['value'] % 3 == 0:
                raise Exception('Simulated failure')
            processed += 1
        
        await queen.queue('risky').limit(10).consume(handler)
        
        # Query DLQ
        dlq = await queen.queue('risky').dlq().limit(10).get()
        
        print(f'Processed: {processed}')
        print(f'Failed (in DLQ): {dlq["total"]}')
        
        for msg in dlq['messages']:
            print(f'  - Value: {msg["data"]["value"]}, Error: {msg.get("errorMessage")}')

asyncio.run(main())

Message Tracing

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        await queen.queue('orders').create()
        
        # Push an order
        await queen.queue('orders').push([
            {'data': {'orderId': 12345, 'amount': 99.99}}
        ])
        
        # Process with tracing
        async def handler(msg):
            order_id = msg['data']['orderId']
            
            # Record trace
            await msg['trace']({
                'traceName': f"order-{order_id}",
                'eventType': 'info',
                'data': {'text': 'Order processing started'}
            })
            
            # Do work
            await process_order(msg['data'])
            
            # Record completion
            await msg['trace']({
                'traceName': f"order-{order_id}",
                'eventType': 'processing',
                'data': {
                    'text': 'Order completed',
                    'amount': msg['data']['amount']
                }
            })
        
        await queen.queue('orders').limit(1).consume(handler)
        print('✅ Order processed with tracing!')

async def process_order(data):
    await asyncio.sleep(0.5)

asyncio.run(main())

Lease Renewal

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        await queen.queue('long-tasks').create()
        
        # Push a task
        await queen.queue('long-tasks').push([
            {'data': {'task': 'process-video', 'videoId': 123}}
        ])
        
        # Consume with auto-renewal (every 60 seconds)
        async def handler(message):
            print('Starting long task...')
            
            # This can take minutes - lease keeps renewing!
            await asyncio.sleep(180)  # 3 minutes
            
            print('Long task completed!')
        
        await (queen.queue('long-tasks')
            .renew_lease(True, 60000)  # Renew every 60 seconds
            .limit(1)
            .consume(handler))

asyncio.run(main())

Error Handling with Callbacks

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        await queen.queue('tasks').create()
        
        # Push messages
        for i in range(5):
            await queen.queue('tasks').push([
                {'data': {'id': i, 'risky': i % 2 == 0}}
            ])
        
        # Consume with error handling
        async def handler(message):
            if message['data']['risky']:
                raise Exception('Risky operation failed!')
            return message['data']
        
        async def on_success(message, result):
            print('✅ Success:', result)
        
        async def on_error(message, error):
            print('❌ Error:', error)
            # Custom retry logic
            if 'temporary' in str(error):
                await queen.ack(message, False)  # Retry
            else:
                await queen.ack(message, 'failed', {'error': str(error)})
        
        await (queen.queue('tasks')
            .auto_ack(False)
            .limit(5)
            .consume(handler)
            .on_success(on_success)
            .on_error(on_error))

asyncio.run(main())

Multiple Queues with Namespace

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create multiple queues in 'accounting' namespace
        await queen.queue('invoices').namespace('accounting').create()
        await queen.queue('receipts').namespace('accounting').create()
        await queen.queue('payments').namespace('accounting').create()
        
        # Push to different queues
        await queen.queue('invoices').push([{'data': {'invoice': 'INV-001'}}])
        await queen.queue('receipts').push([{'data': {'receipt': 'RCP-001'}}])
        await queen.queue('payments').push([{'data': {'payment': 'PAY-001'}}])
        
        # Consume from ALL queues in namespace
        async def handler(message):
            print(f'Accounting message from {message.get("queue")}: {message["data"]}')
        
        await queen.queue().namespace('accounting').limit(3).consume(handler)
        
        print('✅ Processed messages from all accounting queues!')

asyncio.run(main())

Streaming with Windows

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create source queue
        await queen.queue('sensor-data').create()
        
        # Define stream
        await (queen.stream('sensor-analytics', 'iot')
            .sources(['sensor-data'])
            .tumbling_time(10)     # 10 second windows
            .grace_period(5)       # 5 second late arrival grace
            .define())
        
        # Push sensor data
        for i in range(100):
            await queen.queue('sensor-data').push([
                {'data': {'sensorId': i % 10, 'temperature': 20 + i % 10, 'humidity': 50 + i % 20}}
            ])
            await asyncio.sleep(0.1)
        
        # Consume windows
        consumer = queen.consumer('sensor-analytics', 'analytics-group')
        
        async def process_window(window):
            print(f'Window: {window.size()} messages')
            
            # Aggregate
            stats = window.aggregate({
                'count': True,
                'avg': ['data.temperature', 'data.humidity'],
                'min': ['data.temperature'],
                'max': ['data.temperature']
            })
            
            print('Temperature:', stats)
            
            # Group by sensor
            groups = window.group_by('data.sensorId')
            print(f'Sensors: {len(groups)}')
        
        # Process 3 windows then stop
        windows_processed = 0
        async def wrapped_process(window):
            nonlocal windows_processed
            await process_window(window)
            windows_processed += 1
            if windows_processed >= 3:
                consumer.stop()
        
        await consumer.process(wrapped_process)

asyncio.run(main())

High-Throughput Pipeline

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create queues
        await queen.queue('raw-events').config({'priority': 5}).create()
        await queen.queue('processed-events').config({'priority': 10}).create()
        
        # Stage 1: Ingest with buffering
        async def ingest():
            for i in range(10000):
                await (queen.queue('raw-events')
                    .partition(f"user-{i % 100}")
                    .buffer({'message_count': 500, 'time_millis': 1000})
                    .push([{
                        'data': {
                            'userId': i % 100,
                            'event': 'page_view',
                            'timestamp': i
                        }
                    }]))
            
            await queen.flush_all_buffers()
            print('✅ Ingestion complete!')
        
        # Stage 2: Process with transactions
        async def process():
            async def handler(messages):
                processed = [{
                    'userId': m['data']['userId'],
                    'processed': True,
                    'timestamp': m['data']['timestamp']
                } for m in messages]
                
                # Atomic: ack inputs and push outputs
                txn = queen.transaction()
                for msg in messages:
                    txn.ack(msg)
                txn.queue('processed-events').push([{'data': p} for p in processed])
                await txn.commit()
            
            await (queen.queue('raw-events')
                .group('processors')
                .concurrency(5)
                .batch(10)
                .auto_ack(False)
                .consume(handler))
        
        # Stage 3: Monitor (only new messages)
        async def monitor():
            async def handler(message):
                await message['trace']({
                    'traceName': f"user-{message['data']['userId']}",
                    'data': {'text': 'Event processed'}
                })
            
            await (queen.queue('processed-events')
                .group('monitors')
                .subscription_mode('new')
                .consume(handler))
        
        # Run pipeline
        await ingest()
        
        # Process and monitor concurrently
        await asyncio.gather(
            asyncio.create_task(process()),
            asyncio.create_task(monitor())
        )

asyncio.run(main())

Request-Reply Pattern

python
import asyncio
import uuid
from queen import Queen

async def requester():
    """Make a request and wait for reply"""
    async with Queen('http://localhost:6632') as queen:
        correlation_id = str(uuid.uuid4())
        
        # Send request
        await queen.queue('requests').push([{
            'data': {
                'action': 'calculate',
                'value': 42,
                'correlationId': correlation_id
            }
        }])
        
        print(f'📤 Request sent: {correlation_id}')
        
        # Wait for reply
        reply_received = False
        async def reply_handler(message):
            nonlocal reply_received
            if message['data'].get('correlationId') == correlation_id:
                print(f'📥 Reply received: {message["data"]["result"]}')
                reply_received = True
        
        await queen.queue('replies').consume(reply_handler)

async def worker():
    """Process requests and send replies"""
    async with Queen('http://localhost:6632') as queen:
        async def handler(message):
            correlation_id = message['data']['correlationId']
            value = message['data']['value']
            
            # Process
            result = value * 2
            
            # Send reply
            await queen.queue('replies').push([{
                'data': {
                    'correlationId': correlation_id,
                    'result': result
                }
            }])
            
            print(f'✅ Processed request {correlation_id}: {value} -> {result}')
        
        await queen.queue('requests').consume(handler)

# Run worker in background, then make request
asyncio.run(asyncio.gather(
    worker(),
    requester()
))

Manual Ack with Pop

python
import asyncio
from queen import Queen

async def main():
    async with Queen('http://localhost:6632') as queen:
        # Create and populate queue
        await queen.queue('manual-tasks').create()
        
        for i in range(5):
            await queen.queue('manual-tasks').push([
                {'data': {'id': i, 'risky': i % 2 == 0}}
            ])
        
        # Pop with long polling
        while True:
            messages = await queen.queue('manual-tasks').batch(5).wait(True).pop()
            
            if not messages:
                break
            
            for message in messages:
                try:
                    # Process
                    if message['data']['risky']:
                        raise Exception('Risky operation failed')
                    
                    print(f'✅ Processed: {message["data"]["id"]}')
                    await queen.ack(message, True)
                    
                except Exception as error:
                    print(f'❌ Failed: {message["data"]["id"]} - {error}')
                    await queen.ack(message, False, {'error': str(error)})

asyncio.run(main())

Graceful Shutdown

python
import asyncio
import signal
from queen import Queen

# Using context manager (automatic cleanup)
async def main():
    async with Queen('http://localhost:6632') as queen:
        async def handler(message):
            await process(message['data'])
        
        await queen.queue('tasks').consume(handler)
    # Queen automatically closes and flushes buffers on exit

# Using abort signal
async def main_with_signal():
    queen = Queen('http://localhost:6632')
    
    # Create abort signal
    stop_signal = asyncio.Event()
    
    # Setup signal handler
    def signal_handler(signum, frame):
        print('\nReceived interrupt, stopping...')
        stop_signal.set()
    
    signal.signal(signal.SIGINT, signal_handler)
    
    try:
        # Start consumer with signal
        async def handler(message):
            await process(message['data'])
        
        await queen.queue('tasks').consume(handler, signal=stop_signal)
    finally:
        await queen.close()

asyncio.run(main())

Type Hints

python
import asyncio
from queen import Queen, Message
from typing import Dict, Any

async def main():
    queen: Queen = Queen('http://localhost:6632')
    
    async def handler(message: Message) -> None:
        data: Dict[str, Any] = message['data']
        order_id: int = data['orderId']
        amount: float = data['amount']
        
        print(f'Processing order {order_id}: ${amount}')
    
    await queen.queue('orders').consume(handler)

asyncio.run(main())

Production Example

python
import asyncio
import logging
from queen import Queen

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def main():
    # Production configuration
    queen = Queen({
        'urls': [
            'http://queen1.prod.internal:6632',
            'http://queen2.prod.internal:6632',
            'http://queen3.prod.internal:6632'
        ],
        'load_balancing_strategy': 'affinity',
        'affinity_hash_ring': 128,
        'enable_failover': True,
        'timeout_millis': 30000,
        'retry_attempts': 3
    })
    
    try:
        # Setup queues
        await queen.queue('orders').config({
            'leaseTime': 300,
            'retryLimit': 3,
            'priority': 10,
            'dlqAfterMaxRetries': True,
            'encryptionEnabled': True
        }).create()
        
        # Process orders
        async def handler(message):
            order_id = message['data']['orderId']
            logger.info(f'Processing order {order_id}')
            
            try:
                # Trace start
                await message['trace']({
                    'traceName': f"order-{order_id}",
                    'eventType': 'info',
                    'data': {'text': 'Started processing'}
                })
                
                # Process
                result = await process_order(message['data'])
                
                # Trace completion
                await message['trace']({
                    'traceName': f"order-{order_id}",
                    'eventType': 'processing',
                    'data': {'text': 'Completed', 'result': result}
                })
                
                logger.info(f'Completed order {order_id}')
                
            except Exception as e:
                logger.error(f'Failed order {order_id}: {e}')
                await message['trace']({
                    'traceName': f"order-{order_id}",
                    'eventType': 'error',
                    'data': {'text': 'Failed', 'error': str(e)}
                })
                raise
        
        # Run consumer with monitoring
        await (queen.queue('orders')
            .group('order-processors')
            .concurrency(10)
            .batch(20)
            .renew_lease(True, 60000)
            .consume(handler))
            
    finally:
        await queen.close()

async def process_order(data):
    # Your business logic here
    await asyncio.sleep(1)
    return {'status': 'completed'}

if __name__ == '__main__':
    asyncio.run(main())

More Examples

For more examples and advanced patterns:

Built with ❤️ by Smartness