QA Engineer Skills 2026QA-2026Multi-Agent Communication Protocols

Multi-Agent Communication Protocols

Why Communication Matters

When agents need to share information, the communication protocol determines whether the system works or fails. Poor protocols lead to context loss, cascading failures, and agents working at cross-purposes. Good protocols enable clean handoffs, error propagation, and distributed coordination.


The Message Protocol

from dataclasses import dataclass

@dataclass
class AgentMessage:
    sender: str           # "code_analyzer"
    receiver: str         # "test_generator" or "broadcast"
    message_type: str     # "analysis_complete" | "tests_ready" | "error"
    payload: dict         # Structured data
    priority: int         # 0=low, 1=normal, 2=high, 3=critical
    timestamp: float
    correlation_id: str   # Traces related messages across agents

Message Types

Type Sender Receiver Purpose
analysis_complete Code Analyzer Test Generator "I have analyzed the module, here are the testable functions"
tests_ready Test Generator Test Runner "I have generated tests, they are ready to execute"
results_available Test Runner Test Fixer "Tests ran, here are the failures"
fix_applied Test Fixer Test Runner "I fixed the failing tests, please re-run"
error Any agent Orchestrator "I failed, here is what happened"
budget_warning Any agent Orchestrator "I am at 80% of my token budget"

Example Message Flow

messages = [
    AgentMessage(
        sender="code_analyzer",
        receiver="test_generator",
        message_type="analysis_complete",
        payload={
            "module": "auth/login.py",
            "functions_to_test": ["authenticate", "validate_token"],
            "complexity_hints": {"authenticate": "high", "validate_token": "medium"},
            "existing_test_count": 3,
            "suggested_test_count": 8
        },
        priority=1,
        timestamp=time.time(),
        correlation_id="sprint-42-auth-refactor"
    ),
    AgentMessage(
        sender="test_generator",
        receiver="test_runner",
        message_type="tests_ready",
        payload={
            "test_file": "tests/test_auth_login.py",
            "test_count": 8,
            "dependencies": ["pytest", "pytest-asyncio", "httpx"]
        },
        priority=1,
        timestamp=time.time(),
        correlation_id="sprint-42-auth-refactor"
    )
]

Correlation IDs for Tracing

The correlation_id is essential for debugging multi-agent systems. It ties together all messages related to a single task:

# All messages for the auth-refactor task share the same correlation ID
# This allows you to:
# 1. Filter logs by correlation_id to see the full pipeline
# 2. Measure total pipeline time (first message to last)
# 3. Identify where bottlenecks occur
# 4. Replay a specific task for debugging

def get_pipeline_trace(correlation_id: str) -> list[AgentMessage]:
    """Get all messages for a specific task, ordered by timestamp."""
    return sorted(
        [m for m in message_store if m.correlation_id == correlation_id],
        key=lambda m: m.timestamp
    )

Error Propagation

When one agent fails, the system must handle it gracefully. There are three strategies:

Strategy 1: Skip and Continue

async def run_pipeline(self, modules: list[str]):
    for module in modules:
        try:
            analysis = await self.code_analyzer.analyze(module)
            tests = await self.test_generator.generate(analysis)
            results = await self.test_runner.run(tests)

            if results.has_failures:
                fixed = await self.test_fixer.fix(tests, results)
                results = await self.test_runner.run(fixed)

        except AgentTimeoutError as e:
            self.log.warning(f"Agent timeout on {module}: {e}")
            self.dead_letter_queue.append(module)  # Retry later
            continue

        except Exception as e:
            self.log.error(f"Unexpected failure on {module}: {e}")
            self.alert_human(module, e)  # Escalate
            continue

Use when: Modules are independent. Failure on one should not block others.

Strategy 2: Fail Fast

async def run_pipeline(self, modules: list[str]):
    for module in modules:
        try:
            analysis = await self.code_analyzer.analyze(module)
            tests = await self.test_generator.generate(analysis)
            results = await self.test_runner.run(tests)
        except AgentBudgetExceeded as e:
            self.log.error(f"Budget exceeded on {module}: {e}")
            break  # Stop processing ALL modules

Use when: A budget or rate limit error means continuing will fail anyway.

Strategy 3: Fallback Agents

async def analyze_with_fallback(self, module: str):
    """Try the primary analyzer, fall back to a simpler one."""
    try:
        return await self.primary_analyzer.analyze(module)
    except AgentTimeoutError:
        self.log.warning(f"Primary analyzer timed out, using fallback")
        return await self.fallback_analyzer.analyze(module)

Use when: You have a simpler, faster agent that produces lower-quality but acceptable results.


Communication Patterns

Pattern 1: Pipeline (Sequential)

Analyzer -> Generator -> Runner -> Fixer -> Runner

Each agent passes output to the next. Simple and predictable.

Pattern 2: Publish-Subscribe

class MessageBus:
    def __init__(self):
        self.subscribers = defaultdict(list)

    def subscribe(self, message_type: str, handler: Callable):
        self.subscribers[message_type].append(handler)

    def publish(self, message: AgentMessage):
        for handler in self.subscribers[message.message_type]:
            handler(message)

# Usage
bus = MessageBus()
bus.subscribe("analysis_complete", test_generator.on_analysis_complete)
bus.subscribe("analysis_complete", coverage_tracker.on_analysis_complete)
bus.subscribe("tests_ready", test_runner.on_tests_ready)
bus.subscribe("error", alerting_service.on_error)

Multiple agents can react to the same event. The Coverage Tracker and Test Generator both receive the analysis result.

Pattern 3: Request-Response

class AgentProxy:
    """Synchronous request-response to another agent."""

    async def request(self, target_agent: str, action: str, data: dict) -> dict:
        request_id = str(uuid4())
        self.send(AgentMessage(
            sender=self.agent_id,
            receiver=target_agent,
            message_type=f"request:{action}",
            payload={"request_id": request_id, **data},
            priority=2,
        ))

        # Wait for response with timeout
        response = await self.wait_for_response(request_id, timeout=30)
        return response.payload

Monitoring Multi-Agent Communication

class CommunicationMonitor:
    """Track message flow health across agents."""

    def __init__(self):
        self.message_count = defaultdict(int)
        self.error_count = defaultdict(int)
        self.latencies = defaultdict(list)

    def on_message(self, message: AgentMessage):
        key = f"{message.sender}->{message.receiver}"
        self.message_count[key] += 1

        if message.message_type == "error":
            self.error_count[message.sender] += 1

    def report(self) -> dict:
        return {
            "total_messages": sum(self.message_count.values()),
            "messages_by_channel": dict(self.message_count),
            "errors_by_agent": dict(self.error_count),
            "error_rate": sum(self.error_count.values()) / max(sum(self.message_count.values()), 1),
        }

Key Takeaway

Multi-agent communication protocols determine system reliability. Use structured messages with correlation IDs for tracing, implement error propagation strategies (skip, fail-fast, fallback) based on the dependency model, and monitor message flow health to detect bottlenecks and failures early.