LangChain Fundamentals - Part 2: Chains, Agents, and Memory

Cover Image for LangChain Fundamentals - Part 2: Chains, Agents, and Memory
AI & Machine Learning5 min read

Building on the fundamentals, let's explore advanced LangChain concepts that enable production-grade AI applications. We'll cover LCEL patterns, memory systems, agent architectures, and robust error handling.

Understanding LCEL

LangChain Expression Language (LCEL)

LCEL provides a declarative way to build complex workflows:

from langchain_core.runnables import (
    RunnablePassthrough,
    RunnableParallel,
    RunnableLambda,
    RunnableBranch,
)
from operator import itemgetter

class LCELPatterns:
    """Common LCEL patterns for production use"""

    @staticmethod
    def parallel_processing():
        """Execute multiple operations in parallel"""

        chain = RunnableParallel(
            # Execute three operations simultaneously
            summary=ChatPromptTemplate.from_template("Summarize: {text}") | llm,
            sentiment=ChatPromptTemplate.from_template("Analyze sentiment: {text}") | llm,
            keywords=ChatPromptTemplate.from_template("Extract keywords: {text}") | llm,
        )

        return chain

    @staticmethod
    def conditional_routing():
        """Route based on conditions"""

        def route_function(x):
            if "technical" in x.get("query", "").lower():
                return "technical_chain"
            return "general_chain"

        technical_chain = ChatPromptTemplate.from_template(
            "Technical expert response: {query}"
        ) | llm

        general_chain = ChatPromptTemplate.from_template(
            "General response: {query}"
        ) | llm

        branch = RunnableBranch(
            (lambda x: "technical" in x["query"].lower(), technical_chain),
            general_chain,  # Default
        )

        return branch

Advanced Chain Composition

from typing import Dict, List, Any

class ChainOrchestrator:
    """Orchestrate complex multi-step chains"""

    def __init__(self):
        self.llm = ChatOpenAI(temperature=0)

    def build_research_chain(self):
        """Multi-step research chain with validation"""

        # Step 1: Query understanding
        understand_query = ChatPromptTemplate.from_messages([
            ("system", "Extract the main topic and subtopics from the query"),
            ("human", "{query}")
        ]) | self.llm | JsonOutputParser()

        # Step 2: Research planning
        plan_research = ChatPromptTemplate.from_messages([
            ("system", "Create a research plan with steps"),
            ("human", "Topic: {topic}\nSubtopics: {subtopics}")
        ]) | self.llm

        # Step 3: Synthesize results
        synthesize = ChatPromptTemplate.from_messages([
            ("system", "Synthesize research findings"),
            ("human", "Findings: {findings}")
        ]) | self.llm

        # Compose the full chain
        chain = (
            RunnablePassthrough.assign(
                parsed=understand_query
            )
            | RunnablePassthrough.assign(
                plan=lambda x: plan_research.invoke({
                    "topic": x["parsed"]["topic"],
                    "subtopics": x["parsed"]["subtopics"]
                })
            )
            | RunnablePassthrough.assign(
                synthesis=lambda x: synthesize.invoke({
                    "findings": x["research_results"]
                })
            )
        )

        return chain

Memory Systems

Conversation Memory Implementation

from langchain.memory import (
    ConversationBufferMemory,
    ConversationSummaryMemory,
    ConversationBufferWindowMemory,
)
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from typing import List, Optional
import redis
import json

class ProductionMemorySystem:
    """Scalable memory system for production"""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)
        self.llm = ChatOpenAI(temperature=0)

    def create_hybrid_memory(self, session_id: str):
        """Hybrid memory combining multiple strategies"""

        # Short-term: Buffer for recent messages
        buffer_memory = ConversationBufferWindowMemory(
            k=5,  # Keep last 5 exchanges
            return_messages=True,
            memory_key="recent_history"
        )

        # Long-term: Summary of older conversations
        summary_memory = ConversationSummaryMemory(
            llm=self.llm,
            return_messages=True,
            memory_key="summary_history"
        )

        return {
            "buffer": buffer_memory,
            "summary": summary_memory,
        }

    def save_to_redis(self, session_id: str, messages: List[BaseMessage]):
        """Persist conversation to Redis"""

        key = f"conversation:{session_id}"

        # Serialize messages
        serialized = [
            {
                "type": msg.__class__.__name__,
                "content": msg.content,
            }
            for msg in messages
        ]

        self.redis_client.setex(
            key,
            3600 * 24,  # 24 hour TTL
            json.dumps(serialized)
        )

    def load_from_redis(self, session_id: str) -> List[BaseMessage]:
        """Load conversation from Redis"""

        key = f"conversation:{session_id}"
        data = self.redis_client.get(key)

        if not data:
            return []

        serialized = json.loads(data)
        messages = []

        for msg_data in serialized:
            if msg_data["type"] == "HumanMessage":
                msg = HumanMessage(content=msg_data["content"])
            elif msg_data["type"] == "AIMessage":
                msg = AIMessage(content=msg_data["content"])
            messages.append(msg)

        return messages

Agent Architectures

ReAct Agent Implementation

from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import Tool, StructuredTool
from pydantic import BaseModel, Field

class ReActAgent:
    """Production ReAct (Reasoning + Acting) agent"""

    def __init__(self):
        self.llm = ChatOpenAI(temperature=0, model="gpt-4")
        self.tools = self._setup_tools()
        self.agent = self._create_agent()

    def _setup_tools(self) -> List[Tool]:
        """Configure agent tools"""

        # Define tool schemas
        class CalculatorInput(BaseModel):
            expression: str = Field(description="Mathematical expression")

        class SearchInput(BaseModel):
            query: str = Field(description="Search query")
            max_results: int = Field(default=5)

        tools = [
            StructuredTool.from_function(
                func=self._calculator,
                args_schema=CalculatorInput,
                name="Calculator",
                description="Perform mathematical calculations"
            ),
            StructuredTool.from_function(
                func=self._search,
                args_schema=SearchInput,
                name="Search",
                description="Search for information"
            ),
        ]

        return tools

    def _create_agent(self):
        """Create ReAct agent with custom prompt"""

        from langchain_core.prompts import PromptTemplate

        prompt = PromptTemplate.from_template("""
        You are an AI assistant with access to tools.

        Tools available:
        {tools}

        To use a tool, follow this format:
        Thought: I need to [reasoning about what to do]
        Action: [tool_name]
        Action Input: [input to the tool]
        Observation: [tool response]

        Thought: I now have the final answer
        Final Answer: [your final response]

        Question: {input}
        {agent_scratchpad}
        """)

        agent = create_react_agent(
            llm=self.llm,
            tools=self.tools,
            prompt=prompt
        )

        return AgentExecutor(
            agent=agent,
            tools=self.tools,
            verbose=True,
            max_iterations=5,
            max_execution_time=60,
            handle_parsing_errors=True,
        )

    def _calculator(self, expression: str) -> str:
        """Safe mathematical evaluation"""
        try:
            import numexpr as ne
            result = ne.evaluate(expression)
            return str(result)
        except Exception as e:
            return f"Calculation error: {e}"

    def _search(self, query: str, max_results: int = 5) -> str:
        """Implement search functionality"""
        return f"Search results for '{query}'"

    def run(self, query: str) -> str:
        """Execute agent with query"""
        return self.agent.invoke({"input": query})["output"]

Plan-and-Execute Agent

class PlanAndExecuteAgent:
    """Agent that plans before executing"""

    def __init__(self):
        self.planner = self._create_planner()
        self.executor = self._create_executor()

    def _create_planner(self):
        """Create planning chain"""

        planner_prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a planning assistant.
            Break down the task into clear steps.
            Output a numbered list of steps."""),
            ("human", "{task}")
        ])

        return planner_prompt | ChatOpenAI(temperature=0) | StrOutputParser()

    def _create_executor(self):
        """Create execution chain"""

        executor_prompt = ChatPromptTemplate.from_messages([
            ("system", "Execute this step: {step}"),
            ("human", "Context: {context}")
        ])

        return executor_prompt | ChatOpenAI(temperature=0)

    async def run(self, task: str) -> Dict[str, Any]:
        """Plan and execute task"""

        # Generate plan
        plan = await self.planner.ainvoke({"task": task})
        steps = [s.strip() for s in plan.split("\n") if s.strip()]

        # Execute plan
        results = []
        context = ""

        for i, step in enumerate(steps):
            result = await self.executor.ainvoke({
                "step": step,
                "context": context
            })

            results.append({
                "step": i + 1,
                "description": step,
                "result": result.content
            })

            # Update context
            context += f"\nStep {i+1} result: {result.content}"

        return {
            "task": task,
            "plan": steps,
            "execution": results,
            "final_output": results[-1]["result"] if results else None
        }

Production Patterns

Error Handling and Retry Logic

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)
import logging

class RobustChain:
    """Production-ready chain with error handling"""

    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.fallback_llm = ChatOpenAI(
            model="gpt-3.5-turbo",
            temperature=0
        )

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        retry=retry_if_exception_type(Exception)
    )
    async def execute_with_retry(self, chain, input_data):
        """Execute chain with retry logic"""

        try:
            return await chain.ainvoke(input_data)
        except Exception as e:
            self.logger.error(f"Chain execution failed: {e}")
            raise

    async def execute_with_fallback(self, primary_chain, input_data):
        """Execute with fallback chain"""

        try:
            return await self.execute_with_retry(primary_chain, input_data)
        except Exception as e:
            self.logger.warning(f"Primary failed, using fallback: {e}")

            # Create simpler fallback chain
            fallback_chain = (
                ChatPromptTemplate.from_template("Simple response: {input}")
                | self.fallback_llm
            )

            return await fallback_chain.ainvoke(input_data)

Summary

In this second part, we've explored advanced LangChain concepts:

  • LCEL composition patterns
  • Complex chain orchestration
  • Production memory systems
  • Agent architectures (ReAct, Plan-and-Execute)
  • Error handling and monitoring

These patterns form the backbone of production AI applications, providing the structure and reliability needed for real-world deployment.


Series Navigation

This is Part 2 of the LangChain Series.

Previous: ← Part 1 - LangChain Fundamentals: Core Components Next: Part 3 - LangGraph: Building Stateful Applications →

Complete Series:


Tags: #LangChain #AI #Agents #LCEL #Python