LangGraph: Building Stateful AI Applications with Graph Architecture

Cover Image for LangGraph: Building Stateful AI Applications with Graph Architecture
AI & Machine Learning5 min read

Traditional LangChain chains are linear and stateless. LangGraph introduces a revolutionary graph-based approach that enables stateful execution, cyclic workflows, and complex multi-agent systems.

Why LangGraph?

The Evolution from Chains to Graphs

LangGraph introduces:

  • Stateful execution: Maintain context across complex workflows
  • Cyclic graphs: Support loops and iterative refinement
  • Conditional branching: Dynamic routing based on state
  • Checkpointing: Save and restore execution state
  • Parallel execution: Run multiple nodes concurrently
  • Human-in-the-loop: Pause for human intervention
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

# Traditional Chain (Limited)
chain = prompt | llm | parser  # Linear, no state, no cycles

# LangGraph (Powerful)
graph = StateGraph(State)
graph.add_node("analyze", analyze_fn)
graph.add_node("decide", decide_fn)
graph.add_edge("analyze", "decide")
graph.add_conditional_edges(
    "decide",
    route_fn,
    {"refine": "analyze", "complete": END}
)

Core Concepts

Understanding State Graphs

from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver

class GraphState(TypedDict):
    """Define the state structure"""

    # Message history
    messages: Annotated[List[str], operator.add]

    # Current context
    context: str

    # Decision variables
    should_continue: bool
    confidence_score: float

    # Accumulator for results
    results: Annotated[List[dict], operator.add]

    # Error tracking
    errors: List[str]

    # Metadata
    iteration_count: int
    max_iterations: int

class ProductionGraph:
    """Production-ready graph implementation"""

    def __init__(self):
        self.graph = StateGraph(GraphState)
        self.checkpointer = MemorySaver()
        self._build_graph()

    def _build_graph(self):
        """Construct the graph topology"""

        # Add nodes
        self.graph.add_node("input_processor", self.process_input)
        self.graph.add_node("analyzer", self.analyze)
        self.graph.add_node("decision_maker", self.make_decision)
        self.graph.add_node("executor", self.execute_action)
        self.graph.add_node("validator", self.validate_result)

        # Define edges
        self.graph.set_entry_point("input_processor")
        self.graph.add_edge("input_processor", "analyzer")
        self.graph.add_edge("analyzer", "decision_maker")

        # Conditional routing
        self.graph.add_conditional_edges(
            "decision_maker",
            self.route_decision,
            {
                "execute": "executor",
                "refine": "analyzer",
                "end": END
            }
        )

        # Compile
        self.app = self.graph.compile(checkpointer=self.checkpointer)

Graph Architecture

Designing Complex Workflows

from langgraph.prebuilt import ToolExecutor
from langchain_core.messages import HumanMessage, AIMessage
import asyncio

class MultiAgentGraph:
    """Multi-agent system using LangGraph"""

    def __init__(self):
        self.state_spec = self._define_state()
        self.graph = StateGraph(self.state_spec)
        self.agents = self._initialize_agents()

    def _define_state(self):
        """Define shared state for all agents"""

        class SharedState(TypedDict):
            task: str
            research_data: List[dict]
            analysis: Optional[dict]
            synthesis: Optional[str]
            validation: Optional[dict]
            final_output: Optional[str]
            agent_messages: Annotated[List[dict], operator.add]

        return SharedState

    def build_graph(self):
        """Build multi-agent workflow"""

        # Coordinator decides task allocation
        self.graph.add_node("coordinator", self.coordinate)

        # Parallel research nodes
        self.graph.add_node("research_web", self.research_web)
        self.graph.add_node("research_docs", self.research_docs)
        self.graph.add_node("research_data", self.research_data)

        # Analysis node
        self.graph.add_node("analyze", self.analyze_research)

        # Synthesis node
        self.graph.add_node("synthesize", self.synthesize_findings)

        # Validation node
        self.graph.add_node("validate", self.validate_output)

        # Set up flow
        self.graph.set_entry_point("coordinator")

        # Parallel execution from coordinator
        self.graph.add_edge("coordinator", "research_web")
        self.graph.add_edge("coordinator", "research_docs")
        self.graph.add_edge("coordinator", "research_data")

        # Convergence point
        self.graph.add_edge("research_web", "analyze")
        self.graph.add_edge("research_docs", "analyze")
        self.graph.add_edge("research_data", "analyze")

        # Sequential processing
        self.graph.add_edge("analyze", "synthesize")

        # Validation with potential loop
        self.graph.add_conditional_edges(
            "validate",
            self.validation_routing,
            {
                "approved": END,
                "refine": "synthesize",
                "restart": "coordinator"
            }
        )

        return self.graph.compile()

State Management

Advanced State Patterns

from langgraph.checkpoint import Checkpoint
from typing import Any, Dict
import pickle
import redis

class StateManager:
    """Advanced state management for LangGraph"""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)

    def create_reducer(self):
        """Create custom state reducer"""

        def reducer(current: Any, update: Any) -> Any:
            """Custom reducer for complex state updates"""

            if isinstance(current, list) and isinstance(update, list):
                # Merge lists without duplicates
                return list(set(current + update))
            elif isinstance(current, dict) and isinstance(update, dict):
                # Deep merge dictionaries
                return self._deep_merge(current, update)
            else:
                # Default replacement
                return update

        return reducer

    def _deep_merge(self, dict1: dict, dict2: dict) -> dict:
        """Deep merge two dictionaries"""

        result = dict1.copy()

        for key, value in dict2.items():
            if key in result and isinstance(result[key], dict) and isinstance(value, dict):
                result[key] = self._deep_merge(result[key], value)
            else:
                result[key] = value

        return result

    async def save_checkpoint(self, thread_id: str, checkpoint: Checkpoint):
        """Save checkpoint to Redis"""

        key = f"checkpoint:{thread_id}"
        serialized = pickle.dumps(checkpoint)

        self.redis_client.setex(
            key,
            86400,  # 24 hour TTL
            serialized
        )

    async def load_checkpoint(self, thread_id: str) -> Optional[Checkpoint]:
        """Load checkpoint from Redis"""

        key = f"checkpoint:{thread_id}"
        data = self.redis_client.get(key)

        if data:
            return pickle.loads(data)
        return None

Conditional Routing

Dynamic Workflow Control

class ConditionalRouter:
    """Advanced conditional routing logic"""

    def __init__(self):
        self.llm = ChatOpenAI(temperature=0)

    def create_llm_router(self, routing_prompt: str):
        """Use LLM for intelligent routing"""

        def router(state: dict) -> str:
            # Build context from state
            context = self._build_context(state)

            # Ask LLM to decide route
            prompt = ChatPromptTemplate.from_template(
                routing_prompt + "\n\nContext: {context}\n\nRoute:"
            )

            chain = prompt | self.llm | StrOutputParser()
            route = chain.invoke({"context": context})

            # Validate route
            valid_routes = ["continue", "retry", "escalate", "end"]
            if route.lower() not in valid_routes:
                return "continue"  # Default

            return route.lower()

        return router

    def create_score_based_router(
        self,
        score_field: str,
        thresholds: Dict[str, float]
    ):
        """Route based on score thresholds"""

        def router(state: dict) -> str:
            score = state.get(score_field, 0)

            for route, threshold in sorted(
                thresholds.items(),
                key=lambda x: x[1],
                reverse=True
            ):
                if score >= threshold:
                    return route

            return "low_score_path"

        return router

    def create_multi_condition_router(self):
        """Complex multi-condition routing"""

        def router(state: dict) -> str:
            iteration = state.get("iteration_count", 0)
            confidence = state.get("confidence_score", 0)
            errors = len(state.get("errors", []))

            # Complex routing logic
            if errors > 3:
                return "error_handler"
            elif iteration >= state.get("max_iterations", 5):
                return "force_complete"
            elif confidence > 0.9:
                return "high_confidence_path"
            elif confidence > 0.6:
                return "medium_confidence_path"
            else:
                return "retry"

        return router

Complete RAG System with LangGraph

class RAGGraph:
    """Production RAG system using LangGraph"""

    def __init__(self, vector_store):
        self.vector_store = vector_store
        self.graph = StateGraph(self.RAGState)
        self.llm = ChatOpenAI(temperature=0)
        self._build()

    class RAGState(TypedDict):
        query: str
        retrieved_docs: List[Document]
        reranked_docs: List[Document]
        answer: str
        confidence: float
        sources: List[str]
        needs_clarification: bool
        clarification_questions: List[str]

    def _build(self):
        """Build RAG graph"""

        # Nodes
        self.graph.add_node("retrieve", self.retrieve_documents)
        self.graph.add_node("rerank", self.rerank_documents)
        self.graph.add_node("generate", self.generate_answer)
        self.graph.add_node("validate", self.validate_answer)
        self.graph.add_node("clarify", self.generate_clarification)

        # Flow
        self.graph.set_entry_point("retrieve")
        self.graph.add_edge("retrieve", "rerank")

        # Conditional after reranking
        self.graph.add_conditional_edges(
            "rerank",
            self.check_document_quality,
            {
                "sufficient": "generate",
                "insufficient": "clarify"
            }
        )

        self.graph.add_edge("generate", "validate")

        # Validation routing
        self.graph.add_conditional_edges(
            "validate",
            self.validation_router,
            {
                "accept": END,
                "regenerate": "generate",
                "need_more_docs": "retrieve"
            }
        )

        self.graph.add_edge("clarify", END)

        self.app = self.graph.compile()

    async def retrieve_documents(self, state: dict) -> dict:
        """Retrieve relevant documents"""

        docs = await self.vector_store.asimilarity_search(
            state["query"],
            k=10
        )

        return {
            **state,
            "retrieved_docs": docs
        }

    async def run(self, query: str) -> dict:
        """Execute RAG pipeline"""

        result = await self.app.ainvoke(
            {"query": query},
            config={"configurable": {"thread_id": str(uuid.uuid4())}}
        )

        return result

Summary

LangGraph represents a paradigm shift in building AI applications:

  • Stateful execution enables complex, multi-step workflows
  • Graph architecture provides flexibility beyond linear chains
  • Conditional routing allows dynamic behavior
  • Checkpointing enables pause/resume and recovery
  • Production patterns ensure reliability and observability

The graph-based approach is particularly powerful for:

  • Multi-agent systems
  • Complex decision trees
  • Iterative refinement workflows
  • Human-in-the-loop applications
  • Stateful conversation systems

Series Navigation

This is Part 3 of the LangChain Series.

Previous: ← Part 2 - LangChain Fundamentals: Chains and Agents Next: Part 4 - LangGraph: Advanced Patterns →

Complete Series:


Tags: #LangGraph #StateGraph #MultiAgent #LangChain #AI