Skip to content

Building a Reliable Message Queue System with RabbitMQ and Python

Introduction

This is follow up of https://moonytunes.com/2024/12/07/extending-openai-swarm-with-rabbitmq-building-a-scalable-agent-communication-system/ blog post about the RabbitMQ application for agent swarm from OpenAI

The code is in my GitHub repo

https://github.com/sq5rix/swarm

In this post, I’ll walk through how we built together with Claude Sonnet in neovim with avante plugin, a reliable message queue system using RabbitMQ and Python. We’ll explore how we solved common issues like connection handling, message persistence, and consumer reliability.

The Problem

We needed to create a system where multiple agents could communicate asynchronously through message queues. The main requirements were:

  • Reliable message delivery
  • Multiple concurrent consumers
  • Message persistence
  • Error recovery
  • Queue monitoring

The Solution

We created a robust system using RabbitMQ as our message broker, with Python handling the client-side logic. Here’s how it works:

1. System Architecture

SwarmRabbitMQ (Main Client)
├── Agent Management
│   ├── Register Agents
│   └── Queue Setup
├── Message Handling
│   ├── Publishing
│   └── Consuming
└── Connection Management
    ├── Auto-reconnection
    └── Channel recovery

2. Key Components

Agent Registration

# Create and register agents
agent_a = Agent(name="Agent A", role="Sender")
agent_b = Agent(name="Agent B", role="Receiver")
client.register_agent(agent_a)
client.register_agent(agent_b)

Each agent gets its own dedicated queue with format agent_{agent_name}_queue.

Message Consumer

def message_handler(message):
    """Handle received messages"""
    print("\n=== Message Received ===")
    print(f"Content: {json.dumps(message, indent=2)}")

The consumer runs in a separate thread and processes messages as they arrive.

Reliable Consumer Threading

def start_consumer_for_agent(client, agent):
    max_retries = 3
    retry_count = 0
    while retry_count < max_retries:
        try:
            client.start_consuming(agent, callback=message_handler)
            break
        except Exception as e:
            retry_count += 1
            # Exponential backoff retry logic

3. Message Flow

  1. Message Publication
  • Messages are published to specific agent queues
  • Each message includes:
    • Content payload
    • Context variables
    • Routing information
  1. Queue Processing
   Publisher → Exchange → Queue → Consumer
                      ↓
               Message Storage
  1. Message Consumption
  • Dedicated consumers per agent
  • Automatic acknowledgment
  • Error handling and retries

4. Reliability Features

Connection Management

  • Automatic reconnection
  • Channel recovery
  • Exponential backoff for retries

Error Handling

try:
    response = client.run(agent_a, test_messages, context_variables=context)
except Exception as e:
    print(f"[ERROR] Failed to process response: {str(e)}")

Queue Monitoring

queue_status = client.debug_queues()
total_messages = sum(q.get("message_count", 0) for q in queue_status.values())

Sample Message Flow

  1. Initial Message
{
  "role": "user",
  "content": "I want to talk to agent B."
}
  1. Queue Status
{
  "Agent A": {
    "queue_name": "agent_agent_a_queue",
    "message_count": 1,
    "consumer_count": 1,
    "routing_key": "agent.agent_a",
    "status": "active"
  }
}

Best Practices Implemented

  1. Connection Management
  • Proper connection cleanup
  • Automatic reconnection
  • Channel management
  1. Error Handling
  • Retry mechanisms
  • Exception catching
  • Logging
  1. Resource Management
  • Thread safety
  • Resource cleanup
  • Memory management
  1. Monitoring
  • Queue status tracking
  • Message counting
  • Consumer health checks

Conclusion

This implementation provides a robust foundation for building distributed systems with RabbitMQ. The code handles common edge cases and provides reliable message delivery with proper error recovery.

Key takeaways:

  • Always implement proper connection management
  • Use threading for consumers
  • Implement retry mechanisms
  • Monitor queue health
  • Clean up resources properly

The full implementation is available in the code above, ready to be used as a starting point for your own message queue system.

This is work in progress. I have many plans for future enhancements.

Cool cat rooting for agents swarm with RabbitMQ

2 thoughts on “Building a Reliable Message Queue System with RabbitMQ and Python”

  1. Does your website have a contact page? I’m having a tough time locating it but, I’d like to shoot you an email. I’ve got some ideas for your blog you might be interested in hearing. Either way, great blog and I look forward to seeing it develop over time.

Leave a Reply

Your email address will not be published. Required fields are marked *