MCP (Model Context Protocol): The Complete Developer Guide
Model Context Protocol (MCP): Complete Developer Guide
Master MCP to build context-aware AI applications that seamlessly connect LLMs with external tools, data sources, and systems
π Table of Contents
- Introduction to MCP
- Core Concepts
- Architecture Overview
- Getting Started
- Building MCP Servers
- Creating MCP Clients
- Resources and Tools
- Prompts and Sampling
- Advanced Features
- Testing and Debugging
- Security Best Practices
- Deployment Guide
- Real-World Examples
- Troubleshooting
- Additional Resources
1. What is MCP?
The Simple Explanation
Model Context Protocol (MCP) is an open protocol that provides a standard way for AI assistants (like Claude, ChatGPT, or your custom LLM app) to connect with external data sources and tools.
Think of MCP as a universal adapter that lets AI models:
- π Access your local files and databases
- π§ Use external tools and APIs
- πΎ Remember context across conversations
- π Stay synchronized with real-time data
Visual Concept
Without MCP:
βββββββββββ Custom API 1 ββββββββββββ
β LLM ββββββββββββββββββββββΊβ Tool 1 β
βββββββββββ ββββββββββββ
β Custom API 2 ββββββββββββ
βββββββββββββββββββββββββββΊβ Tool 2 β
ββββββββββββ
With MCP:
βββββββββββ Standard MCP ββββββββββββ
β LLM βββββββββββββββββββββββΊβ MCP Serverβ
βββββββββββ ββββββββββββ
β
βββββββββββββββββββΌββββββββββββββββββ
βΌ βΌ βΌ
ββββββββββ ββββββββββ ββββββββββ
β Tool 1 β β Tool 2 β βDatabaseβ
ββββββββββ ββββββββββ ββββββββββ
Key Components
- MCP Servers: Expose resources (files, data, tools) to AI models
- MCP Clients: AI applications that connect to MCP servers
- Resources: Data sources like files, databases, APIs
- Tools: Functions the AI can call (search, calculate, transform)
- Prompts: Pre-configured templates for common tasks
2. Why MCP Changes Everything
Before MCP: The Problems
- Every integration is custom: Each tool needs its own API integration
- No standardization: Every developer reinvents the wheel
- Limited context: AI canβt easily access your local data
- Poor reusability: Canβt share integrations between AI apps
After MCP: The Solutions
- Write once, use everywhere: One MCP server works with any MCP client
- Standardized protocol: Common patterns for all integrations
- Rich context: AI can access any data you expose
- Community ecosystem: Share and reuse MCP servers
Real Impact Examples
- For Developers: Build one GitHub integration, use it in Claude, ChatGPT, and your custom app
- For Data Scientists: Give AI direct access to your Jupyter notebooks and datasets
- For Teams: Share company knowledge bases and tools across all AI assistants
- For Automation: Let AI orchestrate complex workflows across multiple systems
3. Core Concepts & Architecture
The MCP Protocol Stack
βββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer β
β (Your AI App / Claude Desktop) β
βββββββββββββββββββββββββββββββββββββββββββββββ€
β MCP Client Layer β
β (Handles protocol, connection, auth) β
βββββββββββββββββββββββββββββββββββββββββββββββ€
β Transport Layer β
β (stdio, HTTP, WebSocket - planned) β
βββββββββββββββββββββββββββββββββββββββββββββββ€
β MCP Server Layer β
β (Exposes resources, tools, prompts) β
βββββββββββββββββββββββββββββββββββββββββββββββ€
β Resource Layer β
β (Files, DBs, APIs, External Services) β
βββββββββββββββββββββββββββββββββββββββββββββββ
Core Concepts Explained
1. Resources
Resources are data sources your AI can read from:
// Example resource definition
{
uri: "file:///home/user/projects/data.json",
name: "Project Data",
description: "Current project configuration and data",
mimeType: "application/json"
}
2. Tools
Tools are functions your AI can execute:
// Example tool definition
{
name: "calculate_metrics",
description: "Calculate statistics for a dataset",
inputSchema: {
type: "object",
properties: {
dataset: { type: "string" },
metrics: { type: "array", items: { type: "string" } }
}
}
}
3. Prompts
Prompts are reusable templates:
// Example prompt definition
{
name: "analyze_code",
description: "Analyze code for improvements",
arguments: [
{
name: "language",
description: "Programming language",
required: true
}
]
}
Message Flow
1. Client β Server: Initialize connection
2. Server β Client: Capabilities announcement
3. Client β Server: List available resources
4. Server β Client: Resource list
5. Client β Server: Execute tool with parameters
6. Server β Client: Tool execution result
4. Complete Setup Guide
Prerequisites
Before starting, ensure you have:
- Node.js 18+ or Python 3.10+ installed
- npm or pip package manager
- A code editor (VS Code recommended)
- Basic command line knowledge
Option 1: TypeScript/JavaScript Setup
# Create a new project
mkdir my-mcp-server
cd my-mcp-server
# Initialize package.json
npm init -y
# Install MCP SDK
npm install @modelcontextprotocol/sdk
# Install development dependencies
npm install -D typescript @types/node tsx
# Initialize TypeScript
npx tsc --init
Create tsconfig.json
:
{
"compilerOptions": {
"target": "ES2022",
"module": "commonjs",
"lib": ["ES2022"],
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"declaration": true,
"declarationMap": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
Option 2: Python Setup
# Create a new project
mkdir my-mcp-server
cd my-mcp-server
# Create virtual environment
python -m venv venv
# Activate virtual environment
# On Windows:
venv\Scripts\activate
# On macOS/Linux:
source venv/bin/activate
# Install MCP SDK
pip install mcp
# Install additional dependencies
pip install python-dotenv aiofiles
Setting Up Claude Desktop
Install Claude Desktop from claude.ai
Configure MCP by editing the config file:
macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
Windows: %APPDATA%\Claude\claude_desktop_config.json
{
"mcpServers": {
"my-server": {
"command": "node",
"args": ["/absolute/path/to/your/server.js"]
}
}
}
5. Building Your First MCP Server
Example 1: Basic File System Server (TypeScript)
Create src/server.ts
:
#!/usr/bin/env node
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ErrorCode,
ListResourcesRequestSchema,
ListToolsRequestSchema,
McpError,
ReadResourceRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import fs from 'fs/promises';
import path from 'path';
// Define the base directory for file operations
const BASE_DIR = process.env.MCP_BASE_DIR || process.cwd();
// Create server instance
const server = new Server(
{
name: 'filesystem-server',
version: '1.0.0',
},
{
capabilities: {
resources: {},
tools: {},
},
}
);
// List available resources (files in the directory)
server.setRequestHandler(ListResourcesRequestSchema, async () => {
try {
const files = await fs.readdir(BASE_DIR);
const resources = await Promise.all(
files.map(async (file) => {
const filePath = path.join(BASE_DIR, file);
const stats = await fs.stat(filePath);
if (stats.isFile()) {
return {
uri: `file://${filePath}`,
name: file,
description: `File: ${file} (${stats.size} bytes)`,
mimeType: getMimeType(file),
};
}
return null;
})
);
return {
resources: resources.filter(r => r !== null),
};
} catch (error) {
throw new McpError(
ErrorCode.InternalError,
`Failed to list resources: ${error}`
);
}
});
// Read resource content
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
if (!uri.startsWith('file://')) {
throw new McpError(
ErrorCode.InvalidRequest,
'Only file:// URIs are supported'
);
}
const filePath = uri.slice(7); // Remove 'file://'
// Security check: ensure file is within BASE_DIR
const resolvedPath = path.resolve(filePath);
const resolvedBase = path.resolve(BASE_DIR);
if (!resolvedPath.startsWith(resolvedBase)) {
throw new McpError(
ErrorCode.InvalidRequest,
'Access denied: file outside base directory'
);
}
try {
const content = await fs.readFile(filePath, 'utf-8');
return {
contents: [
{
uri,
mimeType: getMimeType(filePath),
text: content,
},
],
};
} catch (error) {
throw new McpError(
ErrorCode.InternalError,
`Failed to read file: ${error}`
);
}
});
// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: 'create_file',
description: 'Create a new file with specified content',
inputSchema: {
type: 'object',
properties: {
filename: {
type: 'string',
description: 'Name of the file to create',
},
content: {
type: 'string',
description: 'Content to write to the file',
},
},
required: ['filename', 'content'],
},
},
{
name: 'search_files',
description: 'Search for files containing specific text',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'Text to search for',
},
extension: {
type: 'string',
description: 'File extension to filter (optional)',
},
},
required: ['query'],
},
},
],
};
});
// Handle tool execution
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case 'create_file': {
const { filename, content } = args as { filename: string; content: string };
const filePath = path.join(BASE_DIR, filename);
// Security check
const resolvedPath = path.resolve(filePath);
const resolvedBase = path.resolve(BASE_DIR);
if (!resolvedPath.startsWith(resolvedBase)) {
throw new McpError(
ErrorCode.InvalidRequest,
'Cannot create file outside base directory'
);
}
try {
await fs.writeFile(filePath, content, 'utf-8');
return {
content: [
{
type: 'text',
text: `Successfully created file: ${filename}`,
},
],
};
} catch (error) {
throw new McpError(
ErrorCode.InternalError,
`Failed to create file: ${error}`
);
}
}
case 'search_files': {
const { query, extension } = args as { query: string; extension?: string };
try {
const results = await searchFiles(BASE_DIR, query, extension);
return {
content: [
{
type: 'text',
text: results.length > 0
? `Found ${results.length} matches:\n${results.join('\n')}`
: 'No matches found',
},
],
};
} catch (error) {
throw new McpError(
ErrorCode.InternalError,
`Search failed: ${error}`
);
}
}
default:
throw new McpError(
ErrorCode.MethodNotFound,
`Unknown tool: ${name}`
);
}
});
// Helper functions
function getMimeType(filename: string): string {
const ext = path.extname(filename).toLowerCase();
const mimeTypes: { [key: string]: string } = {
'.txt': 'text/plain',
'.json': 'application/json',
'.js': 'application/javascript',
'.ts': 'application/typescript',
'.py': 'text/x-python',
'.md': 'text/markdown',
'.html': 'text/html',
'.css': 'text/css',
'.csv': 'text/csv',
'.xml': 'application/xml',
'.yaml': 'application/yaml',
'.yml': 'application/yaml',
};
return mimeTypes[ext] || 'text/plain';
}
async function searchFiles(
dir: string,
query: string,
extension?: string
): Promise<string[]> {
const results: string[] = [];
const files = await fs.readdir(dir);
for (const file of files) {
const filePath = path.join(dir, file);
const stats = await fs.stat(filePath);
if (stats.isFile()) {
if (extension && !file.endsWith(extension)) {
continue;
}
try {
const content = await fs.readFile(filePath, 'utf-8');
if (content.toLowerCase().includes(query.toLowerCase())) {
results.push(`${file}: ${getMatchContext(content, query)}`);
}
} catch (error) {
// Skip files that can't be read
}
}
}
return results;
}
function getMatchContext(content: string, query: string): string {
const index = content.toLowerCase().indexOf(query.toLowerCase());
if (index === -1) return '';
const start = Math.max(0, index - 30);
const end = Math.min(content.length, index + query.length + 30);
return '...' + content.slice(start, end).replace(/\n/g, ' ') + '...';
}
// Start the server
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error('MCP Server running on stdio');
}
main().catch((error) => {
console.error('Server error:', error);
process.exit(1);
});
Example 2: Database Query Server (Python)
Create server.py
:
#!/usr/bin/env python3
import asyncio
import json
import sqlite3
from typing import Any, Dict, List, Optional
import mcp.types as types
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
# Initialize SQLite connection
DB_PATH = "example.db"
def init_database():
"""Initialize example database with sample data"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
price REAL NOT NULL,
stock INTEGER DEFAULT 0
)
''')
# Insert sample data if tables are empty
cursor.execute("SELECT COUNT(*) FROM users")
if cursor.fetchone()[0] == 0:
sample_users = [
("Alice Johnson", "alice@example.com"),
("Bob Smith", "bob@example.com"),
("Charlie Brown", "charlie@example.com")
]
cursor.executemany("INSERT INTO users (name, email) VALUES (?, ?)", sample_users)
sample_products = [
("Laptop", 999.99, 10),
("Mouse", 29.99, 50),
("Keyboard", 79.99, 30),
("Monitor", 299.99, 15)
]
cursor.executemany("INSERT INTO products (name, price, stock) VALUES (?, ?, ?)", sample_products)
conn.commit()
conn.close()
class DatabaseServer:
def __init__(self):
self.server = Server("database-server")
init_database()
self._setup_handlers()
def _setup_handlers(self):
@self.server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return [
types.Resource(
uri="db://users",
name="Users Table",
description="User information database",
mimeType="application/json",
),
types.Resource(
uri="db://products",
name="Products Table",
description="Product inventory database",
mimeType="application/json",
),
]
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
if uri == "db://users":
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
data = [dict(row) for row in rows]
return json.dumps(data, indent=2)
elif uri == "db://products":
cursor.execute("SELECT * FROM products")
rows = cursor.fetchall()
data = [dict(row) for row in rows]
return json.dumps(data, indent=2)
else:
raise ValueError(f"Unknown resource: {uri}")
finally:
conn.close()
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
return [
types.Tool(
name="query_database",
description="Execute a SELECT query on the database",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL SELECT query to execute",
},
},
"required": ["query"],
},
),
types.Tool(
name="add_user",
description="Add a new user to the database",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "User's full name",
},
"email": {
"type": "string",
"description": "User's email address",
},
},
"required": ["name", "email"],
},
),
types.Tool(
name="update_product_stock",
description="Update product stock quantity",
inputSchema={
"type": "object",
"properties": {
"product_id": {
"type": "integer",
"description": "Product ID",
},
"stock": {
"type": "integer",
"description": "New stock quantity",
},
},
"required": ["product_id", "stock"],
},
),
]
@self.server.call_tool()
async def handle_call_tool(
name: str, arguments: Optional[Dict[str, Any]]
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
if name == "query_database":
query = arguments.get("query", "")
# Security: Only allow SELECT queries
if not query.strip().upper().startswith("SELECT"):
return [types.TextContent(
type="text",
text="Error: Only SELECT queries are allowed"
)]
try:
cursor.execute(query)
rows = cursor.fetchall()
if rows:
# Convert rows to list of dicts
data = [dict(row) for row in rows]
result = json.dumps(data, indent=2)
return [types.TextContent(
type="text",
text=f"Query executed successfully. Results:\n{result}"
)]
else:
return [types.TextContent(
type="text",
text="Query executed successfully. No results found."
)]
except sqlite3.Error as e:
return [types.TextContent(
type="text",
text=f"Query error: {str(e)}"
)]
elif name == "add_user":
name = arguments.get("name", "")
email = arguments.get("email", "")
try:
cursor.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name, email)
)
conn.commit()
return [types.TextContent(
type="text",
text=f"Successfully added user: {name} ({email})"
)]
except sqlite3.IntegrityError:
return [types.TextContent(
type="text",
text=f"Error: User with email {email} already exists"
)]
elif name == "update_product_stock":
product_id = arguments.get("product_id", 0)
stock = arguments.get("stock", 0)
cursor.execute(
"UPDATE products SET stock = ? WHERE id = ?",
(stock, product_id)
)
if cursor.rowcount > 0:
conn.commit()
return [types.TextContent(
type="text",
text=f"Successfully updated stock for product ID {product_id} to {stock}"
)]
else:
return [types.TextContent(
type="text",
text=f"Error: Product with ID {product_id} not found"
)]
else:
return [types.TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
finally:
conn.close()
@self.server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
return [
types.Prompt(
name="analyze_sales",
description="Generate SQL query to analyze sales data",
arguments=[
types.PromptArgument(
name="time_period",
description="Time period for analysis (e.g., 'last month', 'Q1 2024')",
required=True,
),
],
),
types.Prompt(
name="inventory_report",
description="Generate inventory status report",
arguments=[
types.PromptArgument(
name="threshold",
description="Low stock threshold",
required=False,
),
],
),
]
@self.server.get_prompt()
async def handle_get_prompt(
name: str, arguments: Optional[Dict[str, str]]
) -> types.GetPromptResult:
if name == "analyze_sales":
time_period = arguments.get("time_period", "all time")
return types.GetPromptResult(
description="Generate sales analysis query",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"Generate a SQL query to analyze product sales for {time_period}. Include total revenue, units sold, and top-selling products."
),
)
],
)
elif name == "inventory_report":
threshold = arguments.get("threshold", "10")
return types.GetPromptResult(
description="Generate inventory report",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"Create an inventory report showing all products with stock below {threshold} units. Include current stock levels and suggested reorder quantities."
),
)
],
)
else:
raise ValueError(f"Unknown prompt: {name}")
async def run(self):
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="database-server",
server_version="1.0.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
server = DatabaseServer()
asyncio.run(server.run())
Example 3: API Integration Server (TypeScript)
Create src/weather-server.ts
:
#!/usr/bin/env node
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ErrorCode,
ListToolsRequestSchema,
McpError,
} from '@modelcontextprotocol/sdk/types.js';
import axios from 'axios';
// You need to get an API key from OpenWeatherMap
const API_KEY = process.env.OPENWEATHER_API_KEY || 'your_api_key_here';
const BASE_URL = 'https://api.openweathermap.org/data/2.5';
interface WeatherData {
temperature: number;
feels_like: number;
humidity: number;
description: string;
wind_speed: number;
city: string;
country: string;
}
const server = new Server(
{
name: 'weather-server',
version: '1.0.0',
},
{
capabilities: {
tools: {},
},
}
);
// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: 'get_weather',
description: 'Get current weather for a city',
inputSchema: {
type: 'object',
properties: {
city: {
type: 'string',
description: 'City name',
},
country: {
type: 'string',
description: 'Country code (e.g., US, UK)',
default: '',
},
units: {
type: 'string',
enum: ['metric', 'imperial'],
description: 'Temperature units',
default: 'metric',
},
},
required: ['city'],
},
},
{
name: 'get_forecast',
description: 'Get 5-day weather forecast',
inputSchema: {
type: 'object',
properties: {
city: {
type: 'string',
description: 'City name',
},
country: {
type: 'string',
description: 'Country code (e.g., US, UK)',
default: '',
},
units: {
type: 'string',
enum: ['metric', 'imperial'],
description: 'Temperature units',
default: 'metric',
},
},
required: ['city'],
},
},
{
name: 'compare_weather',
description: 'Compare weather between multiple cities',
inputSchema: {
type: 'object',
properties: {
cities: {
type: 'array',
items: { type: 'string' },
description: 'List of cities to compare',
minItems: 2,
maxItems: 5,
},
units: {
type: 'string',
enum: ['metric', 'imperial'],
description: 'Temperature units',
default: 'metric',
},
},
required: ['cities'],
},
},
],
};
});
// Handle tool execution
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case 'get_weather': {
const { city, country = '', units = 'metric' } = args as {
city: string;
country?: string;
units?: string;
};
const weather = await getCurrentWeather(city, country, units);
return {
content: [
{
type: 'text',
text: formatWeatherReport(weather, units),
},
],
};
}
case 'get_forecast': {
const { city, country = '', units = 'metric' } = args as {
city: string;
country?: string;
units?: string;
};
const forecast = await getForecast(city, country, units);
return {
content: [
{
type: 'text',
text: formatForecastReport(forecast, units),
},
],
};
}
case 'compare_weather': {
const { cities, units = 'metric' } = args as {
cities: string[];
units?: string;
};
const weatherData = await Promise.all(
cities.map(city => getCurrentWeather(city, '', units))
);
return {
content: [
{
type: 'text',
text: formatComparisonReport(weatherData, units),
},
],
};
}
default:
throw new McpError(
ErrorCode.MethodNotFound,
`Unknown tool: ${name}`
);
}
} catch (error: any) {
if (error instanceof McpError) {
throw error;
}
throw new McpError(
ErrorCode.InternalError,
`Weather API error: ${error.message}`
);
}
});
// Weather API functions
async function getCurrentWeather(
city: string,
country: string,
units: string
): Promise<WeatherData> {
const location = country ? `${city},${country}` : city;
try {
const response = await axios.get(`${BASE_URL}/weather`, {
params: {
q: location,
appid: API_KEY,
units: units,
},
});
const data = response.data;
return {
temperature: Math.round(data.main.temp),
feels_like: Math.round(data.main.feels_like),
humidity: data.main.humidity,
description: data.weather[0].description,
wind_speed: data.wind.speed,
city: data.name,
country: data.sys.country,
};
} catch (error: any) {
if (error.response?.status === 404) {
throw new Error(`City not found: ${location}`);
}
throw error;
}
}
async function getForecast(
city: string,
country: string,
units: string
): Promise<any[]> {
const location = country ? `${city},${country}` : city;
try {
const response = await axios.get(`${BASE_URL}/forecast`, {
params: {
q: location,
appid: API_KEY,
units: units,
cnt: 40, // 5 days * 8 (3-hour intervals)
},
});
// Group forecasts by day
const dailyForecasts = new Map<string, any[]>();
response.data.list.forEach((item: any) => {
const date = new Date(item.dt * 1000).toLocaleDateString();
if (!dailyForecasts.has(date)) {
dailyForecasts.set(date, []);
}
dailyForecasts.get(date)!.push(item);
});
// Calculate daily summaries
const forecasts = Array.from(dailyForecasts.entries()).map(([date, items]) => {
const temps = items.map(item => item.main.temp);
const descriptions = items.map(item => item.weather[0].description);
return {
date,
min_temp: Math.round(Math.min(...temps)),
max_temp: Math.round(Math.max(...temps)),
description: getMostCommonElement(descriptions),
city: response.data.city.name,
country: response.data.city.country,
};
});
return forecasts.slice(0, 5); // Return 5 days
} catch (error: any) {
if (error.response?.status === 404) {
throw new Error(`City not found: ${location}`);
}
throw error;
}
}
// Helper functions
function formatWeatherReport(weather: WeatherData, units: string): string {
const tempUnit = units === 'metric' ? 'Β°C' : 'Β°F';
const speedUnit = units === 'metric' ? 'm/s' : 'mph';
return `
π€οΈ **Current Weather in ${weather.city}, ${weather.country}**
π‘οΈ Temperature: ${weather.temperature}${tempUnit} (feels like ${weather.feels_like}${tempUnit})
βοΈ Conditions: ${weather.description}
π§ Humidity: ${weather.humidity}%
π¨ Wind Speed: ${weather.wind_speed} ${speedUnit}
`.trim();
}
function formatForecastReport(forecasts: any[], units: string): string {
const tempUnit = units === 'metric' ? 'Β°C' : 'Β°F';
let report = `π
**5-Day Forecast for ${forecasts[0].city}, ${forecasts[0].country}**\n\n`;
forecasts.forEach(forecast => {
report += `**${forecast.date}**\n`;
report += ` π‘οΈ ${forecast.min_temp}-${forecast.max_temp}${tempUnit}\n`;
report += ` βοΈ ${forecast.description}\n\n`;
});
return report.trim();
}
function formatComparisonReport(weatherData: WeatherData[], units: string): string {
const tempUnit = units === 'metric' ? 'Β°C' : 'Β°F';
let report = 'π **Weather Comparison**\n\n';
// Sort by temperature
weatherData.sort((a, b) => b.temperature - a.temperature);
weatherData.forEach((weather, index) => {
const emoji = index === 0 ? 'π₯' : index === 1 ? 'π₯' : index === 2 ? 'π₯' : 'π';
report += `${emoji} **${weather.city}, ${weather.country}**\n`;
report += ` π‘οΈ ${weather.temperature}${tempUnit} - ${weather.description}\n`;
report += ` π§ Humidity: ${weather.humidity}%\n\n`;
});
return report.trim();
}
function getMostCommonElement(arr: string[]): string {
const counts = new Map<string, number>();
arr.forEach(element => {
counts.set(element, (counts.get(element) || 0) + 1);
});
let maxCount = 0;
let mostCommon = arr[0];
counts.forEach((count, element) => {
if (count > maxCount) {
maxCount = count;
mostCommon = element;
}
});
return mostCommon;
}
// Start the server
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error('Weather MCP Server running on stdio');
}
main().catch((error) => {
console.error('Server error:', error);
process.exit(1);
});
6. Resources, Tools & Prompts
Understanding the Three Primitives
Resources: Your Data Sources
Resources are read-only data that your AI can access:
// Example: Exposing configuration files
const configResource = {
uri: "config://app/settings",
name: "Application Settings",
description: "Current app configuration",
mimeType: "application/json"
};
// Example: Database views
const dbResource = {
uri: "db://analytics/user_metrics",
name: "User Analytics",
description: "Read-only view of user metrics",
mimeType: "application/json"
};
// Example: API endpoints
const apiResource = {
uri: "api://github/repos",
name: "GitHub Repositories",
description: "List of user's GitHub repos",
mimeType: "application/json"
};
Tools: Your Action Handlers
Tools allow the AI to perform actions:
// Example: System administration tool
const systemTool = {
name: "restart_service",
description: "Restart a system service",
inputSchema: {
type: "object",
properties: {
service_name: {
type: "string",
enum: ["web", "database", "cache"],
description: "Service to restart"
},
force: {
type: "boolean",
default: false,
description: "Force restart without graceful shutdown"
}
},
required: ["service_name"]
}
};
// Example: Data processing tool
const dataTool = {
name: "transform_dataset",
description: "Apply transformations to a dataset",
inputSchema: {
type: "object",
properties: {
dataset_id: { type: "string" },
transformations: {
type: "array",
items: {
type: "object",
properties: {
type: { type: "string", enum: ["filter", "map", "aggregate"] },
config: { type: "object" }
}
}
}
},
required: ["dataset_id", "transformations"]
}
};
Prompts: Your Templates
Prompts provide reusable conversation starters:
// Example: Code review prompt
const codeReviewPrompt = {
name: "code_review",
description: "Comprehensive code review template",
arguments: [
{
name: "language",
description: "Programming language",
required: true
},
{
name: "focus_areas",
description: "Specific areas to focus on",
required: false
}
],
template: `Please review the following {language} code focusing on:
- Code quality and best practices
- Performance implications
- Security vulnerabilities
- Maintainability
{focus_areas ? "Additional focus: " + focus_areas : ""}`
};
// Example: Data analysis prompt
const analysisPrompt = {
name: "data_analysis",
description: "Statistical data analysis template",
arguments: [
{
name: "dataset",
description: "Dataset identifier",
required: true
},
{
name: "hypothesis",
description: "Hypothesis to test",
required: false
}
]
};
Best Practices for Each Primitive
Resource Best Practices
- Use Clear URI Schemes:
Good: file:///path/to/file, db://table/users, api://service/endpoint Bad: /path/to/file, users, endpoint
- Provide Accurate MIME Types:
const mimeTypes = { '.json': 'application/json', '.csv': 'text/csv', '.md': 'text/markdown', '.sql': 'application/sql' };
- Implement Pagination for Large Resources:
// Return partial data with continuation tokens return { contents: [{ uri: resource.uri, mimeType: 'application/json', text: JSON.stringify({ data: items.slice(0, 100), next_page: items.length > 100 ? 'page2' : null }) }] };
Tool Best Practices
- Validate Input Thoroughly:
async function handleToolCall(name: string, args: any) { // Always validate const validated = validateSchema(args, tool.inputSchema); if (!validated.valid) { throw new McpError( ErrorCode.InvalidParams, `Invalid parameters: ${validated.errors.join(', ')}` ); } }
- Implement Idempotency:
// Use request IDs to prevent duplicate operations const requestId = args.request_id || generateId(); if (processedRequests.has(requestId)) { return processedRequests.get(requestId); }
- Provide Progress for Long Operations:
// Return progress updates return { content: [{ type: 'text', text: 'Operation started. Check status with get_operation_status tool.' }], isPartial: true, operationId: operationId };
Prompt Best Practices
- Make Prompts Composable:
// Base prompts that can be extended const basePrompts = { error_analysis: "Analyze this error and suggest fixes", optimization: "Identify optimization opportunities", documentation: "Generate comprehensive documentation" };
- Include Examples in Prompts:
const prompt = { name: "api_design", template: `Design a REST API for {purpose}. Example good practice: GET /api/v1/users?page=1&limit=20 Include: endpoints, methods, request/response formats` };
7. Client Integration
Integrating with Claude Desktop
- Update Configuration:
{ "mcpServers": { "filesystem": { "command": "node", "args": ["/path/to/filesystem-server.js"], "env": { "MCP_BASE_DIR": "/Users/me/projects" } }, "database": { "command": "python", "args": ["/path/to/database-server.py"], "env": { "DB_PATH": "/Users/me/data/app.db" } }, "weather": { "command": "npx", "args": ["ts-node", "/path/to/weather-server.ts"], "env": { "OPENWEATHER_API_KEY": "your_api_key" } } } }
Restart Claude Desktop to load the new servers
- Test the Integration:
- Ask: βWhat files are in my projects directory?β
- Ask: βShow me all users in the databaseβ
- Ask: βWhatβs the weather in Tokyo?β
Building a Custom Client
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { spawn } from 'child_process';
async function createMCPClient() {
// Spawn the server process
const serverProcess = spawn('node', ['path/to/server.js'], {
stdio: ['pipe', 'pipe', 'process.stderr'],
});
// Create transport
const transport = new StdioClientTransport({
reader: serverProcess.stdout,
writer: serverProcess.stdin,
});
// Create client
const client = new Client({
name: 'my-mcp-client',
version: '1.0.0',
}, {
capabilities: {}
});
// Connect
await client.connect(transport);
// List available resources
const resources = await client.listResources();
console.log('Available resources:', resources);
// List available tools
const tools = await client.listTools();
console.log('Available tools:', tools);
// Call a tool
const result = await client.callTool({
name: 'create_file',
arguments: {
filename: 'test.txt',
content: 'Hello from MCP!'
}
});
console.log('Tool result:', result);
return client;
}
// Usage
const client = await createMCPClient();
Integration with LangChain
from langchain.tools import Tool
from mcp import Client
import asyncio
class MCPTool(Tool):
"""Wrapper to use MCP tools in LangChain"""
def __init__(self, mcp_client: Client, tool_name: str):
self.client = mcp_client
self.tool_name = tool_name
# Get tool info from MCP
tool_info = asyncio.run(self._get_tool_info())
super().__init__(
name=tool_info['name'],
description=tool_info['description'],
func=self._run
)
async def _get_tool_info(self):
tools = await self.client.list_tools()
for tool in tools:
if tool['name'] == self.tool_name:
return tool
raise ValueError(f"Tool {self.tool_name} not found")
def _run(self, *args, **kwargs):
"""Synchronous wrapper for async tool call"""
return asyncio.run(self._arun(*args, **kwargs))
async def _arun(self, *args, **kwargs):
"""Execute MCP tool"""
result = await self.client.call_tool(
name=self.tool_name,
arguments=kwargs
)
return result['content'][0]['text']
# Create MCP client
mcp_client = await create_mcp_client()
# Wrap MCP tools for LangChain
weather_tool = MCPTool(mcp_client, 'get_weather')
db_tool = MCPTool(mcp_client, 'query_database')
# Use in LangChain agent
from langchain.agents import initialize_agent
from langchain.llms import OpenAI
llm = OpenAI(temperature=0)
agent = initialize_agent(
tools=[weather_tool, db_tool],
llm=llm,
agent="zero-shot-react-description"
)
# Run agent
result = agent.run("What's the weather in Paris? Also show me all users from France.")
8. Real-World Examples
Example 1: Git Repository Manager
// git-server.ts
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { exec } from 'child_process';
import { promisify } from 'util';
const execAsync = promisify(exec);
class GitServer {
private server: Server;
private repoPath: string;
constructor(repoPath: string) {
this.repoPath = repoPath;
this.server = new Server({
name: 'git-manager',
version: '1.0.0',
});
this.setupHandlers();
}
private setupHandlers() {
// List tools
this.server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: 'git_status',
description: 'Get current git status',
inputSchema: { type: 'object', properties: {} }
},
{
name: 'git_log',
description: 'Get commit history',
inputSchema: {
type: 'object',
properties: {
limit: { type: 'number', default: 10 },
oneline: { type: 'boolean', default: false }
}
}
},
{
name: 'git_diff',
description: 'Show changes in working directory',
inputSchema: {
type: 'object',
properties: {
staged: { type: 'boolean', default: false }
}
}
},
{
name: 'git_branch',
description: 'List or create branches',
inputSchema: {
type: 'object',
properties: {
create: { type: 'string' },
delete: { type: 'string' },
list: { type: 'boolean', default: true }
}
}
}
]
};
});
// Handle tool calls
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
let result: string;
switch (name) {
case 'git_status':
result = await this.gitCommand('status --porcelain');
break;
case 'git_log':
const limit = args.limit || 10;
const format = args.oneline ? '--oneline' : '';
result = await this.gitCommand(`log -${limit} ${format}`);
break;
case 'git_diff':
const staged = args.staged ? '--cached' : '';
result = await this.gitCommand(`diff ${staged}`);
break;
case 'git_branch':
if (args.create) {
result = await this.gitCommand(`checkout -b ${args.create}`);
} else if (args.delete) {
result = await this.gitCommand(`branch -d ${args.delete}`);
} else {
result = await this.gitCommand('branch -a');
}
break;
default:
throw new Error(`Unknown tool: ${name}`);
}
return {
content: [{
type: 'text',
text: result || 'Command completed successfully'
}]
};
} catch (error: any) {
return {
content: [{
type: 'text',
text: `Git error: ${error.message}`
}]
};
}
});
}
private async gitCommand(command: string): Promise<string> {
const { stdout, stderr } = await execAsync(
`git ${command}`,
{ cwd: this.repoPath }
);
if (stderr && !stderr.includes('warning')) {
throw new Error(stderr);
}
return stdout;
}
}
Example 2: Slack Integration
# slack_server.py
import asyncio
from typing import Any, Dict, List, Optional
from slack_sdk.web.async_client import AsyncWebClient
from slack_sdk.errors import SlackApiError
import mcp.types as types
from mcp.server import Server
class SlackServer:
def __init__(self, slack_token: str):
self.server = Server("slack-integration")
self.slack = AsyncWebClient(token=slack_token)
self._setup_handlers()
def _setup_handlers(self):
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
return [
types.Tool(
name="send_message",
description="Send a message to a Slack channel",
inputSchema={
"type": "object",
"properties": {
"channel": {
"type": "string",
"description": "Channel name or ID (e.g., #general or C1234567890)"
},
"text": {
"type": "string",
"description": "Message text"
},
"thread_ts": {
"type": "string",
"description": "Thread timestamp to reply to (optional)"
}
},
"required": ["channel", "text"]
}
),
types.Tool(
name="search_messages",
description="Search for messages in Slack",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"count": {
"type": "integer",
"description": "Number of results to return",
"default": 10
}
},
"required": ["query"]
}
),
types.Tool(
name="list_channels",
description="List all channels in the workspace",
inputSchema={
"type": "object",
"properties": {
"types": {
"type": "string",
"description": "Comma-separated channel types",
"default": "public_channel,private_channel"
}
}
}
),
types.Tool(
name="get_user_info",
description="Get information about a user",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User ID (e.g., U1234567890)"
}
},
"required": ["user_id"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(
name: str, arguments: Optional[Dict[str, Any]]
) -> list[types.TextContent]:
try:
if name == "send_message":
response = await self.slack.chat_postMessage(
channel=arguments["channel"],
text=arguments["text"],
thread_ts=arguments.get("thread_ts")
)
return [types.TextContent(
type="text",
text=f"Message sent successfully. Timestamp: {response['ts']}"
)]
elif name == "search_messages":
response = await self.slack.search_messages(
query=arguments["query"],
count=arguments.get("count", 10)
)
messages = []
for match in response["messages"]["matches"]:
messages.append(
f"[{match['username']}] {match['text'][:100]}..."
)
result = "\n".join(messages) if messages else "No messages found"
return [types.TextContent(type="text", text=result)]
elif name == "list_channels":
response = await self.slack.conversations_list(
types=arguments.get("types", "public_channel,private_channel")
)
channels = []
for channel in response["channels"]:
channels.append(
f"#{channel['name']} ({channel['id']})"
)
return [types.TextContent(
type="text",
text="\n".join(channels)
)]
elif name == "get_user_info":
response = await self.slack.users_info(
user=arguments["user_id"]
)
user = response["user"]
info = f"""
User: {user['real_name']} (@{user['name']})
Email: {user.get('profile', {}).get('email', 'N/A')}
Title: {user.get('profile', {}).get('title', 'N/A')}
Status: {user.get('profile', {}).get('status_text', 'No status')}
""".strip()
return [types.TextContent(type="text", text=info)]
else:
return [types.TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except SlackApiError as e:
return [types.TextContent(
type="text",
text=f"Slack API error: {e.response['error']}"
)]
Example 3: Jupyter Notebook Integration
// jupyter-server.ts
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import axios from 'axios';
import WebSocket from 'ws';
interface JupyterConfig {
baseUrl: string;
token: string;
}
class JupyterServer {
private server: Server;
private config: JupyterConfig;
private kernels: Map<string, any> = new Map();
constructor(config: JupyterConfig) {
this.config = config;
this.server = new Server({
name: 'jupyter-integration',
version: '1.0.0',
});
this.setupHandlers();
}
private setupHandlers() {
// List resources (notebooks)
this.server.setRequestHandler(ListResourcesRequestSchema, async () => {
const notebooks = await this.listNotebooks();
return {
resources: notebooks.map(nb => ({
uri: `notebook://${nb.path}`,
name: nb.name,
description: `Jupyter notebook: ${nb.name}`,
mimeType: 'application/x-ipynb+json'
}))
};
});
// Read notebook content
this.server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
const path = uri.replace('notebook://', '');
const content = await this.getNotebook(path);
return {
contents: [{
uri,
mimeType: 'application/x-ipynb+json',
text: JSON.stringify(content, null, 2)
}]
};
});
// List tools
this.server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: 'execute_code',
description: 'Execute code in a Jupyter kernel',
inputSchema: {
type: 'object',
properties: {
code: {
type: 'string',
description: 'Code to execute'
},
kernel_name: {
type: 'string',
description: 'Kernel name (e.g., python3)',
default: 'python3'
}
},
required: ['code']
}
},
{
name: 'create_notebook',
description: 'Create a new Jupyter notebook',
inputSchema: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Notebook name'
},
content: {
type: 'object',
description: 'Initial notebook content'
}
},
required: ['name']
}
},
{
name: 'list_kernels',
description: 'List available Jupyter kernels',
inputSchema: {
type: 'object',
properties: {}
}
}
]
};
});
// Handle tool execution
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case 'execute_code':
return await this.executeCode(args.code, args.kernel_name);
case 'create_notebook':
return await this.createNotebook(args.name, args.content);
case 'list_kernels':
return await this.listKernels();
default:
throw new Error(`Unknown tool: ${name}`);
}
});
}
private async executeCode(code: string, kernelName: string = 'python3') {
// Start or get kernel
let kernel = this.kernels.get(kernelName);
if (!kernel) {
kernel = await this.startKernel(kernelName);
this.kernels.set(kernelName, kernel);
}
// Execute code
const result = await this.executeInKernel(kernel, code);
return {
content: [{
type: 'text',
text: this.formatExecutionResult(result)
}]
};
}
private formatExecutionResult(result: any): string {
let output = '';
// Add stdout
if (result.stdout) {
output += `Output:\n${result.stdout}\n`;
}
// Add result data
if (result.data) {
if (result.data['text/plain']) {
output += `Result:\n${result.data['text/plain']}\n`;
}
}
// Add errors
if (result.error) {
output += `Error:\n${result.error.ename}: ${result.error.evalue}\n`;
}
return output || 'Code executed successfully (no output)';
}
// ... Additional implementation details
}
9. Advanced Patterns
Pattern 1: Stateful Conversations
// Maintain conversation state across MCP calls
class StatefulMCPServer extends Server {
private sessions: Map<string, SessionState> = new Map();
constructor() {
super({ name: 'stateful-server', version: '1.0.0' });
this.setupStateManagement();
}
private setupStateManagement() {
// Tool to start a session
this.addTool({
name: 'start_session',
handler: async (args) => {
const sessionId = generateId();
this.sessions.set(sessionId, {
id: sessionId,
startTime: Date.now(),
context: {},
history: []
});
return {
sessionId,
message: 'Session started. Include session_id in future calls.'
};
}
});
// Stateful tool example
this.addTool({
name: 'remember',
handler: async (args) => {
const session = this.sessions.get(args.session_id);
if (!session) {
throw new Error('Invalid session ID');
}
// Store information
session.context[args.key] = args.value;
session.history.push({
action: 'remember',
key: args.key,
value: args.value,
timestamp: Date.now()
});
return {
message: `Remembered: ${args.key} = ${args.value}`,
total_memories: Object.keys(session.context).length
};
}
});
}
}
Pattern 2: Tool Composition
// Compose complex operations from simple tools
class ComposableMCPServer extends Server {
constructor() {
super({ name: 'composable-server', version: '1.0.0' });
this.setupComposableTools();
}
private setupComposableTools() {
// Base tools
this.addTool({
name: 'fetch_data',
handler: async (args) => {
// Fetch from source
return { data: await fetchFromSource(args.source) };
}
});
this.addTool({
name: 'transform_data',
handler: async (args) => {
// Apply transformation
return { data: transform(args.data, args.transformation) };
}
});
this.addTool({
name: 'validate_data',
handler: async (args) => {
// Validate against schema
return { valid: validate(args.data, args.schema) };
}
});
// Composite tool that uses other tools
this.addTool({
name: 'etl_pipeline',
handler: async (args) => {
// Compose operations
const fetchResult = await this.callTool('fetch_data', {
source: args.source
});
const transformResult = await this.callTool('transform_data', {
data: fetchResult.data,
transformation: args.transformation
});
const validateResult = await this.callTool('validate_data', {
data: transformResult.data,
schema: args.schema
});
if (!validateResult.valid) {
throw new Error('Validation failed');
}
return {
data: transformResult.data,
metadata: {
source: args.source,
transformations: args.transformation,
validated: true
}
};
}
});
}
}
Pattern 3: Event-Driven MCP
// Implement event streaming and webhooks
class EventDrivenMCPServer extends Server {
private eventEmitter = new EventEmitter();
private subscriptions = new Map<string, Set<string>>();
constructor() {
super({ name: 'event-server', version: '1.0.0' });
this.setupEventSystem();
}
private setupEventSystem() {
// Subscribe to events
this.addTool({
name: 'subscribe',
handler: async (args) => {
const { event_type, webhook_url } = args;
if (!this.subscriptions.has(event_type)) {
this.subscriptions.set(event_type, new Set());
}
this.subscriptions.get(event_type)!.add(webhook_url);
// Set up listener
this.eventEmitter.on(event_type, async (data) => {
await this.notifySubscribers(event_type, data);
});
return {
message: `Subscribed to ${event_type}`,
subscription_id: generateId()
};
}
});
// Emit events
this.addTool({
name: 'emit_event',
handler: async (args) => {
const { event_type, data } = args;
// Emit internally
this.eventEmitter.emit(event_type, data);
// Notify external subscribers
await this.notifySubscribers(event_type, data);
return {
message: `Event ${event_type} emitted`,
subscriber_count: this.subscriptions.get(event_type)?.size || 0
};
}
});
}
private async notifySubscribers(eventType: string, data: any) {
const subscribers = this.subscriptions.get(eventType);
if (!subscribers) return;
const notifications = Array.from(subscribers).map(webhook =>
fetch(webhook, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ event: eventType, data })
})
);
await Promise.allSettled(notifications);
}
}
Pattern 4: Security & Rate Limiting
# Implement security layers
from datetime import datetime, timedelta
from collections import defaultdict
import hashlib
import hmac
class SecureMCPServer:
def __init__(self, api_keys: dict):
self.server = Server("secure-server")
self.api_keys = api_keys # {key: permissions}
self.rate_limits = defaultdict(list)
self._setup_security()
def _setup_security(self):
# Middleware for authentication
@self.server.middleware
async def authenticate(request):
api_key = request.headers.get("X-API-Key")
if not api_key or api_key not in self.api_keys:
raise McpError(
ErrorCode.Unauthorized,
"Invalid API key"
)
# Add permissions to request context
request.context["permissions"] = self.api_keys[api_key]
request.context["api_key"] = api_key
# Middleware for rate limiting
@self.server.middleware
async def rate_limit(request):
api_key = request.context.get("api_key")
# Clean old entries
cutoff = datetime.now() - timedelta(minutes=1)
self.rate_limits[api_key] = [
t for t in self.rate_limits[api_key] if t > cutoff
]
# Check rate limit (60 requests per minute)
if len(self.rate_limits[api_key]) >= 60:
raise McpError(
ErrorCode.RateLimitExceeded,
"Rate limit exceeded. Please try again later."
)
self.rate_limits[api_key].append(datetime.now())
# Protected tool example
@self.server.tool(
name="sensitive_operation",
requires_permission="admin"
)
async def handle_sensitive(request, args):
# Permission check
permissions = request.context.get("permissions", [])
if "admin" not in permissions:
raise McpError(
ErrorCode.Forbidden,
"Insufficient permissions"
)
# Audit log
self.audit_log(
api_key=request.context["api_key"],
action="sensitive_operation",
args=args
)
# Perform operation
return {"status": "completed"}
10. Deployment & Production
Deployment Options
Option 1: Standalone Process
# docker-compose.yml
version: '3.8'
services:
mcp-filesystem:
build: ./servers/filesystem
environment:
- MCP_BASE_DIR=/data
volumes:
- ./data:/data:ro
restart: unless-stopped
mcp-database:
build: ./servers/database
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/myapp
depends_on:
- db
restart: unless-stopped
mcp-router:
image: mcp/router:latest
ports:
- "8080:8080"
environment:
- MCP_SERVERS=filesystem:3000,database:3001
depends_on:
- mcp-filesystem
- mcp-database
Option 2: Kubernetes Deployment
# mcp-server-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mcp-server
labels:
app: mcp-server
spec:
replicas: 3
selector:
matchLabels:
app: mcp-server
template:
metadata:
labels:
app: mcp-server
spec:
containers:
- name: mcp-server
image: myregistry/mcp-server:latest
ports:
- containerPort: 3000
env:
- name: MCP_MODE
value: "production"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: mcp-secrets
key: database-url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 3000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: mcp-server-service
spec:
selector:
app: mcp-server
ports:
- protocol: TCP
port: 80
targetPort: 3000
type: LoadBalancer
Option 3: Serverless Deployment
// serverless-mcp.ts
import { APIGatewayProxyHandler } from 'aws-lambda';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { LambdaTransport } from './lambda-transport';
let server: Server;
const initServer = () => {
if (!server) {
server = new Server({
name: 'serverless-mcp',
version: '1.0.0'
});
// Configure server
setupTools(server);
setupResources(server);
}
return server;
};
export const handler: APIGatewayProxyHandler = async (event, context) => {
const server = initServer();
const transport = new LambdaTransport(event, context);
try {
const response = await server.handleRequest(transport);
return {
statusCode: 200,
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(response),
};
} catch (error: any) {
return {
statusCode: error.statusCode || 500,
body: JSON.stringify({
error: error.message,
}),
};
}
};
Production Best Practices
1. Health Checks
class ProductionMCPServer extends Server {
private startTime = Date.now();
private requestCount = 0;
constructor() {
super({ name: 'production-server', version: '1.0.0' });
this.setupHealthChecks();
}
private setupHealthChecks() {
// Liveness probe
this.addEndpoint('/health', async () => {
return {
status: 'healthy',
uptime: Date.now() - this.startTime,
version: this.version
};
});
// Readiness probe
this.addEndpoint('/ready', async () => {
try {
// Check dependencies
await this.checkDatabase();
await this.checkExternalAPIs();
return { ready: true };
} catch (error) {
return {
ready: false,
reason: error.message
};
}
});
// Metrics endpoint
this.addEndpoint('/metrics', async () => {
return {
requests_total: this.requestCount,
uptime_seconds: (Date.now() - this.startTime) / 1000,
memory_usage: process.memoryUsage(),
active_connections: this.getActiveConnections()
};
});
}
}
2. Logging & Monitoring
import logging
import json
from datetime import datetime
class LoggingMCPServer:
def __init__(self):
self.server = Server("logged-server")
self.setup_logging()
def setup_logging(self):
# Structured logging
logging.basicConfig(
level=logging.INFO,
format='%(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('mcp-server.log')
]
)
# Log all requests
@self.server.middleware
async def log_request(request):
start_time = datetime.now()
# Log request
logging.info(json.dumps({
"timestamp": start_time.isoformat(),
"type": "request",
"method": request.method,
"params": request.params,
"session_id": request.session_id
}))
try:
# Process request
response = await request.next()
# Log response
duration = (datetime.now() - start_time).total_seconds()
logging.info(json.dumps({
"timestamp": datetime.now().isoformat(),
"type": "response",
"session_id": request.session_id,
"duration_seconds": duration,
"status": "success"
}))
return response
except Exception as e:
# Log error
logging.error(json.dumps({
"timestamp": datetime.now().isoformat(),
"type": "error",
"session_id": request.session_id,
"error": str(e),
"error_type": type(e).__name__
}))
raise
3. Configuration Management
// config.ts
interface MCPConfig {
server: {
name: string;
version: string;
port?: number;
};
features: {
rateLimit: boolean;
authentication: boolean;
metrics: boolean;
};
limits: {
maxRequestSize: number;
maxResponseSize: number;
requestTimeout: number;
};
external: {
databaseUrl?: string;
redisUrl?: string;
apiKeys?: Record<string, string>;
};
}
// Load configuration with validation
function loadConfig(): MCPConfig {
const config: MCPConfig = {
server: {
name: process.env.MCP_SERVER_NAME || 'mcp-server',
version: process.env.MCP_SERVER_VERSION || '1.0.0',
port: parseInt(process.env.MCP_PORT || '3000'),
},
features: {
rateLimit: process.env.MCP_RATE_LIMIT === 'true',
authentication: process.env.MCP_AUTH === 'true',
metrics: process.env.MCP_METRICS === 'true',
},
limits: {
maxRequestSize: parseInt(process.env.MCP_MAX_REQUEST_SIZE || '1048576'),
maxResponseSize: parseInt(process.env.MCP_MAX_RESPONSE_SIZE || '10485760'),
requestTimeout: parseInt(process.env.MCP_REQUEST_TIMEOUT || '30000'),
},
external: {
databaseUrl: process.env.DATABASE_URL,
redisUrl: process.env.REDIS_URL,
apiKeys: process.env.MCP_API_KEYS ?
JSON.parse(process.env.MCP_API_KEYS) : undefined,
},
};
// Validate configuration
validateConfig(config);
return config;
}
11. Troubleshooting
Common Issues and Solutions
Issue 1: Server Wonβt Start
Symptoms: Error when launching server
Debugging steps:
# Check if port is in use
lsof -i :3000
# Run with debug logging
DEBUG=mcp:* node server.js
# Check permissions
ls -la server.js
# Verify dependencies
npm list
Solution:
// Add error handling to server startup
async function startServer() {
try {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error('Server started successfully');
} catch (error) {
console.error('Failed to start server:', error);
// Common issues
if (error.code === 'EADDRINUSE') {
console.error('Port already in use');
} else if (error.code === 'EACCES') {
console.error('Permission denied');
}
process.exit(1);
}
}
Issue 2: Client Canβt Connect
Symptoms: βConnection refusedβ or timeout errors
Debugging:
# Test server is responding
import asyncio
from mcp.client import Client
async def test_connection():
try:
client = Client()
await client.connect_stdio(
command="node",
args=["path/to/server.js"]
)
# Test basic operation
response = await client.list_tools()
print(f"Connected! Found {len(response.tools)} tools")
except Exception as e:
print(f"Connection failed: {e}")
# Debug info
import subprocess
result = subprocess.run(
["node", "path/to/server.js", "--version"],
capture_output=True,
text=True
)
print(f"Server output: {result.stdout}")
print(f"Server errors: {result.stderr}")
Issue 3: Tool Execution Fails
Symptoms: Tools listed but fail when called
Debugging:
// Add comprehensive error handling
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
console.error(`Executing tool: ${name}`, args);
try {
// Validate tool exists
const tool = this.tools.get(name);
if (!tool) {
throw new McpError(
ErrorCode.MethodNotFound,
`Tool '${name}' not found. Available tools: ${Array.from(this.tools.keys()).join(', ')}`
);
}
// Validate arguments
const validation = validateArguments(args, tool.inputSchema);
if (!validation.valid) {
throw new McpError(
ErrorCode.InvalidParams,
`Invalid arguments: ${validation.errors.join(', ')}`
);
}
// Execute with timeout
const result = await withTimeout(
tool.handler(args),
30000,
`Tool '${name}' execution timed out`
);
return result;
} catch (error) {
console.error(`Tool execution error:`, error);
// Provide helpful error messages
if (error instanceof McpError) {
throw error;
}
throw new McpError(
ErrorCode.InternalError,
`Tool '${name}' failed: ${error.message}`,
{ originalError: error.stack }
);
}
});
Issue 4: Performance Issues
Symptoms: Slow responses, timeouts
Profiling:
// Add performance monitoring
class PerformanceMonitor {
private metrics = new Map<string, number[]>();
async measure<T>(name: string, fn: () => Promise<T>): Promise<T> {
const start = performance.now();
try {
const result = await fn();
const duration = performance.now() - start;
// Store metric
if (!this.metrics.has(name)) {
this.metrics.set(name, []);
}
this.metrics.get(name)!.push(duration);
// Log slow operations
if (duration > 1000) {
console.warn(`Slow operation '${name}': ${duration}ms`);
}
return result;
} catch (error) {
const duration = performance.now() - start;
console.error(`Operation '${name}' failed after ${duration}ms`);
throw error;
}
}
getStats(name: string) {
const times = this.metrics.get(name) || [];
if (times.length === 0) return null;
return {
count: times.length,
avg: times.reduce((a, b) => a + b) / times.length,
min: Math.min(...times),
max: Math.max(...times),
p95: percentile(times, 95)
};
}
}
Debug Mode
// Enable debug mode with environment variable
if (process.env.MCP_DEBUG === 'true') {
// Log all messages
server.on('message', (msg) => {
console.error('Message:', JSON.stringify(msg, null, 2));
});
// Log all errors
server.on('error', (error) => {
console.error('Error:', error);
});
// Memory usage monitoring
setInterval(() => {
const usage = process.memoryUsage();
console.error('Memory:', {
rss: `${Math.round(usage.rss / 1024 / 1024)}MB`,
heap: `${Math.round(usage.heapUsed / 1024 / 1024)}MB`
});
}, 30000);
}
12. MCP Ecosystem & Resources
Official Resources
- MCP Specification
- Example Servers
Community Resources
- Tutorials & Guides
- Tools & Libraries
- MCP Inspector: Debug MCP connections
- MCP Router: Route between multiple servers
- MCP Testing Framework: Test your servers
- Integration Examples
Popular MCP Servers
- Data & Storage
mcp-server-sqlite
: SQLite database accessmcp-server-postgres
: PostgreSQL integrationmcp-server-filesystem
: Local file system accessmcp-server-s3
: AWS S3 integration
- Development Tools
mcp-server-git
: Git repository managementmcp-server-github
: GitHub API integrationmcp-server-docker
: Docker container managementmcp-server-kubernetes
: K8s cluster operations
- Communication
mcp-server-slack
: Slack workspace integrationmcp-server-email
: Email sending and readingmcp-server-discord
: Discord bot integration
- AI/ML Tools
mcp-server-huggingface
: Hugging Face model accessmcp-server-openai
: OpenAI API integrationmcp-server-langchain
: LangChain integration
Contributing to MCP
- Create Your Own Server
# Use the template npx create-mcp-server my-awesome-server cd my-awesome-server npm install npm run dev
- Share with Community
- Publish to npm/PyPI
- Add to awesome-mcp list
- Write documentation
- Create examples
- Report Issues
Conclusion
MCP represents a paradigm shift in how AI applications interact with external systems. By providing a standardized protocol for context sharing, MCP enables:
- Unified Integration: One protocol for all AI assistants
- Rich Context: AI can access any data you expose
- Community Innovation: Share and reuse integrations
- Future-Proof: Works with any AI model or platform
Next Steps
- Start Simple: Build a basic file system server
- Experiment: Try different types of resources and tools
- Integrate: Connect to Claude Desktop or your app
- Share: Contribute your servers to the community
- Scale: Deploy to production when ready
Remember: MCP is not just a protocolβitβs an ecosystem that grows with every server you build and share.
Happy building! π