LangSmith is the observability platform for LangChain applications, providing complete visibility into LLM operations, debugging tools, and production monitoring capabilities.
Why LangSmith?
LangSmith provides:
- Complete Tracing: Every LLM call, chain execution, and tool use
- Debugging Tools: Replay failed runs, inspect prompts and outputs
- Dataset Management: Version and manage test datasets
- Evaluation Framework: Automated testing and quality metrics
- Cost Analysis: Track token usage and API costs
- Performance Monitoring: Latency, throughput, and error rates
import os
from langsmith import Client
from langchain_core.tracers.context import tracing_v2_enabled
# Initialize LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production"
client = Client()
Setting Up LangSmith
Configuration for Production
from langsmith import Client
from langchain.callbacks.tracers import LangChainTracer
from typing import Optional, Dict, Any
import logging
class LangSmithConfig:
"""Production LangSmith configuration"""
def __init__(
self,
api_key: str,
project_name: str,
environment: str = "production"
):
self.api_key = api_key
self.project_name = f"{project_name}_{environment}"
self.environment = environment
# Set environment variables
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = api_key
os.environ["LANGCHAIN_PROJECT"] = self.project_name
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
# Initialize client
self.client = Client(api_key=api_key)
# Configure logging
self.setup_logging()
def get_tracer(self, run_name: Optional[str] = None) -> LangChainTracer:
"""Get configured tracer"""
return LangChainTracer(
project_name=self.project_name,
client=self.client,
run_name=run_name,
tags=[self.environment]
)
Environment-Specific Configuration
class MultiEnvironmentConfig:
"""Manage multiple environments"""
def __init__(self):
self.configs = {
"development": {
"project": "my_app_dev",
"sampling_rate": 1.0, # Trace everything
"log_level": "DEBUG"
},
"staging": {
"project": "my_app_staging",
"sampling_rate": 0.5, # Sample 50%
"log_level": "INFO"
},
"production": {
"project": "my_app_prod",
"sampling_rate": 0.1, # Sample 10%
"log_level": "WARNING"
}
}
def configure(self, environment: str):
"""Configure for specific environment"""
config = self.configs.get(environment, self.configs["production"])
os.environ["LANGCHAIN_PROJECT"] = config["project"]
# Configure sampling
if random.random() > config["sampling_rate"]:
os.environ["LANGCHAIN_TRACING_V2"] = "false"
logging.getLogger().setLevel(config["log_level"])
return config
Tracing and Debugging
Advanced Tracing Patterns
from langsmith.run_helpers import traceable
from contextlib import contextmanager
from typing import Generator
class TracingManager:
"""Advanced tracing management"""
@staticmethod
@contextmanager
def trace_context(
name: str,
metadata: Optional[Dict[str, Any]] = None,
tags: Optional[List[str]] = None
) -> Generator:
"""Context manager for tracing"""
with tracing_v2_enabled(
project_name=os.environ.get("LANGCHAIN_PROJECT"),
tags=tags or [],
metadata=metadata or {}
) as tracer:
try:
yield tracer
except Exception as e:
# Log error to LangSmith
tracer.on_error(error=e)
raise
@staticmethod
@traceable(run_type="chain", name="custom_operation")
async def traced_operation(
input_data: dict,
config: Optional[dict] = None
) -> dict:
"""Traced async operation"""
# Operation logic
result = await process_data(input_data)
# Add custom metadata
from langsmith.run_trees import get_current_run_tree
run_tree = get_current_run_tree()
if run_tree:
run_tree.add_metadata({
"custom_metric": calculate_metric(result),
"processing_time": time.time(),
"config": config or {}
})
return result
class DebugTracer:
"""Enhanced debugging capabilities"""
def __init__(self, client: Client):
self.client = client
async def replay_run(self, run_id: str) -> dict:
"""Replay a specific run for debugging"""
# Fetch run details
run = self.client.read_run(run_id)
# Extract inputs and config
inputs = run.inputs
config = run.extra.get("config", {})
# Re-run with debug mode
with self.debug_mode():
# Recreate the chain/agent
chain = self._recreate_chain(run.serialized)
# Execute with same inputs
result = await chain.ainvoke(inputs, config=config)
return {
"original": run.outputs,
"replay": result,
"diff": self._compare_outputs(run.outputs, result)
}
def analyze_failures(self, project_name: str, hours: int = 24) -> dict:
"""Analyze recent failures"""
from datetime import datetime, timedelta
# Query failed runs
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
runs = self.client.list_runs(
project_name=project_name,
start_time=start_time,
end_time=end_time,
error=True # Only failed runs
)
# Analyze failure patterns
failures = {}
for run in runs:
error_type = self._classify_error(run.error)
failures[error_type] = failures.get(error_type, 0) + 1
return {
"total_failures": sum(failures.values()),
"failure_types": failures,
"common_errors": self._get_common_errors(runs)
}
Dataset Management
Creating and Managing Test Datasets
from langsmith.schemas import Dataset, Example
from typing import List, Tuple
import pandas as pd
class DatasetManager:
"""Manage LangSmith datasets"""
def __init__(self, client: Client):
self.client = client
def create_dataset_from_csv(
self,
csv_path: str,
dataset_name: str,
input_columns: List[str],
output_columns: List[str],
metadata: Optional[Dict] = None
) -> Dataset:
"""Create dataset from CSV"""
# Read CSV
df = pd.read_csv(csv_path)
# Create dataset
dataset = self.client.create_dataset(
dataset_name=dataset_name,
description=f"Dataset from {csv_path}",
metadata=metadata or {}
)
# Add examples
for _, row in df.iterrows():
inputs = {col: row[col] for col in input_columns}
outputs = {col: row[col] for col in output_columns}
self.client.create_example(
dataset_id=dataset.id,
inputs=inputs,
outputs=outputs
)
return dataset
def create_golden_dataset(
self,
name: str,
examples: List[Tuple[dict, dict]]
) -> Dataset:
"""Create golden dataset for testing"""
dataset = self.client.create_dataset(
dataset_name=name,
description="Golden test dataset",
metadata={"type": "golden", "version": "1.0"}
)
for inputs, expected_outputs in examples:
self.client.create_example(
dataset_id=dataset.id,
inputs=inputs,
outputs=expected_outputs,
metadata={"golden": True}
)
return dataset
def version_dataset(
self,
dataset_name: str,
version: str
) -> Dataset:
"""Create versioned copy of dataset"""
# Get original dataset
original = self.client.read_dataset(dataset_name=dataset_name)
# Create new version
versioned_name = f"{dataset_name}_v{version}"
new_dataset = self.client.create_dataset(
dataset_name=versioned_name,
description=f"Version {version} of {dataset_name}",
metadata={
"version": version,
"parent": dataset_name,
"created_at": datetime.now().isoformat()
}
)
# Copy examples
examples = self.client.list_examples(dataset_id=original.id)
for example in examples:
self.client.create_example(
dataset_id=new_dataset.id,
inputs=example.inputs,
outputs=example.outputs,
metadata=example.metadata
)
return new_dataset
Testing and Evaluation
Automated Testing Framework
from langsmith.evaluation import evaluate, RunEvaluator
from langchain.smith import RunEvalConfig
from typing import Optional
class EvaluationFramework:
"""Comprehensive evaluation framework"""
def __init__(self, client: Client):
self.client = client
self.evaluators = self._setup_evaluators()
def _setup_evaluators(self) -> List[RunEvaluator]:
"""Configure evaluation metrics"""
return [
self.create_accuracy_evaluator(),
self.create_latency_evaluator(),
self.create_hallucination_evaluator(),
]
def create_accuracy_evaluator(self) -> RunEvaluator:
"""Evaluate accuracy"""
class AccuracyEvaluator(RunEvaluator):
def evaluate_run(
self,
run: Run,
example: Optional[Example] = None
) -> dict:
if not example:
return {"score": None}
# Compare outputs
predicted = run.outputs.get("output", "")
expected = example.outputs.get("output", "")
# Calculate similarity
from difflib import SequenceMatcher
similarity = SequenceMatcher(None, predicted, expected).ratio()
return {
"score": similarity,
"pass": similarity > 0.8,
"feedback": f"Similarity: {similarity:.2%}"
}
return AccuracyEvaluator()
async def run_evaluation(
self,
chain,
dataset_name: str,
eval_config: Optional[RunEvalConfig] = None
) -> dict:
"""Run comprehensive evaluation"""
config = eval_config or RunEvalConfig(
evaluators=self.evaluators,
batch_size=5,
max_concurrency=3
)
results = await evaluate(
lambda inputs: chain.invoke(inputs),
data=dataset_name,
evaluators=config.evaluators,
client=self.client,
project_name=f"eval_{dataset_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
return self._analyze_results(results)
Performance Monitoring
Real-Time Performance Tracking
from prometheus_client import Counter, Histogram, Gauge
import time
class PerformanceMonitor:
"""Monitor LangChain application performance"""
def __init__(self, client: Client):
self.client = client
# Prometheus metrics
self.request_count = Counter(
'langchain_requests_total',
'Total requests',
['chain_name', 'status']
)
self.request_duration = Histogram(
'langchain_request_duration_seconds',
'Request duration',
['chain_name']
)
self.token_usage = Counter(
'langchain_tokens_total',
'Total tokens used',
['model', 'type']
)
async def monitor_chain_execution(self, chain, inputs: dict, chain_name: str):
"""Monitor chain execution with metrics"""
start_time = time.time()
try:
# Execute chain
result = await chain.ainvoke(inputs)
# Record success
self.request_count.labels(
chain_name=chain_name,
status="success"
).inc()
return result
except Exception as e:
# Record failure
self.request_count.labels(
chain_name=chain_name,
status="failure"
).inc()
raise
finally:
# Record duration
duration = time.time() - start_time
self.request_duration.labels(chain_name=chain_name).observe(duration)
Cost Tracking
Token Usage and Cost Analysis
class CostTracker:
"""Track and optimize costs"""
# Pricing per 1k tokens (as of 2024)
PRICING = {
"gpt-4": {"prompt": 0.03, "completion": 0.06},
"gpt-4-turbo": {"prompt": 0.01, "completion": 0.03},
"gpt-3.5-turbo": {"prompt": 0.0005, "completion": 0.0015},
"claude-3-opus": {"prompt": 0.015, "completion": 0.075},
"claude-3-sonnet": {"prompt": 0.003, "completion": 0.015},
}
def __init__(self, client: Client):
self.client = client
def calculate_run_cost(self, run: Run) -> float:
"""Calculate cost for a single run"""
if not run.token_usage:
return 0.0
model = run.model or "gpt-3.5-turbo"
pricing = self.PRICING.get(model, self.PRICING["gpt-3.5-turbo"])
prompt_cost = (run.token_usage.prompt_tokens / 1000) * pricing["prompt"]
completion_cost = (run.token_usage.completion_tokens / 1000) * pricing["completion"]
return prompt_cost + completion_cost
def analyze_costs(
self,
project_name: str,
start_date: datetime,
end_date: datetime
) -> dict:
"""Analyze costs over time period"""
runs = list(self.client.list_runs(
project_name=project_name,
start_time=start_date,
end_time=end_date
))
# Calculate costs by model
costs_by_model = {}
tokens_by_model = {}
for run in runs:
if run.token_usage:
model = run.model or "unknown"
cost = self.calculate_run_cost(run)
costs_by_model[model] = costs_by_model.get(model, 0) + cost
tokens_by_model[model] = tokens_by_model.get(model, 0) + run.token_usage.total_tokens
return {
"total_cost": sum(costs_by_model.values()),
"costs_by_model": costs_by_model,
"tokens_by_model": tokens_by_model,
"avg_cost_per_run": sum(costs_by_model.values()) / len(runs) if runs else 0
}
def optimize_costs(self, project_name: str) -> List[dict]:
"""Generate cost optimization recommendations"""
recommendations = []
# Analyze recent runs
runs = list(self.client.list_runs(
project_name=project_name,
limit=1000
))
# Check for expensive models on simple tasks
for run in runs:
if run.model == "gpt-4" and run.token_usage:
if run.token_usage.total_tokens < 500:
recommendations.append({
"type": "model_downgrade",
"run_id": run.id,
"current_model": "gpt-4",
"recommended_model": "gpt-3.5-turbo",
"potential_savings": self._calculate_savings(run, "gpt-3.5-turbo")
})
return recommendations
Production Best Practices
Complete Production Setup
class ProductionLangSmith:
"""Production-ready LangSmith setup"""
def __init__(self, config: dict):
self.config = config
self.client = Client(api_key=config["api_key"])
self.setup_production_features()
def setup_production_features(self):
"""Configure all production features"""
# Setup monitoring
self.monitor = PerformanceMonitor(self.client)
# Setup cost tracking
self.cost_tracker = CostTracker(self.client)
# Setup evaluation
self.evaluator = EvaluationFramework(self.client)
# Setup debugging
self.debugger = DebugTracer(self.client)
async def production_chain_wrapper(
self,
chain,
chain_name: str
):
"""Wrap chain with production features"""
async def wrapped_chain(inputs: dict) -> dict:
# Start monitoring
with self.monitor.monitor_chain_execution(
chain,
inputs,
chain_name
):
# Add custom metadata
metadata = {
"environment": self.config["environment"],
"version": self.config["version"],
"user_id": inputs.get("user_id"),
}
# Execute with tracing
with TracingManager.trace_context(
name=chain_name,
metadata=metadata,
tags=[self.config["environment"]]
):
result = await chain.ainvoke(inputs)
return result
return wrapped_chain
def generate_daily_report(self) -> dict:
"""Generate daily operations report"""
end_date = datetime.now()
start_date = end_date - timedelta(days=1)
return {
"date": end_date.date().isoformat(),
"performance": self.monitor.create_dashboard_metrics(
self.config["project_name"]
),
"costs": self.cost_tracker.analyze_costs(
self.config["project_name"],
start_date,
end_date
),
"failures": self.debugger.analyze_failures(
self.config["project_name"],
hours=24
)
}
Summary
LangSmith provides comprehensive observability for production LangChain applications:
- Complete Tracing: Every operation is tracked and searchable
- Dataset Management: Version-controlled test data
- Automated Testing: Regression testing and quality metrics
- Performance Monitoring: Real-time metrics and alerting
- Cost Optimization: Token usage tracking and recommendations
- Debugging Tools: Replay and analyze failed runs
These capabilities are essential for maintaining reliable, cost-effective AI applications in production.
Series Navigation
This is Part 5 of the LangChain Series.
Previous: ā Part 4 - LangGraph: Advanced Patterns
Complete Series:
- Part 0: Introduction to LangChain Ecosystem
- Part 1: LangChain Fundamentals - Core Components
- Part 2: LangChain Fundamentals - Chains and Agents
- Part 3: LangGraph - Building Stateful Applications
- Part 4: LangGraph - Advanced Patterns
- Part 5: LangSmith - Production Monitoring (You are here)
Tags: #LangSmith #Monitoring #Observability #Production #LangChain

