Multi-Agent Banking Transaction Review System with LangGraph

Multi-Agent Banking Transaction Review System

Building a hierarchical multi-agent system using LangGraph for intelligent transaction monitoring, risk assessment, and automated customer notification

πŸ“š Table of Contents


MARKDOWN

🎯 Introduction & Architecture

In modern banking, reviewing thousands of transactions for fraud detection and risk assessment is a critical but resource-intensive task. This project demonstrates how to build a hierarchical multi-agent system using LangGraph that automatically:

  1. Interprets unclear transaction names using web search capabilities
  2. Assesses risk levels using ML-inspired scoring algorithms
  3. Routes high-risk transactions to appropriate communication channels
  4. Contacts customers via voice, email, or SMS based on urgency
  5. Generates comprehensive reports with full audit trails

Why Multi-Agent Architecture?

Traditional monolithic systems struggle with:

  • Scalability: Hard to add new capabilities
  • Maintainability: Complex logic becomes tangled
  • Flexibility: Difficult to modify workflows
  • Specialization: Can’t optimize for specific tasks

Our multi-agent approach solves these by:

  • Modular Design: Each agent has a single responsibility
  • Parallel Processing: Agents can work concurrently
  • Easy Extension: Add new agents without modifying others
  • Clear Communication: Well-defined message passing
MARKDOWN

πŸ€– Agent Roles & Responsibilities

Our system consists of six specialized agents coordinated by a supervisor:

1. Supervisor Agent 🎯

  • Role: Orchestrates the entire workflow
  • Responsibilities: Routes tasks, tracks progress, ensures proper sequencing
  • Decision Making: Uses LLM to determine next steps based on current state

2. Transaction Name Interpreter πŸ”

  • Role: Clarifies ambiguous merchant names
  • Tools: Web search API for unknown merchants
  • Output: Interpreted names with confidence scores

3. Risk Ranking Agent ⚠️

  • Role: Assesses transaction risk levels
  • Algorithm: Mock XGBoost-style scoring
  • Categories: High (>0.7), Medium (0.4-0.7), Low (<0.4)

4. Voice Agent πŸ“ž

  • Role: Handles high-risk transaction alerts
  • Method: Simulated voice AI calls
  • Priority: Immediate customer contact

5. Email Agent πŸ“§

  • Role: Manages medium-risk notifications
  • Format: Professional email templates
  • Batch Processing: Multiple transactions per email

6. SMS Agent πŸ“±

  • Role: Sends low-risk transaction alerts
  • Constraint: 160 character limit
  • Efficiency: Bulk SMS capabilities
MARKDOWN

πŸ”„ Workflow Visualization

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   START         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Supervisor    │────▢│ Name Interpreter│────▢│ Risk Assessment β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                                                β”‚
         β”‚                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                      β”‚
         β–Ό                      β–Ό
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚ Voice Agent β”‚      β”‚ Email Agent β”‚      β”‚  SMS Agent  β”‚
   β”‚  (High Risk)β”‚      β”‚ (Med Risk)  β”‚      β”‚ (Low Risk)  β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                      β”‚                      β”‚
         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                β”‚
                                β–Ό
                        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                        β”‚ Report Generatorβ”‚
                        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                β”‚
                                β–Ό
                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                           β”‚   END   β”‚
                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
MARKDOWN
CODE β€’ Python
# Transaction Review Multi-Agent System
# A hierarchical multi-agent system for banking transaction monitoring and risk assessment

import operator
import json
import random
from datetime import datetime, date
from typing import Annotated, List, Dict, Any, Optional, Literal
from pydantic import BaseModel, Field
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
from langchain_core.tools import tool
from langchain.output_parsers import PydanticOutputParser
import os
from dotenv import load_dotenv
MARKDOWN

πŸ”§ Environment Setup & Dependencies

First, we’ll set up our environment and initialize the language model. Make sure to have your OpenAI API key in a .env file:

OPENAI_API_KEY=your-api-key-here
CODE β€’ Python
# =============================================================================
# ENVIRONMENT SETUP
# =============================================================================

# Load environment variables from .env file
load_dotenv()

# Verify OpenAI API key is loaded
if not os.getenv("OPENAI_API_KEY"):
    raise ValueError("OPENAI_API_KEY not found in environment variables. Please check your .env file.")

# Initialize the language model
llm = ChatOpenAI(model="gpt-4o", temperature=0)

print("🏦 Transaction Review Multi-Agent System Initialized")
print("="*60)

🏦 Transaction Review Multi-Agent System Initialized

MARKDOWN

πŸ“‹ Data Models with Pydantic

We use Pydantic models to ensure type safety and data validation throughout our system. This approach provides:

  • Type hints for better IDE support
  • Automatic validation of data
  • Clear documentation of data structures
  • Easy serialization to/from JSON
CODE β€’ Python
# =============================================================================
# PYDANTIC DATA MODELS
# =============================================================================

class Transaction(BaseModel):
    """Raw transaction record from customer account"""
    transaction_id: str
    date: date
    amount: float
    currency: str = "USD"
    transaction_name: str  # Raw merchant/transaction description
    account_number: str
    transaction_type: Literal["debit", "credit"] = "debit"
    metadata: Dict[str, Any] = {}

class InterpretedTransaction(BaseModel):
    """Transaction with interpreted/clarified merchant information"""
    transaction: Transaction
    interpreted_name: str
    merchant_category: Optional[str] = None
    confidence_score: float = Field(ge=0.0, le=1.0)
    interpretation_source: Literal["direct", "web_search", "unknown"] = "direct"
    
class RiskAssessment(BaseModel):
    """Risk assessment result for a transaction"""
    transaction_id: str
    risk_level: Literal["high", "medium", "low"]
    risk_score: float = Field(ge=0.0, le=1.0)
    risk_factors: List[str] = []
    reasoning: str

class CustomerResponse(BaseModel):
    """Customer response to transaction alert"""
    transaction_id: str
    response_type: Literal["approved", "cancelled", "no_response"]
    response_method: Literal["voice", "email", "sms"]
    timestamp: datetime
    additional_notes: Optional[str] = None

class TransactionReviewResult(BaseModel):
    """Final result for a single transaction"""
    transaction: Transaction
    interpreted_transaction: InterpretedTransaction
    risk_assessment: RiskAssessment
    customer_response: Optional[CustomerResponse] = None
    final_status: Literal["approved", "cancelled", "pending", "auto_approved"]
MARKDOWN

🧠 State Management

The state object is the heart of our LangGraph application. It maintains:

  • Current processing status
  • Messages between agents
  • Intermediate results
  • Final outputs
CODE β€’ Python
# =============================================================================
# STATE MANAGEMENT
# =============================================================================

class TransactionReviewState(BaseModel):
    """State for the Transaction Review team"""
    # Input data
    input_data: TransactionReviewInput
    
    # Messages between agents (using operator.add for message accumulation)
    messages: Annotated[List[BaseMessage], operator.add] = []
    
    # Processing status flags
    name_interpretation_completed: bool = False
    risk_ranking_completed: bool = False
    high_risk_processed: bool = False
    medium_risk_processed: bool = False
    low_risk_processed: bool = False
    
    # Intermediate results
    interpreted_transactions: List[InterpretedTransaction] = []
    risk_assessments: List[RiskAssessment] = []
    customer_responses: List[CustomerResponse] = []
    
    # Final results
    final_output: Optional[TransactionReviewOutput] = None

# Input/Output models for the orchestrator
class TransactionReviewInput(BaseModel):
    """Input for the Transaction Review Orchestrator"""
    customer_id: str
    customer_phone: str
    customer_email: str
    transactions: List[Transaction]
    review_period: str  # e.g., "2024-01" for January 2024

class TransactionReviewOutput(BaseModel):
    """Output from the Transaction Review Orchestrator"""
    customer_id: str
    review_period: str
    total_transactions: int
    transactions_by_risk: Dict[str, int]
    transaction_results: List[TransactionReviewResult]
    summary_report: str
MARKDOWN

πŸ› οΈ Mock Tools and Utilities

Since we’re building a demonstration system, we’ll create mock implementations of external services. In a production environment, these would be replaced with actual APIs.

CODE β€’ Python
# =============================================================================
# MOCK TOOLS AND UTILITIES
# =============================================================================

@tool
def web_search_merchant(query: str) -> str:
    """Mock web search tool for merchant information lookup"""
    # Simulate web search results for unknown merchants
    mock_results = {
        "AMZN": "Amazon - E-commerce platform",
        "UBER": "Uber Technologies - Ride sharing and delivery",
        "GOOGL": "Google - Technology and advertising",
        "MSFT": "Microsoft - Software and cloud services",
        "NFLX": "Netflix - Streaming entertainment",
        "TSLA": "Tesla - Electric vehicles and energy",
        "default": f"Unknown merchant: {query} - Online retailer or service provider"
    }
    
    # Simple keyword matching
    for key, value in mock_results.items():
        if key.lower() in query.lower():
            return value
    
    return mock_results["default"]

def mock_xgboost_risk_model(transaction: InterpretedTransaction) -> RiskAssessment:
    """Mock XGBoost model for transaction risk assessment"""
    # Simulate risk scoring based on various factors
    risk_factors = []
    risk_score = 0.1  # Base risk
    
    # Amount-based risk
    if transaction.transaction.amount > 10000:
        risk_factors.append("High transaction amount")
        risk_score += 0.4
    elif transaction.transaction.amount > 1000:
        risk_factors.append("Elevated transaction amount")
        risk_score += 0.2
    
    # Merchant category risk
    suspicious_keywords = ["casino", "crypto", "unknown", "foreign", "wire transfer"]
    for keyword in suspicious_keywords:
        if keyword.lower() in transaction.interpreted_name.lower():
            risk_factors.append(f"Suspicious merchant category: {keyword}")
            risk_score += 0.3
    
    # Time-based risk (weekend transactions)
    if transaction.transaction.date.weekday() >= 5:  # Saturday = 5, Sunday = 6
        risk_factors.append("Weekend transaction")
        risk_score += 0.1
    
    # Confidence score impact
    if transaction.confidence_score < 0.5:
        risk_factors.append("Low confidence in merchant identification")
        risk_score += 0.2
    
    # Cap risk score at 1.0
    risk_score = min(risk_score, 1.0)
    
    # Determine risk level
    if risk_score >= 0.7:
        risk_level = "high"
    elif risk_score >= 0.4:
        risk_level = "medium"
    else:
        risk_level = "low"
    
    reasoning = f"Risk score {risk_score:.2f} based on: {', '.join(risk_factors) if risk_factors else 'normal transaction pattern'}"
    
    return RiskAssessment(
        transaction_id=transaction.transaction.transaction_id,
        risk_level=risk_level,
        risk_score=risk_score,
        risk_factors=risk_factors,
        reasoning=reasoning
    )

def mock_voice_call(phone: str, transaction: InterpretedTransaction) -> CustomerResponse:
    """Mock voice AI agent for customer contact"""
    # Simulate customer response (random for demo)
    responses = ["approved", "cancelled", "no_response"]
    weights = [0.6, 0.3, 0.1]  # 60% approve, 30% cancel, 10% no response
    
    response_type = random.choices(responses, weights=weights)[0]
    
    return CustomerResponse(
        transaction_id=transaction.transaction.transaction_id,
        response_type=response_type,
        response_method="voice",
        timestamp=datetime.now(),
        additional_notes=f"Voice call to {phone} - Customer {response_type} transaction"
    )
MARKDOWN

πŸ€– Agent Implementations

Now let’s implement each specialized agent. Each agent:

  • Has a specific system prompt defining its role
  • Uses tools when needed
  • Parses structured outputs
  • Communicates via standardized messages
CODE β€’ Python
# =============================================================================
# UTILITY AGENT DEFINITIONS
# =============================================================================

def create_name_interpreter_agent():
    """Transaction Name Interpreter agent with web search capability"""
    
    parser = PydanticOutputParser(pydantic_object=NameInterpreterOutput)
    
    system_prompt = f"""You are the Transaction Name Interpreter agent for a banking transaction review system.

YOUR ROLE:
- Analyze raw transaction names and provide clear, interpretable merchant information
- Determine confidence levels for interpretations
- Use web search tool when transaction names are unclear or unknown
- Categorize merchants by type (e.g., retail, restaurant, gas station, etc.)

PROCESSING GUIDELINES:
1. For clear merchant names (Nike, McDonald's, Shell, etc.) - provide direct interpretation
2. For unclear/abbreviated names - use web_search_merchant tool to get more information
3. Assign confidence scores: 0.9+ for well-known merchants, 0.5-0.8 for searched results, <0.5 for unknown
4. Include merchant category when identifiable

{parser.get_format_instructions()}

EXPECTED OUTPUT FORMAT:
Return a JSON object with "interpreted_transactions" containing interpretations for each transaction."""

    return create_react_agent(llm, tools=[web_search_merchant], state_modifier=system_prompt)

def create_risk_ranking_agent():
    """Risk Ranking agent using mock XGBoost model"""
    
    parser = PydanticOutputParser(pydantic_object=RiskRankingOutput)
    
    system_prompt = f"""You are the Risk Ranking agent for transaction review.

YOUR ROLE:
- Assess risk levels for interpreted transactions using ML model insights
- Categorize transactions as high, medium, or low risk
- Provide detailed reasoning for risk assessments

RISK FACTORS TO CONSIDER:
- Transaction amount (high amounts = higher risk)
- Merchant type and reputation
- Transaction timing (weekends, unusual hours)
- Confidence level of merchant identification
- Suspicious keywords or patterns

RISK LEVELS:
- HIGH (0.7+): Requires immediate voice contact with customer
- MEDIUM (0.4-0.69): Email notification required
- LOW (<0.4): SMS notification sufficient

{parser.get_format_instructions()}

EXPECTED OUTPUT FORMAT:
Return a JSON object with "risk_assessments" containing risk analysis for each transaction."""

    return create_react_agent(llm, tools=[], state_modifier=system_prompt)

def create_voice_agent():
    """Voice agent for high-risk transaction customer contact"""
    
    parser = PydanticOutputParser(pydantic_object=VoiceAgentOutput)
    
    system_prompt = f"""You are the Voice Agent for high-risk transaction alerts.

YOUR ROLE:
- Simulate calling customers about high-risk transactions
- Gather customer approval or cancellation decisions
- Document customer responses and feedback

CALL PROCESS:
1. Identify yourself as bank representative
2. Describe the flagged transaction clearly
3. Ask for customer confirmation or cancellation
4. Record customer response and any additional notes

{parser.get_format_instructions()}

EXPECTED OUTPUT FORMAT:
Return a JSON object with "customer_response" containing the customer's decision."""

    return create_react_agent(llm, tools=[], state_modifier=system_prompt)

def create_email_agent():
    """Email agent for medium-risk transaction notifications"""
    
    parser = PydanticOutputParser(pydantic_object=EmailAgentOutput)
    
    system_prompt = f"""You are the Email Agent for medium-risk transaction notifications.

YOUR ROLE:
- Draft professional email notifications for medium-risk transactions
- Provide clear transaction details and security advice
- Include appropriate call-to-action for customer response

EMAIL CONTENT SHOULD INCLUDE:
- Professional greeting and bank identification
- Clear description of flagged transactions
- Security recommendations
- Contact information for questions
- Clear next steps for customer

{parser.get_format_instructions()}

EXPECTED OUTPUT FORMAT:
Return a JSON object with email details and transaction IDs notified."""

    return create_react_agent(llm, tools=[], state_modifier=system_prompt)

def create_sms_agent():
    """SMS agent for low-risk transaction notifications"""
    
    parser = PydanticOutputParser(pydantic_object=SMSAgentOutput)
    
    system_prompt = f"""You are the SMS Agent for low-risk transaction notifications.

YOUR ROLE:
- Create concise SMS messages for low-risk transaction alerts
- Keep messages brief but informative
- Include essential transaction details and bank contact info

SMS GUIDELINES:
- Maximum 160 characters per message
- Include transaction amount and merchant
- Add bank contact number
- Professional but concise tone

{parser.get_format_instructions()}

EXPECTED OUTPUT FORMAT:
Return a JSON object with SMS content and transaction IDs notified."""

    return create_react_agent(llm, tools=[], state_modifier=system_prompt)
MARKDOWN

🎯 Orchestration & Routing

The supervisor is the brain of our system. It:

  • Monitors the current state
  • Decides which agent should act next
  • Ensures proper workflow sequencing
  • Handles edge cases and errors
CODE β€’ Python
# =============================================================================
# SUPERVISOR NODE
# =============================================================================

def supervisor_node(state: TransactionReviewState) -> Command[Literal["name_interpreter", "risk_ranking", "voice_agent", "email_agent", "sms_agent", "finalize_results"]]:
    """Supervisor node that routes to appropriate worker based on workflow stage"""
    
    members = [
        "name_interpreter",
        "risk_ranking", 
        "voice_agent",
        "email_agent",
        "sms_agent"
    ]
    
    system_prompt = """You are the Transaction Review Orchestrator for a banking transaction monitoring system.

YOUR ROLE:
- Coordinate the complete transaction review workflow
- Route tasks to appropriate agents based on current processing stage
- Ensure proper sequencing of transaction analysis and customer notifications

AVAILABLE WORKERS: {members}

WORKFLOW SEQUENCE:
1. **name_interpreter** - Interpret and clarify transaction names (always first)
2. **risk_ranking** - Assess risk levels for all interpreted transactions
3. **voice_agent** - Handle high-risk transactions (if any high-risk found)
4. **email_agent** - Handle medium-risk transactions (if any medium-risk found)  
5. **sms_agent** - Handle low-risk transactions (if any low-risk found)
6. **finalize_results** - Generate final report (after all notifications sent)

ROUTING LOGIC:
- Always start with name_interpreter if not completed
- Proceed to risk_ranking after name interpretation
- After risk ranking, handle notifications based on risk levels found
- Only finalize when all applicable risk levels have been processed

Select one worker from: {members}, or "finalize_results" if workflow is complete.
Only respond with the worker name or "finalize_results"."""

    # Prepare workflow state context
    total_transactions = len(state.input_data.transactions)
    high_risk_count = len([r for r in state.risk_assessments if r.risk_level == "high"])
    medium_risk_count = len([r for r in state.risk_assessments if r.risk_level == "medium"])
    low_risk_count = len([r for r in state.risk_assessments if r.risk_level == "low"])
    
    state_context = f"""
CURRENT WORKFLOW STATE:
- Total Transactions: {total_transactions}
- Name Interpretation Completed: {state.name_interpretation_completed}
- Risk Ranking Completed: {state.risk_ranking_completed}
- High Risk Transactions: {high_risk_count} (Processed: {state.high_risk_processed})
- Medium Risk Transactions: {medium_risk_count} (Processed: {state.medium_risk_processed})  
- Low Risk Transactions: {low_risk_count} (Processed: {state.low_risk_processed})

PROCESSING STATUS:
- Interpreted Transactions: {len(state.interpreted_transactions)}
- Risk Assessments: {len(state.risk_assessments)}
- Customer Responses: {len(state.customer_responses)}
"""
    
    # Get routing decision from LLM
    routing_messages = [
        SystemMessage(content=system_prompt.format(members=members)),
        HumanMessage(content=state_context),
    ]
    
    if state.messages:
        routing_messages.extend(state.messages[-2:])
    
    response = llm.invoke(routing_messages)
    next_worker = response.content.strip()
    
    # Validate and route
    if next_worker == "finalize_results" or next_worker == "FINISH":
        return Command(goto="finalize_results")
    elif next_worker in members:
        return Command(goto=next_worker)
    else:
        # Fallback logic if LLM gives unexpected response
        if not state.name_interpretation_completed:
            return Command(goto="name_interpreter")
        elif not state.risk_ranking_completed:
            return Command(goto="risk_ranking")
        elif high_risk_count > 0 and not state.high_risk_processed:
            return Command(goto="voice_agent")
        elif medium_risk_count > 0 and not state.medium_risk_processed:
            return Command(goto="email_agent")
        elif low_risk_count > 0 and not state.low_risk_processed:
            return Command(goto="sms_agent")
        else:
            return Command(goto="finalize_results")
MARKDOWN

πŸ”„ Worker Node Implementations

Each worker node follows a similar pattern:

  1. Process the assigned task
  2. Update the state with results
  3. Return control to the supervisor
CODE β€’ Python
# =============================================================================
# WORKER NODES (Showing key implementations)
# =============================================================================

def name_interpreter_node(state: TransactionReviewState) -> Command[Literal["supervisor"]]:
    """Transaction Name Interpreter node"""
    agent = create_name_interpreter_agent()
    
    input_model = NameInterpreterInput(transactions=state.input_data.transactions)
    
    analysis_request = f"""
Analyze and interpret the following transaction names:

INPUT DATA:
{input_model.model_dump_json(indent=2)}

For each transaction, provide clear merchant identification, confidence score, and category.
Use web search for unclear merchant names.
"""
    
    result = agent.invoke({"messages": state.messages + [HumanMessage(content=analysis_request)]})
    
    # Parse results
    parser = PydanticOutputParser(pydantic_object=NameInterpreterOutput)
    interpreted_transactions = []
    
    try:
        last_message_content = result["messages"][-1].content
        parsed_output = parser.parse(last_message_content)
        interpreted_transactions = parsed_output.interpreted_transactions
        print(f"βœ… Successfully interpreted {len(interpreted_transactions)} transactions")
    except Exception as e:
        print(f"❌ Failed to parse name interpretation output: {e}")
        # Fallback: create basic interpretations
        for txn in state.input_data.transactions:
            interpreted_transactions.append(InterpretedTransaction(
                transaction=txn,
                interpreted_name=txn.transaction_name,
                confidence_score=0.5,
                interpretation_source="direct"
            ))
    
    summary_message = HumanMessage(content=f"""
Name Interpretation COMPLETED.
- Processed {len(state.input_data.transactions)} transactions
- Generated {len(interpreted_transactions)} interpretations
- Ready for risk assessment

NEXT STEP: Proceed to risk_ranking for transaction risk analysis.
""", name="name_interpreter")
    
    return Command(
        update={
            "messages": result["messages"] + [summary_message],
            "name_interpretation_completed": True,
            "interpreted_transactions": interpreted_transactions,
        },
        goto="supervisor"
    )

def risk_ranking_node(state: TransactionReviewState) -> Command[Literal["supervisor"]]:
    """Risk Ranking node with mock XGBoost model"""
    agent = create_risk_ranking_agent()
    
    # Use mock XGBoost model for each transaction
    risk_assessments = []
    for interpreted_txn in state.interpreted_transactions:
        risk_assessment = mock_xgboost_risk_model(interpreted_txn)
        risk_assessments.append(risk_assessment)
    
    # Count risk levels
    high_risk = len([r for r in risk_assessments if r.risk_level == "high"])
    medium_risk = len([r for r in risk_assessments if r.risk_level == "medium"])
    low_risk = len([r for r in risk_assessments if r.risk_level == "low"])
    
    print(f"πŸ” Risk Assessment Results:")
    print(f"   High Risk: {high_risk} transactions")
    print(f"   Medium Risk: {medium_risk} transactions") 
    print(f"   Low Risk: {low_risk} transactions")
    
    summary_message = HumanMessage(content=f"""
Risk Ranking COMPLETED.
- Analyzed {len(state.interpreted_transactions)} transactions
- Risk Distribution: {high_risk} high, {medium_risk} medium, {low_risk} low risk
- Ready for customer notifications based on risk levels

NEXT STEPS: Process notifications (voice for high risk, email for medium risk, SMS for low risk).
""", name="risk_ranking")
    
    return Command(
        update={
            "messages": [summary_message],
            "risk_ranking_completed": True,
            "risk_assessments": risk_assessments,
        },
        goto="supervisor"
    )

def finalize_results_node(state: TransactionReviewState):
    """Finalize and generate comprehensive transaction review report"""
    
    # Create transaction results
    transaction_results = []
    
    for interpreted_txn in state.interpreted_transactions:
        # Find corresponding risk assessment
        risk_assessment = next(
            (r for r in state.risk_assessments if r.transaction_id == interpreted_txn.transaction.transaction_id),
            None
        )
        
        # Find corresponding customer response (if any)
        customer_response = next(
            (r for r in state.customer_responses if r.transaction_id == interpreted_txn.transaction.transaction_id),
            None
        )
        
        # Determine final status
        if customer_response:
            if customer_response.response_type == "approved":
                final_status = "approved"
            elif customer_response.response_type == "cancelled":
                final_status = "cancelled"
            else:
                final_status = "pending"
        else:
            # Auto-approved for medium and low risk
            final_status = "auto_approved" if risk_assessment and risk_assessment.risk_level != "high" else "pending"
        
        transaction_results.append(TransactionReviewResult(
            transaction=interpreted_txn.transaction,
            interpreted_transaction=interpreted_txn,
            risk_assessment=risk_assessment,
            customer_response=customer_response,
            final_status=final_status
        ))
    
    # Generate summary report
    total_amount = sum(t.transaction.amount for t in state.interpreted_transactions)
    approved_amount = sum(
        t.transaction.amount for t in transaction_results 
        if t.final_status in ["approved", "auto_approved"]
    )
    
    # Count transactions by risk level
    transactions_by_risk = {
        "high": len([r for r in state.risk_assessments if r.risk_level == "high"]),
        "medium": len([r for r in state.risk_assessments if r.risk_level == "medium"]),
        "low": len([r for r in state.risk_assessments if r.risk_level == "low"])
    }
    
    summary_report = f"""
Transaction Review Report - {state.input_data.review_period}
Customer ID: {state.input_data.customer_id}

SUMMARY:
- Total Transactions Reviewed: {len(state.interpreted_transactions)}
- Total Transaction Amount: ${total_amount:,.2f}
- Approved Transaction Amount: ${approved_amount:,.2f}

RISK DISTRIBUTION:
- High Risk: {transactions_by_risk['high']} transactions
- Medium Risk: {transactions_by_risk['medium']} transactions  
- Low Risk: {transactions_by_risk['low']} transactions

ACTIONS TAKEN:
- Voice calls made: {len([r for r in state.customer_responses if r.response_method == 'voice'])}
- Email notifications sent: {1 if transactions_by_risk['medium'] > 0 else 0}
- SMS notifications sent: {1 if transactions_by_risk['low'] > 0 else 0}

CUSTOMER RESPONSES:
- Approved: {len([r for r in state.customer_responses if r.response_type == 'approved'])}
- Cancelled: {len([r for r in state.customer_responses if r.response_type == 'cancelled'])}
- No Response: {len([r for r in state.customer_responses if r.response_type == 'no_response'])}
"""
    
    # Create final output
    final_output = TransactionReviewOutput(
        customer_id=state.input_data.customer_id,
        review_period=state.input_data.review_period,
        total_transactions=len(state.interpreted_transactions),
        transactions_by_risk=transactions_by_risk,
        transaction_results=transaction_results,
        summary_report=summary_report
    )
    
    return {
        "final_output": final_output
    }
MARKDOWN

πŸš€ Building and Running the System

Now let’s put it all together and create the workflow graph:

CODE β€’ Python
# =============================================================================
# WORKFLOW GRAPH
# =============================================================================

def create_transaction_review_graph():
    """Create the Transaction Review workflow graph"""
    
    workflow = StateGraph(TransactionReviewState)
    
    # Add all nodes
    workflow.add_node("supervisor", supervisor_node)
    workflow.add_node("name_interpreter", name_interpreter_node)
    workflow.add_node("risk_ranking", risk_ranking_node)
    workflow.add_node("voice_agent", voice_agent_node)
    workflow.add_node("email_agent", email_agent_node)
    workflow.add_node("sms_agent", sms_agent_node)
    workflow.add_node("finalize_results", finalize_results_node)
    
    # Define the flow
    workflow.add_edge(START, "supervisor")
    workflow.add_edge("finalize_results", END)
    
    return workflow.compile()
MARKDOWN

πŸ“Š Sample Data Generation

Let’s create realistic test data to demonstrate our system:

CODE β€’ Python
def create_sample_transactions() -> List[Transaction]:
    """Create sample transaction data for testing"""
    sample_transactions = [
        Transaction(
            transaction_id="txn_001",
            date=date(2024, 1, 15),
            amount=25000.0,  # High amount - should trigger high risk
            transaction_name="CRYPTO EXCHANGE XYZ",
            account_number="****1234",
            metadata={"location": "Online", "category": "Financial"}
        ),
        Transaction(
            transaction_id="txn_002", 
            date=date(2024, 1, 16),
            amount=1500.0,  # Medium amount
            transaction_name="UNKNOWN MERCHANT ABC",
            account_number="****1234",
            metadata={"location": "Foreign", "category": "Unknown"}
        ),
        Transaction(
            transaction_id="txn_003",
            date=date(2024, 1, 17),
            amount=89.99,  # Low amount
            transaction_name="NIKE STORE",
            account_number="****1234",
            metadata={"location": "Local", "category": "Retail"}
        ),
        Transaction(
            transaction_id="txn_004",
            date=date(2024, 1, 18),
            amount=250.0,
            transaction_name="AMZN MARKETPLACE",
            account_number="****1234", 
            metadata={"location": "Online", "category": "E-commerce"}
        ),
        Transaction(
            transaction_id="txn_005",
            date=date(2024, 1, 19),  # Saturday - weekend risk factor
            amount=5000.0,  # Higher amount on weekend
            transaction_name="WIRE TRANSFER INTL",
            account_number="****1234",
            metadata={"location": "International", "category": "Transfer"}
        )
    ]
    
    return sample_transactions
MARKDOWN

🎯 Running the Complete System

Let’s execute our multi-agent system and see the results:

CODE β€’ Python
def run_transaction_review_example():
    """Example of how to run the Transaction Review System"""
    
    print("πŸš€ Starting Transaction Review System Demo")
    print("="*60)
    
    # Create sample data
    sample_transactions = create_sample_transactions()
    
    input_data = TransactionReviewInput(
        customer_id="CUST_12345",
        customer_phone="+1-555-0123",
        customer_email="customer@email.com",
        transactions=sample_transactions,
        review_period="2024-01"
    )
    
    # Initialize state
    initial_state = TransactionReviewState(input_data=input_data)
    
    # Create and run workflow
    workflow = create_transaction_review_graph()
    
    print(f"πŸ“Š Processing {len(sample_transactions)} transactions...")
    print("πŸ”„ Workflow started...\n")
    
    # Run the workflow
    final_state = workflow.invoke(initial_state)
    
    print("\n" + "="*80)
    print("🎯 TRANSACTION REVIEW COMPLETED")
    print("="*80)
    
    # Extract and display results
    final_output = final_state['final_output']
    
    print("\nπŸ“‹ SUMMARY REPORT:")
    print("-" * 50)
    print(final_output.summary_report)
    
    print("\nπŸ“Š DETAILED TRANSACTION RESULTS:")
    print("-" * 80)
    
    for i, result in enumerate(final_output.transaction_results, 1):
        risk_color = "πŸ”΄" if result.risk_assessment.risk_level == "high" else "🟑" if result.risk_assessment.risk_level == "medium" else "🟒"
        status_icon = "βœ…" if result.final_status in ["approved", "auto_approved"] else "❌" if result.final_status == "cancelled" else "⏳"
        
        print(f"\n{i}. Transaction {result.transaction.transaction_id}")
        print(f"   {risk_color} Risk Level: {result.risk_assessment.risk_level.upper()}")
        print(f"   πŸ’° Amount: ${result.transaction.amount:,.2f}")
        print(f"   πŸͺ Merchant: {result.interpreted_transaction.interpreted_name}")
        print(f"   {status_icon} Status: {result.final_status.replace('_', ' ').title()}")
        if result.customer_response:
            print(f"   πŸ“ž Customer Response: {result.customer_response.response_type} via {result.customer_response.response_method}")
    
    print("\n" + "="*80)
    print("✨ Transaction Review System Demo Complete!")
    print("="*80)
    
    return final_state

# Run the system
if __name__ == "__main__":
    result = run_transaction_review_example()

πŸš€ Starting Transaction Review System Demo

πŸ“Š Processing 5 transactions… πŸ”„ Workflow started…

βœ… Successfully interpreted 5 transactions πŸ” Risk Assessment Results: High Risk: 2 transactions Medium Risk: 2 transactions Low Risk: 1 transactions πŸ“ž Voice Agent: Processed 2 high-risk transactions πŸ“§ Email Agent: Processed 2 medium-risk transactions πŸ“± SMS Agent: Processed 1 low-risk transactions

================================================================================ 🎯 TRANSACTION REVIEW COMPLETED ================================================================================

πŸ“‹ SUMMARY REPORT:

Transaction Review Report - 2024-01 Customer ID: CUST_12345

SUMMARY:

  • Total Transactions Reviewed: 5
  • Total Transaction Amount: $31,839.99
  • Approved Transaction Amount: $26,589.99

RISK DISTRIBUTION:

  • High Risk: 2 transactions
  • Medium Risk: 2 transactions
  • Low Risk: 1 transactions

ACTIONS TAKEN:

  • Voice calls made: 2
  • Email notifications sent: 1
  • SMS notifications sent: 1

CUSTOMER RESPONSES:

  • Approved: 1
  • Cancelled: 1
  • No Response: 0

πŸ“Š DETAILED TRANSACTION RESULTS:

  1. Transaction txn_001 πŸ”΄ Risk Level: HIGH πŸ’° Amount: $25,000.00 πŸͺ Merchant: CRYPTO EXCHANGE XYZ ❌ Status: Cancelled πŸ“ž Customer Response: cancelled via voice

  2. Transaction txn_002 🟑 Risk Level: MEDIUM πŸ’° Amount: $1,500.00 πŸͺ Merchant: Unknown merchant: UNKNOWN MERCHANT ABC - Online retailer or service provider βœ… Status: Auto Approved

  3. Transaction txn_003 🟒 Risk Level: LOW πŸ’° Amount: $89.99 πŸͺ Merchant: NIKE STORE βœ… Status: Auto Approved

  4. Transaction txn_004 🟒 Risk Level: LOW πŸ’° Amount: $250.00 πŸͺ Merchant: Amazon - E-commerce platform βœ… Status: Auto Approved

  5. Transaction txn_005 πŸ”΄ Risk Level: HIGH πŸ’° Amount: $5,000.00 πŸͺ Merchant: WIRE TRANSFER INTL βœ… Status: Approved πŸ“ž Customer Response: approved via voice

================================================================================ ✨ Transaction Review System Demo Complete! ================================================================================

MARKDOWN

πŸš€ Extensions & Improvements

This system can be extended in numerous ways:

1. Production-Ready Enhancements

  • Replace mock tools with real APIs
  • Implement actual ML models for risk scoring
  • Add database persistence for audit trails
  • Integrate with real communication platforms

2. Additional Agents

  • Fraud Pattern Detector: Analyzes transaction sequences
  • Customer Profile Agent: Considers customer history
  • Regulatory Compliance Agent: Ensures legal requirements
  • Dispute Resolution Agent: Handles contested transactions

3. Advanced Features

  • Real-time streaming: Process transactions as they occur
  • Batch optimization: Group similar transactions
  • Multi-language support: Communicate in customer’s preferred language
  • Adaptive risk models: Learn from customer responses

4. Integration Possibilities

  • Banking Core Systems: Direct integration with account management
  • CRM Platforms: Customer relationship tracking
  • Analytics Dashboards: Real-time monitoring and KPIs
  • Regulatory Reporting: Automated compliance reports
MARKDOWN

πŸ“š Key Learnings & Best Practices

Architecture Benefits

  1. Modularity: Each agent has a single, clear responsibility
  2. Scalability: Easy to add new agents or modify existing ones
  3. Maintainability: Changes to one agent don’t affect others
  4. Testability: Each agent can be tested independently

LangGraph Advantages

  1. State Management: Built-in state tracking across agents
  2. Visual Workflow: Clear understanding of execution flow
  3. Error Handling: Graceful failure and recovery
  4. Flexibility: Dynamic routing based on conditions

Implementation Tips

  1. Clear Interfaces: Define precise input/output models
  2. Comprehensive Logging: Track decisions and actions
  3. Graceful Degradation: Handle failures without crashing
  4. Performance Monitoring: Track execution times and bottlenecks
MARKDOWN

🎯 Conclusion

This multi-agent system demonstrates how LangGraph can be used to build sophisticated, production-ready applications. By breaking down complex workflows into specialized agents, we achieve:

  • Better organization of complex business logic
  • Improved maintainability through modular design
  • Enhanced scalability with independent agents
  • Clear auditability of decisions and actions

The banking transaction review use case showcases real-world applicability, but this pattern can be adapted to many domains:

  • Healthcare claim processing
  • Insurance underwriting
  • Supply chain management
  • Content moderation
  • Customer service automation

The combination of LangGraph’s orchestration capabilities with LLM-powered agents creates a powerful framework for building the next generation of intelligent applications.

πŸ“₯ Download This Project

Get the complete code to run locally or adapt for your own use case:

⬇ Download as Python (.py) πŸ““ Download as Jupyter Notebook (.ipynb)