Multi-Agent Tracing
Observe complex agent workflows and inter-agent communications with comprehensive tracing
Multi-agent systems represent some of the most complex AI applications, involving multiple agents that coordinate, communicate, and collaborate to achieve shared goals. Noveum.ai provides specialized tracing capabilities to help you understand and optimize these intricate workflows.
🎯 Why Multi-Agent Tracing Matters
Multi-agent systems introduce unique observability challenges:
- Complex Dependencies: Agents depend on each other's outputs and decisions
- Asynchronous Operations: Agents may operate concurrently or in parallel
- Communication Patterns: Understanding how agents share information
- Resource Coordination: Managing shared resources and preventing conflicts
- Error Propagation: How failures in one agent affect the entire system
🏗️ Agent System Architecture
Agent Types and Roles
Noveum.ai can trace various agent patterns:
import noveum_trace
# Coordinator Agent - Orchestrates workflow
@noveum_trace.trace_agent(agent_type="coordinator")
def coordinator_agent(task: str) -> Dict[str, Any]:
plan = create_execution_plan(task)
return {
"plan": plan,
"next_agents": ["researcher", "analyzer"],
"coordination_id": generate_coordination_id()
}
# Worker Agent - Executes specific tasks
@noveum_trace.trace_agent(agent_type="worker", role="researcher")
def research_agent(query: str, context: Dict) -> Dict[str, Any]:
results = perform_research(query)
return {
"findings": results,
"confidence": calculate_confidence(results),
"requires_validation": True
}
# Validator Agent - Reviews and validates outputs
@noveum_trace.trace_agent(agent_type="validator")
def validation_agent(data: Dict, criteria: List[str]) -> Dict[str, Any]:
validation_results = validate_against_criteria(data, criteria)
return {
"is_valid": validation_results["passed"],
"feedback": validation_results["feedback"],
"suggestions": validation_results["improvements"]
}
TypeScript Multi-Agent Example
import { trace, addAttribute } from '@noveum/trace';
class AgentOrchestrator {
async executeWorkflow(task: string): Promise<WorkflowResult> {
return await trace('agent-workflow', async () => {
addAttribute('workflow.task', task);
addAttribute('workflow.agent_count', 3);
// Coordinate multiple agents
const researchTask = this.delegateToAgent('researcher', task);
const analysisTask = this.delegateToAgent('analyzer', task);
const results = await Promise.all([researchTask, analysisTask]);
// Final coordination
return await this.synthesizeResults(results);
});
}
private async delegateToAgent(agentType: string, task: string): Promise<AgentResult> {
return await trace(`agent-${agentType}`, async () => {
addAttribute('agent.type', agentType);
addAttribute('agent.task', task);
addAttribute('agent.parent_workflow', 'main');
// Agent-specific processing
return await this.processWithAgent(agentType, task);
});
}
}
📊 Tracing Multi-Agent Workflows
Coordination Patterns
Sequential Agent Execution
@noveum_trace.trace_workflow(name="sequential_agents")
def sequential_agent_workflow(input_data: Dict) -> Dict:
"""Execute agents in sequence, each building on the previous output."""
# Agent 1: Data Collection
raw_data = trace_agent_execution(
agent_name="data_collector",
agent_function=data_collection_agent,
input_data=input_data
)
# Agent 2: Data Processing (depends on Agent 1)
processed_data = trace_agent_execution(
agent_name="data_processor",
agent_function=data_processing_agent,
input_data=raw_data,
dependencies=["data_collector"]
)
# Agent 3: Decision Making (depends on Agent 2)
decision = trace_agent_execution(
agent_name="decision_maker",
agent_function=decision_making_agent,
input_data=processed_data,
dependencies=["data_processor"]
)
return decision
def trace_agent_execution(agent_name: str, agent_function, input_data: Dict, dependencies: List[str] = None):
"""Helper to trace individual agent execution with dependency tracking."""
with noveum_trace.trace(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("agent.type", "sequential")
if dependencies:
span.set_attribute("agent.dependencies", dependencies)
result = agent_function(input_data)
span.set_attribute("agent.output_size", len(str(result)))
span.set_attribute("agent.success", True)
return result
Parallel Agent Execution
@noveum_trace.trace_workflow(name="parallel_agents")
async def parallel_agent_workflow(task: str) -> Dict:
"""Execute multiple agents in parallel for faster processing."""
with noveum_trace.trace("parallel_coordination") as coordination_span:
coordination_span.set_attribute("coordination.pattern", "parallel")
coordination_span.set_attribute("coordination.agent_count", 3)
# Launch agents in parallel
tasks = [
trace_async_agent("research_agent", research_task, task),
trace_async_agent("analysis_agent", analysis_task, task),
trace_async_agent("synthesis_agent", synthesis_task, task)
]
results = await asyncio.gather(*tasks)
# Merge results from all agents
merged_result = merge_agent_outputs(results)
coordination_span.set_attribute("coordination.success", True)
coordination_span.set_attribute("coordination.output_keys", list(merged_result.keys()))
return merged_result
async def trace_async_agent(agent_name: str, agent_function, input_data):
"""Trace asynchronous agent execution."""
async with noveum_trace.trace(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("agent.execution_mode", "parallel")
span.set_attribute("agent.start_time", time.time())
result = await agent_function(input_data)
span.set_attribute("agent.end_time", time.time())
span.set_attribute("agent.result_type", type(result).__name__)
return result
Hierarchical Agent Systems
@noveum_trace.trace_workflow(name="hierarchical_agents")
def hierarchical_agent_workflow(complex_task: str) -> Dict:
"""Execute agents in a hierarchical structure with supervision."""
# Master Agent - High-level coordination
with noveum_trace.trace("master_agent") as master_span:
master_span.set_attribute("agent.level", "master")
master_span.set_attribute("agent.role", "coordinator")
# Break down complex task
subtasks = decompose_task(complex_task)
master_span.set_attribute("master.subtask_count", len(subtasks))
# Supervisor Agents - Mid-level coordination
supervisor_results = []
for i, subtask_group in enumerate(subtasks):
with noveum_trace.trace(f"supervisor_agent_{i}") as supervisor_span:
supervisor_span.set_attribute("agent.level", "supervisor")
supervisor_span.set_attribute("agent.subtask_group", i)
supervisor_span.set_attribute("agent.parent", "master_agent")
# Worker Agents - Task execution
worker_results = []
for j, subtask in enumerate(subtask_group):
with noveum_trace.trace(f"worker_agent_{i}_{j}") as worker_span:
worker_span.set_attribute("agent.level", "worker")
worker_span.set_attribute("agent.task_id", f"{i}_{j}")
worker_span.set_attribute("agent.supervisor", f"supervisor_agent_{i}")
result = execute_subtask(subtask)
worker_results.append(result)
worker_span.set_attribute("worker.success", True)
# Supervisor consolidates worker results
consolidated = consolidate_worker_results(worker_results)
supervisor_results.append(consolidated)
supervisor_span.set_attribute("supervisor.worker_count", len(worker_results))
# Master agent synthesizes final result
final_result = synthesize_supervisor_results(supervisor_results)
master_span.set_attribute("master.final_result_size", len(str(final_result)))
return final_result
🔗 Inter-Agent Communication Tracing
Message Passing
class AgentCommunicationTracer:
"""Trace communication between agents."""
@staticmethod
def trace_message_send(sender_agent: str, receiver_agent: str, message_type: str, payload: Dict):
"""Trace message sending between agents."""
with noveum_trace.trace("agent_communication.send") as span:
span.set_attribute("communication.sender", sender_agent)
span.set_attribute("communication.receiver", receiver_agent)
span.set_attribute("communication.message_type", message_type)
span.set_attribute("communication.payload_size", len(str(payload)))
span.set_attribute("communication.timestamp", time.time())
# Simulate message sending
message_id = send_message(receiver_agent, message_type, payload)
span.set_attribute("communication.message_id", message_id)
return message_id
@staticmethod
def trace_message_receive(receiver_agent: str, message_id: str):
"""Trace message reception by an agent."""
with noveum_trace.trace("agent_communication.receive") as span:
span.set_attribute("communication.receiver", receiver_agent)
span.set_attribute("communication.message_id", message_id)
span.set_attribute("communication.receive_timestamp", time.time())
# Process received message
message = receive_message(message_id)
span.set_attribute("communication.message_type", message.get("type"))
span.set_attribute("communication.processing_required", True)
return message
# Usage in agent workflow
@noveum_trace.trace_agent(agent_type="communicator")
def communicating_agent(task: str) -> Dict:
"""Agent that communicates with other agents."""
# Send request to research agent
message_id = AgentCommunicationTracer.trace_message_send(
sender_agent="coordinator",
receiver_agent="research_agent",
message_type="research_request",
payload={"query": task, "priority": "high"}
)
# Wait for response
response = AgentCommunicationTracer.trace_message_receive(
receiver_agent="coordinator",
message_id=f"response_{message_id}"
)
return {"research_data": response, "communication_success": True}
Shared State Management
class SharedStateTracer:
"""Trace access to shared state between agents."""
@staticmethod
def trace_state_read(agent_name: str, state_key: str):
"""Trace reading from shared state."""
with noveum_trace.trace("shared_state.read") as span:
span.set_attribute("state.operation", "read")
span.set_attribute("state.agent", agent_name)
span.set_attribute("state.key", state_key)
span.set_attribute("state.timestamp", time.time())
value = read_shared_state(state_key)
span.set_attribute("state.value_type", type(value).__name__)
span.set_attribute("state.success", True)
return value
@staticmethod
def trace_state_write(agent_name: str, state_key: str, value: Any):
"""Trace writing to shared state."""
with noveum_trace.trace("shared_state.write") as span:
span.set_attribute("state.operation", "write")
span.set_attribute("state.agent", agent_name)
span.set_attribute("state.key", state_key)
span.set_attribute("state.value_type", type(value).__name__)
span.set_attribute("state.timestamp", time.time())
success = write_shared_state(state_key, value)
span.set_attribute("state.success", success)
return success
# Usage in agents
@noveum_trace.trace_agent(agent_type="state_manager")
def state_managing_agent(operation: str) -> Dict:
"""Agent that manages shared state."""
if operation == "update_progress":
# Read current progress
current_progress = SharedStateTracer.trace_state_read(
agent_name="progress_manager",
state_key="workflow_progress"
)
# Update progress
new_progress = current_progress + 0.1
SharedStateTracer.trace_state_write(
agent_name="progress_manager",
state_key="workflow_progress",
value=new_progress
)
return {"progress_updated": True, "new_progress": new_progress}
📈 Multi-Agent Performance Analysis
Agent Performance Metrics
Track key metrics for each agent:
@noveum_trace.trace_agent(agent_type="performance_monitored")
def performance_tracked_agent(task: str) -> Dict:
"""Agent with comprehensive performance tracking."""
start_time = time.time()
memory_before = get_memory_usage()
with noveum_trace.trace("agent_execution") as span:
span.set_attribute("performance.start_time", start_time)
span.set_attribute("performance.memory_before", memory_before)
# Execute agent logic
result = execute_agent_logic(task)
# Track performance metrics
end_time = time.time()
memory_after = get_memory_usage()
execution_time = end_time - start_time
span.set_attribute("performance.end_time", end_time)
span.set_attribute("performance.execution_time", execution_time)
span.set_attribute("performance.memory_after", memory_after)
span.set_attribute("performance.memory_delta", memory_after - memory_before)
span.set_attribute("performance.output_size", len(str(result)))
# Performance classification
if execution_time > 10.0:
span.set_attribute("performance.classification", "slow")
elif execution_time > 5.0:
span.set_attribute("performance.classification", "moderate")
else:
span.set_attribute("performance.classification", "fast")
return result
System-Wide Coordination Metrics
@noveum_trace.trace_workflow(name="system_coordination_metrics")
def track_system_coordination(agents: List[str]) -> Dict:
"""Track coordination metrics across the entire agent system."""
with noveum_trace.trace("system_coordination") as span:
span.set_attribute("system.agent_count", len(agents))
span.set_attribute("system.coordination_start", time.time())
# Track agent startup times
startup_times = {}
for agent in agents:
start = time.time()
initialize_agent(agent)
startup_times[agent] = time.time() - start
span.set_attribute("system.avg_startup_time", np.mean(list(startup_times.values())))
span.set_attribute("system.max_startup_time", max(startup_times.values()))
# Track message passing efficiency
message_count = 0
total_message_latency = 0
for i in range(len(agents)):
for j in range(i + 1, len(agents)):
start = time.time()
send_test_message(agents[i], agents[j])
latency = time.time() - start
total_message_latency += latency
message_count += 1
avg_message_latency = total_message_latency / message_count if message_count > 0 else 0
span.set_attribute("system.message_count", message_count)
span.set_attribute("system.avg_message_latency", avg_message_latency)
span.set_attribute("system.coordination_efficiency", calculate_efficiency_score(startup_times, avg_message_latency))
return {
"coordination_successful": True,
"performance_metrics": {
"startup_times": startup_times,
"message_latency": avg_message_latency,
"efficiency_score": span.get_attribute("system.coordination_efficiency")
}
}
🔧 Best Practices for Multi-Agent Tracing
1. Agent Identification
Always clearly identify agents in your traces:
# Clear agent identification
@noveum_trace.trace_agent(
agent_type="coordinator",
agent_id="main_coordinator_v1",
agent_version="1.2.0"
)
def main_coordinator(task: str) -> Dict:
pass
2. Communication Tracing
Trace all inter-agent communications:
# Comprehensive communication tracing
def trace_agent_communication(sender: str, receiver: str, message: Dict):
with noveum_trace.trace("agent_communication") as span:
span.set_attribute("sender_agent", sender)
span.set_attribute("receiver_agent", receiver)
span.set_attribute("message_type", message.get("type"))
span.set_attribute("message_priority", message.get("priority", "normal"))
span.set_attribute("requires_response", message.get("requires_response", False))
3. Error Propagation Tracking
Monitor how errors propagate through agent systems:
@noveum_trace.trace_agent(agent_type="error_resilient")
def resilient_agent(task: str) -> Dict:
"""Agent with error propagation tracking."""
try:
result = execute_task(task)
return result
except Exception as e:
with noveum_trace.trace("error_handling") as span:
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
span.set_attribute("error.propagation_level", "contained")
span.set_attribute("error.recovery_attempted", True)
# Attempt recovery
try:
recovery_result = attempt_recovery(task, e)
span.set_attribute("error.recovery_successful", True)
return recovery_result
except Exception as recovery_error:
span.set_attribute("error.recovery_successful", False)
span.set_attribute("error.propagation_level", "escalated")
raise
4. Resource Coordination
Track shared resource usage:
class ResourceCoordinationTracer:
"""Track how agents coordinate shared resources."""
@staticmethod
def trace_resource_acquisition(agent_name: str, resource_type: str, resource_id: str):
with noveum_trace.trace("resource.acquire") as span:
span.set_attribute("resource.agent", agent_name)
span.set_attribute("resource.type", resource_type)
span.set_attribute("resource.id", resource_id)
span.set_attribute("resource.timestamp", time.time())
success = acquire_resource(resource_type, resource_id)
span.set_attribute("resource.acquired", success)
if not success:
span.set_attribute("resource.conflict", True)
span.set_attribute("resource.wait_required", True)
return success
@staticmethod
def trace_resource_release(agent_name: str, resource_type: str, resource_id: str):
with noveum_trace.trace("resource.release") as span:
span.set_attribute("resource.agent", agent_name)
span.set_attribute("resource.type", resource_type)
span.set_attribute("resource.id", resource_id)
span.set_attribute("resource.release_timestamp", time.time())
release_resource(resource_type, resource_id)
span.set_attribute("resource.released", True)
🎯 Advanced Multi-Agent Patterns
Self-Organizing Agent Systems
@noveum_trace.trace_workflow(name="self_organizing_agents")
def self_organizing_system(initial_task: str) -> Dict:
"""Trace a self-organizing agent system."""
with noveum_trace.trace("system_initialization") as span:
span.set_attribute("system.type", "self_organizing")
span.set_attribute("system.initial_task", initial_task)
# Agents organize themselves based on task requirements
task_analysis = analyze_task_requirements(initial_task)
required_capabilities = task_analysis["capabilities"]
span.set_attribute("system.required_capabilities", required_capabilities)
# Dynamic agent allocation
agent_allocation = allocate_agents_to_capabilities(required_capabilities)
span.set_attribute("system.allocated_agents", len(agent_allocation))
# Trace the self-organization process
organization_result = trace_agent_self_organization(agent_allocation, initial_task)
return organization_result
def trace_agent_self_organization(agent_allocation: Dict, task: str) -> Dict:
"""Trace how agents organize themselves."""
with noveum_trace.trace("agent_self_organization") as span:
span.set_attribute("organization.agent_count", len(agent_allocation))
span.set_attribute("organization.task", task)
# Agents negotiate roles and responsibilities
role_negotiation = {}
for agent_id, capabilities in agent_allocation.items():
with noveum_trace.trace(f"role_negotiation.{agent_id}") as negotiation_span:
negotiation_span.set_attribute("agent.id", agent_id)
negotiation_span.set_attribute("agent.capabilities", capabilities)
assigned_role = negotiate_role(agent_id, capabilities, task)
role_negotiation[agent_id] = assigned_role
negotiation_span.set_attribute("agent.assigned_role", assigned_role)
span.set_attribute("organization.role_assignments", list(role_negotiation.values()))
# Execute with self-organized structure
execution_result = execute_with_organization(role_negotiation, task)
return {
"organization_successful": True,
"role_assignments": role_negotiation,
"execution_result": execution_result
}
Adaptive Agent Workflows
@noveum_trace.trace_workflow(name="adaptive_agents")
def adaptive_agent_workflow(dynamic_task: str) -> Dict:
"""Agents that adapt their behavior based on execution context."""
with noveum_trace.trace("adaptive_coordination") as span:
span.set_attribute("workflow.type", "adaptive")
span.set_attribute("workflow.initial_task", dynamic_task)
# Start with initial strategy
current_strategy = "default"
execution_context = initialize_execution_context()
span.set_attribute("workflow.initial_strategy", current_strategy)
for iteration in range(max_iterations := 5):
with noveum_trace.trace(f"adaptation_iteration_{iteration}") as iteration_span:
iteration_span.set_attribute("iteration.number", iteration)
iteration_span.set_attribute("iteration.strategy", current_strategy)
# Execute with current strategy
result = execute_with_strategy(current_strategy, dynamic_task, execution_context)
# Evaluate results and adapt if necessary
performance_metrics = evaluate_performance(result)
iteration_span.set_attribute("iteration.performance", performance_metrics["score"])
if performance_metrics["requires_adaptation"]:
new_strategy = adapt_strategy(current_strategy, performance_metrics, execution_context)
iteration_span.set_attribute("iteration.adapted", True)
iteration_span.set_attribute("iteration.new_strategy", new_strategy)
current_strategy = new_strategy
# Update execution context based on learning
execution_context = update_execution_context(execution_context, performance_metrics)
else:
iteration_span.set_attribute("iteration.adapted", False)
span.set_attribute("workflow.converged_at_iteration", iteration)
break
return {
"adaptation_successful": True,
"final_strategy": current_strategy,
"iterations_required": iteration + 1,
"final_result": result
}
📊 Monitoring and Alerts
Set up monitoring for multi-agent systems:
# Configure alerts for multi-agent systems
noveum_trace.configure_alerts({
"agent_failure_rate": {
"threshold": 0.1, # 10% failure rate
"window": "5m",
"action": "alert"
},
"communication_latency": {
"threshold": 1000, # 1 second
"window": "1m",
"action": "alert"
},
"coordination_efficiency": {
"threshold": 0.7, # 70% efficiency
"window": "10m",
"action": "alert"
}
})
Multi-agent tracing with Noveum.ai provides the visibility needed to understand, optimize, and scale complex agent systems. By implementing comprehensive tracing across all agent interactions, communications, and coordination patterns, you can build more reliable and efficient multi-agent AI applications.
🔗 Next Steps
- RAG Pipeline Observability - Monitor retrieval and generation systems
- Custom Instrumentation - Add domain-specific tracing
- Performance Optimization - Optimize based on tracing insights
Get Early Access to Noveum.ai Platform
Be the first one to get notified when we open Noveum Platform to more users. All users get access to Observability suite for free, early users get free eval jobs and premium support for the first year.