LangSmith: Production Monitoring, Debugging, and Observability

Cover Image for LangSmith: Production Monitoring, Debugging, and Observability
AI & Machine Learning5 min read

LangSmith is the observability platform for LangChain applications, providing complete visibility into LLM operations, debugging tools, and production monitoring capabilities.

Why LangSmith?

LangSmith provides:

  • Complete Tracing: Every LLM call, chain execution, and tool use
  • Debugging Tools: Replay failed runs, inspect prompts and outputs
  • Dataset Management: Version and manage test datasets
  • Evaluation Framework: Automated testing and quality metrics
  • Cost Analysis: Track token usage and API costs
  • Performance Monitoring: Latency, throughput, and error rates
import os
from langsmith import Client
from langchain_core.tracers.context import tracing_v2_enabled

# Initialize LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production"

client = Client()

Setting Up LangSmith

Configuration for Production

from langsmith import Client
from langchain.callbacks.tracers import LangChainTracer
from typing import Optional, Dict, Any
import logging

class LangSmithConfig:
    """Production LangSmith configuration"""

    def __init__(
        self,
        api_key: str,
        project_name: str,
        environment: str = "production"
    ):
        self.api_key = api_key
        self.project_name = f"{project_name}_{environment}"
        self.environment = environment

        # Set environment variables
        os.environ["LANGCHAIN_TRACING_V2"] = "true"
        os.environ["LANGCHAIN_API_KEY"] = api_key
        os.environ["LANGCHAIN_PROJECT"] = self.project_name
        os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"

        # Initialize client
        self.client = Client(api_key=api_key)

        # Configure logging
        self.setup_logging()

    def get_tracer(self, run_name: Optional[str] = None) -> LangChainTracer:
        """Get configured tracer"""

        return LangChainTracer(
            project_name=self.project_name,
            client=self.client,
            run_name=run_name,
            tags=[self.environment]
        )

Environment-Specific Configuration

class MultiEnvironmentConfig:
    """Manage multiple environments"""

    def __init__(self):
        self.configs = {
            "development": {
                "project": "my_app_dev",
                "sampling_rate": 1.0,  # Trace everything
                "log_level": "DEBUG"
            },
            "staging": {
                "project": "my_app_staging",
                "sampling_rate": 0.5,  # Sample 50%
                "log_level": "INFO"
            },
            "production": {
                "project": "my_app_prod",
                "sampling_rate": 0.1,  # Sample 10%
                "log_level": "WARNING"
            }
        }

    def configure(self, environment: str):
        """Configure for specific environment"""

        config = self.configs.get(environment, self.configs["production"])

        os.environ["LANGCHAIN_PROJECT"] = config["project"]

        # Configure sampling
        if random.random() > config["sampling_rate"]:
            os.environ["LANGCHAIN_TRACING_V2"] = "false"

        logging.getLogger().setLevel(config["log_level"])

        return config

Tracing and Debugging

Advanced Tracing Patterns

from langsmith.run_helpers import traceable
from contextlib import contextmanager
from typing import Generator

class TracingManager:
    """Advanced tracing management"""

    @staticmethod
    @contextmanager
    def trace_context(
        name: str,
        metadata: Optional[Dict[str, Any]] = None,
        tags: Optional[List[str]] = None
    ) -> Generator:
        """Context manager for tracing"""

        with tracing_v2_enabled(
            project_name=os.environ.get("LANGCHAIN_PROJECT"),
            tags=tags or [],
            metadata=metadata or {}
        ) as tracer:
            try:
                yield tracer
            except Exception as e:
                # Log error to LangSmith
                tracer.on_error(error=e)
                raise

    @staticmethod
    @traceable(run_type="chain", name="custom_operation")
    async def traced_operation(
        input_data: dict,
        config: Optional[dict] = None
    ) -> dict:
        """Traced async operation"""

        # Operation logic
        result = await process_data(input_data)

        # Add custom metadata
        from langsmith.run_trees import get_current_run_tree
        run_tree = get_current_run_tree()

        if run_tree:
            run_tree.add_metadata({
                "custom_metric": calculate_metric(result),
                "processing_time": time.time(),
                "config": config or {}
            })

        return result

class DebugTracer:
    """Enhanced debugging capabilities"""

    def __init__(self, client: Client):
        self.client = client

    async def replay_run(self, run_id: str) -> dict:
        """Replay a specific run for debugging"""

        # Fetch run details
        run = self.client.read_run(run_id)

        # Extract inputs and config
        inputs = run.inputs
        config = run.extra.get("config", {})

        # Re-run with debug mode
        with self.debug_mode():
            # Recreate the chain/agent
            chain = self._recreate_chain(run.serialized)

            # Execute with same inputs
            result = await chain.ainvoke(inputs, config=config)

            return {
                "original": run.outputs,
                "replay": result,
                "diff": self._compare_outputs(run.outputs, result)
            }

    def analyze_failures(self, project_name: str, hours: int = 24) -> dict:
        """Analyze recent failures"""

        from datetime import datetime, timedelta

        # Query failed runs
        end_time = datetime.now()
        start_time = end_time - timedelta(hours=hours)

        runs = self.client.list_runs(
            project_name=project_name,
            start_time=start_time,
            end_time=end_time,
            error=True  # Only failed runs
        )

        # Analyze failure patterns
        failures = {}
        for run in runs:
            error_type = self._classify_error(run.error)
            failures[error_type] = failures.get(error_type, 0) + 1

        return {
            "total_failures": sum(failures.values()),
            "failure_types": failures,
            "common_errors": self._get_common_errors(runs)
        }

Dataset Management

Creating and Managing Test Datasets

from langsmith.schemas import Dataset, Example
from typing import List, Tuple
import pandas as pd

class DatasetManager:
    """Manage LangSmith datasets"""

    def __init__(self, client: Client):
        self.client = client

    def create_dataset_from_csv(
        self,
        csv_path: str,
        dataset_name: str,
        input_columns: List[str],
        output_columns: List[str],
        metadata: Optional[Dict] = None
    ) -> Dataset:
        """Create dataset from CSV"""

        # Read CSV
        df = pd.read_csv(csv_path)

        # Create dataset
        dataset = self.client.create_dataset(
            dataset_name=dataset_name,
            description=f"Dataset from {csv_path}",
            metadata=metadata or {}
        )

        # Add examples
        for _, row in df.iterrows():
            inputs = {col: row[col] for col in input_columns}
            outputs = {col: row[col] for col in output_columns}

            self.client.create_example(
                dataset_id=dataset.id,
                inputs=inputs,
                outputs=outputs
            )

        return dataset

    def create_golden_dataset(
        self,
        name: str,
        examples: List[Tuple[dict, dict]]
    ) -> Dataset:
        """Create golden dataset for testing"""

        dataset = self.client.create_dataset(
            dataset_name=name,
            description="Golden test dataset",
            metadata={"type": "golden", "version": "1.0"}
        )

        for inputs, expected_outputs in examples:
            self.client.create_example(
                dataset_id=dataset.id,
                inputs=inputs,
                outputs=expected_outputs,
                metadata={"golden": True}
            )

        return dataset

    def version_dataset(
        self,
        dataset_name: str,
        version: str
    ) -> Dataset:
        """Create versioned copy of dataset"""

        # Get original dataset
        original = self.client.read_dataset(dataset_name=dataset_name)

        # Create new version
        versioned_name = f"{dataset_name}_v{version}"
        new_dataset = self.client.create_dataset(
            dataset_name=versioned_name,
            description=f"Version {version} of {dataset_name}",
            metadata={
                "version": version,
                "parent": dataset_name,
                "created_at": datetime.now().isoformat()
            }
        )

        # Copy examples
        examples = self.client.list_examples(dataset_id=original.id)
        for example in examples:
            self.client.create_example(
                dataset_id=new_dataset.id,
                inputs=example.inputs,
                outputs=example.outputs,
                metadata=example.metadata
            )

        return new_dataset

Testing and Evaluation

Automated Testing Framework

from langsmith.evaluation import evaluate, RunEvaluator
from langchain.smith import RunEvalConfig
from typing import Optional

class EvaluationFramework:
    """Comprehensive evaluation framework"""

    def __init__(self, client: Client):
        self.client = client
        self.evaluators = self._setup_evaluators()

    def _setup_evaluators(self) -> List[RunEvaluator]:
        """Configure evaluation metrics"""

        return [
            self.create_accuracy_evaluator(),
            self.create_latency_evaluator(),
            self.create_hallucination_evaluator(),
        ]

    def create_accuracy_evaluator(self) -> RunEvaluator:
        """Evaluate accuracy"""

        class AccuracyEvaluator(RunEvaluator):
            def evaluate_run(
                self,
                run: Run,
                example: Optional[Example] = None
            ) -> dict:
                if not example:
                    return {"score": None}

                # Compare outputs
                predicted = run.outputs.get("output", "")
                expected = example.outputs.get("output", "")

                # Calculate similarity
                from difflib import SequenceMatcher
                similarity = SequenceMatcher(None, predicted, expected).ratio()

                return {
                    "score": similarity,
                    "pass": similarity > 0.8,
                    "feedback": f"Similarity: {similarity:.2%}"
                }

        return AccuracyEvaluator()

    async def run_evaluation(
        self,
        chain,
        dataset_name: str,
        eval_config: Optional[RunEvalConfig] = None
    ) -> dict:
        """Run comprehensive evaluation"""

        config = eval_config or RunEvalConfig(
            evaluators=self.evaluators,
            batch_size=5,
            max_concurrency=3
        )

        results = await evaluate(
            lambda inputs: chain.invoke(inputs),
            data=dataset_name,
            evaluators=config.evaluators,
            client=self.client,
            project_name=f"eval_{dataset_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        )

        return self._analyze_results(results)

Performance Monitoring

Real-Time Performance Tracking

from prometheus_client import Counter, Histogram, Gauge
import time

class PerformanceMonitor:
    """Monitor LangChain application performance"""

    def __init__(self, client: Client):
        self.client = client

        # Prometheus metrics
        self.request_count = Counter(
            'langchain_requests_total',
            'Total requests',
            ['chain_name', 'status']
        )

        self.request_duration = Histogram(
            'langchain_request_duration_seconds',
            'Request duration',
            ['chain_name']
        )

        self.token_usage = Counter(
            'langchain_tokens_total',
            'Total tokens used',
            ['model', 'type']
        )

    async def monitor_chain_execution(self, chain, inputs: dict, chain_name: str):
        """Monitor chain execution with metrics"""

        start_time = time.time()

        try:
            # Execute chain
            result = await chain.ainvoke(inputs)

            # Record success
            self.request_count.labels(
                chain_name=chain_name,
                status="success"
            ).inc()

            return result

        except Exception as e:
            # Record failure
            self.request_count.labels(
                chain_name=chain_name,
                status="failure"
            ).inc()
            raise

        finally:
            # Record duration
            duration = time.time() - start_time
            self.request_duration.labels(chain_name=chain_name).observe(duration)

Cost Tracking

Token Usage and Cost Analysis

class CostTracker:
    """Track and optimize costs"""

    # Pricing per 1k tokens (as of 2024)
    PRICING = {
        "gpt-4": {"prompt": 0.03, "completion": 0.06},
        "gpt-4-turbo": {"prompt": 0.01, "completion": 0.03},
        "gpt-3.5-turbo": {"prompt": 0.0005, "completion": 0.0015},
        "claude-3-opus": {"prompt": 0.015, "completion": 0.075},
        "claude-3-sonnet": {"prompt": 0.003, "completion": 0.015},
    }

    def __init__(self, client: Client):
        self.client = client

    def calculate_run_cost(self, run: Run) -> float:
        """Calculate cost for a single run"""

        if not run.token_usage:
            return 0.0

        model = run.model or "gpt-3.5-turbo"
        pricing = self.PRICING.get(model, self.PRICING["gpt-3.5-turbo"])

        prompt_cost = (run.token_usage.prompt_tokens / 1000) * pricing["prompt"]
        completion_cost = (run.token_usage.completion_tokens / 1000) * pricing["completion"]

        return prompt_cost + completion_cost

    def analyze_costs(
        self,
        project_name: str,
        start_date: datetime,
        end_date: datetime
    ) -> dict:
        """Analyze costs over time period"""

        runs = list(self.client.list_runs(
            project_name=project_name,
            start_time=start_date,
            end_time=end_date
        ))

        # Calculate costs by model
        costs_by_model = {}
        tokens_by_model = {}

        for run in runs:
            if run.token_usage:
                model = run.model or "unknown"
                cost = self.calculate_run_cost(run)

                costs_by_model[model] = costs_by_model.get(model, 0) + cost
                tokens_by_model[model] = tokens_by_model.get(model, 0) + run.token_usage.total_tokens

        return {
            "total_cost": sum(costs_by_model.values()),
            "costs_by_model": costs_by_model,
            "tokens_by_model": tokens_by_model,
            "avg_cost_per_run": sum(costs_by_model.values()) / len(runs) if runs else 0
        }

    def optimize_costs(self, project_name: str) -> List[dict]:
        """Generate cost optimization recommendations"""

        recommendations = []

        # Analyze recent runs
        runs = list(self.client.list_runs(
            project_name=project_name,
            limit=1000
        ))

        # Check for expensive models on simple tasks
        for run in runs:
            if run.model == "gpt-4" and run.token_usage:
                if run.token_usage.total_tokens < 500:
                    recommendations.append({
                        "type": "model_downgrade",
                        "run_id": run.id,
                        "current_model": "gpt-4",
                        "recommended_model": "gpt-3.5-turbo",
                        "potential_savings": self._calculate_savings(run, "gpt-3.5-turbo")
                    })

        return recommendations

Production Best Practices

Complete Production Setup

class ProductionLangSmith:
    """Production-ready LangSmith setup"""

    def __init__(self, config: dict):
        self.config = config
        self.client = Client(api_key=config["api_key"])
        self.setup_production_features()

    def setup_production_features(self):
        """Configure all production features"""

        # Setup monitoring
        self.monitor = PerformanceMonitor(self.client)

        # Setup cost tracking
        self.cost_tracker = CostTracker(self.client)

        # Setup evaluation
        self.evaluator = EvaluationFramework(self.client)

        # Setup debugging
        self.debugger = DebugTracer(self.client)

    async def production_chain_wrapper(
        self,
        chain,
        chain_name: str
    ):
        """Wrap chain with production features"""

        async def wrapped_chain(inputs: dict) -> dict:
            # Start monitoring
            with self.monitor.monitor_chain_execution(
                chain,
                inputs,
                chain_name
            ):
                # Add custom metadata
                metadata = {
                    "environment": self.config["environment"],
                    "version": self.config["version"],
                    "user_id": inputs.get("user_id"),
                }

                # Execute with tracing
                with TracingManager.trace_context(
                    name=chain_name,
                    metadata=metadata,
                    tags=[self.config["environment"]]
                ):
                    result = await chain.ainvoke(inputs)
                    return result

        return wrapped_chain

    def generate_daily_report(self) -> dict:
        """Generate daily operations report"""

        end_date = datetime.now()
        start_date = end_date - timedelta(days=1)

        return {
            "date": end_date.date().isoformat(),
            "performance": self.monitor.create_dashboard_metrics(
                self.config["project_name"]
            ),
            "costs": self.cost_tracker.analyze_costs(
                self.config["project_name"],
                start_date,
                end_date
            ),
            "failures": self.debugger.analyze_failures(
                self.config["project_name"],
                hours=24
            )
        }

Summary

LangSmith provides comprehensive observability for production LangChain applications:

  • Complete Tracing: Every operation is tracked and searchable
  • Dataset Management: Version-controlled test data
  • Automated Testing: Regression testing and quality metrics
  • Performance Monitoring: Real-time metrics and alerting
  • Cost Optimization: Token usage tracking and recommendations
  • Debugging Tools: Replay and analyze failed runs

These capabilities are essential for maintaining reliable, cost-effective AI applications in production.


Series Navigation

This is Part 5 of the LangChain Series.

Previous: ← Part 4 - LangGraph: Advanced Patterns

Complete Series:


Tags: #LangSmith #Monitoring #Observability #Production #LangChain