MCP (Model Context Protocol): The Complete Developer Guide

Model Context Protocol (MCP): Complete Developer Guide

Master MCP to build context-aware AI applications that seamlessly connect LLMs with external tools, data sources, and systems

πŸ“š Table of Contents

  1. Introduction to MCP
  2. Core Concepts
  3. Architecture Overview
  4. Getting Started
  5. Building MCP Servers
  6. Creating MCP Clients
  7. Resources and Tools
  8. Prompts and Sampling
  9. Advanced Features
  10. Testing and Debugging
  11. Security Best Practices
  12. Deployment Guide
  13. Real-World Examples
  14. Troubleshooting
  15. Additional Resources

1. What is MCP?

The Simple Explanation

Model Context Protocol (MCP) is an open protocol that provides a standard way for AI assistants (like Claude, ChatGPT, or your custom LLM app) to connect with external data sources and tools.

Think of MCP as a universal adapter that lets AI models:

Visual Concept

Without MCP:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     Custom API 1    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   LLM   │◄───────────────────►│   Tool 1 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     β”‚          Custom API 2    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
     └─────────────────────────►│   Tool 2 β”‚
                                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     
With MCP:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”      Standard MCP     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   LLM   │◄────────────────────►│ MCP Serverβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                      β”‚
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β–Ό                 β–Ό                 β–Ό
                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”
                β”‚ Tool 1 β”‚      β”‚ Tool 2 β”‚      β”‚Databaseβ”‚
                β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Key Components

  1. MCP Servers: Expose resources (files, data, tools) to AI models
  2. MCP Clients: AI applications that connect to MCP servers
  3. Resources: Data sources like files, databases, APIs
  4. Tools: Functions the AI can call (search, calculate, transform)
  5. Prompts: Pre-configured templates for common tasks

2. Why MCP Changes Everything

Before MCP: The Problems

  1. Every integration is custom: Each tool needs its own API integration
  2. No standardization: Every developer reinvents the wheel
  3. Limited context: AI can’t easily access your local data
  4. Poor reusability: Can’t share integrations between AI apps

After MCP: The Solutions

  1. Write once, use everywhere: One MCP server works with any MCP client
  2. Standardized protocol: Common patterns for all integrations
  3. Rich context: AI can access any data you expose
  4. Community ecosystem: Share and reuse MCP servers

Real Impact Examples

3. Core Concepts & Architecture

The MCP Protocol Stack

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            Application Layer                 β”‚
β”‚         (Your AI App / Claude Desktop)       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚            MCP Client Layer                  β”‚
β”‚    (Handles protocol, connection, auth)      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚           Transport Layer                    β”‚
β”‚     (stdio, HTTP, WebSocket - planned)       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚            MCP Server Layer                  β”‚
β”‚    (Exposes resources, tools, prompts)       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚           Resource Layer                     β”‚
β”‚   (Files, DBs, APIs, External Services)      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Concepts Explained

1. Resources

Resources are data sources your AI can read from:

// Example resource definition
{
  uri: "file:///home/user/projects/data.json",
  name: "Project Data",
  description: "Current project configuration and data",
  mimeType: "application/json"
}

2. Tools

Tools are functions your AI can execute:

// Example tool definition
{
  name: "calculate_metrics",
  description: "Calculate statistics for a dataset",
  inputSchema: {
    type: "object",
    properties: {
      dataset: { type: "string" },
      metrics: { type: "array", items: { type: "string" } }
    }
  }
}

3. Prompts

Prompts are reusable templates:

// Example prompt definition
{
  name: "analyze_code",
  description: "Analyze code for improvements",
  arguments: [
    {
      name: "language",
      description: "Programming language",
      required: true
    }
  ]
}

Message Flow

1. Client β†’ Server: Initialize connection
2. Server β†’ Client: Capabilities announcement
3. Client β†’ Server: List available resources
4. Server β†’ Client: Resource list
5. Client β†’ Server: Execute tool with parameters
6. Server β†’ Client: Tool execution result

4. Complete Setup Guide

Prerequisites

Before starting, ensure you have:

Option 1: TypeScript/JavaScript Setup

# Create a new project
mkdir my-mcp-server
cd my-mcp-server

# Initialize package.json
npm init -y

# Install MCP SDK
npm install @modelcontextprotocol/sdk

# Install development dependencies
npm install -D typescript @types/node tsx

# Initialize TypeScript
npx tsc --init

Create tsconfig.json:

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "commonjs",
    "lib": ["ES2022"],
    "outDir": "./dist",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "resolveJsonModule": true,
    "declaration": true,
    "declarationMap": true
  },
  "include": ["src/**/*"],
  "exclude": ["node_modules", "dist"]
}

Option 2: Python Setup

# Create a new project
mkdir my-mcp-server
cd my-mcp-server

# Create virtual environment
python -m venv venv

# Activate virtual environment
# On Windows:
venv\Scripts\activate
# On macOS/Linux:
source venv/bin/activate

# Install MCP SDK
pip install mcp

# Install additional dependencies
pip install python-dotenv aiofiles

Setting Up Claude Desktop

  1. Install Claude Desktop from claude.ai

  2. Configure MCP by editing the config file:

macOS: ~/Library/Application Support/Claude/claude_desktop_config.json Windows: %APPDATA%\Claude\claude_desktop_config.json

{
  "mcpServers": {
    "my-server": {
      "command": "node",
      "args": ["/absolute/path/to/your/server.js"]
    }
  }
}

5. Building Your First MCP Server

Example 1: Basic File System Server (TypeScript)

Create src/server.ts:

#!/usr/bin/env node
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  CallToolRequestSchema,
  ErrorCode,
  ListResourcesRequestSchema,
  ListToolsRequestSchema,
  McpError,
  ReadResourceRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import fs from 'fs/promises';
import path from 'path';

// Define the base directory for file operations
const BASE_DIR = process.env.MCP_BASE_DIR || process.cwd();

// Create server instance
const server = new Server(
  {
    name: 'filesystem-server',
    version: '1.0.0',
  },
  {
    capabilities: {
      resources: {},
      tools: {},
    },
  }
);

// List available resources (files in the directory)
server.setRequestHandler(ListResourcesRequestSchema, async () => {
  try {
    const files = await fs.readdir(BASE_DIR);
    const resources = await Promise.all(
      files.map(async (file) => {
        const filePath = path.join(BASE_DIR, file);
        const stats = await fs.stat(filePath);
        
        if (stats.isFile()) {
          return {
            uri: `file://${filePath}`,
            name: file,
            description: `File: ${file} (${stats.size} bytes)`,
            mimeType: getMimeType(file),
          };
        }
        return null;
      })
    );

    return {
      resources: resources.filter(r => r !== null),
    };
  } catch (error) {
    throw new McpError(
      ErrorCode.InternalError,
      `Failed to list resources: ${error}`
    );
  }
});

// Read resource content
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const { uri } = request.params;
  
  if (!uri.startsWith('file://')) {
    throw new McpError(
      ErrorCode.InvalidRequest,
      'Only file:// URIs are supported'
    );
  }

  const filePath = uri.slice(7); // Remove 'file://'
  
  // Security check: ensure file is within BASE_DIR
  const resolvedPath = path.resolve(filePath);
  const resolvedBase = path.resolve(BASE_DIR);
  
  if (!resolvedPath.startsWith(resolvedBase)) {
    throw new McpError(
      ErrorCode.InvalidRequest,
      'Access denied: file outside base directory'
    );
  }

  try {
    const content = await fs.readFile(filePath, 'utf-8');
    return {
      contents: [
        {
          uri,
          mimeType: getMimeType(filePath),
          text: content,
        },
      ],
    };
  } catch (error) {
    throw new McpError(
      ErrorCode.InternalError,
      `Failed to read file: ${error}`
    );
  }
});

// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: 'create_file',
        description: 'Create a new file with specified content',
        inputSchema: {
          type: 'object',
          properties: {
            filename: {
              type: 'string',
              description: 'Name of the file to create',
            },
            content: {
              type: 'string',
              description: 'Content to write to the file',
            },
          },
          required: ['filename', 'content'],
        },
      },
      {
        name: 'search_files',
        description: 'Search for files containing specific text',
        inputSchema: {
          type: 'object',
          properties: {
            query: {
              type: 'string',
              description: 'Text to search for',
            },
            extension: {
              type: 'string',
              description: 'File extension to filter (optional)',
            },
          },
          required: ['query'],
        },
      },
    ],
  };
});

// Handle tool execution
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  switch (name) {
    case 'create_file': {
      const { filename, content } = args as { filename: string; content: string };
      const filePath = path.join(BASE_DIR, filename);
      
      // Security check
      const resolvedPath = path.resolve(filePath);
      const resolvedBase = path.resolve(BASE_DIR);
      
      if (!resolvedPath.startsWith(resolvedBase)) {
        throw new McpError(
          ErrorCode.InvalidRequest,
          'Cannot create file outside base directory'
        );
      }

      try {
        await fs.writeFile(filePath, content, 'utf-8');
        return {
          content: [
            {
              type: 'text',
              text: `Successfully created file: ${filename}`,
            },
          ],
        };
      } catch (error) {
        throw new McpError(
          ErrorCode.InternalError,
          `Failed to create file: ${error}`
        );
      }
    }

    case 'search_files': {
      const { query, extension } = args as { query: string; extension?: string };
      
      try {
        const results = await searchFiles(BASE_DIR, query, extension);
        return {
          content: [
            {
              type: 'text',
              text: results.length > 0
                ? `Found ${results.length} matches:\n${results.join('\n')}`
                : 'No matches found',
            },
          ],
        };
      } catch (error) {
        throw new McpError(
          ErrorCode.InternalError,
          `Search failed: ${error}`
        );
      }
    }

    default:
      throw new McpError(
        ErrorCode.MethodNotFound,
        `Unknown tool: ${name}`
      );
  }
});

// Helper functions
function getMimeType(filename: string): string {
  const ext = path.extname(filename).toLowerCase();
  const mimeTypes: { [key: string]: string } = {
    '.txt': 'text/plain',
    '.json': 'application/json',
    '.js': 'application/javascript',
    '.ts': 'application/typescript',
    '.py': 'text/x-python',
    '.md': 'text/markdown',
    '.html': 'text/html',
    '.css': 'text/css',
    '.csv': 'text/csv',
    '.xml': 'application/xml',
    '.yaml': 'application/yaml',
    '.yml': 'application/yaml',
  };
  return mimeTypes[ext] || 'text/plain';
}

async function searchFiles(
  dir: string,
  query: string,
  extension?: string
): Promise<string[]> {
  const results: string[] = [];
  const files = await fs.readdir(dir);

  for (const file of files) {
    const filePath = path.join(dir, file);
    const stats = await fs.stat(filePath);

    if (stats.isFile()) {
      if (extension && !file.endsWith(extension)) {
        continue;
      }

      try {
        const content = await fs.readFile(filePath, 'utf-8');
        if (content.toLowerCase().includes(query.toLowerCase())) {
          results.push(`${file}: ${getMatchContext(content, query)}`);
        }
      } catch (error) {
        // Skip files that can't be read
      }
    }
  }

  return results;
}

function getMatchContext(content: string, query: string): string {
  const index = content.toLowerCase().indexOf(query.toLowerCase());
  if (index === -1) return '';
  
  const start = Math.max(0, index - 30);
  const end = Math.min(content.length, index + query.length + 30);
  
  return '...' + content.slice(start, end).replace(/\n/g, ' ') + '...';
}

// Start the server
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error('MCP Server running on stdio');
}

main().catch((error) => {
  console.error('Server error:', error);
  process.exit(1);
});

Example 2: Database Query Server (Python)

Create server.py:

#!/usr/bin/env python3
import asyncio
import json
import sqlite3
from typing import Any, Dict, List, Optional
import mcp.types as types
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio

# Initialize SQLite connection
DB_PATH = "example.db"

def init_database():
    """Initialize example database with sample data"""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Create tables
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            price REAL NOT NULL,
            stock INTEGER DEFAULT 0
        )
    ''')
    
    # Insert sample data if tables are empty
    cursor.execute("SELECT COUNT(*) FROM users")
    if cursor.fetchone()[0] == 0:
        sample_users = [
            ("Alice Johnson", "alice@example.com"),
            ("Bob Smith", "bob@example.com"),
            ("Charlie Brown", "charlie@example.com")
        ]
        cursor.executemany("INSERT INTO users (name, email) VALUES (?, ?)", sample_users)
        
        sample_products = [
            ("Laptop", 999.99, 10),
            ("Mouse", 29.99, 50),
            ("Keyboard", 79.99, 30),
            ("Monitor", 299.99, 15)
        ]
        cursor.executemany("INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", sample_products)
    
    conn.commit()
    conn.close()

class DatabaseServer:
    def __init__(self):
        self.server = Server("database-server")
        init_database()
        self._setup_handlers()
    
    def _setup_handlers(self):
        @self.server.list_resources()
        async def handle_list_resources() -> list[types.Resource]:
            return [
                types.Resource(
                    uri="db://users",
                    name="Users Table",
                    description="User information database",
                    mimeType="application/json",
                ),
                types.Resource(
                    uri="db://products",
                    name="Products Table",
                    description="Product inventory database",
                    mimeType="application/json",
                ),
            ]
        
        @self.server.read_resource()
        async def handle_read_resource(uri: str) -> str:
            conn = sqlite3.connect(DB_PATH)
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            
            try:
                if uri == "db://users":
                    cursor.execute("SELECT * FROM users")
                    rows = cursor.fetchall()
                    data = [dict(row) for row in rows]
                    return json.dumps(data, indent=2)
                
                elif uri == "db://products":
                    cursor.execute("SELECT * FROM products")
                    rows = cursor.fetchall()
                    data = [dict(row) for row in rows]
                    return json.dumps(data, indent=2)
                
                else:
                    raise ValueError(f"Unknown resource: {uri}")
            
            finally:
                conn.close()
        
        @self.server.list_tools()
        async def handle_list_tools() -> list[types.Tool]:
            return [
                types.Tool(
                    name="query_database",
                    description="Execute a SELECT query on the database",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "SQL SELECT query to execute",
                            },
                        },
                        "required": ["query"],
                    },
                ),
                types.Tool(
                    name="add_user",
                    description="Add a new user to the database",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "name": {
                                "type": "string",
                                "description": "User's full name",
                            },
                            "email": {
                                "type": "string",
                                "description": "User's email address",
                            },
                        },
                        "required": ["name", "email"],
                    },
                ),
                types.Tool(
                    name="update_product_stock",
                    description="Update product stock quantity",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "product_id": {
                                "type": "integer",
                                "description": "Product ID",
                            },
                            "stock": {
                                "type": "integer",
                                "description": "New stock quantity",
                            },
                        },
                        "required": ["product_id", "stock"],
                    },
                ),
            ]
        
        @self.server.call_tool()
        async def handle_call_tool(
            name: str, arguments: Optional[Dict[str, Any]]
        ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
            conn = sqlite3.connect(DB_PATH)
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            
            try:
                if name == "query_database":
                    query = arguments.get("query", "")
                    
                    # Security: Only allow SELECT queries
                    if not query.strip().upper().startswith("SELECT"):
                        return [types.TextContent(
                            type="text",
                            text="Error: Only SELECT queries are allowed"
                        )]
                    
                    try:
                        cursor.execute(query)
                        rows = cursor.fetchall()
                        
                        if rows:
                            # Convert rows to list of dicts
                            data = [dict(row) for row in rows]
                            result = json.dumps(data, indent=2)
                            
                            return [types.TextContent(
                                type="text",
                                text=f"Query executed successfully. Results:\n{result}"
                            )]
                        else:
                            return [types.TextContent(
                                type="text",
                                text="Query executed successfully. No results found."
                            )]
                    
                    except sqlite3.Error as e:
                        return [types.TextContent(
                            type="text",
                            text=f"Query error: {str(e)}"
                        )]
                
                elif name == "add_user":
                    name = arguments.get("name", "")
                    email = arguments.get("email", "")
                    
                    try:
                        cursor.execute(
                            "INSERT INTO users (name, email) VALUES (?, ?)",
                            (name, email)
                        )
                        conn.commit()
                        
                        return [types.TextContent(
                            type="text",
                            text=f"Successfully added user: {name} ({email})"
                        )]
                    
                    except sqlite3.IntegrityError:
                        return [types.TextContent(
                            type="text",
                            text=f"Error: User with email {email} already exists"
                        )]
                
                elif name == "update_product_stock":
                    product_id = arguments.get("product_id", 0)
                    stock = arguments.get("stock", 0)
                    
                    cursor.execute(
                        "UPDATE products SET stock = ? WHERE id = ?",
                        (stock, product_id)
                    )
                    
                    if cursor.rowcount > 0:
                        conn.commit()
                        return [types.TextContent(
                            type="text",
                            text=f"Successfully updated stock for product ID {product_id} to {stock}"
                        )]
                    else:
                        return [types.TextContent(
                            type="text",
                            text=f"Error: Product with ID {product_id} not found"
                        )]
                
                else:
                    return [types.TextContent(
                        type="text",
                        text=f"Unknown tool: {name}"
                    )]
            
            finally:
                conn.close()
        
        @self.server.list_prompts()
        async def handle_list_prompts() -> list[types.Prompt]:
            return [
                types.Prompt(
                    name="analyze_sales",
                    description="Generate SQL query to analyze sales data",
                    arguments=[
                        types.PromptArgument(
                            name="time_period",
                            description="Time period for analysis (e.g., 'last month', 'Q1 2024')",
                            required=True,
                        ),
                    ],
                ),
                types.Prompt(
                    name="inventory_report",
                    description="Generate inventory status report",
                    arguments=[
                        types.PromptArgument(
                            name="threshold",
                            description="Low stock threshold",
                            required=False,
                        ),
                    ],
                ),
            ]
        
        @self.server.get_prompt()
        async def handle_get_prompt(
            name: str, arguments: Optional[Dict[str, str]]
        ) -> types.GetPromptResult:
            if name == "analyze_sales":
                time_period = arguments.get("time_period", "all time")
                return types.GetPromptResult(
                    description="Generate sales analysis query",
                    messages=[
                        types.PromptMessage(
                            role="user",
                            content=types.TextContent(
                                type="text",
                                text=f"Generate a SQL query to analyze product sales for {time_period}. Include total revenue, units sold, and top-selling products."
                            ),
                        )
                    ],
                )
            
            elif name == "inventory_report":
                threshold = arguments.get("threshold", "10")
                return types.GetPromptResult(
                    description="Generate inventory report",
                    messages=[
                        types.PromptMessage(
                            role="user",
                            content=types.TextContent(
                                type="text",
                                text=f"Create an inventory report showing all products with stock below {threshold} units. Include current stock levels and suggested reorder quantities."
                            ),
                        )
                    ],
                )
            
            else:
                raise ValueError(f"Unknown prompt: {name}")
    
    async def run(self):
        async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
            await self.server.run(
                read_stream,
                write_stream,
                InitializationOptions(
                    server_name="database-server",
                    server_version="1.0.0",
                    capabilities=self.server.get_capabilities(
                        notification_options=NotificationOptions(),
                        experimental_capabilities={},
                    ),
                ),
            )

if __name__ == "__main__":
    server = DatabaseServer()
    asyncio.run(server.run())

Example 3: API Integration Server (TypeScript)

Create src/weather-server.ts:

#!/usr/bin/env node
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
  CallToolRequestSchema,
  ErrorCode,
  ListToolsRequestSchema,
  McpError,
} from '@modelcontextprotocol/sdk/types.js';
import axios from 'axios';

// You need to get an API key from OpenWeatherMap
const API_KEY = process.env.OPENWEATHER_API_KEY || 'your_api_key_here';
const BASE_URL = 'https://api.openweathermap.org/data/2.5';

interface WeatherData {
  temperature: number;
  feels_like: number;
  humidity: number;
  description: string;
  wind_speed: number;
  city: string;
  country: string;
}

const server = new Server(
  {
    name: 'weather-server',
    version: '1.0.0',
  },
  {
    capabilities: {
      tools: {},
    },
  }
);

// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: 'get_weather',
        description: 'Get current weather for a city',
        inputSchema: {
          type: 'object',
          properties: {
            city: {
              type: 'string',
              description: 'City name',
            },
            country: {
              type: 'string',
              description: 'Country code (e.g., US, UK)',
              default: '',
            },
            units: {
              type: 'string',
              enum: ['metric', 'imperial'],
              description: 'Temperature units',
              default: 'metric',
            },
          },
          required: ['city'],
        },
      },
      {
        name: 'get_forecast',
        description: 'Get 5-day weather forecast',
        inputSchema: {
          type: 'object',
          properties: {
            city: {
              type: 'string',
              description: 'City name',
            },
            country: {
              type: 'string',
              description: 'Country code (e.g., US, UK)',
              default: '',
            },
            units: {
              type: 'string',
              enum: ['metric', 'imperial'],
              description: 'Temperature units',
              default: 'metric',
            },
          },
          required: ['city'],
        },
      },
      {
        name: 'compare_weather',
        description: 'Compare weather between multiple cities',
        inputSchema: {
          type: 'object',
          properties: {
            cities: {
              type: 'array',
              items: { type: 'string' },
              description: 'List of cities to compare',
              minItems: 2,
              maxItems: 5,
            },
            units: {
              type: 'string',
              enum: ['metric', 'imperial'],
              description: 'Temperature units',
              default: 'metric',
            },
          },
          required: ['cities'],
        },
      },
    ],
  };
});

// Handle tool execution
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  try {
    switch (name) {
      case 'get_weather': {
        const { city, country = '', units = 'metric' } = args as {
          city: string;
          country?: string;
          units?: string;
        };

        const weather = await getCurrentWeather(city, country, units);
        
        return {
          content: [
            {
              type: 'text',
              text: formatWeatherReport(weather, units),
            },
          ],
        };
      }

      case 'get_forecast': {
        const { city, country = '', units = 'metric' } = args as {
          city: string;
          country?: string;
          units?: string;
        };

        const forecast = await getForecast(city, country, units);
        
        return {
          content: [
            {
              type: 'text',
              text: formatForecastReport(forecast, units),
            },
          ],
        };
      }

      case 'compare_weather': {
        const { cities, units = 'metric' } = args as {
          cities: string[];
          units?: string;
        };

        const weatherData = await Promise.all(
          cities.map(city => getCurrentWeather(city, '', units))
        );

        return {
          content: [
            {
              type: 'text',
              text: formatComparisonReport(weatherData, units),
            },
          ],
        };
      }

      default:
        throw new McpError(
          ErrorCode.MethodNotFound,
          `Unknown tool: ${name}`
        );
    }
  } catch (error: any) {
    if (error instanceof McpError) {
      throw error;
    }
    
    throw new McpError(
      ErrorCode.InternalError,
      `Weather API error: ${error.message}`
    );
  }
});

// Weather API functions
async function getCurrentWeather(
  city: string,
  country: string,
  units: string
): Promise<WeatherData> {
  const location = country ? `${city},${country}` : city;
  
  try {
    const response = await axios.get(`${BASE_URL}/weather`, {
      params: {
        q: location,
        appid: API_KEY,
        units: units,
      },
    });

    const data = response.data;
    
    return {
      temperature: Math.round(data.main.temp),
      feels_like: Math.round(data.main.feels_like),
      humidity: data.main.humidity,
      description: data.weather[0].description,
      wind_speed: data.wind.speed,
      city: data.name,
      country: data.sys.country,
    };
  } catch (error: any) {
    if (error.response?.status === 404) {
      throw new Error(`City not found: ${location}`);
    }
    throw error;
  }
}

async function getForecast(
  city: string,
  country: string,
  units: string
): Promise<any[]> {
  const location = country ? `${city},${country}` : city;
  
  try {
    const response = await axios.get(`${BASE_URL}/forecast`, {
      params: {
        q: location,
        appid: API_KEY,
        units: units,
        cnt: 40, // 5 days * 8 (3-hour intervals)
      },
    });

    // Group forecasts by day
    const dailyForecasts = new Map<string, any[]>();
    
    response.data.list.forEach((item: any) => {
      const date = new Date(item.dt * 1000).toLocaleDateString();
      if (!dailyForecasts.has(date)) {
        dailyForecasts.set(date, []);
      }
      dailyForecasts.get(date)!.push(item);
    });

    // Calculate daily summaries
    const forecasts = Array.from(dailyForecasts.entries()).map(([date, items]) => {
      const temps = items.map(item => item.main.temp);
      const descriptions = items.map(item => item.weather[0].description);
      
      return {
        date,
        min_temp: Math.round(Math.min(...temps)),
        max_temp: Math.round(Math.max(...temps)),
        description: getMostCommonElement(descriptions),
        city: response.data.city.name,
        country: response.data.city.country,
      };
    });

    return forecasts.slice(0, 5); // Return 5 days
  } catch (error: any) {
    if (error.response?.status === 404) {
      throw new Error(`City not found: ${location}`);
    }
    throw error;
  }
}

// Helper functions
function formatWeatherReport(weather: WeatherData, units: string): string {
  const tempUnit = units === 'metric' ? 'Β°C' : 'Β°F';
  const speedUnit = units === 'metric' ? 'm/s' : 'mph';
  
  return `
🌀️ **Current Weather in ${weather.city}, ${weather.country}**

🌑️ Temperature: ${weather.temperature}${tempUnit} (feels like ${weather.feels_like}${tempUnit})
☁️ Conditions: ${weather.description}
πŸ’§ Humidity: ${weather.humidity}%
πŸ’¨ Wind Speed: ${weather.wind_speed} ${speedUnit}
  `.trim();
}

function formatForecastReport(forecasts: any[], units: string): string {
  const tempUnit = units === 'metric' ? 'Β°C' : 'Β°F';
  
  let report = `πŸ“… **5-Day Forecast for ${forecasts[0].city}, ${forecasts[0].country}**\n\n`;
  
  forecasts.forEach(forecast => {
    report += `**${forecast.date}**\n`;
    report += `  🌑️ ${forecast.min_temp}-${forecast.max_temp}${tempUnit}\n`;
    report += `  ☁️ ${forecast.description}\n\n`;
  });
  
  return report.trim();
}

function formatComparisonReport(weatherData: WeatherData[], units: string): string {
  const tempUnit = units === 'metric' ? 'Β°C' : 'Β°F';
  
  let report = '🌍 **Weather Comparison**\n\n';
  
  // Sort by temperature
  weatherData.sort((a, b) => b.temperature - a.temperature);
  
  weatherData.forEach((weather, index) => {
    const emoji = index === 0 ? 'πŸ₯‡' : index === 1 ? 'πŸ₯ˆ' : index === 2 ? 'πŸ₯‰' : 'πŸ“';
    
    report += `${emoji} **${weather.city}, ${weather.country}**\n`;
    report += `   🌑️ ${weather.temperature}${tempUnit} - ${weather.description}\n`;
    report += `   πŸ’§ Humidity: ${weather.humidity}%\n\n`;
  });
  
  return report.trim();
}

function getMostCommonElement(arr: string[]): string {
  const counts = new Map<string, number>();
  
  arr.forEach(element => {
    counts.set(element, (counts.get(element) || 0) + 1);
  });
  
  let maxCount = 0;
  let mostCommon = arr[0];
  
  counts.forEach((count, element) => {
    if (count > maxCount) {
      maxCount = count;
      mostCommon = element;
    }
  });
  
  return mostCommon;
}

// Start the server
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error('Weather MCP Server running on stdio');
}

main().catch((error) => {
  console.error('Server error:', error);
  process.exit(1);
});

6. Resources, Tools & Prompts

Understanding the Three Primitives

Resources: Your Data Sources

Resources are read-only data that your AI can access:

// Example: Exposing configuration files
const configResource = {
  uri: "config://app/settings",
  name: "Application Settings",
  description: "Current app configuration",
  mimeType: "application/json"
};

// Example: Database views
const dbResource = {
  uri: "db://analytics/user_metrics",
  name: "User Analytics",
  description: "Read-only view of user metrics",
  mimeType: "application/json"
};

// Example: API endpoints
const apiResource = {
  uri: "api://github/repos",
  name: "GitHub Repositories",
  description: "List of user's GitHub repos",
  mimeType: "application/json"
};

Tools: Your Action Handlers

Tools allow the AI to perform actions:

// Example: System administration tool
const systemTool = {
  name: "restart_service",
  description: "Restart a system service",
  inputSchema: {
    type: "object",
    properties: {
      service_name: {
        type: "string",
        enum: ["web", "database", "cache"],
        description: "Service to restart"
      },
      force: {
        type: "boolean",
        default: false,
        description: "Force restart without graceful shutdown"
      }
    },
    required: ["service_name"]
  }
};

// Example: Data processing tool
const dataTool = {
  name: "transform_dataset",
  description: "Apply transformations to a dataset",
  inputSchema: {
    type: "object",
    properties: {
      dataset_id: { type: "string" },
      transformations: {
        type: "array",
        items: {
          type: "object",
          properties: {
            type: { type: "string", enum: ["filter", "map", "aggregate"] },
            config: { type: "object" }
          }
        }
      }
    },
    required: ["dataset_id", "transformations"]
  }
};

Prompts: Your Templates

Prompts provide reusable conversation starters:

// Example: Code review prompt
const codeReviewPrompt = {
  name: "code_review",
  description: "Comprehensive code review template",
  arguments: [
    {
      name: "language",
      description: "Programming language",
      required: true
    },
    {
      name: "focus_areas",
      description: "Specific areas to focus on",
      required: false
    }
  ],
  template: `Please review the following {language} code focusing on:
- Code quality and best practices
- Performance implications
- Security vulnerabilities
- Maintainability
{focus_areas ? "Additional focus: " + focus_areas : ""}`
};

// Example: Data analysis prompt
const analysisPrompt = {
  name: "data_analysis",
  description: "Statistical data analysis template",
  arguments: [
    {
      name: "dataset",
      description: "Dataset identifier",
      required: true
    },
    {
      name: "hypothesis",
      description: "Hypothesis to test",
      required: false
    }
  ]
};

Best Practices for Each Primitive

Resource Best Practices

  1. Use Clear URI Schemes:
    Good: file:///path/to/file, db://table/users, api://service/endpoint
    Bad: /path/to/file, users, endpoint
    
  2. Provide Accurate MIME Types:
    const mimeTypes = {
      '.json': 'application/json',
      '.csv': 'text/csv',
      '.md': 'text/markdown',
      '.sql': 'application/sql'
    };
    
  3. Implement Pagination for Large Resources:
    // Return partial data with continuation tokens
    return {
      contents: [{
        uri: resource.uri,
        mimeType: 'application/json',
        text: JSON.stringify({
          data: items.slice(0, 100),
          next_page: items.length > 100 ? 'page2' : null
        })
      }]
    };
    

Tool Best Practices

  1. Validate Input Thoroughly:
    async function handleToolCall(name: string, args: any) {
      // Always validate
      const validated = validateSchema(args, tool.inputSchema);
      if (!validated.valid) {
        throw new McpError(
          ErrorCode.InvalidParams,
          `Invalid parameters: ${validated.errors.join(', ')}`
        );
      }
    }
    
  2. Implement Idempotency:
    // Use request IDs to prevent duplicate operations
    const requestId = args.request_id || generateId();
    if (processedRequests.has(requestId)) {
      return processedRequests.get(requestId);
    }
    
  3. Provide Progress for Long Operations:
    // Return progress updates
    return {
      content: [{
        type: 'text',
        text: 'Operation started. Check status with get_operation_status tool.'
      }],
      isPartial: true,
      operationId: operationId
    };
    

Prompt Best Practices

  1. Make Prompts Composable:
    // Base prompts that can be extended
    const basePrompts = {
      error_analysis: "Analyze this error and suggest fixes",
      optimization: "Identify optimization opportunities",
      documentation: "Generate comprehensive documentation"
    };
    
  2. Include Examples in Prompts:
    const prompt = {
      name: "api_design",
      template: `Design a REST API for {purpose}.
         
    Example good practice:
    GET /api/v1/users?page=1&limit=20
       
    Include: endpoints, methods, request/response formats`
    };
    

7. Client Integration

Integrating with Claude Desktop

  1. Update Configuration:
    {
      "mcpServers": {
     "filesystem": {
       "command": "node",
       "args": ["/path/to/filesystem-server.js"],
       "env": {
         "MCP_BASE_DIR": "/Users/me/projects"
       }
     },
     "database": {
       "command": "python",
       "args": ["/path/to/database-server.py"],
       "env": {
         "DB_PATH": "/Users/me/data/app.db"
       }
     },
     "weather": {
       "command": "npx",
       "args": ["ts-node", "/path/to/weather-server.ts"],
       "env": {
         "OPENWEATHER_API_KEY": "your_api_key"
       }
     }
      }
    }
    
  2. Restart Claude Desktop to load the new servers

  3. Test the Integration:
    • Ask: β€œWhat files are in my projects directory?”
    • Ask: β€œShow me all users in the database”
    • Ask: β€œWhat’s the weather in Tokyo?”

Building a Custom Client

import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { spawn } from 'child_process';

async function createMCPClient() {
  // Spawn the server process
  const serverProcess = spawn('node', ['path/to/server.js'], {
    stdio: ['pipe', 'pipe', 'process.stderr'],
  });

  // Create transport
  const transport = new StdioClientTransport({
    reader: serverProcess.stdout,
    writer: serverProcess.stdin,
  });

  // Create client
  const client = new Client({
    name: 'my-mcp-client',
    version: '1.0.0',
  }, {
    capabilities: {}
  });

  // Connect
  await client.connect(transport);

  // List available resources
  const resources = await client.listResources();
  console.log('Available resources:', resources);

  // List available tools
  const tools = await client.listTools();
  console.log('Available tools:', tools);

  // Call a tool
  const result = await client.callTool({
    name: 'create_file',
    arguments: {
      filename: 'test.txt',
      content: 'Hello from MCP!'
    }
  });
  console.log('Tool result:', result);

  return client;
}

// Usage
const client = await createMCPClient();

Integration with LangChain

from langchain.tools import Tool
from mcp import Client
import asyncio

class MCPTool(Tool):
    """Wrapper to use MCP tools in LangChain"""
    
    def __init__(self, mcp_client: Client, tool_name: str):
        self.client = mcp_client
        self.tool_name = tool_name
        
        # Get tool info from MCP
        tool_info = asyncio.run(self._get_tool_info())
        
        super().__init__(
            name=tool_info['name'],
            description=tool_info['description'],
            func=self._run
        )
    
    async def _get_tool_info(self):
        tools = await self.client.list_tools()
        for tool in tools:
            if tool['name'] == self.tool_name:
                return tool
        raise ValueError(f"Tool {self.tool_name} not found")
    
    def _run(self, *args, **kwargs):
        """Synchronous wrapper for async tool call"""
        return asyncio.run(self._arun(*args, **kwargs))
    
    async def _arun(self, *args, **kwargs):
        """Execute MCP tool"""
        result = await self.client.call_tool(
            name=self.tool_name,
            arguments=kwargs
        )
        return result['content'][0]['text']

# Create MCP client
mcp_client = await create_mcp_client()

# Wrap MCP tools for LangChain
weather_tool = MCPTool(mcp_client, 'get_weather')
db_tool = MCPTool(mcp_client, 'query_database')

# Use in LangChain agent
from langchain.agents import initialize_agent
from langchain.llms import OpenAI

llm = OpenAI(temperature=0)
agent = initialize_agent(
    tools=[weather_tool, db_tool],
    llm=llm,
    agent="zero-shot-react-description"
)

# Run agent
result = agent.run("What's the weather in Paris? Also show me all users from France.")

8. Real-World Examples

Example 1: Git Repository Manager

// git-server.ts
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { exec } from 'child_process';
import { promisify } from 'util';

const execAsync = promisify(exec);

class GitServer {
  private server: Server;
  private repoPath: string;

  constructor(repoPath: string) {
    this.repoPath = repoPath;
    this.server = new Server({
      name: 'git-manager',
      version: '1.0.0',
    });
    
    this.setupHandlers();
  }

  private setupHandlers() {
    // List tools
    this.server.setRequestHandler(ListToolsRequestSchema, async () => {
      return {
        tools: [
          {
            name: 'git_status',
            description: 'Get current git status',
            inputSchema: { type: 'object', properties: {} }
          },
          {
            name: 'git_log',
            description: 'Get commit history',
            inputSchema: {
              type: 'object',
              properties: {
                limit: { type: 'number', default: 10 },
                oneline: { type: 'boolean', default: false }
              }
            }
          },
          {
            name: 'git_diff',
            description: 'Show changes in working directory',
            inputSchema: {
              type: 'object',
              properties: {
                staged: { type: 'boolean', default: false }
              }
            }
          },
          {
            name: 'git_branch',
            description: 'List or create branches',
            inputSchema: {
              type: 'object',
              properties: {
                create: { type: 'string' },
                delete: { type: 'string' },
                list: { type: 'boolean', default: true }
              }
            }
          }
        ]
      };
    });

    // Handle tool calls
    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      const { name, arguments: args } = request.params;

      try {
        let result: string;

        switch (name) {
          case 'git_status':
            result = await this.gitCommand('status --porcelain');
            break;

          case 'git_log':
            const limit = args.limit || 10;
            const format = args.oneline ? '--oneline' : '';
            result = await this.gitCommand(`log -${limit} ${format}`);
            break;

          case 'git_diff':
            const staged = args.staged ? '--cached' : '';
            result = await this.gitCommand(`diff ${staged}`);
            break;

          case 'git_branch':
            if (args.create) {
              result = await this.gitCommand(`checkout -b ${args.create}`);
            } else if (args.delete) {
              result = await this.gitCommand(`branch -d ${args.delete}`);
            } else {
              result = await this.gitCommand('branch -a');
            }
            break;

          default:
            throw new Error(`Unknown tool: ${name}`);
        }

        return {
          content: [{
            type: 'text',
            text: result || 'Command completed successfully'
          }]
        };
      } catch (error: any) {
        return {
          content: [{
            type: 'text',
            text: `Git error: ${error.message}`
          }]
        };
      }
    });
  }

  private async gitCommand(command: string): Promise<string> {
    const { stdout, stderr } = await execAsync(
      `git ${command}`,
      { cwd: this.repoPath }
    );
    
    if (stderr && !stderr.includes('warning')) {
      throw new Error(stderr);
    }
    
    return stdout;
  }
}

Example 2: Slack Integration

# slack_server.py
import asyncio
from typing import Any, Dict, List, Optional
from slack_sdk.web.async_client import AsyncWebClient
from slack_sdk.errors import SlackApiError
import mcp.types as types
from mcp.server import Server

class SlackServer:
    def __init__(self, slack_token: str):
        self.server = Server("slack-integration")
        self.slack = AsyncWebClient(token=slack_token)
        self._setup_handlers()
    
    def _setup_handlers(self):
        @self.server.list_tools()
        async def handle_list_tools() -> list[types.Tool]:
            return [
                types.Tool(
                    name="send_message",
                    description="Send a message to a Slack channel",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "channel": {
                                "type": "string",
                                "description": "Channel name or ID (e.g., #general or C1234567890)"
                            },
                            "text": {
                                "type": "string",
                                "description": "Message text"
                            },
                            "thread_ts": {
                                "type": "string",
                                "description": "Thread timestamp to reply to (optional)"
                            }
                        },
                        "required": ["channel", "text"]
                    }
                ),
                types.Tool(
                    name="search_messages",
                    description="Search for messages in Slack",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {
                                "type": "string",
                                "description": "Search query"
                            },
                            "count": {
                                "type": "integer",
                                "description": "Number of results to return",
                                "default": 10
                            }
                        },
                        "required": ["query"]
                    }
                ),
                types.Tool(
                    name="list_channels",
                    description="List all channels in the workspace",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "types": {
                                "type": "string",
                                "description": "Comma-separated channel types",
                                "default": "public_channel,private_channel"
                            }
                        }
                    }
                ),
                types.Tool(
                    name="get_user_info",
                    description="Get information about a user",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "user_id": {
                                "type": "string",
                                "description": "User ID (e.g., U1234567890)"
                            }
                        },
                        "required": ["user_id"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def handle_call_tool(
            name: str, arguments: Optional[Dict[str, Any]]
        ) -> list[types.TextContent]:
            try:
                if name == "send_message":
                    response = await self.slack.chat_postMessage(
                        channel=arguments["channel"],
                        text=arguments["text"],
                        thread_ts=arguments.get("thread_ts")
                    )
                    
                    return [types.TextContent(
                        type="text",
                        text=f"Message sent successfully. Timestamp: {response['ts']}"
                    )]
                
                elif name == "search_messages":
                    response = await self.slack.search_messages(
                        query=arguments["query"],
                        count=arguments.get("count", 10)
                    )
                    
                    messages = []
                    for match in response["messages"]["matches"]:
                        messages.append(
                            f"[{match['username']}] {match['text'][:100]}..."
                        )
                    
                    result = "\n".join(messages) if messages else "No messages found"
                    
                    return [types.TextContent(type="text", text=result)]
                
                elif name == "list_channels":
                    response = await self.slack.conversations_list(
                        types=arguments.get("types", "public_channel,private_channel")
                    )
                    
                    channels = []
                    for channel in response["channels"]:
                        channels.append(
                            f"#{channel['name']} ({channel['id']})"
                        )
                    
                    return [types.TextContent(
                        type="text",
                        text="\n".join(channels)
                    )]
                
                elif name == "get_user_info":
                    response = await self.slack.users_info(
                        user=arguments["user_id"]
                    )
                    
                    user = response["user"]
                    info = f"""
User: {user['real_name']} (@{user['name']})
Email: {user.get('profile', {}).get('email', 'N/A')}
Title: {user.get('profile', {}).get('title', 'N/A')}
Status: {user.get('profile', {}).get('status_text', 'No status')}
                    """.strip()
                    
                    return [types.TextContent(type="text", text=info)]
                
                else:
                    return [types.TextContent(
                        type="text",
                        text=f"Unknown tool: {name}"
                    )]
            
            except SlackApiError as e:
                return [types.TextContent(
                    type="text",
                    text=f"Slack API error: {e.response['error']}"
                )]

Example 3: Jupyter Notebook Integration

// jupyter-server.ts
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import axios from 'axios';
import WebSocket from 'ws';

interface JupyterConfig {
  baseUrl: string;
  token: string;
}

class JupyterServer {
  private server: Server;
  private config: JupyterConfig;
  private kernels: Map<string, any> = new Map();

  constructor(config: JupyterConfig) {
    this.config = config;
    this.server = new Server({
      name: 'jupyter-integration',
      version: '1.0.0',
    });
    
    this.setupHandlers();
  }

  private setupHandlers() {
    // List resources (notebooks)
    this.server.setRequestHandler(ListResourcesRequestSchema, async () => {
      const notebooks = await this.listNotebooks();
      
      return {
        resources: notebooks.map(nb => ({
          uri: `notebook://${nb.path}`,
          name: nb.name,
          description: `Jupyter notebook: ${nb.name}`,
          mimeType: 'application/x-ipynb+json'
        }))
      };
    });

    // Read notebook content
    this.server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
      const { uri } = request.params;
      const path = uri.replace('notebook://', '');
      
      const content = await this.getNotebook(path);
      
      return {
        contents: [{
          uri,
          mimeType: 'application/x-ipynb+json',
          text: JSON.stringify(content, null, 2)
        }]
      };
    });

    // List tools
    this.server.setRequestHandler(ListToolsRequestSchema, async () => {
      return {
        tools: [
          {
            name: 'execute_code',
            description: 'Execute code in a Jupyter kernel',
            inputSchema: {
              type: 'object',
              properties: {
                code: {
                  type: 'string',
                  description: 'Code to execute'
                },
                kernel_name: {
                  type: 'string',
                  description: 'Kernel name (e.g., python3)',
                  default: 'python3'
                }
              },
              required: ['code']
            }
          },
          {
            name: 'create_notebook',
            description: 'Create a new Jupyter notebook',
            inputSchema: {
              type: 'object',
              properties: {
                name: {
                  type: 'string',
                  description: 'Notebook name'
                },
                content: {
                  type: 'object',
                  description: 'Initial notebook content'
                }
              },
              required: ['name']
            }
          },
          {
            name: 'list_kernels',
            description: 'List available Jupyter kernels',
            inputSchema: {
              type: 'object',
              properties: {}
            }
          }
        ]
      };
    });

    // Handle tool execution
    this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
      const { name, arguments: args } = request.params;

      switch (name) {
        case 'execute_code':
          return await this.executeCode(args.code, args.kernel_name);
        
        case 'create_notebook':
          return await this.createNotebook(args.name, args.content);
        
        case 'list_kernels':
          return await this.listKernels();
        
        default:
          throw new Error(`Unknown tool: ${name}`);
      }
    });
  }

  private async executeCode(code: string, kernelName: string = 'python3') {
    // Start or get kernel
    let kernel = this.kernels.get(kernelName);
    if (!kernel) {
      kernel = await this.startKernel(kernelName);
      this.kernels.set(kernelName, kernel);
    }

    // Execute code
    const result = await this.executeInKernel(kernel, code);
    
    return {
      content: [{
        type: 'text',
        text: this.formatExecutionResult(result)
      }]
    };
  }

  private formatExecutionResult(result: any): string {
    let output = '';
    
    // Add stdout
    if (result.stdout) {
      output += `Output:\n${result.stdout}\n`;
    }
    
    // Add result data
    if (result.data) {
      if (result.data['text/plain']) {
        output += `Result:\n${result.data['text/plain']}\n`;
      }
    }
    
    // Add errors
    if (result.error) {
      output += `Error:\n${result.error.ename}: ${result.error.evalue}\n`;
    }
    
    return output || 'Code executed successfully (no output)';
  }

  // ... Additional implementation details
}

9. Advanced Patterns

Pattern 1: Stateful Conversations

// Maintain conversation state across MCP calls
class StatefulMCPServer extends Server {
  private sessions: Map<string, SessionState> = new Map();

  constructor() {
    super({ name: 'stateful-server', version: '1.0.0' });
    this.setupStateManagement();
  }

  private setupStateManagement() {
    // Tool to start a session
    this.addTool({
      name: 'start_session',
      handler: async (args) => {
        const sessionId = generateId();
        this.sessions.set(sessionId, {
          id: sessionId,
          startTime: Date.now(),
          context: {},
          history: []
        });
        
        return {
          sessionId,
          message: 'Session started. Include session_id in future calls.'
        };
      }
    });

    // Stateful tool example
    this.addTool({
      name: 'remember',
      handler: async (args) => {
        const session = this.sessions.get(args.session_id);
        if (!session) {
          throw new Error('Invalid session ID');
        }
        
        // Store information
        session.context[args.key] = args.value;
        session.history.push({
          action: 'remember',
          key: args.key,
          value: args.value,
          timestamp: Date.now()
        });
        
        return {
          message: `Remembered: ${args.key} = ${args.value}`,
          total_memories: Object.keys(session.context).length
        };
      }
    });
  }
}

Pattern 2: Tool Composition

// Compose complex operations from simple tools
class ComposableMCPServer extends Server {
  constructor() {
    super({ name: 'composable-server', version: '1.0.0' });
    this.setupComposableTools();
  }

  private setupComposableTools() {
    // Base tools
    this.addTool({
      name: 'fetch_data',
      handler: async (args) => {
        // Fetch from source
        return { data: await fetchFromSource(args.source) };
      }
    });

    this.addTool({
      name: 'transform_data',
      handler: async (args) => {
        // Apply transformation
        return { data: transform(args.data, args.transformation) };
      }
    });

    this.addTool({
      name: 'validate_data',
      handler: async (args) => {
        // Validate against schema
        return { valid: validate(args.data, args.schema) };
      }
    });

    // Composite tool that uses other tools
    this.addTool({
      name: 'etl_pipeline',
      handler: async (args) => {
        // Compose operations
        const fetchResult = await this.callTool('fetch_data', {
          source: args.source
        });
        
        const transformResult = await this.callTool('transform_data', {
          data: fetchResult.data,
          transformation: args.transformation
        });
        
        const validateResult = await this.callTool('validate_data', {
          data: transformResult.data,
          schema: args.schema
        });
        
        if (!validateResult.valid) {
          throw new Error('Validation failed');
        }
        
        return {
          data: transformResult.data,
          metadata: {
            source: args.source,
            transformations: args.transformation,
            validated: true
          }
        };
      }
    });
  }
}

Pattern 3: Event-Driven MCP

// Implement event streaming and webhooks
class EventDrivenMCPServer extends Server {
  private eventEmitter = new EventEmitter();
  private subscriptions = new Map<string, Set<string>>();

  constructor() {
    super({ name: 'event-server', version: '1.0.0' });
    this.setupEventSystem();
  }

  private setupEventSystem() {
    // Subscribe to events
    this.addTool({
      name: 'subscribe',
      handler: async (args) => {
        const { event_type, webhook_url } = args;
        
        if (!this.subscriptions.has(event_type)) {
          this.subscriptions.set(event_type, new Set());
        }
        
        this.subscriptions.get(event_type)!.add(webhook_url);
        
        // Set up listener
        this.eventEmitter.on(event_type, async (data) => {
          await this.notifySubscribers(event_type, data);
        });
        
        return {
          message: `Subscribed to ${event_type}`,
          subscription_id: generateId()
        };
      }
    });

    // Emit events
    this.addTool({
      name: 'emit_event',
      handler: async (args) => {
        const { event_type, data } = args;
        
        // Emit internally
        this.eventEmitter.emit(event_type, data);
        
        // Notify external subscribers
        await this.notifySubscribers(event_type, data);
        
        return {
          message: `Event ${event_type} emitted`,
          subscriber_count: this.subscriptions.get(event_type)?.size || 0
        };
      }
    });
  }

  private async notifySubscribers(eventType: string, data: any) {
    const subscribers = this.subscriptions.get(eventType);
    if (!subscribers) return;
    
    const notifications = Array.from(subscribers).map(webhook => 
      fetch(webhook, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ event: eventType, data })
      })
    );
    
    await Promise.allSettled(notifications);
  }
}

Pattern 4: Security & Rate Limiting

# Implement security layers
from datetime import datetime, timedelta
from collections import defaultdict
import hashlib
import hmac

class SecureMCPServer:
    def __init__(self, api_keys: dict):
        self.server = Server("secure-server")
        self.api_keys = api_keys  # {key: permissions}
        self.rate_limits = defaultdict(list)
        self._setup_security()
    
    def _setup_security(self):
        # Middleware for authentication
        @self.server.middleware
        async def authenticate(request):
            api_key = request.headers.get("X-API-Key")
            
            if not api_key or api_key not in self.api_keys:
                raise McpError(
                    ErrorCode.Unauthorized,
                    "Invalid API key"
                )
            
            # Add permissions to request context
            request.context["permissions"] = self.api_keys[api_key]
            request.context["api_key"] = api_key
        
        # Middleware for rate limiting
        @self.server.middleware
        async def rate_limit(request):
            api_key = request.context.get("api_key")
            
            # Clean old entries
            cutoff = datetime.now() - timedelta(minutes=1)
            self.rate_limits[api_key] = [
                t for t in self.rate_limits[api_key] if t > cutoff
            ]
            
            # Check rate limit (60 requests per minute)
            if len(self.rate_limits[api_key]) >= 60:
                raise McpError(
                    ErrorCode.RateLimitExceeded,
                    "Rate limit exceeded. Please try again later."
                )
            
            self.rate_limits[api_key].append(datetime.now())
        
        # Protected tool example
        @self.server.tool(
            name="sensitive_operation",
            requires_permission="admin"
        )
        async def handle_sensitive(request, args):
            # Permission check
            permissions = request.context.get("permissions", [])
            if "admin" not in permissions:
                raise McpError(
                    ErrorCode.Forbidden,
                    "Insufficient permissions"
                )
            
            # Audit log
            self.audit_log(
                api_key=request.context["api_key"],
                action="sensitive_operation",
                args=args
            )
            
            # Perform operation
            return {"status": "completed"}

10. Deployment & Production

Deployment Options

Option 1: Standalone Process

# docker-compose.yml
version: '3.8'

services:
  mcp-filesystem:
    build: ./servers/filesystem
    environment:
      - MCP_BASE_DIR=/data
    volumes:
      - ./data:/data:ro
    restart: unless-stopped

  mcp-database:
    build: ./servers/database
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/myapp
    depends_on:
      - db
    restart: unless-stopped

  mcp-router:
    image: mcp/router:latest
    ports:
      - "8080:8080"
    environment:
      - MCP_SERVERS=filesystem:3000,database:3001
    depends_on:
      - mcp-filesystem
      - mcp-database

Option 2: Kubernetes Deployment

# mcp-server-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mcp-server
  labels:
    app: mcp-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mcp-server
  template:
    metadata:
      labels:
        app: mcp-server
    spec:
      containers:
      - name: mcp-server
        image: myregistry/mcp-server:latest
        ports:
        - containerPort: 3000
        env:
        - name: MCP_MODE
          value: "production"
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: mcp-secrets
              key: database-url
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 3000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 3000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: mcp-server-service
spec:
  selector:
    app: mcp-server
  ports:
    - protocol: TCP
      port: 80
      targetPort: 3000
  type: LoadBalancer

Option 3: Serverless Deployment

// serverless-mcp.ts
import { APIGatewayProxyHandler } from 'aws-lambda';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { LambdaTransport } from './lambda-transport';

let server: Server;

const initServer = () => {
  if (!server) {
    server = new Server({
      name: 'serverless-mcp',
      version: '1.0.0'
    });
    
    // Configure server
    setupTools(server);
    setupResources(server);
  }
  return server;
};

export const handler: APIGatewayProxyHandler = async (event, context) => {
  const server = initServer();
  const transport = new LambdaTransport(event, context);
  
  try {
    const response = await server.handleRequest(transport);
    
    return {
      statusCode: 200,
      headers: {
        'Content-Type': 'application/json',
      },
      body: JSON.stringify(response),
    };
  } catch (error: any) {
    return {
      statusCode: error.statusCode || 500,
      body: JSON.stringify({
        error: error.message,
      }),
    };
  }
};

Production Best Practices

1. Health Checks

class ProductionMCPServer extends Server {
  private startTime = Date.now();
  private requestCount = 0;

  constructor() {
    super({ name: 'production-server', version: '1.0.0' });
    this.setupHealthChecks();
  }

  private setupHealthChecks() {
    // Liveness probe
    this.addEndpoint('/health', async () => {
      return {
        status: 'healthy',
        uptime: Date.now() - this.startTime,
        version: this.version
      };
    });

    // Readiness probe
    this.addEndpoint('/ready', async () => {
      try {
        // Check dependencies
        await this.checkDatabase();
        await this.checkExternalAPIs();
        
        return { ready: true };
      } catch (error) {
        return { 
          ready: false, 
          reason: error.message 
        };
      }
    });

    // Metrics endpoint
    this.addEndpoint('/metrics', async () => {
      return {
        requests_total: this.requestCount,
        uptime_seconds: (Date.now() - this.startTime) / 1000,
        memory_usage: process.memoryUsage(),
        active_connections: this.getActiveConnections()
      };
    });
  }
}

2. Logging & Monitoring

import logging
import json
from datetime import datetime

class LoggingMCPServer:
    def __init__(self):
        self.server = Server("logged-server")
        self.setup_logging()
        
    def setup_logging(self):
        # Structured logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(message)s',
            handlers=[
                logging.StreamHandler(),
                logging.FileHandler('mcp-server.log')
            ]
        )
        
        # Log all requests
        @self.server.middleware
        async def log_request(request):
            start_time = datetime.now()
            
            # Log request
            logging.info(json.dumps({
                "timestamp": start_time.isoformat(),
                "type": "request",
                "method": request.method,
                "params": request.params,
                "session_id": request.session_id
            }))
            
            try:
                # Process request
                response = await request.next()
                
                # Log response
                duration = (datetime.now() - start_time).total_seconds()
                logging.info(json.dumps({
                    "timestamp": datetime.now().isoformat(),
                    "type": "response",
                    "session_id": request.session_id,
                    "duration_seconds": duration,
                    "status": "success"
                }))
                
                return response
                
            except Exception as e:
                # Log error
                logging.error(json.dumps({
                    "timestamp": datetime.now().isoformat(),
                    "type": "error",
                    "session_id": request.session_id,
                    "error": str(e),
                    "error_type": type(e).__name__
                }))
                raise

3. Configuration Management

// config.ts
interface MCPConfig {
  server: {
    name: string;
    version: string;
    port?: number;
  };
  features: {
    rateLimit: boolean;
    authentication: boolean;
    metrics: boolean;
  };
  limits: {
    maxRequestSize: number;
    maxResponseSize: number;
    requestTimeout: number;
  };
  external: {
    databaseUrl?: string;
    redisUrl?: string;
    apiKeys?: Record<string, string>;
  };
}

// Load configuration with validation
function loadConfig(): MCPConfig {
  const config: MCPConfig = {
    server: {
      name: process.env.MCP_SERVER_NAME || 'mcp-server',
      version: process.env.MCP_SERVER_VERSION || '1.0.0',
      port: parseInt(process.env.MCP_PORT || '3000'),
    },
    features: {
      rateLimit: process.env.MCP_RATE_LIMIT === 'true',
      authentication: process.env.MCP_AUTH === 'true',
      metrics: process.env.MCP_METRICS === 'true',
    },
    limits: {
      maxRequestSize: parseInt(process.env.MCP_MAX_REQUEST_SIZE || '1048576'),
      maxResponseSize: parseInt(process.env.MCP_MAX_RESPONSE_SIZE || '10485760'),
      requestTimeout: parseInt(process.env.MCP_REQUEST_TIMEOUT || '30000'),
    },
    external: {
      databaseUrl: process.env.DATABASE_URL,
      redisUrl: process.env.REDIS_URL,
      apiKeys: process.env.MCP_API_KEYS ? 
        JSON.parse(process.env.MCP_API_KEYS) : undefined,
    },
  };

  // Validate configuration
  validateConfig(config);
  
  return config;
}

11. Troubleshooting

Common Issues and Solutions

Issue 1: Server Won’t Start

Symptoms: Error when launching server

Debugging steps:

# Check if port is in use
lsof -i :3000

# Run with debug logging
DEBUG=mcp:* node server.js

# Check permissions
ls -la server.js

# Verify dependencies
npm list

Solution:

// Add error handling to server startup
async function startServer() {
  try {
    const transport = new StdioServerTransport();
    await server.connect(transport);
    console.error('Server started successfully');
  } catch (error) {
    console.error('Failed to start server:', error);
    
    // Common issues
    if (error.code === 'EADDRINUSE') {
      console.error('Port already in use');
    } else if (error.code === 'EACCES') {
      console.error('Permission denied');
    }
    
    process.exit(1);
  }
}

Issue 2: Client Can’t Connect

Symptoms: β€œConnection refused” or timeout errors

Debugging:

# Test server is responding
import asyncio
from mcp.client import Client

async def test_connection():
    try:
        client = Client()
        await client.connect_stdio(
            command="node",
            args=["path/to/server.js"]
        )
        
        # Test basic operation
        response = await client.list_tools()
        print(f"Connected! Found {len(response.tools)} tools")
        
    except Exception as e:
        print(f"Connection failed: {e}")
        
        # Debug info
        import subprocess
        result = subprocess.run(
            ["node", "path/to/server.js", "--version"],
            capture_output=True,
            text=True
        )
        print(f"Server output: {result.stdout}")
        print(f"Server errors: {result.stderr}")

Issue 3: Tool Execution Fails

Symptoms: Tools listed but fail when called

Debugging:

// Add comprehensive error handling
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;
  
  console.error(`Executing tool: ${name}`, args);
  
  try {
    // Validate tool exists
    const tool = this.tools.get(name);
    if (!tool) {
      throw new McpError(
        ErrorCode.MethodNotFound,
        `Tool '${name}' not found. Available tools: ${Array.from(this.tools.keys()).join(', ')}`
      );
    }
    
    // Validate arguments
    const validation = validateArguments(args, tool.inputSchema);
    if (!validation.valid) {
      throw new McpError(
        ErrorCode.InvalidParams,
        `Invalid arguments: ${validation.errors.join(', ')}`
      );
    }
    
    // Execute with timeout
    const result = await withTimeout(
      tool.handler(args),
      30000,
      `Tool '${name}' execution timed out`
    );
    
    return result;
    
  } catch (error) {
    console.error(`Tool execution error:`, error);
    
    // Provide helpful error messages
    if (error instanceof McpError) {
      throw error;
    }
    
    throw new McpError(
      ErrorCode.InternalError,
      `Tool '${name}' failed: ${error.message}`,
      { originalError: error.stack }
    );
  }
});

Issue 4: Performance Issues

Symptoms: Slow responses, timeouts

Profiling:

// Add performance monitoring
class PerformanceMonitor {
  private metrics = new Map<string, number[]>();

  async measure<T>(name: string, fn: () => Promise<T>): Promise<T> {
    const start = performance.now();
    
    try {
      const result = await fn();
      const duration = performance.now() - start;
      
      // Store metric
      if (!this.metrics.has(name)) {
        this.metrics.set(name, []);
      }
      this.metrics.get(name)!.push(duration);
      
      // Log slow operations
      if (duration > 1000) {
        console.warn(`Slow operation '${name}': ${duration}ms`);
      }
      
      return result;
    } catch (error) {
      const duration = performance.now() - start;
      console.error(`Operation '${name}' failed after ${duration}ms`);
      throw error;
    }
  }

  getStats(name: string) {
    const times = this.metrics.get(name) || [];
    if (times.length === 0) return null;
    
    return {
      count: times.length,
      avg: times.reduce((a, b) => a + b) / times.length,
      min: Math.min(...times),
      max: Math.max(...times),
      p95: percentile(times, 95)
    };
  }
}

Debug Mode

// Enable debug mode with environment variable
if (process.env.MCP_DEBUG === 'true') {
  // Log all messages
  server.on('message', (msg) => {
    console.error('Message:', JSON.stringify(msg, null, 2));
  });
  
  // Log all errors
  server.on('error', (error) => {
    console.error('Error:', error);
  });
  
  // Memory usage monitoring
  setInterval(() => {
    const usage = process.memoryUsage();
    console.error('Memory:', {
      rss: `${Math.round(usage.rss / 1024 / 1024)}MB`,
      heap: `${Math.round(usage.heapUsed / 1024 / 1024)}MB`
    });
  }, 30000);
}

12. MCP Ecosystem & Resources

Official Resources

  1. MCP Specification
  2. Example Servers

Community Resources

  1. Tutorials & Guides
  2. Tools & Libraries
  3. Integration Examples
  1. Data & Storage
    • mcp-server-sqlite: SQLite database access
    • mcp-server-postgres: PostgreSQL integration
    • mcp-server-filesystem: Local file system access
    • mcp-server-s3: AWS S3 integration
  2. Development Tools
    • mcp-server-git: Git repository management
    • mcp-server-github: GitHub API integration
    • mcp-server-docker: Docker container management
    • mcp-server-kubernetes: K8s cluster operations
  3. Communication
    • mcp-server-slack: Slack workspace integration
    • mcp-server-email: Email sending and reading
    • mcp-server-discord: Discord bot integration
  4. AI/ML Tools
    • mcp-server-huggingface: Hugging Face model access
    • mcp-server-openai: OpenAI API integration
    • mcp-server-langchain: LangChain integration

Contributing to MCP

  1. Create Your Own Server
    # Use the template
    npx create-mcp-server my-awesome-server
    cd my-awesome-server
    npm install
    npm run dev
    
  2. Share with Community
    • Publish to npm/PyPI
    • Add to awesome-mcp list
    • Write documentation
    • Create examples
  3. Report Issues

Conclusion

MCP represents a paradigm shift in how AI applications interact with external systems. By providing a standardized protocol for context sharing, MCP enables:

Next Steps

  1. Start Simple: Build a basic file system server
  2. Experiment: Try different types of resources and tools
  3. Integrate: Connect to Claude Desktop or your app
  4. Share: Contribute your servers to the community
  5. Scale: Deploy to production when ready

Remember: MCP is not just a protocolβ€”it’s an ecosystem that grows with every server you build and share.

Happy building! πŸš€