Documentation

RAG Pipeline Observability

Monitor retrieval, generation, and context handling in RAG systems with comprehensive tracing

Retrieval-Augmented Generation (RAG) systems combine the power of information retrieval with large language models to provide accurate, contextual responses. Monitoring these complex pipelines requires specialized observability to understand retrieval quality, context relevance, and generation effectiveness.

🎯 Why RAG Observability Matters

RAG systems introduce unique challenges that traditional monitoring can't address:

  • Retrieval Quality: Are you finding the most relevant documents?
  • Context Utilization: How effectively is retrieved context being used?
  • Generation Fidelity: Is the LLM accurately using the provided context?
  • Pipeline Performance: Where are the bottlenecks in your RAG pipeline?
  • Cost Optimization: Which components consume the most resources?

🏗️ RAG Pipeline Architecture

Core RAG Components

Noveum.ai can trace each stage of your RAG pipeline:

import noveum_trace
 
@noveum_trace.trace_rag_pipeline(name="document_qa_pipeline")
def document_qa_rag_pipeline(question: str, collection_id: str) -> Dict[str, Any]:
    """Complete RAG pipeline with comprehensive tracing."""
 
    # Stage 1: Query Processing and Enhancement
    processed_query = trace_query_processing(question)
 
    # Stage 2: Document Retrieval
    retrieved_docs = trace_document_retrieval(processed_query, collection_id)
 
    # Stage 3: Context Preparation
    context = trace_context_preparation(retrieved_docs, processed_query)
 
    # Stage 4: Generation
    response = trace_llm_generation(processed_query, context)
 
    # Stage 5: Post-processing
    final_response = trace_response_postprocessing(response, question)
 
    return final_response
 
def trace_query_processing(question: str) -> Dict[str, Any]:
    """Trace query enhancement and processing."""
    with noveum_trace.trace("rag.query_processing") as span:
        span.set_attribute("query.original", question)
        span.set_attribute("query.length", len(question))
 
        # Query enhancement
        enhanced_query = enhance_query(question)
        span.set_attribute("query.enhanced", enhanced_query)
        span.set_attribute("query.enhancement_applied", enhanced_query != question)
 
        # Query classification
        query_type = classify_query(enhanced_query)
        span.set_attribute("query.type", query_type)
        span.set_attribute("query.complexity", calculate_query_complexity(enhanced_query))
 
        return {
            "original": question,
            "enhanced": enhanced_query,
            "type": query_type,
            "complexity": calculate_query_complexity(enhanced_query)
        }

TypeScript RAG Implementation

import { trace, addAttribute } from '@noveum/trace';
 
class RAGPipeline {
  async processQuery(question: string, collectionId: string): Promise<RAGResponse> {
    return await trace('rag-pipeline', async () => {
      addAttribute('rag.pipeline_version', '2.1.0');
      addAttribute('rag.question', question);
      addAttribute('rag.collection_id', collectionId);
 
      // Trace each stage of the RAG pipeline
      const queryEmbedding = await this.traceQueryEmbedding(question);
      const retrievedDocs = await this.traceDocumentRetrieval(queryEmbedding, collectionId);
      const contextualizedResponse = await this.traceGeneration(question, retrievedDocs);
 
      return contextualizedResponse;
    });
  }
 
  private async traceQueryEmbedding(question: string): Promise<number[]> {
    return await trace('rag.query_embedding', async () => {
      addAttribute('embedding.model', 'text-embedding-ada-002');
      addAttribute('embedding.input_length', question.length);
 
      const embedding = await this.embeddingModel.embed(question);
 
      addAttribute('embedding.dimensions', embedding.length);
      addAttribute('embedding.norm', this.calculateVectorNorm(embedding));
 
      return embedding;
    });
  }
 
  private async traceDocumentRetrieval(
    queryEmbedding: number[],
    collectionId: string
  ): Promise<Document[]> {
    return await trace('rag.document_retrieval', async () => {
      addAttribute('retrieval.collection_id', collectionId);
      addAttribute('retrieval.query_embedding_dimensions', queryEmbedding.length);
      addAttribute('retrieval.search_method', 'semantic_similarity');
 
      const results = await this.vectorDB.search({
        embedding: queryEmbedding,
        collection: collectionId,
        topK: 5,
        threshold: 0.7
      });
 
      addAttribute('retrieval.results_count', results.length);
      addAttribute('retrieval.avg_similarity_score',
        results.reduce((sum, r) => sum + r.score, 0) / results.length);
      addAttribute('retrieval.min_similarity_score', Math.min(...results.map(r => r.score)));
      addAttribute('retrieval.max_similarity_score', Math.max(...results.map(r => r.score)));
 
      return results;
    });
  }
}

📊 Tracing Retrieval Components

Vector Database Operations

@noveum_trace.trace_component(component_type="vector_db")
def trace_vector_search(query_embedding: List[float], collection: str, top_k: int = 5) -> List[Dict]:
    """Trace vector database search operations."""
 
    with noveum_trace.trace("vector_db.search") as span:
        span.set_attribute("vector_db.collection", collection)
        span.set_attribute("vector_db.query_dimensions", len(query_embedding))
        span.set_attribute("vector_db.top_k", top_k)
        span.set_attribute("vector_db.search_algorithm", "cosine_similarity")
 
        # Execute search
        start_time = time.time()
        results = execute_vector_search(query_embedding, collection, top_k)
        search_duration = time.time() - start_time
 
        # Track search performance
        span.set_attribute("vector_db.search_duration", search_duration)
        span.set_attribute("vector_db.results_returned", len(results))
 
        if results:
            similarities = [r["similarity"] for r in results]
            span.set_attribute("vector_db.avg_similarity", np.mean(similarities))
            span.set_attribute("vector_db.max_similarity", max(similarities))
            span.set_attribute("vector_db.min_similarity", min(similarities))
            span.set_attribute("vector_db.similarity_variance", np.var(similarities))
 
        # Quality metrics
        span.set_attribute("vector_db.search_quality", calculate_search_quality(results))
 
        return results
 
def calculate_search_quality(results: List[Dict]) -> str:
    """Calculate search quality based on similarity scores."""
    if not results:
        return "no_results"
 
    avg_similarity = np.mean([r["similarity"] for r in results])
 
    if avg_similarity > 0.8:
        return "excellent"
    elif avg_similarity > 0.6:
        return "good"
    elif avg_similarity > 0.4:
        return "fair"
    else:
        return "poor"

Document Ranking and Reranking

@noveum_trace.trace_component(component_type="reranker")
def trace_document_reranking(query: str, initial_results: List[Dict]) -> List[Dict]:
    """Trace document reranking for improved relevance."""
 
    with noveum_trace.trace("rag.reranking") as span:
        span.set_attribute("reranking.input_count", len(initial_results))
        span.set_attribute("reranking.query", query)
        span.set_attribute("reranking.model", "cross-encoder-reranker")
 
        # Calculate initial ranking scores
        initial_scores = [doc["similarity"] for doc in initial_results]
        span.set_attribute("reranking.initial_avg_score", np.mean(initial_scores))
 
        # Apply reranking
        reranked_results = apply_cross_encoder_reranking(query, initial_results)
 
        # Track reranking impact
        reranked_scores = [doc["rerank_score"] for doc in reranked_results]
        span.set_attribute("reranking.final_avg_score", np.mean(reranked_scores))
        span.set_attribute("reranking.score_improvement", np.mean(reranked_scores) - np.mean(initial_scores))
 
        # Calculate ranking changes
        rank_changes = calculate_rank_changes(initial_results, reranked_results)
        span.set_attribute("reranking.rank_changes", rank_changes)
        span.set_attribute("reranking.stability", calculate_ranking_stability(rank_changes))
 
        return reranked_results
 
def calculate_rank_changes(initial: List[Dict], reranked: List[Dict]) -> int:
    """Calculate how much ranking order changed."""
    initial_ids = [doc["id"] for doc in initial]
    reranked_ids = [doc["id"] for doc in reranked]
 
    changes = 0
    for i, doc_id in enumerate(reranked_ids):
        original_rank = initial_ids.index(doc_id)
        if original_rank != i:
            changes += abs(original_rank - i)
 
    return changes

Context Window Management

@noveum_trace.trace_component(component_type="context_manager")
def trace_context_preparation(documents: List[Dict], query: str, max_tokens: int = 4000) -> Dict:
    """Trace context preparation and optimization."""
 
    with noveum_trace.trace("rag.context_preparation") as span:
        span.set_attribute("context.input_documents", len(documents))
        span.set_attribute("context.max_tokens", max_tokens)
        span.set_attribute("context.query", query)
 
        # Calculate token usage for each document
        document_tokens = []
        total_raw_tokens = 0
 
        for doc in documents:
            tokens = count_tokens(doc["content"])
            document_tokens.append(tokens)
            total_raw_tokens += tokens
 
        span.set_attribute("context.total_raw_tokens", total_raw_tokens)
        span.set_attribute("context.avg_document_tokens", np.mean(document_tokens))
 
        # Optimize context if needed
        if total_raw_tokens > max_tokens:
            span.set_attribute("context.optimization_required", True)
            optimized_context = optimize_context(documents, query, max_tokens)
            span.set_attribute("context.optimization_method", optimized_context["method"])
            span.set_attribute("context.documents_included", optimized_context["documents_used"])
            span.set_attribute("context.final_tokens", optimized_context["token_count"])
            span.set_attribute("context.token_efficiency", optimized_context["token_count"] / total_raw_tokens)
        else:
            span.set_attribute("context.optimization_required", False)
            optimized_context = {
                "content": "\n\n".join([doc["content"] for doc in documents]),
                "documents_used": len(documents),
                "token_count": total_raw_tokens,
                "method": "no_optimization"
            }
 
        # Calculate context quality metrics
        context_quality = calculate_context_quality(optimized_context["content"], query)
        span.set_attribute("context.quality_score", context_quality["score"])
        span.set_attribute("context.relevance_score", context_quality["relevance"])
        span.set_attribute("context.coverage_score", context_quality["coverage"])
 
        return optimized_context
 
def optimize_context(documents: List[Dict], query: str, max_tokens: int) -> Dict:
    """Optimize context to fit within token limits."""
 
    # Strategy 1: Truncate documents
    if len(documents) <= 3:
        return truncate_documents_strategy(documents, max_tokens)
 
    # Strategy 2: Select most relevant documents
    return select_top_documents_strategy(documents, query, max_tokens)
 
def calculate_context_quality(content: str, query: str) -> Dict[str, float]:
    """Calculate quality metrics for the prepared context."""
 
    # Relevance: How well does context match the query
    relevance_score = calculate_relevance_score(content, query)
 
    # Coverage: How comprehensive is the context
    coverage_score = calculate_coverage_score(content, query)
 
    # Overall quality
    quality_score = (relevance_score * 0.7) + (coverage_score * 0.3)
 
    return {
        "score": quality_score,
        "relevance": relevance_score,
        "coverage": coverage_score
    }

🤖 Tracing LLM Generation

Context-Aware Generation

@noveum_trace.trace_llm(provider="openai", model="gpt-4")
def trace_rag_generation(query: str, context: str, system_prompt: str = None) -> Dict:
    """Trace LLM generation with RAG context."""
 
    with noveum_trace.trace("rag.generation") as span:
        span.set_attribute("generation.query", query)
        span.set_attribute("generation.context_length", len(context))
        span.set_attribute("generation.context_token_count", count_tokens(context))
 
        # Prepare the prompt
        if system_prompt is None:
            system_prompt = "Answer the question based only on the provided context. If the context doesn't contain enough information, say so."
 
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}
        ]
 
        total_input_tokens = sum(count_tokens(msg["content"]) for msg in messages)
        span.set_attribute("generation.input_tokens", total_input_tokens)
 
        # Call LLM
        response = call_llm(messages)
 
        # Track generation metrics
        span.set_attribute("generation.output_tokens", count_tokens(response["content"]))
        span.set_attribute("generation.total_tokens", total_input_tokens + count_tokens(response["content"]))
 
        # Analyze response quality
        quality_metrics = analyze_rag_response_quality(query, context, response["content"])
 
        span.set_attribute("generation.context_utilization", quality_metrics["context_utilization"])
        span.set_attribute("generation.answer_completeness", quality_metrics["completeness"])
        span.set_attribute("generation.factual_accuracy", quality_metrics["accuracy"])
        span.set_attribute("generation.hallucination_risk", quality_metrics["hallucination_risk"])
 
        return {
            "response": response["content"],
            "usage": {
                "input_tokens": total_input_tokens,
                "output_tokens": count_tokens(response["content"]),
                "total_tokens": total_input_tokens + count_tokens(response["content"])
            },
            "quality_metrics": quality_metrics
        }
 
def analyze_rag_response_quality(query: str, context: str, response: str) -> Dict[str, float]:
    """Analyze the quality of RAG-generated responses."""
 
    # Context utilization: How much of the context was used
    context_utilization = calculate_context_utilization(context, response)
 
    # Answer completeness: How well does the response answer the query
    completeness = calculate_answer_completeness(query, response)
 
    # Factual accuracy: How accurate is the response based on context
    accuracy = calculate_factual_accuracy(context, response)
 
    # Hallucination risk: Likelihood of information not in context
    hallucination_risk = calculate_hallucination_risk(context, response)
 
    return {
        "context_utilization": context_utilization,
        "completeness": completeness,
        "accuracy": accuracy,
        "hallucination_risk": hallucination_risk
    }

Response Evaluation and Feedback

@noveum_trace.trace_component(component_type="response_evaluator")
def trace_response_evaluation(query: str, context: str, response: str, ground_truth: str = None) -> Dict:
    """Trace comprehensive response evaluation."""
 
    with noveum_trace.trace("rag.response_evaluation") as span:
        span.set_attribute("evaluation.has_ground_truth", ground_truth is not None)
        span.set_attribute("evaluation.response_length", len(response))
 
        # Automatic evaluation metrics
        auto_metrics = calculate_automatic_metrics(query, context, response)
 
        for metric_name, value in auto_metrics.items():
            span.set_attribute(f"evaluation.auto.{metric_name}", value)
 
        # Ground truth evaluation if available
        if ground_truth:
            ground_truth_metrics = calculate_ground_truth_metrics(response, ground_truth)
 
            for metric_name, value in ground_truth_metrics.items():
                span.set_attribute(f"evaluation.ground_truth.{metric_name}", value)
 
        # Overall evaluation score
        overall_score = calculate_overall_evaluation_score(auto_metrics, ground_truth_metrics if ground_truth else None)
        span.set_attribute("evaluation.overall_score", overall_score)
 
        # Classification
        if overall_score > 0.8:
            evaluation_class = "excellent"
        elif overall_score > 0.6:
            evaluation_class = "good"
        elif overall_score > 0.4:
            evaluation_class = "fair"
        else:
            evaluation_class = "poor"
 
        span.set_attribute("evaluation.classification", evaluation_class)
 
        return {
            "automatic_metrics": auto_metrics,
            "ground_truth_metrics": ground_truth_metrics if ground_truth else None,
            "overall_score": overall_score,
            "classification": evaluation_class
        }
 
def calculate_automatic_metrics(query: str, context: str, response: str) -> Dict[str, float]:
    """Calculate automatic evaluation metrics without ground truth."""
 
    return {
        "relevance_to_query": calculate_semantic_similarity(query, response),
        "context_fidelity": calculate_context_fidelity(context, response),
        "answer_completeness": calculate_answer_completeness_score(query, response),
        "coherence": calculate_coherence_score(response),
        "conciseness": calculate_conciseness_score(response),
        "hallucination_score": calculate_hallucination_score(context, response)
    }
 
def calculate_ground_truth_metrics(response: str, ground_truth: str) -> Dict[str, float]:
    """Calculate metrics comparing response to ground truth."""
 
    return {
        "bleu_score": calculate_bleu_score(response, ground_truth),
        "rouge_l": calculate_rouge_l_score(response, ground_truth),
        "semantic_similarity": calculate_semantic_similarity(response, ground_truth),
        "factual_correctness": calculate_factual_correctness(response, ground_truth),
        "information_coverage": calculate_information_coverage(response, ground_truth)
    }

📈 RAG Pipeline Performance Analysis

End-to-End Pipeline Metrics

@noveum_trace.trace_workflow(name="rag_pipeline_performance")
def trace_rag_pipeline_performance(questions: List[str], collection_id: str) -> Dict:
    """Trace performance metrics across multiple RAG queries."""
 
    with noveum_trace.trace("rag.pipeline_performance") as span:
        span.set_attribute("performance.question_count", len(questions))
        span.set_attribute("performance.collection_id", collection_id)
 
        pipeline_metrics = {
            "total_queries": len(questions),
            "successful_queries": 0,
            "failed_queries": 0,
            "avg_latency": 0,
            "avg_retrieval_time": 0,
            "avg_generation_time": 0,
            "avg_quality_score": 0,
            "cost_metrics": {
                "total_embedding_cost": 0,
                "total_generation_cost": 0,
                "total_cost": 0
            }
        }
 
        latencies = []
        quality_scores = []
        retrieval_times = []
        generation_times = []
 
        for i, question in enumerate(questions):
            try:
                start_time = time.time()
 
                # Execute RAG pipeline with detailed timing
                result = execute_rag_pipeline_with_timing(question, collection_id)
 
                total_latency = time.time() - start_time
                latencies.append(total_latency)
                retrieval_times.append(result["timings"]["retrieval_time"])
                generation_times.append(result["timings"]["generation_time"])
                quality_scores.append(result["quality_score"])
 
                # Track costs
                pipeline_metrics["cost_metrics"]["total_embedding_cost"] += result["costs"]["embedding_cost"]
                pipeline_metrics["cost_metrics"]["total_generation_cost"] += result["costs"]["generation_cost"]
 
                pipeline_metrics["successful_queries"] += 1
 
            except Exception as e:
                pipeline_metrics["failed_queries"] += 1
                span.add_event(f"query_failed", {"question_index": i, "error": str(e)})
 
        # Calculate aggregate metrics
        if latencies:
            pipeline_metrics["avg_latency"] = np.mean(latencies)
            pipeline_metrics["p95_latency"] = np.percentile(latencies, 95)
            pipeline_metrics["p99_latency"] = np.percentile(latencies, 99)
 
        if retrieval_times:
            pipeline_metrics["avg_retrieval_time"] = np.mean(retrieval_times)
 
        if generation_times:
            pipeline_metrics["avg_generation_time"] = np.mean(generation_times)
 
        if quality_scores:
            pipeline_metrics["avg_quality_score"] = np.mean(quality_scores)
            pipeline_metrics["min_quality_score"] = min(quality_scores)
            pipeline_metrics["max_quality_score"] = max(quality_scores)
 
        pipeline_metrics["cost_metrics"]["total_cost"] = (
            pipeline_metrics["cost_metrics"]["total_embedding_cost"] +
            pipeline_metrics["cost_metrics"]["total_generation_cost"]
        )
 
        # Set span attributes
        for key, value in pipeline_metrics.items():
            if isinstance(value, dict):
                for subkey, subvalue in value.items():
                    span.set_attribute(f"performance.{key}.{subkey}", subvalue)
            else:
                span.set_attribute(f"performance.{key}", value)
 
        return pipeline_metrics

Cost and Resource Tracking

@noveum_trace.trace_component(component_type="cost_tracker")
def trace_rag_costs(operation_type: str, tokens_used: int, model: str) -> Dict:
    """Track costs for RAG operations."""
 
    with noveum_trace.trace("rag.cost_tracking") as span:
        span.set_attribute("cost.operation_type", operation_type)
        span.set_attribute("cost.tokens_used", tokens_used)
        span.set_attribute("cost.model", model)
 
        # Calculate costs based on model and operation
        cost_per_token = get_model_cost_per_token(model, operation_type)
        total_cost = tokens_used * cost_per_token
 
        span.set_attribute("cost.cost_per_token", cost_per_token)
        span.set_attribute("cost.total_cost", total_cost)
        span.set_attribute("cost.currency", "USD")
 
        # Track cumulative costs
        daily_cost = get_daily_cumulative_cost()
        span.set_attribute("cost.daily_cumulative", daily_cost)
 
        # Cost efficiency metrics
        if operation_type == "generation":
            cost_per_word = total_cost / (tokens_used / 4)  # Rough token-to-word conversion
            span.set_attribute("cost.cost_per_word", cost_per_word)
 
        return {
            "operation_type": operation_type,
            "tokens_used": tokens_used,
            "total_cost": total_cost,
            "daily_cumulative": daily_cost
        }
 
def get_model_cost_per_token(model: str, operation_type: str) -> float:
    """Get cost per token for specific model and operation."""
 
    cost_mapping = {
        "text-embedding-ada-002": {"embedding": 0.0000001},
        "gpt-4": {"generation": 0.00003},
        "gpt-3.5-turbo": {"generation": 0.000002},
        "claude-3-opus": {"generation": 0.000015},
        "claude-3-sonnet": {"generation": 0.000003}
    }
 
    return cost_mapping.get(model, {}).get(operation_type, 0.0)

🔧 Best Practices for RAG Observability

1. Comprehensive Pipeline Tracing

# Trace every component of your RAG pipeline
@noveum_trace.trace_rag_pipeline(name="comprehensive_rag")
def comprehensive_rag_pipeline(query: str) -> Dict:
 
    # Always trace query preprocessing
    processed_query = trace_query_processing(query)
 
    # Always trace retrieval with quality metrics
    retrieved_docs = trace_retrieval_with_quality(processed_query)
 
    # Always trace context preparation
    context = trace_context_preparation(retrieved_docs, processed_query)
 
    # Always trace generation with quality analysis
    response = trace_generation_with_analysis(processed_query, context)
 
    # Always trace post-processing
    final_response = trace_post_processing(response)
 
    return final_response

2. Quality Monitoring

# Monitor quality metrics for continuous improvement
def setup_rag_quality_monitoring():
    noveum_trace.configure_quality_alerts({
        "retrieval_quality": {
            "metric": "avg_similarity_score",
            "threshold": 0.6,
            "window": "1h",
            "action": "alert"
        },
        "generation_quality": {
            "metric": "context_utilization",
            "threshold": 0.7,
            "window": "1h",
            "action": "alert"
        },
        "hallucination_risk": {
            "metric": "hallucination_score",
            "threshold": 0.3,
            "window": "30m",
            "action": "alert"
        }
    })

3. A/B Testing for RAG Components

@noveum_trace.trace_experiment(experiment_name="retrieval_strategies")
def compare_retrieval_strategies(query: str, collection_id: str) -> Dict:
    """Compare different retrieval strategies."""
 
    # Strategy A: Pure vector similarity
    results_a = trace_retrieval_strategy(query, collection_id, "vector_similarity")
 
    # Strategy B: Hybrid search (vector + keyword)
    results_b = trace_retrieval_strategy(query, collection_id, "hybrid_search")
 
    # Strategy C: Reranked results
    results_c = trace_retrieval_strategy(query, collection_id, "reranked")
 
    # Compare strategies
    comparison = compare_strategies([results_a, results_b, results_c])
 
    return comparison
 
def trace_retrieval_strategy(query: str, collection_id: str, strategy: str) -> Dict:
    """Trace a specific retrieval strategy."""
    with noveum_trace.trace(f"retrieval.strategy.{strategy}") as span:
        span.set_attribute("strategy.name", strategy)
        span.set_attribute("strategy.query", query)
 
        # Execute strategy
        results = execute_strategy(query, collection_id, strategy)
 
        # Track strategy-specific metrics
        span.set_attribute("strategy.results_count", len(results))
        span.set_attribute("strategy.avg_score", np.mean([r["score"] for r in results]))
 
        return results

🎯 Advanced RAG Patterns

Multi-Modal RAG

@noveum_trace.trace_rag_pipeline(name="multimodal_rag")
def multimodal_rag_pipeline(query: str, content_types: List[str]) -> Dict:
    """RAG pipeline that handles multiple content types (text, images, documents)."""
 
    with noveum_trace.trace("multimodal_coordination") as span:
        span.set_attribute("multimodal.content_types", content_types)
        span.set_attribute("multimodal.query", query)
 
        # Process different content types
        results_by_type = {}
 
        for content_type in content_types:
            with noveum_trace.trace(f"multimodal.{content_type}") as type_span:
                type_span.set_attribute("content_type", content_type)
 
                if content_type == "text":
                    results = trace_text_retrieval(query)
                elif content_type == "image":
                    results = trace_image_retrieval(query)
                elif content_type == "document":
                    results = trace_document_retrieval(query)
 
                results_by_type[content_type] = results
                type_span.set_attribute("results_count", len(results))
 
        # Merge multimodal results
        merged_context = merge_multimodal_context(results_by_type)
 
        # Generate response considering all modalities
        response = trace_multimodal_generation(query, merged_context)
 
        return response

Conversational RAG

@noveum_trace.trace_rag_pipeline(name="conversational_rag")
def conversational_rag_pipeline(current_query: str, conversation_history: List[Dict]) -> Dict:
    """RAG pipeline that maintains conversation context."""
 
    with noveum_trace.trace("conversational_rag") as span:
        span.set_attribute("conversation.current_query", current_query)
        span.set_attribute("conversation.history_length", len(conversation_history))
 
        # Extract conversation context
        conversation_context = extract_conversation_context(conversation_history)
        span.set_attribute("conversation.context_extracted", bool(conversation_context))
 
        # Enhance query with conversation context
        enhanced_query = enhance_query_with_context(current_query, conversation_context)
        span.set_attribute("conversation.query_enhanced", enhanced_query != current_query)
 
        # Retrieve with conversation-aware strategy
        retrieved_docs = trace_conversational_retrieval(enhanced_query, conversation_context)
 
        # Generate response maintaining conversation flow
        response = trace_conversational_generation(
            current_query,
            retrieved_docs,
            conversation_history
        )
 
        return response
 
def trace_conversational_retrieval(query: str, conversation_context: Dict) -> List[Dict]:
    """Retrieve documents considering conversation context."""
    with noveum_trace.trace("conversational.retrieval") as span:
        span.set_attribute("retrieval.has_conversation_context", bool(conversation_context))
 
        # Weight recent conversation topics higher
        context_weights = calculate_conversation_weights(conversation_context)
        span.set_attribute("retrieval.context_weights", len(context_weights))
 
        # Execute context-aware retrieval
        results = execute_contextual_retrieval(query, context_weights)
 
        span.set_attribute("retrieval.results_count", len(results))
 
        return results

RAG observability with Noveum.ai provides the deep insights needed to build, optimize, and scale retrieval-augmented generation systems. By implementing comprehensive tracing across retrieval, context preparation, and generation stages, you can ensure your RAG pipeline delivers accurate, relevant, and cost-effective responses.

🔗 Next Steps

Exclusive Early Access

Get Early Access to Noveum.ai Platform

Be the first one to get notified when we open Noveum Platform to more users. All users get access to Observability suite for free, early users get free eval jobs and premium support for the first year.

Sign up now. We send access to new batch every week.

Early access members receive premium onboarding support and influence our product roadmap. Limited spots available.