Introduction to my swarm fork
OpenAI’s Swarm is a powerful system for orchestrating multiple AI agents. While the base implementation works great for direct communication, we can enhance its capabilities by introducing message queuing via RabbitMQ. This modification enables asynchronous communication, better scalability, and improved reliability in agent-to-agent interactions.
I created my own fork at https://github.com/sq5rix/swarm and I plan to implement RabbitMQ into swarm.
Next stage will be containerisation of swarm agents and handoffs.
https://github.com/sq5rix/swarm_test for testing it.
I invite collaborators!
Why RabbitMQ?
- Asynchronous Communication: Agents can work independently without blocking each other
- Message Persistence: No message loss during system failures
- Scalability: Easy to scale horizontally as your agent swarm grows
- Reliability: Built-in message acknowledgment and retry mechanisms
- Monitoring: RabbitMQ’s management interface provides excellent visibility into system operations
Implementation Overview
The implementation adds a new layer of message handling while preserving Swarm’s core functionality. Here’s what’s included:
RabbitMQ Handler
class RabbitMQHandler:
def __init__(self, host='localhost', port=5672,
username='guest', password='guest'):
self.credentials = pika.PlainCredentials(username, password)
self.parameters = pika.ConnectionParameters(
host=host,
port=port,
credentials=self.credentials
)
This class manages all RabbitMQ operations, including:
- Connection management
- Queue creation
- Message publishing
- Error handling
Enhanced Swarm Class
class SwarmRabbitMQ:
def __init__(self, client=None, rabbitmq_config: Dict[str, Any] = None):
if not client:
client = OpenAI()
self.client = client
self.rabbitmq = RabbitMQHandler(**(rabbitmq_config or {}))
self.agent_queues = {}
Key features:
- Automatic agent queue creation
- Message routing between agents
- Enhanced handoff mechanism
- Persistent message storage
Deployment
The system comes with Docker support for easy deployment:
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
swarm_app:
build: .
environment:
- RABBITMQ_HOST=rabbitmq
- OPENAI_API_KEY=${OPENAI_API_KEY}
Usage Example
from swarm_rabbitmq import SwarmRabbitMQ
from swarm import Agent
# Configure RabbitMQ
rabbitmq_config = {
'host': 'localhost',
'port': 5672,
'username': 'guest',
'password': 'guest'
}
# Initialize enhanced swarm
client = SwarmRabbitMQ(rabbitmq_config=rabbitmq_config)
# Create and register agents
agent_a = Agent(
name="Agent A",
instructions="You are a helpful agent."
)
client.register_agent(agent_a)
# Run as usual
response = client.run(
agent=agent_a,
messages=[{"role": "user", "content": "Hello!"}]
)
Benefits
- Resilience: Messages persist even if agents go offline
- Load Distribution: Better handling of high message volumes
- Monitoring: Built-in tools for system observation
- Flexibility: Easy to add new agents without system modifications
- Error Recovery: Automatic reconnection and message retry capabilities
Requirements
openai>=1.0.0
pika>=1.3.1
python-dotenv>=1.0.0
typing-extensions>=4.7.1
Future Enhancements
- Message priority handling
- Dead letter queues for failed messages
- Enhanced monitoring and logging
- Agent load balancing
- Message routing patterns
- Authentication and authorisation of agents
- Containers with RabbitMQ for network separation, scaling workers and security
Conclusion
This enhancement to OpenAI’s Swarm system provides a robust foundation for building complex, distributed AI agent systems. The addition of RabbitMQ brings enterprise-grade message handling capabilities while maintaining the simplicity and flexibility of the original Swarm implementation.
Feel free to contribute or raise issues on our GitHub repository!
Note: This is an unofficial extension of OpenAI’s Swarm system. Please ensure you comply with OpenAI’s terms of service and usage guidelines when implementing this solution.
#OpenAI #Swarm #RabbitMQ #AIAgents #Python #Microservices ■
Pingback: Building a Reliable Message Queue System with RabbitMQ and Python