Introduction
This is follow up of https://moonytunes.com/2024/12/07/extending-openai-swarm-with-rabbitmq-building-a-scalable-agent-communication-system/ blog post about the RabbitMQ application for agent swarm from OpenAI
The code is in my GitHub repo
https://github.com/sq5rix/swarm
In this post, I’ll walk through how we built together with Claude Sonnet in neovim with avante plugin, a reliable message queue system using RabbitMQ and Python. We’ll explore how we solved common issues like connection handling, message persistence, and consumer reliability.
The Problem
We needed to create a system where multiple agents could communicate asynchronously through message queues. The main requirements were:
- Reliable message delivery
- Multiple concurrent consumers
- Message persistence
- Error recovery
- Queue monitoring
The Solution
We created a robust system using RabbitMQ as our message broker, with Python handling the client-side logic. Here’s how it works:
1. System Architecture
SwarmRabbitMQ (Main Client)
├── Agent Management
│ ├── Register Agents
│ └── Queue Setup
├── Message Handling
│ ├── Publishing
│ └── Consuming
└── Connection Management
├── Auto-reconnection
└── Channel recovery
2. Key Components
Agent Registration
# Create and register agents
agent_a = Agent(name="Agent A", role="Sender")
agent_b = Agent(name="Agent B", role="Receiver")
client.register_agent(agent_a)
client.register_agent(agent_b)
Each agent gets its own dedicated queue with format agent_{agent_name}_queue
.
Message Consumer
def message_handler(message):
"""Handle received messages"""
print("\n=== Message Received ===")
print(f"Content: {json.dumps(message, indent=2)}")
The consumer runs in a separate thread and processes messages as they arrive.
Reliable Consumer Threading
def start_consumer_for_agent(client, agent):
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
client.start_consuming(agent, callback=message_handler)
break
except Exception as e:
retry_count += 1
# Exponential backoff retry logic
3. Message Flow
- Message Publication
- Messages are published to specific agent queues
- Each message includes:
- Content payload
- Context variables
- Routing information
- Queue Processing
Publisher → Exchange → Queue → Consumer
↓
Message Storage
- Message Consumption
- Dedicated consumers per agent
- Automatic acknowledgment
- Error handling and retries
4. Reliability Features
Connection Management
- Automatic reconnection
- Channel recovery
- Exponential backoff for retries
Error Handling
try:
response = client.run(agent_a, test_messages, context_variables=context)
except Exception as e:
print(f"[ERROR] Failed to process response: {str(e)}")
Queue Monitoring
queue_status = client.debug_queues()
total_messages = sum(q.get("message_count", 0) for q in queue_status.values())
Sample Message Flow
- Initial Message
{
"role": "user",
"content": "I want to talk to agent B."
}
- Queue Status
{
"Agent A": {
"queue_name": "agent_agent_a_queue",
"message_count": 1,
"consumer_count": 1,
"routing_key": "agent.agent_a",
"status": "active"
}
}
Best Practices Implemented
- Connection Management
- Proper connection cleanup
- Automatic reconnection
- Channel management
- Error Handling
- Retry mechanisms
- Exception catching
- Logging
- Resource Management
- Thread safety
- Resource cleanup
- Memory management
- Monitoring
- Queue status tracking
- Message counting
- Consumer health checks
Conclusion
This implementation provides a robust foundation for building distributed systems with RabbitMQ. The code handles common edge cases and provides reliable message delivery with proper error recovery.
Key takeaways:
- Always implement proper connection management
- Use threading for consumers
- Implement retry mechanisms
- Monitor queue health
- Clean up resources properly
The full implementation is available in the code above, ready to be used as a starting point for your own message queue system.
This is work in progress. I have many plans for future enhancements.