Custom Instrumentation
Add custom spans and attributes for domain-specific observability and advanced tracing patterns
While Noveum.ai's automatic instrumentation covers common AI operations, custom instrumentation allows you to add domain-specific observability, track business metrics, and create detailed traces for unique workflows. This guide covers advanced techniques for implementing custom tracing patterns.
🎯 Why Custom Instrumentation?
Custom instrumentation enables you to:
- Track Business Metrics: Monitor domain-specific KPIs alongside technical metrics
- Trace Complex Workflows: Create detailed observability for unique business logic
- Add Context: Enrich traces with application-specific attributes
- Monitor Custom Components: Instrument proprietary algorithms and processes
- Optimize Performance: Track specific bottlenecks in your application
🛠️ Custom Span Creation
Basic Custom Spans
import noveum_trace
import time
# Manual span creation for custom operations
@noveum_trace.trace_custom(operation_name="data_preprocessing")
def preprocess_user_data(user_data: Dict) -> Dict:
"""Custom preprocessing with detailed tracing."""
with noveum_trace.trace("data_validation") as validation_span:
validation_span.set_attribute("validation.input_fields", len(user_data))
validation_span.set_attribute("validation.user_id", user_data.get("user_id"))
# Validate input data
validation_result = validate_user_data(user_data)
validation_span.set_attribute("validation.success", validation_result["valid"])
validation_span.set_attribute("validation.errors_count", len(validation_result["errors"]))
if not validation_result["valid"]:
validation_span.set_attribute("validation.error_types", validation_result["error_types"])
raise ValueError("Invalid user data")
with noveum_trace.trace("data_transformation") as transform_span:
transform_span.set_attribute("transformation.input_size", len(str(user_data)))
# Apply custom transformations
transformed_data = apply_custom_transformations(user_data)
transform_span.set_attribute("transformation.output_size", len(str(transformed_data)))
transform_span.set_attribute("transformation.compression_ratio",
len(str(transformed_data)) / len(str(user_data)))
return transformed_data
# Advanced span configuration with custom attributes
def custom_span_with_rich_attributes(operation_data: Dict):
"""Demonstrate rich custom attributes."""
with noveum_trace.trace("rich_custom_operation") as span:
# Basic attributes
span.set_attribute("operation.type", "data_processing")
span.set_attribute("operation.version", "2.1.0")
span.set_attribute("operation.environment", "production")
# Nested attribute structures (using dot notation)
span.set_attribute("input.size", len(operation_data))
span.set_attribute("input.type", type(operation_data).__name__)
span.set_attribute("input.has_metadata", "metadata" in operation_data)
# Business context
span.set_attribute("business.customer_tier", operation_data.get("customer_tier", "standard"))
span.set_attribute("business.feature_flags", operation_data.get("enabled_features", []))
span.set_attribute("business.session_id", operation_data.get("session_id"))
# Performance tracking
start_memory = get_memory_usage()
span.set_attribute("performance.memory_start", start_memory)
# Execute operation
result = execute_custom_operation(operation_data)
# Performance metrics
end_memory = get_memory_usage()
span.set_attribute("performance.memory_end", end_memory)
span.set_attribute("performance.memory_delta", end_memory - start_memory)
# Result characteristics
span.set_attribute("result.success", result.get("success", False))
span.set_attribute("result.items_processed", result.get("items_count", 0))
span.set_attribute("result.processing_time", result.get("processing_time", 0))
return result
TypeScript Custom Instrumentation
import { trace, addAttribute, addEvent } from '@noveum/trace';
class CustomInstrumentationExample {
async processWorkflow(workflowData: WorkflowData): Promise<WorkflowResult> {
return await trace('custom-workflow-processing', async () => {
// Set basic workflow attributes
addAttribute('workflow.id', workflowData.id);
addAttribute('workflow.type', workflowData.type);
addAttribute('workflow.priority', workflowData.priority);
addAttribute('workflow.user_id', workflowData.userId);
// Track workflow stages
const stages = ['validation', 'processing', 'enrichment', 'finalization'];
const stageResults: Record<string, any> = {};
for (const stage of stages) {
const stageResult = await this.executeStage(stage, workflowData);
stageResults[stage] = stageResult;
// Add stage-specific attributes
addAttribute(`stages.${stage}.duration`, stageResult.duration);
addAttribute(`stages.${stage}.success`, stageResult.success);
addAttribute(`stages.${stage}.items_processed`, stageResult.itemsProcessed);
}
// Calculate overall metrics
const totalDuration = Object.values(stageResults)
.reduce((sum, result) => sum + result.duration, 0);
const successfulStages = Object.values(stageResults)
.filter(result => result.success).length;
addAttribute('workflow.total_duration', totalDuration);
addAttribute('workflow.success_rate', successfulStages / stages.length);
addAttribute('workflow.completed_stages', successfulStages);
return {
workflowId: workflowData.id,
results: stageResults,
success: successfulStages === stages.length,
totalDuration
};
});
}
private async executeStage(stageName: string, data: WorkflowData): Promise<StageResult> {
return await trace(`workflow-stage-${stageName}`, async () => {
addAttribute('stage.name', stageName);
addAttribute('stage.input_size', JSON.stringify(data).length);
const startTime = Date.now();
try {
// Stage-specific processing
const result = await this.processStage(stageName, data);
const duration = Date.now() - startTime;
addAttribute('stage.success', true);
addAttribute('stage.duration', duration);
addAttribute('stage.output_size', JSON.stringify(result).length);
// Add stage completion event
addEvent('stage_completed', {
stage: stageName,
duration,
itemsProcessed: result.itemsProcessed || 0
});
return {
success: true,
duration,
itemsProcessed: result.itemsProcessed || 0,
data: result
};
} catch (error) {
const duration = Date.now() - startTime;
addAttribute('stage.success', false);
addAttribute('stage.duration', duration);
addAttribute('stage.error', error.message);
// Add error event
addEvent('stage_failed', {
stage: stageName,
error: error.message,
duration
});
throw error;
}
});
}
}
📊 Custom Metrics and Attributes
Business Metrics Integration
class BusinessMetricsTracer:
"""Custom tracer for business-specific metrics."""
@staticmethod
def trace_customer_interaction(customer_id: str, interaction_type: str):
"""Trace customer interactions with business context."""
with noveum_trace.trace("customer_interaction") as span:
# Customer context
span.set_attribute("customer.id", customer_id)
span.set_attribute("customer.interaction_type", interaction_type)
# Fetch customer context
customer_data = get_customer_data(customer_id)
span.set_attribute("customer.tier", customer_data.get("tier", "standard"))
span.set_attribute("customer.lifetime_value", customer_data.get("ltv", 0))
span.set_attribute("customer.account_age_days", customer_data.get("account_age", 0))
span.set_attribute("customer.previous_interactions", customer_data.get("interaction_count", 0))
# Business metrics
interaction_value = calculate_interaction_value(interaction_type, customer_data)
span.set_attribute("business.interaction_value", interaction_value)
span.set_attribute("business.revenue_impact", calculate_revenue_impact(interaction_type))
# Contextual attributes
span.set_attribute("context.timestamp", time.time())
span.set_attribute("context.timezone", customer_data.get("timezone", "UTC"))
span.set_attribute("context.device_type", get_device_type())
span.set_attribute("context.session_duration", get_session_duration())
return {
"customer_id": customer_id,
"interaction_value": interaction_value,
"business_impact": calculate_revenue_impact(interaction_type)
}
@staticmethod
def trace_conversion_funnel(user_id: str, funnel_stage: str, funnel_data: Dict):
"""Trace conversion funnel progression."""
with noveum_trace.trace("conversion_funnel") as span:
span.set_attribute("funnel.user_id", user_id)
span.set_attribute("funnel.stage", funnel_stage)
span.set_attribute("funnel.stage_order", get_stage_order(funnel_stage))
# Funnel progression data
previous_stages = funnel_data.get("completed_stages", [])
span.set_attribute("funnel.completed_stages", len(previous_stages))
span.set_attribute("funnel.completion_rate", len(previous_stages) / get_total_stages())
# Stage-specific metrics
stage_metrics = calculate_stage_metrics(funnel_stage, funnel_data)
span.set_attribute("funnel.stage_duration", stage_metrics["duration"])
span.set_attribute("funnel.stage_interactions", stage_metrics["interactions"])
span.set_attribute("funnel.stage_conversion_probability", stage_metrics["conversion_prob"])
# Behavioral tracking
span.set_attribute("behavior.time_on_stage", stage_metrics["time_spent"])
span.set_attribute("behavior.bounce_risk", stage_metrics["bounce_risk"])
span.set_attribute("behavior.engagement_score", stage_metrics["engagement"])
return stage_metrics
# Usage example
@noveum_trace.trace_custom(operation_name="ecommerce_purchase")
def process_purchase(user_id: str, cart_data: Dict) -> Dict:
"""Custom instrumentation for e-commerce purchase flow."""
# Track customer interaction
BusinessMetricsTracer.trace_customer_interaction(user_id, "purchase_attempt")
with noveum_trace.trace("purchase_validation") as validation_span:
validation_span.set_attribute("purchase.item_count", len(cart_data["items"]))
validation_span.set_attribute("purchase.total_value", cart_data["total"])
validation_span.set_attribute("purchase.currency", cart_data["currency"])
validation_span.set_attribute("purchase.payment_method", cart_data["payment_method"])
# Validate purchase
validation_result = validate_purchase(cart_data)
validation_span.set_attribute("purchase.validation_success", validation_result["valid"])
if validation_result["valid"]:
# Track successful funnel completion
BusinessMetricsTracer.trace_conversion_funnel(user_id, "purchase_completed", cart_data)
return process_successful_purchase(cart_data)
else:
# Track funnel abandonment
BusinessMetricsTracer.trace_conversion_funnel(user_id, "purchase_abandoned", cart_data)
raise ValueError("Purchase validation failed")
Performance Profiling Integration
class PerformanceProfiler:
"""Custom performance profiling with tracing integration."""
@staticmethod
def profile_function_execution(func_name: str, execution_context: Dict):
"""Profile function execution with detailed metrics."""
with noveum_trace.trace(f"performance_profile.{func_name}") as span:
span.set_attribute("profiling.function_name", func_name)
span.set_attribute("profiling.context", str(execution_context))
# Pre-execution metrics
start_time = time.perf_counter()
start_memory = get_memory_usage()
start_cpu = get_cpu_usage()
span.set_attribute("profiling.start_memory_mb", start_memory)
span.set_attribute("profiling.start_cpu_percent", start_cpu)
try:
# Execute the profiled operation
result = execute_profiled_operation(func_name, execution_context)
# Post-execution metrics
end_time = time.perf_counter()
end_memory = get_memory_usage()
end_cpu = get_cpu_usage()
execution_time = end_time - start_time
memory_delta = end_memory - start_memory
cpu_delta = end_cpu - start_cpu
# Performance attributes
span.set_attribute("profiling.execution_time_seconds", execution_time)
span.set_attribute("profiling.memory_delta_mb", memory_delta)
span.set_attribute("profiling.cpu_delta_percent", cpu_delta)
span.set_attribute("profiling.peak_memory_mb", get_peak_memory())
# Performance classification
performance_class = classify_performance(execution_time, memory_delta)
span.set_attribute("profiling.performance_class", performance_class)
# Efficiency metrics
operations_per_second = calculate_ops_per_second(result, execution_time)
span.set_attribute("profiling.operations_per_second", operations_per_second)
return {
"result": result,
"performance_metrics": {
"execution_time": execution_time,
"memory_delta": memory_delta,
"cpu_delta": cpu_delta,
"performance_class": performance_class
}
}
except Exception as e:
# Error profiling
end_time = time.perf_counter()
error_time = end_time - start_time
span.set_attribute("profiling.error_occurred", True)
span.set_attribute("profiling.error_time_seconds", error_time)
span.set_attribute("profiling.error_message", str(e))
raise
# Decorator for automatic profiling
def profile_with_tracing(operation_name: str):
"""Decorator to automatically profile and trace function execution."""
def decorator(func):
def wrapper(*args, **kwargs):
execution_context = {
"args_count": len(args),
"kwargs_keys": list(kwargs.keys()),
"function_module": func.__module__,
"function_name": func.__name__
}
with noveum_trace.trace(f"profiled.{operation_name}") as span:
span.set_attribute("profiled.operation_name", operation_name)
span.set_attribute("profiled.function_name", func.__name__)
# Use performance profiler
profiler_result = PerformanceProfiler.profile_function_execution(
func.__name__,
execution_context
)
# Add profiling results to span
for key, value in profiler_result["performance_metrics"].items():
span.set_attribute(f"profiled.{key}", value)
return profiler_result["result"]
return wrapper
return decorator
# Usage example
@profile_with_tracing("complex_algorithm")
def complex_data_processing(data: List[Dict]) -> Dict:
"""Example function with automatic profiling."""
# Complex processing logic here
processed_data = {}
for item in data:
processed_data[item["id"]] = transform_item(item)
return processed_data
🔧 Custom Context Propagation
Thread-Safe Context Management
import threading
from contextlib import contextmanager
from typing import Optional, Dict, Any
class CustomContextManager:
"""Thread-safe context management for custom attributes."""
_local = threading.local()
@classmethod
def set_context(cls, key: str, value: Any):
"""Set context value for current thread."""
if not hasattr(cls._local, 'context'):
cls._local.context = {}
cls._local.context[key] = value
@classmethod
def get_context(cls, key: str, default: Any = None) -> Any:
"""Get context value for current thread."""
if not hasattr(cls._local, 'context'):
return default
return cls._local.context.get(key, default)
@classmethod
def get_all_context(cls) -> Dict[str, Any]:
"""Get all context for current thread."""
if not hasattr(cls._local, 'context'):
return {}
return cls._local.context.copy()
@classmethod
def clear_context(cls):
"""Clear context for current thread."""
if hasattr(cls._local, 'context'):
cls._local.context.clear()
@contextmanager
def custom_trace_context(**context_attributes):
"""Context manager for custom trace context."""
# Store previous context
previous_context = CustomContextManager.get_all_context()
try:
# Set new context attributes
for key, value in context_attributes.items():
CustomContextManager.set_context(key, value)
# Start span with context
with noveum_trace.trace("custom_context_operation") as span:
# Add all context attributes to span
current_context = CustomContextManager.get_all_context()
for key, value in current_context.items():
span.set_attribute(f"context.{key}", value)
yield span
finally:
# Restore previous context
CustomContextManager.clear_context()
for key, value in previous_context.items():
CustomContextManager.set_context(key, value)
# Usage example
def process_user_request(user_id: str, request_data: Dict):
"""Process user request with custom context propagation."""
with custom_trace_context(
user_id=user_id,
request_type=request_data.get("type"),
session_id=request_data.get("session_id"),
api_version="v2.1",
feature_flags=get_user_feature_flags(user_id)
):
# All nested operations will inherit this context
result = perform_request_processing(request_data)
# Context is automatically added to all spans
return result
def perform_request_processing(request_data: Dict) -> Dict:
"""Nested operation that inherits context."""
with noveum_trace.trace("request_processing") as span:
# Context is automatically inherited
current_context = CustomContextManager.get_all_context()
# Add processing-specific attributes
span.set_attribute("processing.data_size", len(str(request_data)))
span.set_attribute("processing.inherited_context_keys", list(current_context.keys()))
# Process request with full context
return execute_processing_logic(request_data)
Async Context Propagation
import asyncio
from contextvars import ContextVar
from typing import Any, Dict
# Context variables for async operations
current_operation_context: ContextVar[Dict[str, Any]] = ContextVar('operation_context', default={})
current_user_context: ContextVar[Dict[str, Any]] = ContextVar('user_context', default={})
class AsyncContextManager:
"""Context manager for async operations."""
@staticmethod
def set_operation_context(**kwargs):
"""Set operation context for current async task."""
current_context = current_operation_context.get()
new_context = {**current_context, **kwargs}
current_operation_context.set(new_context)
@staticmethod
def set_user_context(**kwargs):
"""Set user context for current async task."""
current_context = current_user_context.get()
new_context = {**current_context, **kwargs}
current_user_context.set(new_context)
@staticmethod
def get_full_context() -> Dict[str, Any]:
"""Get combined context for tracing."""
return {
**current_operation_context.get(),
**current_user_context.get()
}
@noveum_trace.trace_async(operation_name="async_operation_with_context")
async def async_operation_with_context(operation_id: str, user_id: str, data: Dict) -> Dict:
"""Async operation with comprehensive context tracking."""
# Set context for this async operation
AsyncContextManager.set_operation_context(
operation_id=operation_id,
operation_type="data_processing",
start_time=time.time()
)
AsyncContextManager.set_user_context(
user_id=user_id,
user_tier=await get_user_tier(user_id),
session_id=data.get("session_id")
)
with noveum_trace.trace("async_processing") as span:
# Add all context to span
full_context = AsyncContextManager.get_full_context()
for key, value in full_context.items():
span.set_attribute(f"async_context.{key}", value)
# Execute async operations with context propagation
tasks = [
async_subtask_1(data),
async_subtask_2(data),
async_subtask_3(data)
]
results = await asyncio.gather(*tasks)
span.set_attribute("async_processing.subtasks_completed", len(results))
return {"results": results, "context": full_context}
async def async_subtask_1(data: Dict) -> Dict:
"""Subtask that inherits async context."""
async with noveum_trace.trace_async("subtask_1") as span:
# Context is automatically available
context = AsyncContextManager.get_full_context()
span.set_attribute("subtask.inherited_context_keys", list(context.keys()))
span.set_attribute("subtask.user_id", context.get("user_id"))
span.set_attribute("subtask.operation_id", context.get("operation_id"))
# Simulate async work
await asyncio.sleep(0.1)
return {"subtask": "1", "context_inherited": bool(context)}
📈 Advanced Custom Patterns
Event-Driven Instrumentation
class EventDrivenTracer:
"""Event-driven tracing for complex workflows."""
def __init__(self):
self.active_workflows = {}
self.event_handlers = {}
def register_event_handler(self, event_type: str, handler_func):
"""Register handler for specific event types."""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler_func)
def emit_event(self, event_type: str, event_data: Dict, workflow_id: str = None):
"""Emit event with automatic tracing."""
with noveum_trace.trace(f"event.{event_type}") as span:
span.set_attribute("event.type", event_type)
span.set_attribute("event.timestamp", time.time())
span.set_attribute("event.workflow_id", workflow_id)
span.set_attribute("event.data_size", len(str(event_data)))
# Track workflow state if applicable
if workflow_id and workflow_id in self.active_workflows:
workflow_state = self.active_workflows[workflow_id]
span.set_attribute("workflow.current_state", workflow_state["state"])
span.set_attribute("workflow.events_processed", workflow_state["event_count"])
# Update workflow state
workflow_state["event_count"] += 1
workflow_state["last_event"] = event_type
workflow_state["last_event_time"] = time.time()
# Execute event handlers
if event_type in self.event_handlers:
handler_results = []
for handler in self.event_handlers[event_type]:
with noveum_trace.trace(f"event_handler.{handler.__name__}") as handler_span:
handler_span.set_attribute("handler.function_name", handler.__name__)
try:
result = handler(event_data, workflow_id)
handler_results.append(result)
handler_span.set_attribute("handler.success", True)
except Exception as e:
handler_span.set_attribute("handler.success", False)
handler_span.set_attribute("handler.error", str(e))
raise
span.set_attribute("event.handlers_executed", len(handler_results))
return handler_results
def start_workflow(self, workflow_id: str, workflow_type: str):
"""Start tracking a workflow."""
with noveum_trace.trace("workflow.start") as span:
span.set_attribute("workflow.id", workflow_id)
span.set_attribute("workflow.type", workflow_type)
span.set_attribute("workflow.start_time", time.time())
self.active_workflows[workflow_id] = {
"type": workflow_type,
"state": "started",
"start_time": time.time(),
"event_count": 0,
"last_event": None,
"last_event_time": None
}
def end_workflow(self, workflow_id: str, final_state: str = "completed"):
"""End workflow tracking."""
if workflow_id not in self.active_workflows:
return
workflow_data = self.active_workflows[workflow_id]
with noveum_trace.trace("workflow.end") as span:
span.set_attribute("workflow.id", workflow_id)
span.set_attribute("workflow.final_state", final_state)
span.set_attribute("workflow.duration", time.time() - workflow_data["start_time"])
span.set_attribute("workflow.total_events", workflow_data["event_count"])
span.set_attribute("workflow.last_event", workflow_data["last_event"])
# Calculate workflow metrics
avg_event_interval = (
(time.time() - workflow_data["start_time"]) / max(workflow_data["event_count"], 1)
)
span.set_attribute("workflow.avg_event_interval", avg_event_interval)
del self.active_workflows[workflow_id]
# Usage example
event_tracer = EventDrivenTracer()
# Register event handlers
def handle_user_action(event_data: Dict, workflow_id: str):
"""Handle user action events."""
with noveum_trace.trace("user_action_handler") as span:
span.set_attribute("action.type", event_data.get("action_type"))
span.set_attribute("action.user_id", event_data.get("user_id"))
# Process user action
return process_user_action(event_data)
def handle_system_event(event_data: Dict, workflow_id: str):
"""Handle system events."""
with noveum_trace.trace("system_event_handler") as span:
span.set_attribute("system.event_source", event_data.get("source"))
span.set_attribute("system.severity", event_data.get("severity"))
# Process system event
return process_system_event(event_data)
# Register handlers
event_tracer.register_event_handler("user_action", handle_user_action)
event_tracer.register_event_handler("system_event", handle_system_event)
# Use in application
def process_user_workflow(user_id: str, workflow_data: Dict):
"""Process user workflow with event-driven tracing."""
workflow_id = f"user_workflow_{user_id}_{int(time.time())}"
# Start workflow tracking
event_tracer.start_workflow(workflow_id, "user_interaction")
try:
# Emit events throughout the workflow
event_tracer.emit_event("user_action", {
"action_type": "workflow_start",
"user_id": user_id,
"workflow_data": workflow_data
}, workflow_id)
# Process workflow steps
for step in workflow_data.get("steps", []):
event_tracer.emit_event("user_action", {
"action_type": "step_execution",
"user_id": user_id,
"step": step
}, workflow_id)
# Execute step
execute_workflow_step(step)
# Emit completion event
event_tracer.emit_event("user_action", {
"action_type": "workflow_complete",
"user_id": user_id
}, workflow_id)
# End workflow tracking
event_tracer.end_workflow(workflow_id, "completed")
except Exception as e:
# Emit error event
event_tracer.emit_event("system_event", {
"source": "workflow_processor",
"severity": "error",
"error": str(e),
"user_id": user_id
}, workflow_id)
# End workflow with error state
event_tracer.end_workflow(workflow_id, "failed")
raise
Custom Sampling Strategies
class CustomSamplingStrategy:
"""Implement custom sampling strategies for different scenarios."""
def __init__(self):
self.sampling_rules = []
self.default_sample_rate = 0.1 # 10% default sampling
def add_sampling_rule(self, condition_func, sample_rate: float, priority: int = 0):
"""Add a custom sampling rule."""
self.sampling_rules.append({
"condition": condition_func,
"sample_rate": sample_rate,
"priority": priority
})
# Sort by priority (higher priority first)
self.sampling_rules.sort(key=lambda x: x["priority"], reverse=True)
def should_sample(self, operation_data: Dict) -> bool:
"""Determine if operation should be sampled."""
# Check custom rules
for rule in self.sampling_rules:
if rule["condition"](operation_data):
return random.random() < rule["sample_rate"]
# Default sampling
return random.random() < self.default_sample_rate
def trace_with_custom_sampling(self, operation_name: str, operation_data: Dict):
"""Trace with custom sampling logic."""
should_trace = self.should_sample(operation_data)
if should_trace:
with noveum_trace.trace(operation_name) as span:
span.set_attribute("sampling.sampled", True)
span.set_attribute("sampling.strategy", "custom")
# Add sampling context
applied_rule = self.get_applied_rule(operation_data)
if applied_rule:
span.set_attribute("sampling.rule_applied", applied_rule["name"])
span.set_attribute("sampling.sample_rate", applied_rule["sample_rate"])
yield span
else:
# No-op context manager for unsampled operations
from contextlib import nullcontext
yield nullcontext()
def get_applied_rule(self, operation_data: Dict) -> Optional[Dict]:
"""Get the sampling rule that would be applied."""
for rule in self.sampling_rules:
if rule["condition"](operation_data):
return rule
return None
# Define custom sampling conditions
def high_value_customer_condition(operation_data: Dict) -> bool:
"""Sample 100% of high-value customer operations."""
customer_tier = operation_data.get("customer_tier", "standard")
return customer_tier in ["premium", "enterprise"]
def error_condition(operation_data: Dict) -> bool:
"""Sample 100% of operations with errors."""
return operation_data.get("has_error", False) or operation_data.get("error_occurred", False)
def slow_operation_condition(operation_data: Dict) -> bool:
"""Sample 50% of slow operations."""
duration = operation_data.get("duration", 0)
return duration > 5.0 # Operations taking more than 5 seconds
def debug_mode_condition(operation_data: Dict) -> bool:
"""Sample 100% when in debug mode."""
return operation_data.get("debug_mode", False)
# Set up custom sampling
sampler = CustomSamplingStrategy()
# Add sampling rules with priorities
sampler.add_sampling_rule(error_condition, 1.0, priority=100) # Always sample errors
sampler.add_sampling_rule(debug_mode_condition, 1.0, priority=90) # Always sample debug mode
sampler.add_sampling_rule(high_value_customer_condition, 1.0, priority=80) # Always sample VIP customers
sampler.add_sampling_rule(slow_operation_condition, 0.5, priority=70) # 50% of slow operations
# Usage example
def process_operation_with_sampling(operation_data: Dict):
"""Process operation with custom sampling."""
operation_name = f"custom_operation_{operation_data.get('type', 'unknown')}"
with sampler.trace_with_custom_sampling(operation_name, operation_data) as span:
if span: # Only execute tracing code if actually sampled
span.set_attribute("operation.type", operation_data.get("type"))
span.set_attribute("operation.customer_tier", operation_data.get("customer_tier"))
span.set_attribute("operation.duration", operation_data.get("duration", 0))
# Execute operation logic (always runs regardless of sampling)
result = execute_operation_logic(operation_data)
if span:
span.set_attribute("operation.success", result.get("success", False))
return result
🔗 Integration with External Systems
Database Operation Tracing
class DatabaseTracer:
"""Custom tracing for database operations."""
@staticmethod
def trace_query_execution(query: str, params: Dict = None, database: str = "default"):
"""Trace database query execution."""
with noveum_trace.trace("database.query") as span:
span.set_attribute("db.system", "postgresql") # or your database type
span.set_attribute("db.name", database)
span.set_attribute("db.operation", "query")
span.set_attribute("db.statement", query[:500]) # Truncate long queries
span.set_attribute("db.has_parameters", params is not None)
if params:
span.set_attribute("db.parameter_count", len(params))
# Execute query with timing
start_time = time.perf_counter()
try:
result = execute_database_query(query, params, database)
execution_time = time.perf_counter() - start_time
span.set_attribute("db.execution_time", execution_time)
span.set_attribute("db.rows_affected", len(result) if result else 0)
span.set_attribute("db.success", True)
# Performance classification
if execution_time > 1.0:
span.set_attribute("db.performance", "slow")
elif execution_time > 0.1:
span.set_attribute("db.performance", "moderate")
else:
span.set_attribute("db.performance", "fast")
return result
except Exception as e:
execution_time = time.perf_counter() - start_time
span.set_attribute("db.execution_time", execution_time)
span.set_attribute("db.success", False)
span.set_attribute("db.error", str(e))
span.set_attribute("db.error_type", type(e).__name__)
raise
# Decorator for automatic database tracing
def trace_database_operation(operation_type: str = "query"):
"""Decorator for tracing database operations."""
def decorator(func):
def wrapper(*args, **kwargs):
with noveum_trace.trace(f"db.{operation_type}") as span:
span.set_attribute("db.operation_type", operation_type)
span.set_attribute("db.function_name", func.__name__)
result = func(*args, **kwargs)
span.set_attribute("db.operation_success", True)
return result
return wrapper
return decorator
# Usage examples
@trace_database_operation("user_lookup")
def get_user_by_id(user_id: str) -> Dict:
"""Get user with automatic tracing."""
query = "SELECT * FROM users WHERE id = %s"
return DatabaseTracer.trace_query_execution(query, {"id": user_id})
Custom instrumentation with Noveum.ai provides the flexibility to create detailed, domain-specific observability that goes beyond standard LLM and AI operation tracing. By implementing custom spans, attributes, context propagation, and advanced patterns, you can build comprehensive monitoring tailored to your specific application needs.
🔗 Next Steps
- Performance Optimization - Use tracing insights to optimize performance
- Multi-Agent Tracing - Observe agent workflows
- RAG Pipeline Observability - Monitor retrieval and generation systems
Get Early Access to Noveum.ai Platform
Be the first one to get notified when we open Noveum Platform to more users. All users get access to Observability suite for free, early users get free eval jobs and premium support for the first year.