Documentation

Multi-Agent Tracing

Observe complex agent workflows and inter-agent communications with comprehensive tracing

Multi-agent systems represent some of the most complex AI applications, involving multiple agents that coordinate, communicate, and collaborate to achieve shared goals. Noveum.ai provides specialized tracing capabilities to help you understand and optimize these intricate workflows.

🎯 Why Multi-Agent Tracing Matters

Multi-agent systems introduce unique observability challenges:

  • Complex Dependencies: Agents depend on each other's outputs and decisions
  • Asynchronous Operations: Agents may operate concurrently or in parallel
  • Communication Patterns: Understanding how agents share information
  • Resource Coordination: Managing shared resources and preventing conflicts
  • Error Propagation: How failures in one agent affect the entire system

🏗️ Agent System Architecture

Agent Types and Roles

Noveum.ai can trace various agent patterns:

import noveum_trace
 
# Coordinator Agent - Orchestrates workflow
@noveum_trace.trace_agent(agent_type="coordinator")
def coordinator_agent(task: str) -> Dict[str, Any]:
    plan = create_execution_plan(task)
    return {
        "plan": plan,
        "next_agents": ["researcher", "analyzer"],
        "coordination_id": generate_coordination_id()
    }
 
# Worker Agent - Executes specific tasks
@noveum_trace.trace_agent(agent_type="worker", role="researcher")
def research_agent(query: str, context: Dict) -> Dict[str, Any]:
    results = perform_research(query)
    return {
        "findings": results,
        "confidence": calculate_confidence(results),
        "requires_validation": True
    }
 
# Validator Agent - Reviews and validates outputs
@noveum_trace.trace_agent(agent_type="validator")
def validation_agent(data: Dict, criteria: List[str]) -> Dict[str, Any]:
    validation_results = validate_against_criteria(data, criteria)
    return {
        "is_valid": validation_results["passed"],
        "feedback": validation_results["feedback"],
        "suggestions": validation_results["improvements"]
    }

TypeScript Multi-Agent Example

import { trace, addAttribute } from '@noveum/trace';
 
class AgentOrchestrator {
  async executeWorkflow(task: string): Promise<WorkflowResult> {
    return await trace('agent-workflow', async () => {
      addAttribute('workflow.task', task);
      addAttribute('workflow.agent_count', 3);
 
      // Coordinate multiple agents
      const researchTask = this.delegateToAgent('researcher', task);
      const analysisTask = this.delegateToAgent('analyzer', task);
 
      const results = await Promise.all([researchTask, analysisTask]);
 
      // Final coordination
      return await this.synthesizeResults(results);
    });
  }
 
  private async delegateToAgent(agentType: string, task: string): Promise<AgentResult> {
    return await trace(`agent-${agentType}`, async () => {
      addAttribute('agent.type', agentType);
      addAttribute('agent.task', task);
      addAttribute('agent.parent_workflow', 'main');
 
      // Agent-specific processing
      return await this.processWithAgent(agentType, task);
    });
  }
}

📊 Tracing Multi-Agent Workflows

Coordination Patterns

Sequential Agent Execution

@noveum_trace.trace_workflow(name="sequential_agents")
def sequential_agent_workflow(input_data: Dict) -> Dict:
    """Execute agents in sequence, each building on the previous output."""
 
    # Agent 1: Data Collection
    raw_data = trace_agent_execution(
        agent_name="data_collector",
        agent_function=data_collection_agent,
        input_data=input_data
    )
 
    # Agent 2: Data Processing (depends on Agent 1)
    processed_data = trace_agent_execution(
        agent_name="data_processor",
        agent_function=data_processing_agent,
        input_data=raw_data,
        dependencies=["data_collector"]
    )
 
    # Agent 3: Decision Making (depends on Agent 2)
    decision = trace_agent_execution(
        agent_name="decision_maker",
        agent_function=decision_making_agent,
        input_data=processed_data,
        dependencies=["data_processor"]
    )
 
    return decision
 
def trace_agent_execution(agent_name: str, agent_function, input_data: Dict, dependencies: List[str] = None):
    """Helper to trace individual agent execution with dependency tracking."""
    with noveum_trace.trace(f"agent.{agent_name}") as span:
        span.set_attribute("agent.name", agent_name)
        span.set_attribute("agent.type", "sequential")
        if dependencies:
            span.set_attribute("agent.dependencies", dependencies)
 
        result = agent_function(input_data)
 
        span.set_attribute("agent.output_size", len(str(result)))
        span.set_attribute("agent.success", True)
 
        return result

Parallel Agent Execution

@noveum_trace.trace_workflow(name="parallel_agents")
async def parallel_agent_workflow(task: str) -> Dict:
    """Execute multiple agents in parallel for faster processing."""
 
    with noveum_trace.trace("parallel_coordination") as coordination_span:
        coordination_span.set_attribute("coordination.pattern", "parallel")
        coordination_span.set_attribute("coordination.agent_count", 3)
 
        # Launch agents in parallel
        tasks = [
            trace_async_agent("research_agent", research_task, task),
            trace_async_agent("analysis_agent", analysis_task, task),
            trace_async_agent("synthesis_agent", synthesis_task, task)
        ]
 
        results = await asyncio.gather(*tasks)
 
        # Merge results from all agents
        merged_result = merge_agent_outputs(results)
 
        coordination_span.set_attribute("coordination.success", True)
        coordination_span.set_attribute("coordination.output_keys", list(merged_result.keys()))
 
        return merged_result
 
async def trace_async_agent(agent_name: str, agent_function, input_data):
    """Trace asynchronous agent execution."""
    async with noveum_trace.trace(f"agent.{agent_name}") as span:
        span.set_attribute("agent.name", agent_name)
        span.set_attribute("agent.execution_mode", "parallel")
        span.set_attribute("agent.start_time", time.time())
 
        result = await agent_function(input_data)
 
        span.set_attribute("agent.end_time", time.time())
        span.set_attribute("agent.result_type", type(result).__name__)
 
        return result

Hierarchical Agent Systems

@noveum_trace.trace_workflow(name="hierarchical_agents")
def hierarchical_agent_workflow(complex_task: str) -> Dict:
    """Execute agents in a hierarchical structure with supervision."""
 
    # Master Agent - High-level coordination
    with noveum_trace.trace("master_agent") as master_span:
        master_span.set_attribute("agent.level", "master")
        master_span.set_attribute("agent.role", "coordinator")
 
        # Break down complex task
        subtasks = decompose_task(complex_task)
        master_span.set_attribute("master.subtask_count", len(subtasks))
 
        # Supervisor Agents - Mid-level coordination
        supervisor_results = []
        for i, subtask_group in enumerate(subtasks):
            with noveum_trace.trace(f"supervisor_agent_{i}") as supervisor_span:
                supervisor_span.set_attribute("agent.level", "supervisor")
                supervisor_span.set_attribute("agent.subtask_group", i)
                supervisor_span.set_attribute("agent.parent", "master_agent")
 
                # Worker Agents - Task execution
                worker_results = []
                for j, subtask in enumerate(subtask_group):
                    with noveum_trace.trace(f"worker_agent_{i}_{j}") as worker_span:
                        worker_span.set_attribute("agent.level", "worker")
                        worker_span.set_attribute("agent.task_id", f"{i}_{j}")
                        worker_span.set_attribute("agent.supervisor", f"supervisor_agent_{i}")
 
                        result = execute_subtask(subtask)
                        worker_results.append(result)
 
                        worker_span.set_attribute("worker.success", True)
 
                # Supervisor consolidates worker results
                consolidated = consolidate_worker_results(worker_results)
                supervisor_results.append(consolidated)
 
                supervisor_span.set_attribute("supervisor.worker_count", len(worker_results))
 
        # Master agent synthesizes final result
        final_result = synthesize_supervisor_results(supervisor_results)
        master_span.set_attribute("master.final_result_size", len(str(final_result)))
 
        return final_result

🔗 Inter-Agent Communication Tracing

Message Passing

class AgentCommunicationTracer:
    """Trace communication between agents."""
 
    @staticmethod
    def trace_message_send(sender_agent: str, receiver_agent: str, message_type: str, payload: Dict):
        """Trace message sending between agents."""
        with noveum_trace.trace("agent_communication.send") as span:
            span.set_attribute("communication.sender", sender_agent)
            span.set_attribute("communication.receiver", receiver_agent)
            span.set_attribute("communication.message_type", message_type)
            span.set_attribute("communication.payload_size", len(str(payload)))
            span.set_attribute("communication.timestamp", time.time())
 
            # Simulate message sending
            message_id = send_message(receiver_agent, message_type, payload)
            span.set_attribute("communication.message_id", message_id)
 
            return message_id
 
    @staticmethod
    def trace_message_receive(receiver_agent: str, message_id: str):
        """Trace message reception by an agent."""
        with noveum_trace.trace("agent_communication.receive") as span:
            span.set_attribute("communication.receiver", receiver_agent)
            span.set_attribute("communication.message_id", message_id)
            span.set_attribute("communication.receive_timestamp", time.time())
 
            # Process received message
            message = receive_message(message_id)
            span.set_attribute("communication.message_type", message.get("type"))
            span.set_attribute("communication.processing_required", True)
 
            return message
 
# Usage in agent workflow
@noveum_trace.trace_agent(agent_type="communicator")
def communicating_agent(task: str) -> Dict:
    """Agent that communicates with other agents."""
 
    # Send request to research agent
    message_id = AgentCommunicationTracer.trace_message_send(
        sender_agent="coordinator",
        receiver_agent="research_agent",
        message_type="research_request",
        payload={"query": task, "priority": "high"}
    )
 
    # Wait for response
    response = AgentCommunicationTracer.trace_message_receive(
        receiver_agent="coordinator",
        message_id=f"response_{message_id}"
    )
 
    return {"research_data": response, "communication_success": True}

Shared State Management

class SharedStateTracer:
    """Trace access to shared state between agents."""
 
    @staticmethod
    def trace_state_read(agent_name: str, state_key: str):
        """Trace reading from shared state."""
        with noveum_trace.trace("shared_state.read") as span:
            span.set_attribute("state.operation", "read")
            span.set_attribute("state.agent", agent_name)
            span.set_attribute("state.key", state_key)
            span.set_attribute("state.timestamp", time.time())
 
            value = read_shared_state(state_key)
            span.set_attribute("state.value_type", type(value).__name__)
            span.set_attribute("state.success", True)
 
            return value
 
    @staticmethod
    def trace_state_write(agent_name: str, state_key: str, value: Any):
        """Trace writing to shared state."""
        with noveum_trace.trace("shared_state.write") as span:
            span.set_attribute("state.operation", "write")
            span.set_attribute("state.agent", agent_name)
            span.set_attribute("state.key", state_key)
            span.set_attribute("state.value_type", type(value).__name__)
            span.set_attribute("state.timestamp", time.time())
 
            success = write_shared_state(state_key, value)
            span.set_attribute("state.success", success)
 
            return success
 
# Usage in agents
@noveum_trace.trace_agent(agent_type="state_manager")
def state_managing_agent(operation: str) -> Dict:
    """Agent that manages shared state."""
 
    if operation == "update_progress":
        # Read current progress
        current_progress = SharedStateTracer.trace_state_read(
            agent_name="progress_manager",
            state_key="workflow_progress"
        )
 
        # Update progress
        new_progress = current_progress + 0.1
        SharedStateTracer.trace_state_write(
            agent_name="progress_manager",
            state_key="workflow_progress",
            value=new_progress
        )
 
        return {"progress_updated": True, "new_progress": new_progress}

📈 Multi-Agent Performance Analysis

Agent Performance Metrics

Track key metrics for each agent:

@noveum_trace.trace_agent(agent_type="performance_monitored")
def performance_tracked_agent(task: str) -> Dict:
    """Agent with comprehensive performance tracking."""
 
    start_time = time.time()
    memory_before = get_memory_usage()
 
    with noveum_trace.trace("agent_execution") as span:
        span.set_attribute("performance.start_time", start_time)
        span.set_attribute("performance.memory_before", memory_before)
 
        # Execute agent logic
        result = execute_agent_logic(task)
 
        # Track performance metrics
        end_time = time.time()
        memory_after = get_memory_usage()
        execution_time = end_time - start_time
 
        span.set_attribute("performance.end_time", end_time)
        span.set_attribute("performance.execution_time", execution_time)
        span.set_attribute("performance.memory_after", memory_after)
        span.set_attribute("performance.memory_delta", memory_after - memory_before)
        span.set_attribute("performance.output_size", len(str(result)))
 
        # Performance classification
        if execution_time > 10.0:
            span.set_attribute("performance.classification", "slow")
        elif execution_time > 5.0:
            span.set_attribute("performance.classification", "moderate")
        else:
            span.set_attribute("performance.classification", "fast")
 
        return result

System-Wide Coordination Metrics

@noveum_trace.trace_workflow(name="system_coordination_metrics")
def track_system_coordination(agents: List[str]) -> Dict:
    """Track coordination metrics across the entire agent system."""
 
    with noveum_trace.trace("system_coordination") as span:
        span.set_attribute("system.agent_count", len(agents))
        span.set_attribute("system.coordination_start", time.time())
 
        # Track agent startup times
        startup_times = {}
        for agent in agents:
            start = time.time()
            initialize_agent(agent)
            startup_times[agent] = time.time() - start
 
        span.set_attribute("system.avg_startup_time", np.mean(list(startup_times.values())))
        span.set_attribute("system.max_startup_time", max(startup_times.values()))
 
        # Track message passing efficiency
        message_count = 0
        total_message_latency = 0
 
        for i in range(len(agents)):
            for j in range(i + 1, len(agents)):
                start = time.time()
                send_test_message(agents[i], agents[j])
                latency = time.time() - start
                total_message_latency += latency
                message_count += 1
 
        avg_message_latency = total_message_latency / message_count if message_count > 0 else 0
 
        span.set_attribute("system.message_count", message_count)
        span.set_attribute("system.avg_message_latency", avg_message_latency)
        span.set_attribute("system.coordination_efficiency", calculate_efficiency_score(startup_times, avg_message_latency))
 
        return {
            "coordination_successful": True,
            "performance_metrics": {
                "startup_times": startup_times,
                "message_latency": avg_message_latency,
                "efficiency_score": span.get_attribute("system.coordination_efficiency")
            }
        }

🔧 Best Practices for Multi-Agent Tracing

1. Agent Identification

Always clearly identify agents in your traces:

# Clear agent identification
@noveum_trace.trace_agent(
    agent_type="coordinator",
    agent_id="main_coordinator_v1",
    agent_version="1.2.0"
)
def main_coordinator(task: str) -> Dict:
    pass

2. Communication Tracing

Trace all inter-agent communications:

# Comprehensive communication tracing
def trace_agent_communication(sender: str, receiver: str, message: Dict):
    with noveum_trace.trace("agent_communication") as span:
        span.set_attribute("sender_agent", sender)
        span.set_attribute("receiver_agent", receiver)
        span.set_attribute("message_type", message.get("type"))
        span.set_attribute("message_priority", message.get("priority", "normal"))
        span.set_attribute("requires_response", message.get("requires_response", False))

3. Error Propagation Tracking

Monitor how errors propagate through agent systems:

@noveum_trace.trace_agent(agent_type="error_resilient")
def resilient_agent(task: str) -> Dict:
    """Agent with error propagation tracking."""
 
    try:
        result = execute_task(task)
        return result
    except Exception as e:
        with noveum_trace.trace("error_handling") as span:
            span.set_attribute("error.type", type(e).__name__)
            span.set_attribute("error.message", str(e))
            span.set_attribute("error.propagation_level", "contained")
            span.set_attribute("error.recovery_attempted", True)
 
            # Attempt recovery
            try:
                recovery_result = attempt_recovery(task, e)
                span.set_attribute("error.recovery_successful", True)
                return recovery_result
            except Exception as recovery_error:
                span.set_attribute("error.recovery_successful", False)
                span.set_attribute("error.propagation_level", "escalated")
                raise

4. Resource Coordination

Track shared resource usage:

class ResourceCoordinationTracer:
    """Track how agents coordinate shared resources."""
 
    @staticmethod
    def trace_resource_acquisition(agent_name: str, resource_type: str, resource_id: str):
        with noveum_trace.trace("resource.acquire") as span:
            span.set_attribute("resource.agent", agent_name)
            span.set_attribute("resource.type", resource_type)
            span.set_attribute("resource.id", resource_id)
            span.set_attribute("resource.timestamp", time.time())
 
            success = acquire_resource(resource_type, resource_id)
            span.set_attribute("resource.acquired", success)
 
            if not success:
                span.set_attribute("resource.conflict", True)
                span.set_attribute("resource.wait_required", True)
 
            return success
 
    @staticmethod
    def trace_resource_release(agent_name: str, resource_type: str, resource_id: str):
        with noveum_trace.trace("resource.release") as span:
            span.set_attribute("resource.agent", agent_name)
            span.set_attribute("resource.type", resource_type)
            span.set_attribute("resource.id", resource_id)
            span.set_attribute("resource.release_timestamp", time.time())
 
            release_resource(resource_type, resource_id)
            span.set_attribute("resource.released", True)

🎯 Advanced Multi-Agent Patterns

Self-Organizing Agent Systems

@noveum_trace.trace_workflow(name="self_organizing_agents")
def self_organizing_system(initial_task: str) -> Dict:
    """Trace a self-organizing agent system."""
 
    with noveum_trace.trace("system_initialization") as span:
        span.set_attribute("system.type", "self_organizing")
        span.set_attribute("system.initial_task", initial_task)
 
        # Agents organize themselves based on task requirements
        task_analysis = analyze_task_requirements(initial_task)
        required_capabilities = task_analysis["capabilities"]
 
        span.set_attribute("system.required_capabilities", required_capabilities)
 
        # Dynamic agent allocation
        agent_allocation = allocate_agents_to_capabilities(required_capabilities)
        span.set_attribute("system.allocated_agents", len(agent_allocation))
 
        # Trace the self-organization process
        organization_result = trace_agent_self_organization(agent_allocation, initial_task)
 
        return organization_result
 
def trace_agent_self_organization(agent_allocation: Dict, task: str) -> Dict:
    """Trace how agents organize themselves."""
 
    with noveum_trace.trace("agent_self_organization") as span:
        span.set_attribute("organization.agent_count", len(agent_allocation))
        span.set_attribute("organization.task", task)
 
        # Agents negotiate roles and responsibilities
        role_negotiation = {}
        for agent_id, capabilities in agent_allocation.items():
            with noveum_trace.trace(f"role_negotiation.{agent_id}") as negotiation_span:
                negotiation_span.set_attribute("agent.id", agent_id)
                negotiation_span.set_attribute("agent.capabilities", capabilities)
 
                assigned_role = negotiate_role(agent_id, capabilities, task)
                role_negotiation[agent_id] = assigned_role
 
                negotiation_span.set_attribute("agent.assigned_role", assigned_role)
 
        span.set_attribute("organization.role_assignments", list(role_negotiation.values()))
 
        # Execute with self-organized structure
        execution_result = execute_with_organization(role_negotiation, task)
 
        return {
            "organization_successful": True,
            "role_assignments": role_negotiation,
            "execution_result": execution_result
        }

Adaptive Agent Workflows

@noveum_trace.trace_workflow(name="adaptive_agents")
def adaptive_agent_workflow(dynamic_task: str) -> Dict:
    """Agents that adapt their behavior based on execution context."""
 
    with noveum_trace.trace("adaptive_coordination") as span:
        span.set_attribute("workflow.type", "adaptive")
        span.set_attribute("workflow.initial_task", dynamic_task)
 
        # Start with initial strategy
        current_strategy = "default"
        execution_context = initialize_execution_context()
 
        span.set_attribute("workflow.initial_strategy", current_strategy)
 
        for iteration in range(max_iterations := 5):
            with noveum_trace.trace(f"adaptation_iteration_{iteration}") as iteration_span:
                iteration_span.set_attribute("iteration.number", iteration)
                iteration_span.set_attribute("iteration.strategy", current_strategy)
 
                # Execute with current strategy
                result = execute_with_strategy(current_strategy, dynamic_task, execution_context)
 
                # Evaluate results and adapt if necessary
                performance_metrics = evaluate_performance(result)
                iteration_span.set_attribute("iteration.performance", performance_metrics["score"])
 
                if performance_metrics["requires_adaptation"]:
                    new_strategy = adapt_strategy(current_strategy, performance_metrics, execution_context)
                    iteration_span.set_attribute("iteration.adapted", True)
                    iteration_span.set_attribute("iteration.new_strategy", new_strategy)
                    current_strategy = new_strategy
 
                    # Update execution context based on learning
                    execution_context = update_execution_context(execution_context, performance_metrics)
                else:
                    iteration_span.set_attribute("iteration.adapted", False)
                    span.set_attribute("workflow.converged_at_iteration", iteration)
                    break
 
        return {
            "adaptation_successful": True,
            "final_strategy": current_strategy,
            "iterations_required": iteration + 1,
            "final_result": result
        }

📊 Monitoring and Alerts

Set up monitoring for multi-agent systems:

# Configure alerts for multi-agent systems
noveum_trace.configure_alerts({
    "agent_failure_rate": {
        "threshold": 0.1,  # 10% failure rate
        "window": "5m",
        "action": "alert"
    },
    "communication_latency": {
        "threshold": 1000,  # 1 second
        "window": "1m",
        "action": "alert"
    },
    "coordination_efficiency": {
        "threshold": 0.7,  # 70% efficiency
        "window": "10m",
        "action": "alert"
    }
})

Multi-agent tracing with Noveum.ai provides the visibility needed to understand, optimize, and scale complex agent systems. By implementing comprehensive tracing across all agent interactions, communications, and coordination patterns, you can build more reliable and efficient multi-agent AI applications.

🔗 Next Steps

Exclusive Early Access

Get Early Access to Noveum.ai Platform

Be the first one to get notified when we open Noveum Platform to more users. All users get access to Observability suite for free, early users get free eval jobs and premium support for the first year.

Sign up now. We send access to new batch every week.

Early access members receive premium onboarding support and influence our product roadmap. Limited spots available.