Tracing
Tracing is the foundation of evaluating, debugging, and optimizing your RAG application. RAG Workbench provides a unique capability to trace both your ingestion and query pipelines, offering a comprehensive view of your system. The two fundamental components in tracing are spans and traces.
-
Spans represent individual tasks with a specific start and end time. They can include optional fields such as input, output, metadata, and evaluation metrics. Common examples of spans include LLM calls, vector searches, agent chain steps, and model evaluations.
-
Traces are composed of a collection of spans that together represent a single, standalone request. Well-structured traces make it easier to understand the flow of your application and facilitate issue resolution when problems arise.
Annotating your code
To log a trace, you simply wrap the code you want to trace with the traced
decorator. RAG Workbench will automatically capture and log information behind the scenes. Some cool features of tracing:
- Connect your ingestion and retrieval traces
- Register key parameters to a
paramSet
to easily manage experimentation on parameters - Auto-instrumentation for popular libraries like OpenAI, LlamaIndex, Langchain
Tracing Ingestion Pipeline
Instantiate your LastMile Tracer object.
tracer: LastMileTracer = get_lastmile_tracer("My-Project")
The tracer.trace_function()
decorator will create a span underneath the currently-active span.
@tracer.trace_function() #Decorate the function with the tracer
def chunk_document(file_path: str, chunk_size: int = 1000) -> list[str]:
with open(file_path, "r") as file:
text = file.read()
chunks: list[str] = []
for i in range(0, len(text), chunk_size):
chunks.append(text[i:i + chunk_size])
return chunks
@tracer.trace_function()
def run_ingestion_flow() -> chromadb.Collection:
collection = chroma_client.create_collection(name="paul_graham_collection")
tracer.log("Ingesting Paul Graham's essay...")
document_chunks = chunk_document("data/paul_graham/paul_graham_essay.txt")
document_ids = [f"chunk_{i}" for i in range(len(document_chunks))]
collection.add(
ids=document_ids,
documents=document_chunks, # ex: ["What I Worked On", "February 2021", ...]
)
return collection
collection:chromadb.Collection = run_ingestion_flow()
The trace data for the ingestion pipeline has a unique trace (and ID) associated with it. We can use this ID to link the tracing for the ingestion step and the query step of the RAG system for a comprehensive overview of your RAG system.
Here is how you get the latest ingestion trace ID which you can use when setting up the tracing for the Query Pipeline.
ingestion_trace_id = get_latest_ingestion_trace_id()
Tracing Query Pipeline
Setup tracing for your query pipeline similarly to the ingestion pipeline. Here are some key differences:
- Specify
ingestion_trace_id
for the span or trace events to link the event to your ingestion trace. This way your entire system is tracked and managed by 1 trace even though you have different pipelines for ingestion and retrieval.
The code below uses register_param
to keep track on a key parameter of our RAG system (ex. chunk size). This function adds the said parameter to the param_set
of your trace – a dictionary maintaining the key parameters of our RAG system. Using param_set
is an efficient way to track changes between experiments. It records the varying states of the system, offering clarity on which configurations performed well and which didn't which is critical for rapid experimentation and debugging.
from lastmile_eval.rag.debugger.api import (
LastMileTracer,
RetrievedNode,
)
LLM_NAME = "gpt-3.5-turbo"
PROMPT_TEMPLATE = """
Context information is below.
---------------------
{context_str}
---------------------
Given the context information and not prior knowledge, answer the query.
Query: {query_str}
Answer:
"""
@tracer.trace_function(name="retrieve-context") #Decorate the function with the tracer
def retrieve_context(query_string: str, top_k: int = 5) -> list[RetrievedNode]:
"""
Retrieve the top-k most relevant contexts based on the query string
from the chroma db collection
"""
tracer.register_param("similarity_top_k", top_k) #Register parameters associated with your RAG pipeline setup
chroma_retrival_results = collection.query(query_texts=query_string, n_results=top_k)
documents_as_retrieved_nodes = [
RetrievedNode(
id=<get_id_from_document(document)>,
title=<get_title_from_document(document)>,
text=document[0],
score=<get_score_from_document(document)>,
)
for document in chroma_retrival_results.get("documents")
]
tracer.add_retrieval_event(
query=query_string,
retrieved_nodes=documents_as_retrieved_nodes,
)
return documents_as_retrieved_nodes
@tracer.trace_function(name="resolve-prompt")
def resolve_prompt(user_query: str, retrieved_nodes: list[RetrievedNode]):
resolved_prompt = PROMPT_TEMPLATE.replace(
"{context_str}", "\n\n\n".join(
[doc.text for doc in retrieved_nodes]
)
).replace("{query_str}", user_query)
tracer.add_template_event(
prompt_template=PROMPT_TEMPLATE,
resolved_prompt=resolved_prompt,
)
return resolved_prompt
@traced(tracer=tracer, name="query-root-span") # You can provide a custom name for the root span
def run_query_flow(user_query: str, ingestion_trace_id: str):
retrieved_nodes = retrieve_context(user_query, top_k=3)
resolved_prompt = resolve_prompt(user_query, retrieved_nodes)
with tracer.start_as_current_span("call-llm") as _llm_span:
openai_client = openai.Client(api_key=os.getenv("OPENAI_API_KEY"))
response = openai_client.chat.completions.create(
model=LLM_NAME,
messages=[{"role": "user", "content": resolved_prompt}],
)
output: str = response.choices[0].message.content
tracer.add_query_event(
query=user_query,
llm_output=output,
)
return output
Register Parameters
The register_param
function allows you to track key parameters of your RAG system, such as chunk size, by adding them to the paramSet
of your trace.
The paramset
is a dictionary that stores the crucial parameters of your RAG system, enabling you to monitor changes between experiments. By registering parameters using register_param("chunk_size", 512)
, you can easily identify optimal configurations, facilitate debugging, and enhance reproducibility. The paramSet
can be viewed in the RAG Workbench UI, providing a clear record of the parameter values used in each trace or evaluation run.
tracer.register_param("similarity_top_k", top_k)
Tracing Integrations
OpenAI Wrapper
The LastMile Tracing SDK provides a wrapper for the OpenAI API that automatically logs your requests.
from lastmile_eval.rag.debugger.api.tracing import LastMileTracer
from lastmile_eval.rag.debugger.tracing import openai as openai_tracing
from lastmile_eval.rag.debugger.tracing.sdk import get_lastmile_tracer
tracer: LastMileTracer = get_lastmile_tracer(
tracer_name="OpenAI Function Calling",
)
client = openai_tracing.wrap(openai.OpenAI(), tracer)
LlamaIndex
The LastMile Tracing SDK provides an instrumentor for LlamaIndex, allowing you to automatically trace and monitor the execution of LlamaIndex operations without modifying the library's code.
import llama_index.core
from lastmile_eval.rag.debugger.tracing.auto_instrumentation import (
LlamaIndexCallbackHandler,
)
llama_index.core.global_handler = LlamaIndexCallbackHandler(
project_name="LlamaIndex Paul Graham QA",
)
Langchain
The LastMile Tracing SDK provides an instrumentor for Langchain, allowing you to automatically trace and monitor the execution of Langchain operations without modifying the library's code.
from lastmile_eval.rag.debugger.tracing.auto_instrumentation import LangChainInstrumentor
instrumentor = LangChainInstrumentor(project_name="Plan-and-Execute Example")
instrumentor.instrument()
Distributed Tracing
Distributed Tracing allows you to start a trace in one process and seamlessly continue it in another, providing a unified view of the application's execution flow across multiple services running on different processes or machines.
The export_span()
function in the LastMile Tracing SDK serializes the current span context into an opaque string identifier, capturing the necessary information to resume the trace in a different process. By passing this exported span context between processes, you are ensuring accurate propagation of the trace context.
Server Code
tracer: LastMileTracer = get_lastmile_tracer(tracer_name="generate_riddle")
app = Flask(__name__)
@app.route("/generate")
def generate()-> str:
span_context = request.headers.get("span")
with tracer.start_as_current_span("generate_endpoint", context=span_context):
# Generate riddle using OpenAI's GPT-3.5-turbo model
# ...
return json.dumps(riddle)
app.run(port=1234, debug=False)
Client Code
tracer2: LastMileTracer = get_lastmile_tracer(tracer_name="generate_riddle")
@tracer2.start_as_current_span("client")
def client_say_riddle():
try:
print("sending request to subprocess server, with span context.")
response = requests.get('http://127.0.0.1:1234/generate', headers={"span": export_span(trace.get_current_span())})
return response.text
except Exception as e:
print(e)
client_say_riddle()
Managing Traces
RAG Workbench offers a user-friendly UI for managing your traces. The UI is particularly helpful if you have run evaluation metrics on your traces or if you want to more closely inspect specific steps for debugging.
In your terminal, type rag-debug launch
. This takes you to the RAG Workbench UI.
Navigate to the Traces tab. Here you will see all your traces by Project and the time interval you want to look at.
Click on a Trace
and now you can see the DAG of all the spans (steps) of your RAG System. You also have access to all the metadata that was logged.
More Resources
Here are other helpful guides on how to use traces: