Documentation
Integration Examples/Pipecat/Basic Pipecat Voice Pipeline

Basic Pipecat Voice Pipeline

Learn how to trace a Pipecat voice pipeline with Noveum Trace

This guide shows a complete Pipecat voice pipeline (STT + LLM function calling + TTS) instrumented with Noveum Trace. You will see per-turn spans for STT, LLM, and TTS, plus (optionally) uploaded audio for each span.

🎯 Use Case

Drive-Thru Voice Agent: A voice-powered ordering bot that:

  • Listens for customer speech (STT)
  • Uses an LLM with function calling to build an order
  • Speaks back responses (TTS)

🚀 Complete Working Example

import os
 
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
    FunctionCallResultProperties,
    LLMRunFrame,
    TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
    LLMContextAggregatorPair,
    LLMUserAggregatorParams,
)
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
 
import noveum_trace
from noveum_trace.integrations.pipecat import NoveumTraceObserver
 
load_dotenv(override=True)
 
 
# Simple simulated order storage
current_order = {"items": [], "total": 0.0}
MENU = {
    "burger": 5.99,
    "fries": 2.99,
}
 
 
async def add_item_to_order(params: FunctionCallParams):
    item = params.arguments.get("item", "").lower().replace(" ", "_")
    quantity = params.arguments.get("quantity", 1)
 
    if item not in MENU:
        await params.result_callback(
            {"success": False, "message": f"Sorry, we don't have {item} on our menu."}
        )
        return
 
    price = MENU[item]
    current_order["items"].append(
        {"name": item, "quantity": quantity, "price": price}
    )
    current_order["total"] += price * quantity
 
    await params.result_callback(
        {
            "success": True,
            "item": item,
            "quantity": quantity,
            "price": price,
            "current_total": current_order["total"],
        }
    )
 
 
async def view_current_order(params: FunctionCallParams):
    items_summary = [
        {"name": i["name"], "quantity": i["quantity"], "price": i["price"]}
        for i in current_order["items"]
    ]
 
    await params.result_callback(
        {
            "success": True,
            "items": items_summary,
            "total": current_order["total"],
        }
    )
 
 
async def confirm_order(params: FunctionCallParams):
    order_total = current_order["total"]
    current_order["items"] = []
    current_order["total"] = 0.0
 
    await params.result_callback(
        {
            "success": True,
            "message": f"Order confirmed. Total was ${order_total:.2f}.",
        }
    )
 
 
transport_params = {
    "daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
    "twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
    "webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}
 
 
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
    logger.info("Starting pipecat drive-thru bot")
 
    # Initialize Noveum Trace before the pipeline starts.
    # In production, call noveum_trace.init() once at module level
    # rather than inside a per-connection function.
    noveum_trace.init(
        api_key=os.getenv("NOVEUM_API_KEY"),
        project=os.getenv("NOVEUM_PROJECT", "pipecat-drive-thru"),
    )
 
    stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
    tts = CartesiaTTSService(
        api_key=os.getenv("CARTESIA_API_KEY"),
        settings=CartesiaTTSService.Settings(
            voice="71a7ad14-091c-4e8e-a314-022ece01c121",
        ),
    )
    llm = OpenAILLMService(
        api_key=os.getenv("OPENAI_API_KEY"),
        settings=OpenAILLMService.Settings(
            system_instruction=(
                "You are a friendly drive-thru order taker. "
                "Use add_item_to_order to build the order, "
                "use view_current_order to check it, and "
                "use confirm_order when the customer is done."
            ),
        ),
    )
 
    # Register function handlers for the LLM
    llm.register_function("add_item_to_order", add_item_to_order)
    llm.register_function("view_current_order", view_current_order)
    llm.register_function("confirm_order", confirm_order)
 
    # Define function schemas for Pipecat's tool/function calling
    add_item_schema = FunctionSchema(
        name="add_item_to_order",
        description="Add an item to the customer's order.",
        properties={
            "item": {"type": "string", "description": "Menu item name (e.g., 'burger', 'fries')"},
            "quantity": {"type": "integer", "description": "Quantity (default 1)", "default": 1},
        },
        required=["item"],
    )
 
    view_order_schema = FunctionSchema(
        name="view_current_order",
        description="View the current order with all items and total price.",
        properties={},
        required=[],
    )
 
    confirm_order_schema = FunctionSchema(
        name="confirm_order",
        description="Confirm and finalize the customer's order.",
        properties={},
        required=[],
    )
 
    tools = ToolsSchema(
        standard_tools=[add_item_schema, view_order_schema, confirm_order_schema]
    )
 
    context = LLMContext(tools=tools)
    user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
        context,
        user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
    )
 
    # AudioBufferProcessor captures the full stereo conversation WAV
    # (left channel = user, right channel = bot).
    # attach_to_task() auto-detects it and produces a pipecat.full_conversation span.
    audio_buffer = AudioBufferProcessor(num_channels=2)
 
    pipeline = Pipeline(
        [
            transport.input(),
            stt,
            user_aggregator,
            llm,
            tts,
            transport.output(),
            assistant_aggregator,
            audio_buffer,
        ]
    )
 
    trace_obs = NoveumTraceObserver(record_audio=True)
 
    # IMPORTANT: attach_to_task() is what wires up turn tracking boundaries
    task = PipelineTask(
        pipeline,
        params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
        observers=[trace_obs],
    )
 
    await trace_obs.attach_to_task(task)
 
    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        logger.info("Customer connected to drive-thru")
        context.add_message({"role": "user", "content": "Greet the customer and take their order."})
        await task.queue_frames([LLMRunFrame()])
 
    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        logger.info("Customer disconnected")
        await task.cancel()
 
    runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
    await runner.run(task)
 
 
async def bot(runner_args: RunnerArguments):
    transport = await create_transport(runner_args, transport_params)
    await run_bot(transport, runner_args)
 
 
if __name__ == "__main__":
    from pipecat.runner.run import main
 
    main()

📋 Prerequisites

pip install "noveum-trace[pipecat]"

Set your environment variables:

export NOVEUM_API_KEY="your-noveum-api-key"
export NOVEUM_PROJECT="pipecat-drive-thru"
export DEEPGRAM_API_KEY="your-deepgram-api-key"
export CARTESIA_API_KEY="your-cartesia-api-key"
export OPENAI_API_KEY="your-openai-api-key"

🔧 How It Works

  1. noveum_trace.init() configures where traces go. In production, call this once at module level rather than inside a per-connection function.
  2. trace_obs = NoveumTraceObserver(record_audio=True) sets what the observer captures.
  3. task = PipelineTask(..., observers=[trace_obs]) attaches the observer to the pipeline task.
  4. await trace_obs.attach_to_task(task) wires turn boundaries so you get per-turn STT/LLM/TTS spans.
  5. AudioBufferProcessor(num_channels=2) in the pipeline combined with record_audio=True enables full-conversation audio capture. attach_to_task() auto-detects the processor and uploads a stereo WAV as a pipecat.full_conversation span when the session ends.

📊 What You'll See in the Dashboard

  • pipecat.conversation trace with one pipecat.turn per user→bot exchange
  • Child spans per turn:
    • pipecat.stt (recognized speech/text, model, confidence, latency)
    • pipecat.llm (prompt, output, token usage, cost, function calls when emitted)
    • pipecat.tts (spoken text, voice, audio timing)
  • When record_audio=True, per-span audio uploads add stt.audio_uuid and tts.audio_uuid attributes
  • A pipecat.full_conversation span at the trace root containing the full stereo WAV of the session (left channel = user, right channel = bot) — enabled because this example includes AudioBufferProcessor(num_channels=2) in the pipeline

🔍 Troubleshooting

If turns don’t split correctly, verify that:

  • await trace_obs.attach_to_task(task) runs after PipelineTask is created
  • noveum_trace.init() is called before the pipeline starts

🚀 Next Steps

Exclusive Early Access

Get Early Access to Noveum.ai Platform

Be the first one to get notified when we open Noveum Platform to more users. All users get access to Observability suite for free, early users get free eval jobs and premium support for the first year.

Sign up now. We send access to new batch every week.

Early access members receive premium onboarding support and influence our product roadmap. Limited spots available.

On this page

Basic Pipecat Voice Pipeline | Documentation | Noveum.ai