Documentation

Custom Instrumentation

Add custom spans and attributes for domain-specific observability and advanced tracing patterns

While Noveum.ai's automatic instrumentation covers common AI operations, custom instrumentation allows you to add domain-specific observability, track business metrics, and create detailed traces for unique workflows. This guide covers advanced techniques for implementing custom tracing patterns.

🎯 Why Custom Instrumentation?

Custom instrumentation enables you to:

  • Track Business Metrics: Monitor domain-specific KPIs alongside technical metrics
  • Trace Complex Workflows: Create detailed observability for unique business logic
  • Add Context: Enrich traces with application-specific attributes
  • Monitor Custom Components: Instrument proprietary algorithms and processes
  • Optimize Performance: Track specific bottlenecks in your application

🛠️ Custom Span Creation

Basic Custom Spans

import noveum_trace
import time
 
# Manual span creation for custom operations
@noveum_trace.trace_custom(operation_name="data_preprocessing")
def preprocess_user_data(user_data: Dict) -> Dict:
    """Custom preprocessing with detailed tracing."""
 
    with noveum_trace.trace("data_validation") as validation_span:
        validation_span.set_attribute("validation.input_fields", len(user_data))
        validation_span.set_attribute("validation.user_id", user_data.get("user_id"))
 
        # Validate input data
        validation_result = validate_user_data(user_data)
        validation_span.set_attribute("validation.success", validation_result["valid"])
        validation_span.set_attribute("validation.errors_count", len(validation_result["errors"]))
 
        if not validation_result["valid"]:
            validation_span.set_attribute("validation.error_types", validation_result["error_types"])
            raise ValueError("Invalid user data")
 
    with noveum_trace.trace("data_transformation") as transform_span:
        transform_span.set_attribute("transformation.input_size", len(str(user_data)))
 
        # Apply custom transformations
        transformed_data = apply_custom_transformations(user_data)
 
        transform_span.set_attribute("transformation.output_size", len(str(transformed_data)))
        transform_span.set_attribute("transformation.compression_ratio",
                                   len(str(transformed_data)) / len(str(user_data)))
 
    return transformed_data
 
# Advanced span configuration with custom attributes
def custom_span_with_rich_attributes(operation_data: Dict):
    """Demonstrate rich custom attributes."""
 
    with noveum_trace.trace("rich_custom_operation") as span:
        # Basic attributes
        span.set_attribute("operation.type", "data_processing")
        span.set_attribute("operation.version", "2.1.0")
        span.set_attribute("operation.environment", "production")
 
        # Nested attribute structures (using dot notation)
        span.set_attribute("input.size", len(operation_data))
        span.set_attribute("input.type", type(operation_data).__name__)
        span.set_attribute("input.has_metadata", "metadata" in operation_data)
 
        # Business context
        span.set_attribute("business.customer_tier", operation_data.get("customer_tier", "standard"))
        span.set_attribute("business.feature_flags", operation_data.get("enabled_features", []))
        span.set_attribute("business.session_id", operation_data.get("session_id"))
 
        # Performance tracking
        start_memory = get_memory_usage()
        span.set_attribute("performance.memory_start", start_memory)
 
        # Execute operation
        result = execute_custom_operation(operation_data)
 
        # Performance metrics
        end_memory = get_memory_usage()
        span.set_attribute("performance.memory_end", end_memory)
        span.set_attribute("performance.memory_delta", end_memory - start_memory)
 
        # Result characteristics
        span.set_attribute("result.success", result.get("success", False))
        span.set_attribute("result.items_processed", result.get("items_count", 0))
        span.set_attribute("result.processing_time", result.get("processing_time", 0))
 
        return result

TypeScript Custom Instrumentation

import { trace, addAttribute, addEvent } from '@noveum/trace';
 
class CustomInstrumentationExample {
 
  async processWorkflow(workflowData: WorkflowData): Promise<WorkflowResult> {
    return await trace('custom-workflow-processing', async () => {
      // Set basic workflow attributes
      addAttribute('workflow.id', workflowData.id);
      addAttribute('workflow.type', workflowData.type);
      addAttribute('workflow.priority', workflowData.priority);
      addAttribute('workflow.user_id', workflowData.userId);
 
      // Track workflow stages
      const stages = ['validation', 'processing', 'enrichment', 'finalization'];
      const stageResults: Record<string, any> = {};
 
      for (const stage of stages) {
        const stageResult = await this.executeStage(stage, workflowData);
        stageResults[stage] = stageResult;
 
        // Add stage-specific attributes
        addAttribute(`stages.${stage}.duration`, stageResult.duration);
        addAttribute(`stages.${stage}.success`, stageResult.success);
        addAttribute(`stages.${stage}.items_processed`, stageResult.itemsProcessed);
      }
 
      // Calculate overall metrics
      const totalDuration = Object.values(stageResults)
        .reduce((sum, result) => sum + result.duration, 0);
      const successfulStages = Object.values(stageResults)
        .filter(result => result.success).length;
 
      addAttribute('workflow.total_duration', totalDuration);
      addAttribute('workflow.success_rate', successfulStages / stages.length);
      addAttribute('workflow.completed_stages', successfulStages);
 
      return {
        workflowId: workflowData.id,
        results: stageResults,
        success: successfulStages === stages.length,
        totalDuration
      };
    });
  }
 
  private async executeStage(stageName: string, data: WorkflowData): Promise<StageResult> {
    return await trace(`workflow-stage-${stageName}`, async () => {
      addAttribute('stage.name', stageName);
      addAttribute('stage.input_size', JSON.stringify(data).length);
 
      const startTime = Date.now();
 
      try {
        // Stage-specific processing
        const result = await this.processStage(stageName, data);
 
        const duration = Date.now() - startTime;
 
        addAttribute('stage.success', true);
        addAttribute('stage.duration', duration);
        addAttribute('stage.output_size', JSON.stringify(result).length);
 
        // Add stage completion event
        addEvent('stage_completed', {
          stage: stageName,
          duration,
          itemsProcessed: result.itemsProcessed || 0
        });
 
        return {
          success: true,
          duration,
          itemsProcessed: result.itemsProcessed || 0,
          data: result
        };
 
      } catch (error) {
        const duration = Date.now() - startTime;
 
        addAttribute('stage.success', false);
        addAttribute('stage.duration', duration);
        addAttribute('stage.error', error.message);
 
        // Add error event
        addEvent('stage_failed', {
          stage: stageName,
          error: error.message,
          duration
        });
 
        throw error;
      }
    });
  }
}

📊 Custom Metrics and Attributes

Business Metrics Integration

class BusinessMetricsTracer:
    """Custom tracer for business-specific metrics."""
 
    @staticmethod
    def trace_customer_interaction(customer_id: str, interaction_type: str):
        """Trace customer interactions with business context."""
 
        with noveum_trace.trace("customer_interaction") as span:
            # Customer context
            span.set_attribute("customer.id", customer_id)
            span.set_attribute("customer.interaction_type", interaction_type)
 
            # Fetch customer context
            customer_data = get_customer_data(customer_id)
            span.set_attribute("customer.tier", customer_data.get("tier", "standard"))
            span.set_attribute("customer.lifetime_value", customer_data.get("ltv", 0))
            span.set_attribute("customer.account_age_days", customer_data.get("account_age", 0))
            span.set_attribute("customer.previous_interactions", customer_data.get("interaction_count", 0))
 
            # Business metrics
            interaction_value = calculate_interaction_value(interaction_type, customer_data)
            span.set_attribute("business.interaction_value", interaction_value)
            span.set_attribute("business.revenue_impact", calculate_revenue_impact(interaction_type))
 
            # Contextual attributes
            span.set_attribute("context.timestamp", time.time())
            span.set_attribute("context.timezone", customer_data.get("timezone", "UTC"))
            span.set_attribute("context.device_type", get_device_type())
            span.set_attribute("context.session_duration", get_session_duration())
 
            return {
                "customer_id": customer_id,
                "interaction_value": interaction_value,
                "business_impact": calculate_revenue_impact(interaction_type)
            }
 
    @staticmethod
    def trace_conversion_funnel(user_id: str, funnel_stage: str, funnel_data: Dict):
        """Trace conversion funnel progression."""
 
        with noveum_trace.trace("conversion_funnel") as span:
            span.set_attribute("funnel.user_id", user_id)
            span.set_attribute("funnel.stage", funnel_stage)
            span.set_attribute("funnel.stage_order", get_stage_order(funnel_stage))
 
            # Funnel progression data
            previous_stages = funnel_data.get("completed_stages", [])
            span.set_attribute("funnel.completed_stages", len(previous_stages))
            span.set_attribute("funnel.completion_rate", len(previous_stages) / get_total_stages())
 
            # Stage-specific metrics
            stage_metrics = calculate_stage_metrics(funnel_stage, funnel_data)
            span.set_attribute("funnel.stage_duration", stage_metrics["duration"])
            span.set_attribute("funnel.stage_interactions", stage_metrics["interactions"])
            span.set_attribute("funnel.stage_conversion_probability", stage_metrics["conversion_prob"])
 
            # Behavioral tracking
            span.set_attribute("behavior.time_on_stage", stage_metrics["time_spent"])
            span.set_attribute("behavior.bounce_risk", stage_metrics["bounce_risk"])
            span.set_attribute("behavior.engagement_score", stage_metrics["engagement"])
 
            return stage_metrics
 
# Usage example
@noveum_trace.trace_custom(operation_name="ecommerce_purchase")
def process_purchase(user_id: str, cart_data: Dict) -> Dict:
    """Custom instrumentation for e-commerce purchase flow."""
 
    # Track customer interaction
    BusinessMetricsTracer.trace_customer_interaction(user_id, "purchase_attempt")
 
    with noveum_trace.trace("purchase_validation") as validation_span:
        validation_span.set_attribute("purchase.item_count", len(cart_data["items"]))
        validation_span.set_attribute("purchase.total_value", cart_data["total"])
        validation_span.set_attribute("purchase.currency", cart_data["currency"])
        validation_span.set_attribute("purchase.payment_method", cart_data["payment_method"])
 
        # Validate purchase
        validation_result = validate_purchase(cart_data)
        validation_span.set_attribute("purchase.validation_success", validation_result["valid"])
 
    if validation_result["valid"]:
        # Track successful funnel completion
        BusinessMetricsTracer.trace_conversion_funnel(user_id, "purchase_completed", cart_data)
 
        return process_successful_purchase(cart_data)
    else:
        # Track funnel abandonment
        BusinessMetricsTracer.trace_conversion_funnel(user_id, "purchase_abandoned", cart_data)
 
        raise ValueError("Purchase validation failed")

Performance Profiling Integration

class PerformanceProfiler:
    """Custom performance profiling with tracing integration."""
 
    @staticmethod
    def profile_function_execution(func_name: str, execution_context: Dict):
        """Profile function execution with detailed metrics."""
 
        with noveum_trace.trace(f"performance_profile.{func_name}") as span:
            span.set_attribute("profiling.function_name", func_name)
            span.set_attribute("profiling.context", str(execution_context))
 
            # Pre-execution metrics
            start_time = time.perf_counter()
            start_memory = get_memory_usage()
            start_cpu = get_cpu_usage()
 
            span.set_attribute("profiling.start_memory_mb", start_memory)
            span.set_attribute("profiling.start_cpu_percent", start_cpu)
 
            try:
                # Execute the profiled operation
                result = execute_profiled_operation(func_name, execution_context)
 
                # Post-execution metrics
                end_time = time.perf_counter()
                end_memory = get_memory_usage()
                end_cpu = get_cpu_usage()
 
                execution_time = end_time - start_time
                memory_delta = end_memory - start_memory
                cpu_delta = end_cpu - start_cpu
 
                # Performance attributes
                span.set_attribute("profiling.execution_time_seconds", execution_time)
                span.set_attribute("profiling.memory_delta_mb", memory_delta)
                span.set_attribute("profiling.cpu_delta_percent", cpu_delta)
                span.set_attribute("profiling.peak_memory_mb", get_peak_memory())
 
                # Performance classification
                performance_class = classify_performance(execution_time, memory_delta)
                span.set_attribute("profiling.performance_class", performance_class)
 
                # Efficiency metrics
                operations_per_second = calculate_ops_per_second(result, execution_time)
                span.set_attribute("profiling.operations_per_second", operations_per_second)
 
                return {
                    "result": result,
                    "performance_metrics": {
                        "execution_time": execution_time,
                        "memory_delta": memory_delta,
                        "cpu_delta": cpu_delta,
                        "performance_class": performance_class
                    }
                }
 
            except Exception as e:
                # Error profiling
                end_time = time.perf_counter()
                error_time = end_time - start_time
 
                span.set_attribute("profiling.error_occurred", True)
                span.set_attribute("profiling.error_time_seconds", error_time)
                span.set_attribute("profiling.error_message", str(e))
 
                raise
 
# Decorator for automatic profiling
def profile_with_tracing(operation_name: str):
    """Decorator to automatically profile and trace function execution."""
 
    def decorator(func):
        def wrapper(*args, **kwargs):
            execution_context = {
                "args_count": len(args),
                "kwargs_keys": list(kwargs.keys()),
                "function_module": func.__module__,
                "function_name": func.__name__
            }
 
            with noveum_trace.trace(f"profiled.{operation_name}") as span:
                span.set_attribute("profiled.operation_name", operation_name)
                span.set_attribute("profiled.function_name", func.__name__)
 
                # Use performance profiler
                profiler_result = PerformanceProfiler.profile_function_execution(
                    func.__name__,
                    execution_context
                )
 
                # Add profiling results to span
                for key, value in profiler_result["performance_metrics"].items():
                    span.set_attribute(f"profiled.{key}", value)
 
                return profiler_result["result"]
 
        return wrapper
    return decorator
 
# Usage example
@profile_with_tracing("complex_algorithm")
def complex_data_processing(data: List[Dict]) -> Dict:
    """Example function with automatic profiling."""
 
    # Complex processing logic here
    processed_data = {}
    for item in data:
        processed_data[item["id"]] = transform_item(item)
 
    return processed_data

🔧 Custom Context Propagation

Thread-Safe Context Management

import threading
from contextlib import contextmanager
from typing import Optional, Dict, Any
 
class CustomContextManager:
    """Thread-safe context management for custom attributes."""
 
    _local = threading.local()
 
    @classmethod
    def set_context(cls, key: str, value: Any):
        """Set context value for current thread."""
        if not hasattr(cls._local, 'context'):
            cls._local.context = {}
        cls._local.context[key] = value
 
    @classmethod
    def get_context(cls, key: str, default: Any = None) -> Any:
        """Get context value for current thread."""
        if not hasattr(cls._local, 'context'):
            return default
        return cls._local.context.get(key, default)
 
    @classmethod
    def get_all_context(cls) -> Dict[str, Any]:
        """Get all context for current thread."""
        if not hasattr(cls._local, 'context'):
            return {}
        return cls._local.context.copy()
 
    @classmethod
    def clear_context(cls):
        """Clear context for current thread."""
        if hasattr(cls._local, 'context'):
            cls._local.context.clear()
 
@contextmanager
def custom_trace_context(**context_attributes):
    """Context manager for custom trace context."""
 
    # Store previous context
    previous_context = CustomContextManager.get_all_context()
 
    try:
        # Set new context attributes
        for key, value in context_attributes.items():
            CustomContextManager.set_context(key, value)
 
        # Start span with context
        with noveum_trace.trace("custom_context_operation") as span:
            # Add all context attributes to span
            current_context = CustomContextManager.get_all_context()
            for key, value in current_context.items():
                span.set_attribute(f"context.{key}", value)
 
            yield span
 
    finally:
        # Restore previous context
        CustomContextManager.clear_context()
        for key, value in previous_context.items():
            CustomContextManager.set_context(key, value)
 
# Usage example
def process_user_request(user_id: str, request_data: Dict):
    """Process user request with custom context propagation."""
 
    with custom_trace_context(
        user_id=user_id,
        request_type=request_data.get("type"),
        session_id=request_data.get("session_id"),
        api_version="v2.1",
        feature_flags=get_user_feature_flags(user_id)
    ):
        # All nested operations will inherit this context
        result = perform_request_processing(request_data)
 
        # Context is automatically added to all spans
        return result
 
def perform_request_processing(request_data: Dict) -> Dict:
    """Nested operation that inherits context."""
 
    with noveum_trace.trace("request_processing") as span:
        # Context is automatically inherited
        current_context = CustomContextManager.get_all_context()
 
        # Add processing-specific attributes
        span.set_attribute("processing.data_size", len(str(request_data)))
        span.set_attribute("processing.inherited_context_keys", list(current_context.keys()))
 
        # Process request with full context
        return execute_processing_logic(request_data)

Async Context Propagation

import asyncio
from contextvars import ContextVar
from typing import Any, Dict
 
# Context variables for async operations
current_operation_context: ContextVar[Dict[str, Any]] = ContextVar('operation_context', default={})
current_user_context: ContextVar[Dict[str, Any]] = ContextVar('user_context', default={})
 
class AsyncContextManager:
    """Context manager for async operations."""
 
    @staticmethod
    def set_operation_context(**kwargs):
        """Set operation context for current async task."""
        current_context = current_operation_context.get()
        new_context = {**current_context, **kwargs}
        current_operation_context.set(new_context)
 
    @staticmethod
    def set_user_context(**kwargs):
        """Set user context for current async task."""
        current_context = current_user_context.get()
        new_context = {**current_context, **kwargs}
        current_user_context.set(new_context)
 
    @staticmethod
    def get_full_context() -> Dict[str, Any]:
        """Get combined context for tracing."""
        return {
            **current_operation_context.get(),
            **current_user_context.get()
        }
 
@noveum_trace.trace_async(operation_name="async_operation_with_context")
async def async_operation_with_context(operation_id: str, user_id: str, data: Dict) -> Dict:
    """Async operation with comprehensive context tracking."""
 
    # Set context for this async operation
    AsyncContextManager.set_operation_context(
        operation_id=operation_id,
        operation_type="data_processing",
        start_time=time.time()
    )
 
    AsyncContextManager.set_user_context(
        user_id=user_id,
        user_tier=await get_user_tier(user_id),
        session_id=data.get("session_id")
    )
 
    with noveum_trace.trace("async_processing") as span:
        # Add all context to span
        full_context = AsyncContextManager.get_full_context()
        for key, value in full_context.items():
            span.set_attribute(f"async_context.{key}", value)
 
        # Execute async operations with context propagation
        tasks = [
            async_subtask_1(data),
            async_subtask_2(data),
            async_subtask_3(data)
        ]
 
        results = await asyncio.gather(*tasks)
 
        span.set_attribute("async_processing.subtasks_completed", len(results))
 
        return {"results": results, "context": full_context}
 
async def async_subtask_1(data: Dict) -> Dict:
    """Subtask that inherits async context."""
 
    async with noveum_trace.trace_async("subtask_1") as span:
        # Context is automatically available
        context = AsyncContextManager.get_full_context()
 
        span.set_attribute("subtask.inherited_context_keys", list(context.keys()))
        span.set_attribute("subtask.user_id", context.get("user_id"))
        span.set_attribute("subtask.operation_id", context.get("operation_id"))
 
        # Simulate async work
        await asyncio.sleep(0.1)
 
        return {"subtask": "1", "context_inherited": bool(context)}

📈 Advanced Custom Patterns

Event-Driven Instrumentation

class EventDrivenTracer:
    """Event-driven tracing for complex workflows."""
 
    def __init__(self):
        self.active_workflows = {}
        self.event_handlers = {}
 
    def register_event_handler(self, event_type: str, handler_func):
        """Register handler for specific event types."""
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = []
        self.event_handlers[event_type].append(handler_func)
 
    def emit_event(self, event_type: str, event_data: Dict, workflow_id: str = None):
        """Emit event with automatic tracing."""
 
        with noveum_trace.trace(f"event.{event_type}") as span:
            span.set_attribute("event.type", event_type)
            span.set_attribute("event.timestamp", time.time())
            span.set_attribute("event.workflow_id", workflow_id)
            span.set_attribute("event.data_size", len(str(event_data)))
 
            # Track workflow state if applicable
            if workflow_id and workflow_id in self.active_workflows:
                workflow_state = self.active_workflows[workflow_id]
                span.set_attribute("workflow.current_state", workflow_state["state"])
                span.set_attribute("workflow.events_processed", workflow_state["event_count"])
 
                # Update workflow state
                workflow_state["event_count"] += 1
                workflow_state["last_event"] = event_type
                workflow_state["last_event_time"] = time.time()
 
            # Execute event handlers
            if event_type in self.event_handlers:
                handler_results = []
                for handler in self.event_handlers[event_type]:
                    with noveum_trace.trace(f"event_handler.{handler.__name__}") as handler_span:
                        handler_span.set_attribute("handler.function_name", handler.__name__)
 
                        try:
                            result = handler(event_data, workflow_id)
                            handler_results.append(result)
                            handler_span.set_attribute("handler.success", True)
                        except Exception as e:
                            handler_span.set_attribute("handler.success", False)
                            handler_span.set_attribute("handler.error", str(e))
                            raise
 
                span.set_attribute("event.handlers_executed", len(handler_results))
 
                return handler_results
 
    def start_workflow(self, workflow_id: str, workflow_type: str):
        """Start tracking a workflow."""
 
        with noveum_trace.trace("workflow.start") as span:
            span.set_attribute("workflow.id", workflow_id)
            span.set_attribute("workflow.type", workflow_type)
            span.set_attribute("workflow.start_time", time.time())
 
            self.active_workflows[workflow_id] = {
                "type": workflow_type,
                "state": "started",
                "start_time": time.time(),
                "event_count": 0,
                "last_event": None,
                "last_event_time": None
            }
 
    def end_workflow(self, workflow_id: str, final_state: str = "completed"):
        """End workflow tracking."""
 
        if workflow_id not in self.active_workflows:
            return
 
        workflow_data = self.active_workflows[workflow_id]
 
        with noveum_trace.trace("workflow.end") as span:
            span.set_attribute("workflow.id", workflow_id)
            span.set_attribute("workflow.final_state", final_state)
            span.set_attribute("workflow.duration", time.time() - workflow_data["start_time"])
            span.set_attribute("workflow.total_events", workflow_data["event_count"])
            span.set_attribute("workflow.last_event", workflow_data["last_event"])
 
            # Calculate workflow metrics
            avg_event_interval = (
                (time.time() - workflow_data["start_time"]) / max(workflow_data["event_count"], 1)
            )
            span.set_attribute("workflow.avg_event_interval", avg_event_interval)
 
        del self.active_workflows[workflow_id]
 
# Usage example
event_tracer = EventDrivenTracer()
 
# Register event handlers
def handle_user_action(event_data: Dict, workflow_id: str):
    """Handle user action events."""
    with noveum_trace.trace("user_action_handler") as span:
        span.set_attribute("action.type", event_data.get("action_type"))
        span.set_attribute("action.user_id", event_data.get("user_id"))
 
        # Process user action
        return process_user_action(event_data)
 
def handle_system_event(event_data: Dict, workflow_id: str):
    """Handle system events."""
    with noveum_trace.trace("system_event_handler") as span:
        span.set_attribute("system.event_source", event_data.get("source"))
        span.set_attribute("system.severity", event_data.get("severity"))
 
        # Process system event
        return process_system_event(event_data)
 
# Register handlers
event_tracer.register_event_handler("user_action", handle_user_action)
event_tracer.register_event_handler("system_event", handle_system_event)
 
# Use in application
def process_user_workflow(user_id: str, workflow_data: Dict):
    """Process user workflow with event-driven tracing."""
 
    workflow_id = f"user_workflow_{user_id}_{int(time.time())}"
 
    # Start workflow tracking
    event_tracer.start_workflow(workflow_id, "user_interaction")
 
    try:
        # Emit events throughout the workflow
        event_tracer.emit_event("user_action", {
            "action_type": "workflow_start",
            "user_id": user_id,
            "workflow_data": workflow_data
        }, workflow_id)
 
        # Process workflow steps
        for step in workflow_data.get("steps", []):
            event_tracer.emit_event("user_action", {
                "action_type": "step_execution",
                "user_id": user_id,
                "step": step
            }, workflow_id)
 
            # Execute step
            execute_workflow_step(step)
 
        # Emit completion event
        event_tracer.emit_event("user_action", {
            "action_type": "workflow_complete",
            "user_id": user_id
        }, workflow_id)
 
        # End workflow tracking
        event_tracer.end_workflow(workflow_id, "completed")
 
    except Exception as e:
        # Emit error event
        event_tracer.emit_event("system_event", {
            "source": "workflow_processor",
            "severity": "error",
            "error": str(e),
            "user_id": user_id
        }, workflow_id)
 
        # End workflow with error state
        event_tracer.end_workflow(workflow_id, "failed")
 
        raise

Custom Sampling Strategies

class CustomSamplingStrategy:
    """Implement custom sampling strategies for different scenarios."""
 
    def __init__(self):
        self.sampling_rules = []
        self.default_sample_rate = 0.1  # 10% default sampling
 
    def add_sampling_rule(self, condition_func, sample_rate: float, priority: int = 0):
        """Add a custom sampling rule."""
        self.sampling_rules.append({
            "condition": condition_func,
            "sample_rate": sample_rate,
            "priority": priority
        })
 
        # Sort by priority (higher priority first)
        self.sampling_rules.sort(key=lambda x: x["priority"], reverse=True)
 
    def should_sample(self, operation_data: Dict) -> bool:
        """Determine if operation should be sampled."""
 
        # Check custom rules
        for rule in self.sampling_rules:
            if rule["condition"](operation_data):
                return random.random() < rule["sample_rate"]
 
        # Default sampling
        return random.random() < self.default_sample_rate
 
    def trace_with_custom_sampling(self, operation_name: str, operation_data: Dict):
        """Trace with custom sampling logic."""
 
        should_trace = self.should_sample(operation_data)
 
        if should_trace:
            with noveum_trace.trace(operation_name) as span:
                span.set_attribute("sampling.sampled", True)
                span.set_attribute("sampling.strategy", "custom")
 
                # Add sampling context
                applied_rule = self.get_applied_rule(operation_data)
                if applied_rule:
                    span.set_attribute("sampling.rule_applied", applied_rule["name"])
                    span.set_attribute("sampling.sample_rate", applied_rule["sample_rate"])
 
                yield span
        else:
            # No-op context manager for unsampled operations
            from contextlib import nullcontext
            yield nullcontext()
 
    def get_applied_rule(self, operation_data: Dict) -> Optional[Dict]:
        """Get the sampling rule that would be applied."""
        for rule in self.sampling_rules:
            if rule["condition"](operation_data):
                return rule
        return None
 
# Define custom sampling conditions
def high_value_customer_condition(operation_data: Dict) -> bool:
    """Sample 100% of high-value customer operations."""
    customer_tier = operation_data.get("customer_tier", "standard")
    return customer_tier in ["premium", "enterprise"]
 
def error_condition(operation_data: Dict) -> bool:
    """Sample 100% of operations with errors."""
    return operation_data.get("has_error", False) or operation_data.get("error_occurred", False)
 
def slow_operation_condition(operation_data: Dict) -> bool:
    """Sample 50% of slow operations."""
    duration = operation_data.get("duration", 0)
    return duration > 5.0  # Operations taking more than 5 seconds
 
def debug_mode_condition(operation_data: Dict) -> bool:
    """Sample 100% when in debug mode."""
    return operation_data.get("debug_mode", False)
 
# Set up custom sampling
sampler = CustomSamplingStrategy()
 
# Add sampling rules with priorities
sampler.add_sampling_rule(error_condition, 1.0, priority=100)  # Always sample errors
sampler.add_sampling_rule(debug_mode_condition, 1.0, priority=90)  # Always sample debug mode
sampler.add_sampling_rule(high_value_customer_condition, 1.0, priority=80)  # Always sample VIP customers
sampler.add_sampling_rule(slow_operation_condition, 0.5, priority=70)  # 50% of slow operations
 
# Usage example
def process_operation_with_sampling(operation_data: Dict):
    """Process operation with custom sampling."""
 
    operation_name = f"custom_operation_{operation_data.get('type', 'unknown')}"
 
    with sampler.trace_with_custom_sampling(operation_name, operation_data) as span:
        if span:  # Only execute tracing code if actually sampled
            span.set_attribute("operation.type", operation_data.get("type"))
            span.set_attribute("operation.customer_tier", operation_data.get("customer_tier"))
            span.set_attribute("operation.duration", operation_data.get("duration", 0))
 
        # Execute operation logic (always runs regardless of sampling)
        result = execute_operation_logic(operation_data)
 
        if span:
            span.set_attribute("operation.success", result.get("success", False))
 
        return result

🔗 Integration with External Systems

Database Operation Tracing

class DatabaseTracer:
    """Custom tracing for database operations."""
 
    @staticmethod
    def trace_query_execution(query: str, params: Dict = None, database: str = "default"):
        """Trace database query execution."""
 
        with noveum_trace.trace("database.query") as span:
            span.set_attribute("db.system", "postgresql")  # or your database type
            span.set_attribute("db.name", database)
            span.set_attribute("db.operation", "query")
            span.set_attribute("db.statement", query[:500])  # Truncate long queries
            span.set_attribute("db.has_parameters", params is not None)
 
            if params:
                span.set_attribute("db.parameter_count", len(params))
 
            # Execute query with timing
            start_time = time.perf_counter()
 
            try:
                result = execute_database_query(query, params, database)
 
                execution_time = time.perf_counter() - start_time
 
                span.set_attribute("db.execution_time", execution_time)
                span.set_attribute("db.rows_affected", len(result) if result else 0)
                span.set_attribute("db.success", True)
 
                # Performance classification
                if execution_time > 1.0:
                    span.set_attribute("db.performance", "slow")
                elif execution_time > 0.1:
                    span.set_attribute("db.performance", "moderate")
                else:
                    span.set_attribute("db.performance", "fast")
 
                return result
 
            except Exception as e:
                execution_time = time.perf_counter() - start_time
 
                span.set_attribute("db.execution_time", execution_time)
                span.set_attribute("db.success", False)
                span.set_attribute("db.error", str(e))
                span.set_attribute("db.error_type", type(e).__name__)
 
                raise
 
# Decorator for automatic database tracing
def trace_database_operation(operation_type: str = "query"):
    """Decorator for tracing database operations."""
 
    def decorator(func):
        def wrapper(*args, **kwargs):
            with noveum_trace.trace(f"db.{operation_type}") as span:
                span.set_attribute("db.operation_type", operation_type)
                span.set_attribute("db.function_name", func.__name__)
 
                result = func(*args, **kwargs)
 
                span.set_attribute("db.operation_success", True)
 
                return result
 
        return wrapper
    return decorator
 
# Usage examples
@trace_database_operation("user_lookup")
def get_user_by_id(user_id: str) -> Dict:
    """Get user with automatic tracing."""
    query = "SELECT * FROM users WHERE id = %s"
    return DatabaseTracer.trace_query_execution(query, {"id": user_id})

Custom instrumentation with Noveum.ai provides the flexibility to create detailed, domain-specific observability that goes beyond standard LLM and AI operation tracing. By implementing custom spans, attributes, context propagation, and advanced patterns, you can build comprehensive monitoring tailored to your specific application needs.

🔗 Next Steps

Exclusive Early Access

Get Early Access to Noveum.ai Platform

Be the first one to get notified when we open Noveum Platform to more users. All users get access to Observability suite for free, early users get free eval jobs and premium support for the first year.

Sign up now. We send access to new batch every week.

Early access members receive premium onboarding support and influence our product roadmap. Limited spots available.