Multi-Agent Banking Transaction Review System with LangGraph
Multi-Agent Banking Transaction Review System
Building a hierarchical multi-agent system using LangGraph for intelligent transaction monitoring, risk assessment, and automated customer notification
π Table of Contents
Project Overview
Implementation
- Environment Setup & Dependencies
- Data Models with Pydantic
- State Management
- Agent Implementations
- Orchestration & Routing
- Running the System
Results & Extensions
π― Introduction & Architecture
In modern banking, reviewing thousands of transactions for fraud detection and risk assessment is a critical but resource-intensive task. This project demonstrates how to build a hierarchical multi-agent system using LangGraph that automatically:
- Interprets unclear transaction names using web search capabilities
- Assesses risk levels using ML-inspired scoring algorithms
- Routes high-risk transactions to appropriate communication channels
- Contacts customers via voice, email, or SMS based on urgency
- Generates comprehensive reports with full audit trails
Why Multi-Agent Architecture?
Traditional monolithic systems struggle with:
- Scalability: Hard to add new capabilities
- Maintainability: Complex logic becomes tangled
- Flexibility: Difficult to modify workflows
- Specialization: Canβt optimize for specific tasks
Our multi-agent approach solves these by:
- Modular Design: Each agent has a single responsibility
- Parallel Processing: Agents can work concurrently
- Easy Extension: Add new agents without modifying others
- Clear Communication: Well-defined message passing
π€ Agent Roles & Responsibilities
Our system consists of six specialized agents coordinated by a supervisor:
1. Supervisor Agent π―
- Role: Orchestrates the entire workflow
- Responsibilities: Routes tasks, tracks progress, ensures proper sequencing
- Decision Making: Uses LLM to determine next steps based on current state
2. Transaction Name Interpreter π
- Role: Clarifies ambiguous merchant names
- Tools: Web search API for unknown merchants
- Output: Interpreted names with confidence scores
3. Risk Ranking Agent β οΈ
- Role: Assesses transaction risk levels
- Algorithm: Mock XGBoost-style scoring
- Categories: High (>0.7), Medium (0.4-0.7), Low (<0.4)
4. Voice Agent π
- Role: Handles high-risk transaction alerts
- Method: Simulated voice AI calls
- Priority: Immediate customer contact
5. Email Agent π§
- Role: Manages medium-risk notifications
- Format: Professional email templates
- Batch Processing: Multiple transactions per email
6. SMS Agent π±
- Role: Sends low-risk transaction alerts
- Constraint: 160 character limit
- Efficiency: Bulk SMS capabilities
π Workflow Visualization
βββββββββββββββββββ β START β ββββββββββ¬βββββββββ β βΌ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β Supervisor ββββββΆβ Name InterpreterββββββΆβ Risk Assessment β βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ β β β βββββββββββββββββββββββββββ β β βΌ βΌ βββββββββββββββ βββββββββββββββ βββββββββββββββ β Voice Agent β β Email Agent β β SMS Agent β β (High Risk)β β (Med Risk) β β (Low Risk) β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β β ββββββββββββββββββββββββ΄βββββββββββββββββββββββ β βΌ βββββββββββββββββββ β Report Generatorβ βββββββββββββββββββ β βΌ βββββββββββ β END β βββββββββββ
# Transaction Review Multi-Agent System
# A hierarchical multi-agent system for banking transaction monitoring and risk assessment
import operator
import json
import random
from datetime import datetime, date
from typing import Annotated, List, Dict, Any, Optional, Literal
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
from langchain_core.tools import tool
from langchain.output_parsers import PydanticOutputParser
import os
from dotenv import load_dotenv
π§ Environment Setup & Dependencies
First, weβll set up our environment and initialize the language model. Make sure to have your OpenAI API key in a .env
file:
OPENAI_API_KEY=your-api-key-here
# =============================================================================
# ENVIRONMENT SETUP
# =============================================================================
# Load environment variables from .env file
load_dotenv()
# Verify OpenAI API key is loaded
if not os.getenv("OPENAI_API_KEY"):
raise ValueError("OPENAI_API_KEY not found in environment variables. Please check your .env file.")
# Initialize the language model
llm = ChatOpenAI(model="gpt-4o", temperature=0)
print("π¦ Transaction Review Multi-Agent System Initialized")
print("="*60)
π¦ Transaction Review Multi-Agent System Initialized
π Data Models with Pydantic
We use Pydantic models to ensure type safety and data validation throughout our system. This approach provides:
- Type hints for better IDE support
- Automatic validation of data
- Clear documentation of data structures
- Easy serialization to/from JSON
# =============================================================================
# PYDANTIC DATA MODELS
# =============================================================================
class Transaction(BaseModel):
"""Raw transaction record from customer account"""
transaction_id: str
date: date
amount: float
currency: str = "USD"
transaction_name: str # Raw merchant/transaction description
account_number: str
transaction_type: Literal["debit", "credit"] = "debit"
metadata: Dict[str, Any] = {}
class InterpretedTransaction(BaseModel):
"""Transaction with interpreted/clarified merchant information"""
transaction: Transaction
interpreted_name: str
merchant_category: Optional[str] = None
confidence_score: float = Field(ge=0.0, le=1.0)
interpretation_source: Literal["direct", "web_search", "unknown"] = "direct"
class RiskAssessment(BaseModel):
"""Risk assessment result for a transaction"""
transaction_id: str
risk_level: Literal["high", "medium", "low"]
risk_score: float = Field(ge=0.0, le=1.0)
risk_factors: List[str] = []
reasoning: str
class CustomerResponse(BaseModel):
"""Customer response to transaction alert"""
transaction_id: str
response_type: Literal["approved", "cancelled", "no_response"]
response_method: Literal["voice", "email", "sms"]
timestamp: datetime
additional_notes: Optional[str] = None
class TransactionReviewResult(BaseModel):
"""Final result for a single transaction"""
transaction: Transaction
interpreted_transaction: InterpretedTransaction
risk_assessment: RiskAssessment
customer_response: Optional[CustomerResponse] = None
final_status: Literal["approved", "cancelled", "pending", "auto_approved"]
π§ State Management
The state object is the heart of our LangGraph application. It maintains:
- Current processing status
- Messages between agents
- Intermediate results
- Final outputs
# =============================================================================
# STATE MANAGEMENT
# =============================================================================
class TransactionReviewState(BaseModel):
"""State for the Transaction Review team"""
# Input data
input_data: TransactionReviewInput
# Messages between agents (using operator.add for message accumulation)
messages: Annotated[List[BaseMessage], operator.add] = []
# Processing status flags
name_interpretation_completed: bool = False
risk_ranking_completed: bool = False
high_risk_processed: bool = False
medium_risk_processed: bool = False
low_risk_processed: bool = False
# Intermediate results
interpreted_transactions: List[InterpretedTransaction] = []
risk_assessments: List[RiskAssessment] = []
customer_responses: List[CustomerResponse] = []
# Final results
final_output: Optional[TransactionReviewOutput] = None
# Input/Output models for the orchestrator
class TransactionReviewInput(BaseModel):
"""Input for the Transaction Review Orchestrator"""
customer_id: str
customer_phone: str
customer_email: str
transactions: List[Transaction]
review_period: str # e.g., "2024-01" for January 2024
class TransactionReviewOutput(BaseModel):
"""Output from the Transaction Review Orchestrator"""
customer_id: str
review_period: str
total_transactions: int
transactions_by_risk: Dict[str, int]
transaction_results: List[TransactionReviewResult]
summary_report: str
π οΈ Mock Tools and Utilities
Since weβre building a demonstration system, weβll create mock implementations of external services. In a production environment, these would be replaced with actual APIs.
# =============================================================================
# MOCK TOOLS AND UTILITIES
# =============================================================================
@tool
def web_search_merchant(query: str) -> str:
"""Mock web search tool for merchant information lookup"""
# Simulate web search results for unknown merchants
mock_results = {
"AMZN": "Amazon - E-commerce platform",
"UBER": "Uber Technologies - Ride sharing and delivery",
"GOOGL": "Google - Technology and advertising",
"MSFT": "Microsoft - Software and cloud services",
"NFLX": "Netflix - Streaming entertainment",
"TSLA": "Tesla - Electric vehicles and energy",
"default": f"Unknown merchant: {query} - Online retailer or service provider"
}
# Simple keyword matching
for key, value in mock_results.items():
if key.lower() in query.lower():
return value
return mock_results["default"]
def mock_xgboost_risk_model(transaction: InterpretedTransaction) -> RiskAssessment:
"""Mock XGBoost model for transaction risk assessment"""
# Simulate risk scoring based on various factors
risk_factors = []
risk_score = 0.1 # Base risk
# Amount-based risk
if transaction.transaction.amount > 10000:
risk_factors.append("High transaction amount")
risk_score += 0.4
elif transaction.transaction.amount > 1000:
risk_factors.append("Elevated transaction amount")
risk_score += 0.2
# Merchant category risk
suspicious_keywords = ["casino", "crypto", "unknown", "foreign", "wire transfer"]
for keyword in suspicious_keywords:
if keyword.lower() in transaction.interpreted_name.lower():
risk_factors.append(f"Suspicious merchant category: {keyword}")
risk_score += 0.3
# Time-based risk (weekend transactions)
if transaction.transaction.date.weekday() >= 5: # Saturday = 5, Sunday = 6
risk_factors.append("Weekend transaction")
risk_score += 0.1
# Confidence score impact
if transaction.confidence_score < 0.5:
risk_factors.append("Low confidence in merchant identification")
risk_score += 0.2
# Cap risk score at 1.0
risk_score = min(risk_score, 1.0)
# Determine risk level
if risk_score >= 0.7:
risk_level = "high"
elif risk_score >= 0.4:
risk_level = "medium"
else:
risk_level = "low"
reasoning = f"Risk score {risk_score:.2f} based on: {', '.join(risk_factors) if risk_factors else 'normal transaction pattern'}"
return RiskAssessment(
transaction_id=transaction.transaction.transaction_id,
risk_level=risk_level,
risk_score=risk_score,
risk_factors=risk_factors,
reasoning=reasoning
)
def mock_voice_call(phone: str, transaction: InterpretedTransaction) -> CustomerResponse:
"""Mock voice AI agent for customer contact"""
# Simulate customer response (random for demo)
responses = ["approved", "cancelled", "no_response"]
weights = [0.6, 0.3, 0.1] # 60% approve, 30% cancel, 10% no response
response_type = random.choices(responses, weights=weights)[0]
return CustomerResponse(
transaction_id=transaction.transaction.transaction_id,
response_type=response_type,
response_method="voice",
timestamp=datetime.now(),
additional_notes=f"Voice call to {phone} - Customer {response_type} transaction"
)
π€ Agent Implementations
Now letβs implement each specialized agent. Each agent:
- Has a specific system prompt defining its role
- Uses tools when needed
- Parses structured outputs
- Communicates via standardized messages
# =============================================================================
# UTILITY AGENT DEFINITIONS
# =============================================================================
def create_name_interpreter_agent():
"""Transaction Name Interpreter agent with web search capability"""
parser = PydanticOutputParser(pydantic_object=NameInterpreterOutput)
system_prompt = f"""You are the Transaction Name Interpreter agent for a banking transaction review system.
YOUR ROLE:
- Analyze raw transaction names and provide clear, interpretable merchant information
- Determine confidence levels for interpretations
- Use web search tool when transaction names are unclear or unknown
- Categorize merchants by type (e.g., retail, restaurant, gas station, etc.)
PROCESSING GUIDELINES:
1. For clear merchant names (Nike, McDonald's, Shell, etc.) - provide direct interpretation
2. For unclear/abbreviated names - use web_search_merchant tool to get more information
3. Assign confidence scores: 0.9+ for well-known merchants, 0.5-0.8 for searched results, <0.5 for unknown
4. Include merchant category when identifiable
{parser.get_format_instructions()}
EXPECTED OUTPUT FORMAT:
Return a JSON object with "interpreted_transactions" containing interpretations for each transaction."""
return create_react_agent(llm, tools=[web_search_merchant], state_modifier=system_prompt)
def create_risk_ranking_agent():
"""Risk Ranking agent using mock XGBoost model"""
parser = PydanticOutputParser(pydantic_object=RiskRankingOutput)
system_prompt = f"""You are the Risk Ranking agent for transaction review.
YOUR ROLE:
- Assess risk levels for interpreted transactions using ML model insights
- Categorize transactions as high, medium, or low risk
- Provide detailed reasoning for risk assessments
RISK FACTORS TO CONSIDER:
- Transaction amount (high amounts = higher risk)
- Merchant type and reputation
- Transaction timing (weekends, unusual hours)
- Confidence level of merchant identification
- Suspicious keywords or patterns
RISK LEVELS:
- HIGH (0.7+): Requires immediate voice contact with customer
- MEDIUM (0.4-0.69): Email notification required
- LOW (<0.4): SMS notification sufficient
{parser.get_format_instructions()}
EXPECTED OUTPUT FORMAT:
Return a JSON object with "risk_assessments" containing risk analysis for each transaction."""
return create_react_agent(llm, tools=[], state_modifier=system_prompt)
def create_voice_agent():
"""Voice agent for high-risk transaction customer contact"""
parser = PydanticOutputParser(pydantic_object=VoiceAgentOutput)
system_prompt = f"""You are the Voice Agent for high-risk transaction alerts.
YOUR ROLE:
- Simulate calling customers about high-risk transactions
- Gather customer approval or cancellation decisions
- Document customer responses and feedback
CALL PROCESS:
1. Identify yourself as bank representative
2. Describe the flagged transaction clearly
3. Ask for customer confirmation or cancellation
4. Record customer response and any additional notes
{parser.get_format_instructions()}
EXPECTED OUTPUT FORMAT:
Return a JSON object with "customer_response" containing the customer's decision."""
return create_react_agent(llm, tools=[], state_modifier=system_prompt)
def create_email_agent():
"""Email agent for medium-risk transaction notifications"""
parser = PydanticOutputParser(pydantic_object=EmailAgentOutput)
system_prompt = f"""You are the Email Agent for medium-risk transaction notifications.
YOUR ROLE:
- Draft professional email notifications for medium-risk transactions
- Provide clear transaction details and security advice
- Include appropriate call-to-action for customer response
EMAIL CONTENT SHOULD INCLUDE:
- Professional greeting and bank identification
- Clear description of flagged transactions
- Security recommendations
- Contact information for questions
- Clear next steps for customer
{parser.get_format_instructions()}
EXPECTED OUTPUT FORMAT:
Return a JSON object with email details and transaction IDs notified."""
return create_react_agent(llm, tools=[], state_modifier=system_prompt)
def create_sms_agent():
"""SMS agent for low-risk transaction notifications"""
parser = PydanticOutputParser(pydantic_object=SMSAgentOutput)
system_prompt = f"""You are the SMS Agent for low-risk transaction notifications.
YOUR ROLE:
- Create concise SMS messages for low-risk transaction alerts
- Keep messages brief but informative
- Include essential transaction details and bank contact info
SMS GUIDELINES:
- Maximum 160 characters per message
- Include transaction amount and merchant
- Add bank contact number
- Professional but concise tone
{parser.get_format_instructions()}
EXPECTED OUTPUT FORMAT:
Return a JSON object with SMS content and transaction IDs notified."""
return create_react_agent(llm, tools=[], state_modifier=system_prompt)
π― Orchestration & Routing
The supervisor is the brain of our system. It:
- Monitors the current state
- Decides which agent should act next
- Ensures proper workflow sequencing
- Handles edge cases and errors
# =============================================================================
# SUPERVISOR NODE
# =============================================================================
def supervisor_node(state: TransactionReviewState) -> Command[Literal["name_interpreter", "risk_ranking", "voice_agent", "email_agent", "sms_agent", "finalize_results"]]:
"""Supervisor node that routes to appropriate worker based on workflow stage"""
members = [
"name_interpreter",
"risk_ranking",
"voice_agent",
"email_agent",
"sms_agent"
]
system_prompt = """You are the Transaction Review Orchestrator for a banking transaction monitoring system.
YOUR ROLE:
- Coordinate the complete transaction review workflow
- Route tasks to appropriate agents based on current processing stage
- Ensure proper sequencing of transaction analysis and customer notifications
AVAILABLE WORKERS: {members}
WORKFLOW SEQUENCE:
1. **name_interpreter** - Interpret and clarify transaction names (always first)
2. **risk_ranking** - Assess risk levels for all interpreted transactions
3. **voice_agent** - Handle high-risk transactions (if any high-risk found)
4. **email_agent** - Handle medium-risk transactions (if any medium-risk found)
5. **sms_agent** - Handle low-risk transactions (if any low-risk found)
6. **finalize_results** - Generate final report (after all notifications sent)
ROUTING LOGIC:
- Always start with name_interpreter if not completed
- Proceed to risk_ranking after name interpretation
- After risk ranking, handle notifications based on risk levels found
- Only finalize when all applicable risk levels have been processed
Select one worker from: {members}, or "finalize_results" if workflow is complete.
Only respond with the worker name or "finalize_results"."""
# Prepare workflow state context
total_transactions = len(state.input_data.transactions)
high_risk_count = len([r for r in state.risk_assessments if r.risk_level == "high"])
medium_risk_count = len([r for r in state.risk_assessments if r.risk_level == "medium"])
low_risk_count = len([r for r in state.risk_assessments if r.risk_level == "low"])
state_context = f"""
CURRENT WORKFLOW STATE:
- Total Transactions: {total_transactions}
- Name Interpretation Completed: {state.name_interpretation_completed}
- Risk Ranking Completed: {state.risk_ranking_completed}
- High Risk Transactions: {high_risk_count} (Processed: {state.high_risk_processed})
- Medium Risk Transactions: {medium_risk_count} (Processed: {state.medium_risk_processed})
- Low Risk Transactions: {low_risk_count} (Processed: {state.low_risk_processed})
PROCESSING STATUS:
- Interpreted Transactions: {len(state.interpreted_transactions)}
- Risk Assessments: {len(state.risk_assessments)}
- Customer Responses: {len(state.customer_responses)}
"""
# Get routing decision from LLM
routing_messages = [
SystemMessage(content=system_prompt.format(members=members)),
HumanMessage(content=state_context),
]
if state.messages:
routing_messages.extend(state.messages[-2:])
response = llm.invoke(routing_messages)
next_worker = response.content.strip()
# Validate and route
if next_worker == "finalize_results" or next_worker == "FINISH":
return Command(goto="finalize_results")
elif next_worker in members:
return Command(goto=next_worker)
else:
# Fallback logic if LLM gives unexpected response
if not state.name_interpretation_completed:
return Command(goto="name_interpreter")
elif not state.risk_ranking_completed:
return Command(goto="risk_ranking")
elif high_risk_count > 0 and not state.high_risk_processed:
return Command(goto="voice_agent")
elif medium_risk_count > 0 and not state.medium_risk_processed:
return Command(goto="email_agent")
elif low_risk_count > 0 and not state.low_risk_processed:
return Command(goto="sms_agent")
else:
return Command(goto="finalize_results")
π Worker Node Implementations
Each worker node follows a similar pattern:
- Process the assigned task
- Update the state with results
- Return control to the supervisor
# =============================================================================
# WORKER NODES (Showing key implementations)
# =============================================================================
def name_interpreter_node(state: TransactionReviewState) -> Command[Literal["supervisor"]]:
"""Transaction Name Interpreter node"""
agent = create_name_interpreter_agent()
input_model = NameInterpreterInput(transactions=state.input_data.transactions)
analysis_request = f"""
Analyze and interpret the following transaction names:
INPUT DATA:
{input_model.model_dump_json(indent=2)}
For each transaction, provide clear merchant identification, confidence score, and category.
Use web search for unclear merchant names.
"""
result = agent.invoke({"messages": state.messages + [HumanMessage(content=analysis_request)]})
# Parse results
parser = PydanticOutputParser(pydantic_object=NameInterpreterOutput)
interpreted_transactions = []
try:
last_message_content = result["messages"][-1].content
parsed_output = parser.parse(last_message_content)
interpreted_transactions = parsed_output.interpreted_transactions
print(f"β
Successfully interpreted {len(interpreted_transactions)} transactions")
except Exception as e:
print(f"β Failed to parse name interpretation output: {e}")
# Fallback: create basic interpretations
for txn in state.input_data.transactions:
interpreted_transactions.append(InterpretedTransaction(
transaction=txn,
interpreted_name=txn.transaction_name,
confidence_score=0.5,
interpretation_source="direct"
))
summary_message = HumanMessage(content=f"""
Name Interpretation COMPLETED.
- Processed {len(state.input_data.transactions)} transactions
- Generated {len(interpreted_transactions)} interpretations
- Ready for risk assessment
NEXT STEP: Proceed to risk_ranking for transaction risk analysis.
""", name="name_interpreter")
return Command(
update={
"messages": result["messages"] + [summary_message],
"name_interpretation_completed": True,
"interpreted_transactions": interpreted_transactions,
},
goto="supervisor"
)
def risk_ranking_node(state: TransactionReviewState) -> Command[Literal["supervisor"]]:
"""Risk Ranking node with mock XGBoost model"""
agent = create_risk_ranking_agent()
# Use mock XGBoost model for each transaction
risk_assessments = []
for interpreted_txn in state.interpreted_transactions:
risk_assessment = mock_xgboost_risk_model(interpreted_txn)
risk_assessments.append(risk_assessment)
# Count risk levels
high_risk = len([r for r in risk_assessments if r.risk_level == "high"])
medium_risk = len([r for r in risk_assessments if r.risk_level == "medium"])
low_risk = len([r for r in risk_assessments if r.risk_level == "low"])
print(f"π Risk Assessment Results:")
print(f" High Risk: {high_risk} transactions")
print(f" Medium Risk: {medium_risk} transactions")
print(f" Low Risk: {low_risk} transactions")
summary_message = HumanMessage(content=f"""
Risk Ranking COMPLETED.
- Analyzed {len(state.interpreted_transactions)} transactions
- Risk Distribution: {high_risk} high, {medium_risk} medium, {low_risk} low risk
- Ready for customer notifications based on risk levels
NEXT STEPS: Process notifications (voice for high risk, email for medium risk, SMS for low risk).
""", name="risk_ranking")
return Command(
update={
"messages": [summary_message],
"risk_ranking_completed": True,
"risk_assessments": risk_assessments,
},
goto="supervisor"
)
def finalize_results_node(state: TransactionReviewState):
"""Finalize and generate comprehensive transaction review report"""
# Create transaction results
transaction_results = []
for interpreted_txn in state.interpreted_transactions:
# Find corresponding risk assessment
risk_assessment = next(
(r for r in state.risk_assessments if r.transaction_id == interpreted_txn.transaction.transaction_id),
None
)
# Find corresponding customer response (if any)
customer_response = next(
(r for r in state.customer_responses if r.transaction_id == interpreted_txn.transaction.transaction_id),
None
)
# Determine final status
if customer_response:
if customer_response.response_type == "approved":
final_status = "approved"
elif customer_response.response_type == "cancelled":
final_status = "cancelled"
else:
final_status = "pending"
else:
# Auto-approved for medium and low risk
final_status = "auto_approved" if risk_assessment and risk_assessment.risk_level != "high" else "pending"
transaction_results.append(TransactionReviewResult(
transaction=interpreted_txn.transaction,
interpreted_transaction=interpreted_txn,
risk_assessment=risk_assessment,
customer_response=customer_response,
final_status=final_status
))
# Generate summary report
total_amount = sum(t.transaction.amount for t in state.interpreted_transactions)
approved_amount = sum(
t.transaction.amount for t in transaction_results
if t.final_status in ["approved", "auto_approved"]
)
# Count transactions by risk level
transactions_by_risk = {
"high": len([r for r in state.risk_assessments if r.risk_level == "high"]),
"medium": len([r for r in state.risk_assessments if r.risk_level == "medium"]),
"low": len([r for r in state.risk_assessments if r.risk_level == "low"])
}
summary_report = f"""
Transaction Review Report - {state.input_data.review_period}
Customer ID: {state.input_data.customer_id}
SUMMARY:
- Total Transactions Reviewed: {len(state.interpreted_transactions)}
- Total Transaction Amount: ${total_amount:,.2f}
- Approved Transaction Amount: ${approved_amount:,.2f}
RISK DISTRIBUTION:
- High Risk: {transactions_by_risk['high']} transactions
- Medium Risk: {transactions_by_risk['medium']} transactions
- Low Risk: {transactions_by_risk['low']} transactions
ACTIONS TAKEN:
- Voice calls made: {len([r for r in state.customer_responses if r.response_method == 'voice'])}
- Email notifications sent: {1 if transactions_by_risk['medium'] > 0 else 0}
- SMS notifications sent: {1 if transactions_by_risk['low'] > 0 else 0}
CUSTOMER RESPONSES:
- Approved: {len([r for r in state.customer_responses if r.response_type == 'approved'])}
- Cancelled: {len([r for r in state.customer_responses if r.response_type == 'cancelled'])}
- No Response: {len([r for r in state.customer_responses if r.response_type == 'no_response'])}
"""
# Create final output
final_output = TransactionReviewOutput(
customer_id=state.input_data.customer_id,
review_period=state.input_data.review_period,
total_transactions=len(state.interpreted_transactions),
transactions_by_risk=transactions_by_risk,
transaction_results=transaction_results,
summary_report=summary_report
)
return {
"final_output": final_output
}
π Building and Running the System
Now letβs put it all together and create the workflow graph:
# =============================================================================
# WORKFLOW GRAPH
# =============================================================================
def create_transaction_review_graph():
"""Create the Transaction Review workflow graph"""
workflow = StateGraph(TransactionReviewState)
# Add all nodes
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("name_interpreter", name_interpreter_node)
workflow.add_node("risk_ranking", risk_ranking_node)
workflow.add_node("voice_agent", voice_agent_node)
workflow.add_node("email_agent", email_agent_node)
workflow.add_node("sms_agent", sms_agent_node)
workflow.add_node("finalize_results", finalize_results_node)
# Define the flow
workflow.add_edge(START, "supervisor")
workflow.add_edge("finalize_results", END)
return workflow.compile()
π Sample Data Generation
Letβs create realistic test data to demonstrate our system:
def create_sample_transactions() -> List[Transaction]:
"""Create sample transaction data for testing"""
sample_transactions = [
Transaction(
transaction_id="txn_001",
date=date(2024, 1, 15),
amount=25000.0, # High amount - should trigger high risk
transaction_name="CRYPTO EXCHANGE XYZ",
account_number="****1234",
metadata={"location": "Online", "category": "Financial"}
),
Transaction(
transaction_id="txn_002",
date=date(2024, 1, 16),
amount=1500.0, # Medium amount
transaction_name="UNKNOWN MERCHANT ABC",
account_number="****1234",
metadata={"location": "Foreign", "category": "Unknown"}
),
Transaction(
transaction_id="txn_003",
date=date(2024, 1, 17),
amount=89.99, # Low amount
transaction_name="NIKE STORE",
account_number="****1234",
metadata={"location": "Local", "category": "Retail"}
),
Transaction(
transaction_id="txn_004",
date=date(2024, 1, 18),
amount=250.0,
transaction_name="AMZN MARKETPLACE",
account_number="****1234",
metadata={"location": "Online", "category": "E-commerce"}
),
Transaction(
transaction_id="txn_005",
date=date(2024, 1, 19), # Saturday - weekend risk factor
amount=5000.0, # Higher amount on weekend
transaction_name="WIRE TRANSFER INTL",
account_number="****1234",
metadata={"location": "International", "category": "Transfer"}
)
]
return sample_transactions
π― Running the Complete System
Letβs execute our multi-agent system and see the results:
def run_transaction_review_example():
"""Example of how to run the Transaction Review System"""
print("π Starting Transaction Review System Demo")
print("="*60)
# Create sample data
sample_transactions = create_sample_transactions()
input_data = TransactionReviewInput(
customer_id="CUST_12345",
customer_phone="+1-555-0123",
customer_email="customer@email.com",
transactions=sample_transactions,
review_period="2024-01"
)
# Initialize state
initial_state = TransactionReviewState(input_data=input_data)
# Create and run workflow
workflow = create_transaction_review_graph()
print(f"π Processing {len(sample_transactions)} transactions...")
print("π Workflow started...\n")
# Run the workflow
final_state = workflow.invoke(initial_state)
print("\n" + "="*80)
print("π― TRANSACTION REVIEW COMPLETED")
print("="*80)
# Extract and display results
final_output = final_state['final_output']
print("\nπ SUMMARY REPORT:")
print("-" * 50)
print(final_output.summary_report)
print("\nπ DETAILED TRANSACTION RESULTS:")
print("-" * 80)
for i, result in enumerate(final_output.transaction_results, 1):
risk_color = "π΄" if result.risk_assessment.risk_level == "high" else "π‘" if result.risk_assessment.risk_level == "medium" else "π’"
status_icon = "β
" if result.final_status in ["approved", "auto_approved"] else "β" if result.final_status == "cancelled" else "β³"
print(f"\n{i}. Transaction {result.transaction.transaction_id}")
print(f" {risk_color} Risk Level: {result.risk_assessment.risk_level.upper()}")
print(f" π° Amount: ${result.transaction.amount:,.2f}")
print(f" πͺ Merchant: {result.interpreted_transaction.interpreted_name}")
print(f" {status_icon} Status: {result.final_status.replace('_', ' ').title()}")
if result.customer_response:
print(f" π Customer Response: {result.customer_response.response_type} via {result.customer_response.response_method}")
print("\n" + "="*80)
print("β¨ Transaction Review System Demo Complete!")
print("="*80)
return final_state
# Run the system
if __name__ == "__main__":
result = run_transaction_review_example()
π Starting Transaction Review System Demo
π Processing 5 transactionsβ¦ π Workflow startedβ¦
β Successfully interpreted 5 transactions π Risk Assessment Results: High Risk: 2 transactions Medium Risk: 2 transactions Low Risk: 1 transactions π Voice Agent: Processed 2 high-risk transactions π§ Email Agent: Processed 2 medium-risk transactions π± SMS Agent: Processed 1 low-risk transactions
================================================================================ π― TRANSACTION REVIEW COMPLETED ================================================================================
π SUMMARY REPORT:
Transaction Review Report - 2024-01 Customer ID: CUST_12345
SUMMARY:
- Total Transactions Reviewed: 5
- Total Transaction Amount: $31,839.99
- Approved Transaction Amount: $26,589.99
RISK DISTRIBUTION:
- High Risk: 2 transactions
- Medium Risk: 2 transactions
- Low Risk: 1 transactions
ACTIONS TAKEN:
- Voice calls made: 2
- Email notifications sent: 1
- SMS notifications sent: 1
CUSTOMER RESPONSES:
- Approved: 1
- Cancelled: 1
- No Response: 0
π DETAILED TRANSACTION RESULTS:
Transaction txn_001 π΄ Risk Level: HIGH π° Amount: $25,000.00 πͺ Merchant: CRYPTO EXCHANGE XYZ β Status: Cancelled π Customer Response: cancelled via voice
Transaction txn_002 π‘ Risk Level: MEDIUM π° Amount: $1,500.00 πͺ Merchant: Unknown merchant: UNKNOWN MERCHANT ABC - Online retailer or service provider β Status: Auto Approved
Transaction txn_003 π’ Risk Level: LOW π° Amount: $89.99 πͺ Merchant: NIKE STORE β Status: Auto Approved
Transaction txn_004 π’ Risk Level: LOW π° Amount: $250.00 πͺ Merchant: Amazon - E-commerce platform β Status: Auto Approved
Transaction txn_005 π΄ Risk Level: HIGH π° Amount: $5,000.00 πͺ Merchant: WIRE TRANSFER INTL β Status: Approved π Customer Response: approved via voice
================================================================================ β¨ Transaction Review System Demo Complete! ================================================================================
π Extensions & Improvements
This system can be extended in numerous ways:
1. Production-Ready Enhancements
- Replace mock tools with real APIs
- Implement actual ML models for risk scoring
- Add database persistence for audit trails
- Integrate with real communication platforms
2. Additional Agents
- Fraud Pattern Detector: Analyzes transaction sequences
- Customer Profile Agent: Considers customer history
- Regulatory Compliance Agent: Ensures legal requirements
- Dispute Resolution Agent: Handles contested transactions
3. Advanced Features
- Real-time streaming: Process transactions as they occur
- Batch optimization: Group similar transactions
- Multi-language support: Communicate in customerβs preferred language
- Adaptive risk models: Learn from customer responses
4. Integration Possibilities
- Banking Core Systems: Direct integration with account management
- CRM Platforms: Customer relationship tracking
- Analytics Dashboards: Real-time monitoring and KPIs
- Regulatory Reporting: Automated compliance reports
π Key Learnings & Best Practices
Architecture Benefits
- Modularity: Each agent has a single, clear responsibility
- Scalability: Easy to add new agents or modify existing ones
- Maintainability: Changes to one agent donβt affect others
- Testability: Each agent can be tested independently
LangGraph Advantages
- State Management: Built-in state tracking across agents
- Visual Workflow: Clear understanding of execution flow
- Error Handling: Graceful failure and recovery
- Flexibility: Dynamic routing based on conditions
Implementation Tips
- Clear Interfaces: Define precise input/output models
- Comprehensive Logging: Track decisions and actions
- Graceful Degradation: Handle failures without crashing
- Performance Monitoring: Track execution times and bottlenecks
π― Conclusion
This multi-agent system demonstrates how LangGraph can be used to build sophisticated, production-ready applications. By breaking down complex workflows into specialized agents, we achieve:
- Better organization of complex business logic
- Improved maintainability through modular design
- Enhanced scalability with independent agents
- Clear auditability of decisions and actions
The banking transaction review use case showcases real-world applicability, but this pattern can be adapted to many domains:
- Healthcare claim processing
- Insurance underwriting
- Supply chain management
- Content moderation
- Customer service automation
The combination of LangGraphβs orchestration capabilities with LLM-powered agents creates a powerful framework for building the next generation of intelligent applications.
π₯ Download This Project
Get the complete code to run locally or adapt for your own use case:
β¬ Download as Python (.py) π Download as Jupyter Notebook (.ipynb)