Multi-Agent Communication Protocols
Why Communication Matters
When agents need to share information, the communication protocol determines whether the system works or fails. Poor protocols lead to context loss, cascading failures, and agents working at cross-purposes. Good protocols enable clean handoffs, error propagation, and distributed coordination.
The Message Protocol
from dataclasses import dataclass
@dataclass
class AgentMessage:
sender: str # "code_analyzer"
receiver: str # "test_generator" or "broadcast"
message_type: str # "analysis_complete" | "tests_ready" | "error"
payload: dict # Structured data
priority: int # 0=low, 1=normal, 2=high, 3=critical
timestamp: float
correlation_id: str # Traces related messages across agents
Message Types
| Type | Sender | Receiver | Purpose |
|---|---|---|---|
analysis_complete |
Code Analyzer | Test Generator | "I have analyzed the module, here are the testable functions" |
tests_ready |
Test Generator | Test Runner | "I have generated tests, they are ready to execute" |
results_available |
Test Runner | Test Fixer | "Tests ran, here are the failures" |
fix_applied |
Test Fixer | Test Runner | "I fixed the failing tests, please re-run" |
error |
Any agent | Orchestrator | "I failed, here is what happened" |
budget_warning |
Any agent | Orchestrator | "I am at 80% of my token budget" |
Example Message Flow
messages = [
AgentMessage(
sender="code_analyzer",
receiver="test_generator",
message_type="analysis_complete",
payload={
"module": "auth/login.py",
"functions_to_test": ["authenticate", "validate_token"],
"complexity_hints": {"authenticate": "high", "validate_token": "medium"},
"existing_test_count": 3,
"suggested_test_count": 8
},
priority=1,
timestamp=time.time(),
correlation_id="sprint-42-auth-refactor"
),
AgentMessage(
sender="test_generator",
receiver="test_runner",
message_type="tests_ready",
payload={
"test_file": "tests/test_auth_login.py",
"test_count": 8,
"dependencies": ["pytest", "pytest-asyncio", "httpx"]
},
priority=1,
timestamp=time.time(),
correlation_id="sprint-42-auth-refactor"
)
]
Correlation IDs for Tracing
The correlation_id is essential for debugging multi-agent systems. It ties together all messages related to a single task:
# All messages for the auth-refactor task share the same correlation ID
# This allows you to:
# 1. Filter logs by correlation_id to see the full pipeline
# 2. Measure total pipeline time (first message to last)
# 3. Identify where bottlenecks occur
# 4. Replay a specific task for debugging
def get_pipeline_trace(correlation_id: str) -> list[AgentMessage]:
"""Get all messages for a specific task, ordered by timestamp."""
return sorted(
[m for m in message_store if m.correlation_id == correlation_id],
key=lambda m: m.timestamp
)
Error Propagation
When one agent fails, the system must handle it gracefully. There are three strategies:
Strategy 1: Skip and Continue
async def run_pipeline(self, modules: list[str]):
for module in modules:
try:
analysis = await self.code_analyzer.analyze(module)
tests = await self.test_generator.generate(analysis)
results = await self.test_runner.run(tests)
if results.has_failures:
fixed = await self.test_fixer.fix(tests, results)
results = await self.test_runner.run(fixed)
except AgentTimeoutError as e:
self.log.warning(f"Agent timeout on {module}: {e}")
self.dead_letter_queue.append(module) # Retry later
continue
except Exception as e:
self.log.error(f"Unexpected failure on {module}: {e}")
self.alert_human(module, e) # Escalate
continue
Use when: Modules are independent. Failure on one should not block others.
Strategy 2: Fail Fast
async def run_pipeline(self, modules: list[str]):
for module in modules:
try:
analysis = await self.code_analyzer.analyze(module)
tests = await self.test_generator.generate(analysis)
results = await self.test_runner.run(tests)
except AgentBudgetExceeded as e:
self.log.error(f"Budget exceeded on {module}: {e}")
break # Stop processing ALL modules
Use when: A budget or rate limit error means continuing will fail anyway.
Strategy 3: Fallback Agents
async def analyze_with_fallback(self, module: str):
"""Try the primary analyzer, fall back to a simpler one."""
try:
return await self.primary_analyzer.analyze(module)
except AgentTimeoutError:
self.log.warning(f"Primary analyzer timed out, using fallback")
return await self.fallback_analyzer.analyze(module)
Use when: You have a simpler, faster agent that produces lower-quality but acceptable results.
Communication Patterns
Pattern 1: Pipeline (Sequential)
Analyzer -> Generator -> Runner -> Fixer -> Runner
Each agent passes output to the next. Simple and predictable.
Pattern 2: Publish-Subscribe
class MessageBus:
def __init__(self):
self.subscribers = defaultdict(list)
def subscribe(self, message_type: str, handler: Callable):
self.subscribers[message_type].append(handler)
def publish(self, message: AgentMessage):
for handler in self.subscribers[message.message_type]:
handler(message)
# Usage
bus = MessageBus()
bus.subscribe("analysis_complete", test_generator.on_analysis_complete)
bus.subscribe("analysis_complete", coverage_tracker.on_analysis_complete)
bus.subscribe("tests_ready", test_runner.on_tests_ready)
bus.subscribe("error", alerting_service.on_error)
Multiple agents can react to the same event. The Coverage Tracker and Test Generator both receive the analysis result.
Pattern 3: Request-Response
class AgentProxy:
"""Synchronous request-response to another agent."""
async def request(self, target_agent: str, action: str, data: dict) -> dict:
request_id = str(uuid4())
self.send(AgentMessage(
sender=self.agent_id,
receiver=target_agent,
message_type=f"request:{action}",
payload={"request_id": request_id, **data},
priority=2,
))
# Wait for response with timeout
response = await self.wait_for_response(request_id, timeout=30)
return response.payload
Monitoring Multi-Agent Communication
class CommunicationMonitor:
"""Track message flow health across agents."""
def __init__(self):
self.message_count = defaultdict(int)
self.error_count = defaultdict(int)
self.latencies = defaultdict(list)
def on_message(self, message: AgentMessage):
key = f"{message.sender}->{message.receiver}"
self.message_count[key] += 1
if message.message_type == "error":
self.error_count[message.sender] += 1
def report(self) -> dict:
return {
"total_messages": sum(self.message_count.values()),
"messages_by_channel": dict(self.message_count),
"errors_by_agent": dict(self.error_count),
"error_rate": sum(self.error_count.values()) / max(sum(self.message_count.values()), 1),
}
Key Takeaway
Multi-agent communication protocols determine system reliability. Use structured messages with correlation IDs for tracing, implement error propagation strategies (skip, fail-fast, fallback) based on the dependency model, and monitor message flow health to detect bottlenecks and failures early.