Building on the fundamentals, let's explore advanced LangChain concepts that enable production-grade AI applications. We'll cover LCEL patterns, memory systems, agent architectures, and robust error handling.
Understanding LCEL
LangChain Expression Language (LCEL)
LCEL provides a declarative way to build complex workflows:
from langchain_core.runnables import (
RunnablePassthrough,
RunnableParallel,
RunnableLambda,
RunnableBranch,
)
from operator import itemgetter
class LCELPatterns:
"""Common LCEL patterns for production use"""
@staticmethod
def parallel_processing():
"""Execute multiple operations in parallel"""
chain = RunnableParallel(
# Execute three operations simultaneously
summary=ChatPromptTemplate.from_template("Summarize: {text}") | llm,
sentiment=ChatPromptTemplate.from_template("Analyze sentiment: {text}") | llm,
keywords=ChatPromptTemplate.from_template("Extract keywords: {text}") | llm,
)
return chain
@staticmethod
def conditional_routing():
"""Route based on conditions"""
def route_function(x):
if "technical" in x.get("query", "").lower():
return "technical_chain"
return "general_chain"
technical_chain = ChatPromptTemplate.from_template(
"Technical expert response: {query}"
) | llm
general_chain = ChatPromptTemplate.from_template(
"General response: {query}"
) | llm
branch = RunnableBranch(
(lambda x: "technical" in x["query"].lower(), technical_chain),
general_chain, # Default
)
return branch
Advanced Chain Composition
from typing import Dict, List, Any
class ChainOrchestrator:
"""Orchestrate complex multi-step chains"""
def __init__(self):
self.llm = ChatOpenAI(temperature=0)
def build_research_chain(self):
"""Multi-step research chain with validation"""
# Step 1: Query understanding
understand_query = ChatPromptTemplate.from_messages([
("system", "Extract the main topic and subtopics from the query"),
("human", "{query}")
]) | self.llm | JsonOutputParser()
# Step 2: Research planning
plan_research = ChatPromptTemplate.from_messages([
("system", "Create a research plan with steps"),
("human", "Topic: {topic}\nSubtopics: {subtopics}")
]) | self.llm
# Step 3: Synthesize results
synthesize = ChatPromptTemplate.from_messages([
("system", "Synthesize research findings"),
("human", "Findings: {findings}")
]) | self.llm
# Compose the full chain
chain = (
RunnablePassthrough.assign(
parsed=understand_query
)
| RunnablePassthrough.assign(
plan=lambda x: plan_research.invoke({
"topic": x["parsed"]["topic"],
"subtopics": x["parsed"]["subtopics"]
})
)
| RunnablePassthrough.assign(
synthesis=lambda x: synthesize.invoke({
"findings": x["research_results"]
})
)
)
return chain
Memory Systems
Conversation Memory Implementation
from langchain.memory import (
ConversationBufferMemory,
ConversationSummaryMemory,
ConversationBufferWindowMemory,
)
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from typing import List, Optional
import redis
import json
class ProductionMemorySystem:
"""Scalable memory system for production"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.llm = ChatOpenAI(temperature=0)
def create_hybrid_memory(self, session_id: str):
"""Hybrid memory combining multiple strategies"""
# Short-term: Buffer for recent messages
buffer_memory = ConversationBufferWindowMemory(
k=5, # Keep last 5 exchanges
return_messages=True,
memory_key="recent_history"
)
# Long-term: Summary of older conversations
summary_memory = ConversationSummaryMemory(
llm=self.llm,
return_messages=True,
memory_key="summary_history"
)
return {
"buffer": buffer_memory,
"summary": summary_memory,
}
def save_to_redis(self, session_id: str, messages: List[BaseMessage]):
"""Persist conversation to Redis"""
key = f"conversation:{session_id}"
# Serialize messages
serialized = [
{
"type": msg.__class__.__name__,
"content": msg.content,
}
for msg in messages
]
self.redis_client.setex(
key,
3600 * 24, # 24 hour TTL
json.dumps(serialized)
)
def load_from_redis(self, session_id: str) -> List[BaseMessage]:
"""Load conversation from Redis"""
key = f"conversation:{session_id}"
data = self.redis_client.get(key)
if not data:
return []
serialized = json.loads(data)
messages = []
for msg_data in serialized:
if msg_data["type"] == "HumanMessage":
msg = HumanMessage(content=msg_data["content"])
elif msg_data["type"] == "AIMessage":
msg = AIMessage(content=msg_data["content"])
messages.append(msg)
return messages
Agent Architectures
ReAct Agent Implementation
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import Tool, StructuredTool
from pydantic import BaseModel, Field
class ReActAgent:
"""Production ReAct (Reasoning + Acting) agent"""
def __init__(self):
self.llm = ChatOpenAI(temperature=0, model="gpt-4")
self.tools = self._setup_tools()
self.agent = self._create_agent()
def _setup_tools(self) -> List[Tool]:
"""Configure agent tools"""
# Define tool schemas
class CalculatorInput(BaseModel):
expression: str = Field(description="Mathematical expression")
class SearchInput(BaseModel):
query: str = Field(description="Search query")
max_results: int = Field(default=5)
tools = [
StructuredTool.from_function(
func=self._calculator,
args_schema=CalculatorInput,
name="Calculator",
description="Perform mathematical calculations"
),
StructuredTool.from_function(
func=self._search,
args_schema=SearchInput,
name="Search",
description="Search for information"
),
]
return tools
def _create_agent(self):
"""Create ReAct agent with custom prompt"""
from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate.from_template("""
You are an AI assistant with access to tools.
Tools available:
{tools}
To use a tool, follow this format:
Thought: I need to [reasoning about what to do]
Action: [tool_name]
Action Input: [input to the tool]
Observation: [tool response]
Thought: I now have the final answer
Final Answer: [your final response]
Question: {input}
{agent_scratchpad}
""")
agent = create_react_agent(
llm=self.llm,
tools=self.tools,
prompt=prompt
)
return AgentExecutor(
agent=agent,
tools=self.tools,
verbose=True,
max_iterations=5,
max_execution_time=60,
handle_parsing_errors=True,
)
def _calculator(self, expression: str) -> str:
"""Safe mathematical evaluation"""
try:
import numexpr as ne
result = ne.evaluate(expression)
return str(result)
except Exception as e:
return f"Calculation error: {e}"
def _search(self, query: str, max_results: int = 5) -> str:
"""Implement search functionality"""
return f"Search results for '{query}'"
def run(self, query: str) -> str:
"""Execute agent with query"""
return self.agent.invoke({"input": query})["output"]
Plan-and-Execute Agent
class PlanAndExecuteAgent:
"""Agent that plans before executing"""
def __init__(self):
self.planner = self._create_planner()
self.executor = self._create_executor()
def _create_planner(self):
"""Create planning chain"""
planner_prompt = ChatPromptTemplate.from_messages([
("system", """You are a planning assistant.
Break down the task into clear steps.
Output a numbered list of steps."""),
("human", "{task}")
])
return planner_prompt | ChatOpenAI(temperature=0) | StrOutputParser()
def _create_executor(self):
"""Create execution chain"""
executor_prompt = ChatPromptTemplate.from_messages([
("system", "Execute this step: {step}"),
("human", "Context: {context}")
])
return executor_prompt | ChatOpenAI(temperature=0)
async def run(self, task: str) -> Dict[str, Any]:
"""Plan and execute task"""
# Generate plan
plan = await self.planner.ainvoke({"task": task})
steps = [s.strip() for s in plan.split("\n") if s.strip()]
# Execute plan
results = []
context = ""
for i, step in enumerate(steps):
result = await self.executor.ainvoke({
"step": step,
"context": context
})
results.append({
"step": i + 1,
"description": step,
"result": result.content
})
# Update context
context += f"\nStep {i+1} result: {result.content}"
return {
"task": task,
"plan": steps,
"execution": results,
"final_output": results[-1]["result"] if results else None
}
Production Patterns
Error Handling and Retry Logic
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
import logging
class RobustChain:
"""Production-ready chain with error handling"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.fallback_llm = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(Exception)
)
async def execute_with_retry(self, chain, input_data):
"""Execute chain with retry logic"""
try:
return await chain.ainvoke(input_data)
except Exception as e:
self.logger.error(f"Chain execution failed: {e}")
raise
async def execute_with_fallback(self, primary_chain, input_data):
"""Execute with fallback chain"""
try:
return await self.execute_with_retry(primary_chain, input_data)
except Exception as e:
self.logger.warning(f"Primary failed, using fallback: {e}")
# Create simpler fallback chain
fallback_chain = (
ChatPromptTemplate.from_template("Simple response: {input}")
| self.fallback_llm
)
return await fallback_chain.ainvoke(input_data)
Summary
In this second part, we've explored advanced LangChain concepts:
- LCEL composition patterns
- Complex chain orchestration
- Production memory systems
- Agent architectures (ReAct, Plan-and-Execute)
- Error handling and monitoring
These patterns form the backbone of production AI applications, providing the structure and reliability needed for real-world deployment.
Series Navigation
This is Part 2 of the LangChain Series.
Previous: ← Part 1 - LangChain Fundamentals: Core Components Next: Part 3 - LangGraph: Building Stateful Applications →
Complete Series:
- Part 0: Introduction to LangChain Ecosystem
- Part 1: LangChain Fundamentals - Core Components
- Part 2: LangChain Fundamentals - Chains and Agents (You are here)
- Part 3: LangGraph - Building Stateful Applications
- Part 4: LangGraph - Advanced Patterns
- Part 5: LangSmith - Production Monitoring
Tags: #LangChain #AI #Agents #LCEL #Python

