Azure Service Bus: The Complete Guide to Enterprise Messaging

Azure Service Bus: Complete Developer Guide

Master enterprise-grade messaging with Azure Service Bus - from basic queues to advanced patterns, with hands-on examples and real-world scenarios

πŸ“š Table of Contents

  1. Introduction: What is Azure Service Bus?
  2. Core Concepts & Architecture
  3. Pricing Tiers & Feature Comparison
  4. Getting Started: Your First Service Bus
  5. Working with Queues
  6. Topics and Subscriptions
  7. Advanced Messaging Patterns
  8. Dead Letter Queues & Error Handling
  9. Security & Authentication
  10. Performance Optimization
  11. Real-World Scenarios
  12. Monitoring & Troubleshooting
  13. Best Practices
  14. Migration Guide
  15. Resources & Next Steps

1. Introduction: What is Azure Service Bus?

The Simple Explanation

Imagine you’re running a busy restaurant. Orders come in from multiple sources: dine-in, takeout, delivery apps. Your kitchen needs to process these orders efficiently without losing any. Azure Service Bus is like a smart order management system that:

In technical terms, Azure Service Bus is a fully managed enterprise message broker with message queues and publish-subscribe topics.

Why Use Azure Service Bus?

Traditional Direct Communication:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     Direct Call      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   App A     │───────────────────►│    App B     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    (Tight Coupling) β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Problems: 
- If App B is down, App A fails
- If App B is slow, App A waits
- Hard to scale independently

With Service Bus:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   App A     │───►│ Service Bus │───►│    App B     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Benefits:
- Apps work independently
- Messages are never lost
- Easy to scale
- Built-in retry logic

Real-World Use Cases

  1. E-commerce Order Processing: Handle order spikes during sales
  2. IoT Data Ingestion: Process millions of sensor readings
  3. Microservice Communication: Decouple services in your architecture
  4. Event-Driven Workflows: Trigger actions based on events
  5. Batch Processing: Queue work items for background processing

2. Core Concepts & Architecture

The Building Blocks

1. Namespace

Think of a namespace as your messaging container. It’s like having your own post office where all your messaging entities live.

// Connection string to your namespace
string connectionString = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=myKey";

2. Messages

The data packets you send. Each message has:

// Creating a message
var message = new ServiceBusMessage("Hello, Service Bus!")
{
    ContentType = "application/json",
    Subject = "OrderCreated",
    ApplicationProperties = {
        ["OrderId"] = "12345",
        ["Priority"] = "High"
    }
};

3. Queues (Point-to-Point)

One sender, one receiver. Like a line at a coffee shop - first in, first out (FIFO).

Sender ──► [Message 3][Message 2][Message 1] ──► Receiver

4. Topics & Subscriptions (Publish-Subscribe)

One sender, multiple receivers. Like a newsletter - one publisher, many subscribers.

                    β”Œβ”€β–Ί Subscription A ──► Receiver 1
                    β”‚
Sender ──► Topic ──┼─► Subscription B ──► Receiver 2
                    β”‚
                    └─► Subscription C ──► Receiver 3

How Messages Flow

1. Producer creates message
2. Sends to Service Bus (Queue/Topic)
3. Service Bus stores message durably
4. Consumer requests message
5. Service Bus delivers message
6. Consumer processes message
7. Consumer acknowledges completion
8. Service Bus removes message

3. Pricing Tiers & Feature Comparison

Choose Your Tier

FeatureBasicStandardPremium
Queuesβœ…βœ…βœ…
TopicsβŒβœ…βœ…
Message Size256 KB256 KB100 MB
SessionsβŒβœ…βœ…
TransactionsβŒβœ…βœ…
Dead Letter Queueβœ…βœ…βœ…
Duplicate DetectionβŒβœ…βœ…
PartitioningβŒβœ…βœ…
PerformanceSharedSharedDedicated
Geo-DRβŒβŒβœ…
Use CaseDev/TestProductionMission Critical

Making the Right Choice

// Basic Tier - Simple queue operations
// Good for: Development, testing, simple scenarios

// Standard Tier - Full features
// Good for: Most production workloads

// Premium Tier - Predictable performance
// Good for: High-throughput, low-latency requirements

4. Getting Started: Your First Service Bus

Step 1: Create a Service Bus Namespace (Azure Portal)

# Using Azure CLI
az servicebus namespace create \
  --resource-group myResourceGroup \
  --name myServiceBusNamespace \
  --location eastus \
  --sku Standard

Step 2: Create a Queue

# Create a queue
az servicebus queue create \
  --resource-group myResourceGroup \
  --namespace-name myServiceBusNamespace \
  --name myQueue

Step 3: Install the SDK

# .NET
dotnet add package Azure.Messaging.ServiceBus

# Python
pip install azure-servicebus

# Node.js
npm install @azure/service-bus

# Java
# Add to pom.xml
<dependency>
  <groupId>com.azure</groupId>
  <artifactId>azure-messaging-servicebus</artifactId>
  <version>7.15.0</version>
</dependency>

Step 4: Send Your First Message (.NET Example)

using Azure.Messaging.ServiceBus;
using System;
using System.Threading.Tasks;

class Program
{
    // Connection string from Azure Portal
    const string connectionString = "Endpoint=sb://...";
    const string queueName = "myQueue";

    static async Task Main()
    {
        // Create a client
        await using var client = new ServiceBusClient(connectionString);
        
        // Create a sender
        await using var sender = client.CreateSender(queueName);
        
        // Create and send a message
        var message = new ServiceBusMessage("Hello, Azure Service Bus!");
        await sender.SendMessageAsync(message);
        
        Console.WriteLine("Message sent!");
    }
}

Step 5: Receive Your First Message

static async Task ReceiveMessages()
{
    await using var client = new ServiceBusClient(connectionString);
    
    // Create a receiver
    await using var receiver = client.CreateReceiver(queueName);
    
    // Receive a message
    var message = await receiver.ReceiveMessageAsync();
    
    if (message != null)
    {
        Console.WriteLine($"Received: {message.Body.ToString()}");
        
        // Complete the message (remove from queue)
        await receiver.CompleteMessageAsync(message);
    }
}

5. Working with Queues

Understanding Queue Behavior

Queues provide reliable point-to-point communication. Think of it as a reliable pipe between two applications.

Properties of a Queue:
- FIFO ordering (with sessions)
- At-least-once delivery
- Message locking
- Dead letter support
- Duplicate detection (optional)

Complete Queue Example

public class QueueService
{
    private readonly ServiceBusClient _client;
    private readonly ServiceBusSender _sender;
    private readonly ServiceBusProcessor _processor;

    public QueueService(string connectionString, string queueName)
    {
        _client = new ServiceBusClient(connectionString);
        _sender = _client.CreateSender(queueName);
        
        // Create processor for receiving
        _processor = _client.CreateProcessor(queueName, new ServiceBusProcessorOptions
        {
            AutoCompleteMessages = false,
            MaxConcurrentCalls = 5,
            PrefetchCount = 10
        });
        
        // Configure handlers
        _processor.ProcessMessageAsync += ProcessMessageHandler;
        _processor.ProcessErrorAsync += ProcessErrorHandler;
    }

    // Send a single message
    public async Task SendMessageAsync(string content)
    {
        var message = new ServiceBusMessage(content)
        {
            MessageId = Guid.NewGuid().ToString(),
            TimeToLive = TimeSpan.FromHours(1)
        };
        
        await _sender.SendMessageAsync(message);
    }

    // Send batch of messages
    public async Task SendBatchAsync(List<string> contents)
    {
        using var batch = await _sender.CreateMessageBatchAsync();
        
        foreach (var content in contents)
        {
            if (!batch.TryAddMessage(new ServiceBusMessage(content)))
            {
                // Batch is full, send it
                await _sender.SendMessagesAsync(batch);
                
                // Create new batch
                batch = await _sender.CreateMessageBatchAsync();
                batch.TryAddMessage(new ServiceBusMessage(content));
            }
        }
        
        // Send remaining messages
        if (batch.Count > 0)
        {
            await _sender.SendMessagesAsync(batch);
        }
    }

    // Process messages continuously
    public async Task StartProcessingAsync()
    {
        await _processor.StartProcessingAsync();
    }

    private async Task ProcessMessageHandler(ProcessMessageEventArgs args)
    {
        try
        {
            string body = args.Message.Body.ToString();
            Console.WriteLine($"Processing: {body}");
            
            // Your business logic here
            await ProcessBusinessLogic(body);
            
            // Complete the message
            await args.CompleteMessageAsync(args.Message);
        }
        catch (Exception ex)
        {
            // Abandon the message for retry
            await args.AbandonMessageAsync(args.Message);
            
            // Or send to dead letter queue
            // await args.DeadLetterMessageAsync(args.Message, "ProcessingError", ex.Message);
        }
    }

    private Task ProcessErrorHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine($"Error: {args.Exception.Message}");
        return Task.CompletedTask;
    }

    private async Task ProcessBusinessLogic(string message)
    {
        // Simulate work
        await Task.Delay(1000);
    }
}

Python Example

from azure.servicebus import ServiceBusClient, ServiceBusMessage
import asyncio

connection_str = "Endpoint=sb://..."
queue_name = "myqueue"

async def send_message():
    async with ServiceBusClient.from_connection_string(connection_str) as client:
        async with client.get_queue_sender(queue_name) as sender:
            # Send single message
            message = ServiceBusMessage("Hello from Python!")
            await sender.send_messages(message)
            
            # Send batch
            messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
            await sender.send_messages(messages)

async def receive_messages():
    async with ServiceBusClient.from_connection_string(connection_str) as client:
        async with client.get_queue_receiver(queue_name) as receiver:
            messages = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
            
            for message in messages:
                print(f"Received: {str(message)}")
                await receiver.complete_message(message)

# Run
asyncio.run(send_message())
asyncio.run(receive_messages())

Advanced Queue Features

1. Message Sessions (Ordered Processing)

// Enable sessions when creating queue
var queue = new CreateQueueOptions(queueName)
{
    RequiresSession = true
};

// Send with session
var message = new ServiceBusMessage("Order item 1")
{
    SessionId = "Order123"  // All messages with same SessionId processed in order
};

// Receive from specific session
await using var receiver = await client.AcceptSessionAsync(queueName, "Order123");

2. Scheduled Messages

// Schedule a message for future delivery
var message = new ServiceBusMessage("Reminder: Meeting at 2 PM");
var scheduledTime = DateTimeOffset.UtcNow.AddHours(2);

long sequenceNumber = await sender.ScheduleMessageAsync(message, scheduledTime);

// Cancel if needed
await sender.CancelScheduledMessageAsync(sequenceNumber);

3. Message Deferral

// Defer a message for later processing
await receiver.DeferMessageAsync(message);

// Retrieve deferred message later
var deferredMessage = await receiver.ReceiveDeferredMessageAsync(sequenceNumber);

6. Topics and Subscriptions

Understanding Pub/Sub Pattern

Topics enable one-to-many communication. One publisher, multiple subscribers with filtering.

Real-world analogy: Magazine Publishing
- Topic = Magazine
- Subscriptions = Individual subscribers
- Filters = Subscriber preferences (sports, tech, etc.)

Creating Topics and Subscriptions

// Create a topic
var topic = new CreateTopicOptions("order-events")
{
    DefaultMessageTimeToLive = TimeSpan.FromDays(7),
    DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(10),
    EnablePartitioning = true
};

// Create subscriptions with filters
var allOrdersSub = new CreateSubscriptionOptions("order-events", "all-orders");

var highValueSub = new CreateSubscriptionOptions("order-events", "high-value-orders");
var highValueFilter = new CreateRuleOptions("HighValueFilter", 
    new SqlRuleFilter("OrderTotal > 1000"));

Complete Pub/Sub Example

public class OrderEventPublisher
{
    private readonly ServiceBusSender _sender;

    public OrderEventPublisher(ServiceBusClient client)
    {
        _sender = client.CreateSender("order-events");
    }

    public async Task PublishOrderEvent(Order order)
    {
        var message = new ServiceBusMessage(JsonSerializer.Serialize(order))
        {
            Subject = order.Type, // Used for filtering
            ApplicationProperties =
            {
                ["OrderTotal"] = order.Total,
                ["CustomerId"] = order.CustomerId,
                ["Region"] = order.Region
            }
        };

        await _sender.SendMessageAsync(message);
    }
}

public class OrderEventSubscriber
{
    private readonly ServiceBusProcessor _processor;

    public OrderEventSubscriber(ServiceBusClient client, string subscriptionName)
    {
        _processor = client.CreateProcessor("order-events", subscriptionName);
        _processor.ProcessMessageAsync += HandleOrderEvent;
        _processor.ProcessErrorAsync += HandleError;
    }

    public async Task StartAsync()
    {
        await _processor.StartProcessingAsync();
    }

    private async Task HandleOrderEvent(ProcessMessageEventArgs args)
    {
        var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
        
        // Process based on subscription
        Console.WriteLine($"Processing order {order.Id} with total {order.Total}");
        
        await args.CompleteMessageAsync(args.Message);
    }

    private Task HandleError(ProcessErrorEventArgs args)
    {
        Console.WriteLine($"Error: {args.Exception}");
        return Task.CompletedTask;
    }
}

Subscription Filters

1. SQL Filters

// Complex SQL filter
var filter = new SqlRuleFilter(@"
    OrderTotal > 100 AND 
    Region IN ('US', 'EU') AND 
    sys.Label = 'HighPriority'
");

2. Correlation Filters (Better Performance)

var filter = new CorrelationRuleFilter
{
    Subject = "OrderCreated",
    ApplicationProperties =
    {
        ["Region"] = "US"
    }
};

3. Custom Actions

// Modify message properties during filtering
var rule = new RuleProperties
{
    Name = "AddRegionTag",
    Filter = new SqlRuleFilter("Region = 'US'"),
    Action = new SqlRuleAction("SET sys.Label = 'US-Order'")
};

7. Advanced Messaging Patterns

Pattern 1: Request-Reply

public class RequestReplyPattern
{
    public async Task<string> SendRequestAsync(string request)
    {
        var requestMessage = new ServiceBusMessage(request)
        {
            ReplyTo = "reply-queue",
            ReplyToSessionId = Guid.NewGuid().ToString()
        };

        // Send request
        await _requestSender.SendMessageAsync(requestMessage);

        // Wait for reply on specific session
        await using var replyReceiver = await _client.AcceptSessionAsync(
            "reply-queue", 
            requestMessage.ReplyToSessionId);

        var reply = await replyReceiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30));
        
        if (reply != null)
        {
            await replyReceiver.CompleteMessageAsync(reply);
            return reply.Body.ToString();
        }

        throw new TimeoutException("No reply received");
    }
}

Pattern 2: Saga/Process Manager

public class OrderSaga
{
    private readonly Dictionary<string, Func<OrderState, Task>> _handlers;

    public OrderSaga()
    {
        _handlers = new Dictionary<string, Func<OrderState, Task>>
        {
            ["OrderPlaced"] = HandleOrderPlaced,
            ["PaymentReceived"] = HandlePaymentReceived,
            ["ItemsShipped"] = HandleItemsShipped
        };
    }

    public async Task ProcessMessage(ServiceBusReceivedMessage message)
    {
        var eventType = message.Subject;
        var orderId = message.SessionId;
        
        // Load saga state
        var state = await LoadOrderState(orderId);
        
        // Handle event
        if (_handlers.TryGetValue(eventType, out var handler))
        {
            await handler(state);
            await SaveOrderState(state);
        }
    }

    private async Task HandleOrderPlaced(OrderState state)
    {
        state.Status = "AwaitingPayment";
        // Send payment request
    }

    private async Task HandlePaymentReceived(OrderState state)
    {
        state.Status = "Paid";
        // Trigger fulfillment
    }
}

Pattern 3: Message Aggregator

public class MessageAggregator
{
    private readonly Dictionary<string, List<ServiceBusReceivedMessage>> _batches = new();

    public async Task AggregateMessage(ServiceBusReceivedMessage message)
    {
        var batchId = message.ApplicationProperties["BatchId"].ToString();
        var totalMessages = (int)message.ApplicationProperties["TotalMessages"];

        lock (_batches)
        {
            if (!_batches.ContainsKey(batchId))
                _batches[batchId] = new List<ServiceBusReceivedMessage>();
            
            _batches[batchId].Add(message);

            if (_batches[batchId].Count == totalMessages)
            {
                // Process complete batch
                ProcessCompleteBatch(_batches[batchId]);
                _batches.Remove(batchId);
            }
        }
    }
}

Pattern 4: Competing Consumers

public class CompetingConsumers
{
    public static async Task StartMultipleConsumers(int consumerCount)
    {
        var tasks = new List<Task>();

        for (int i = 0; i < consumerCount; i++)
        {
            var consumerId = i;
            tasks.Add(Task.Run(async () =>
            {
                var processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions
                {
                    MaxConcurrentCalls = 1,
                    Identifier = $"Consumer-{consumerId}"
                });

                processor.ProcessMessageAsync += async (args) =>
                {
                    Console.WriteLine($"Consumer {consumerId} processing: {args.Message.MessageId}");
                    await Task.Delay(1000); // Simulate work
                    await args.CompleteMessageAsync(args.Message);
                };

                await processor.StartProcessingAsync();
            }));
        }

        await Task.WhenAll(tasks);
    }
}

8. Dead Letter Queues & Error Handling

Understanding Dead Letter Queues

Every queue and subscription has a dead letter queue (DLQ) - a special sub-queue where problematic messages go.

Main Queue: myqueue
DLQ Path:  myqueue/$deadletterqueue

Why messages dead letter:
1. Max delivery attempts exceeded
2. TTL expired
3. Manually dead lettered
4. Header size exceeded
5. Subscription filter evaluation errors

Working with Dead Letter Queues

public class DeadLetterHandler
{
    private readonly ServiceBusClient _client;
    private readonly string _queueName;

    public async Task ProcessDeadLetterMessages()
    {
        // Create DLQ receiver
        var dlqReceiver = _client.CreateReceiver(
            _queueName, 
            new ServiceBusReceiverOptions
            {
                SubQueue = SubQueue.DeadLetter
            });

        await foreach (var message in dlqReceiver.ReceiveMessagesAsync())
        {
            Console.WriteLine($"Dead letter reason: {message.DeadLetterReason}");
            Console.WriteLine($"Dead letter description: {message.DeadLetterErrorDescription}");
            
            try
            {
                // Try to process again or log for investigation
                await ProcessDeadLetteredMessage(message);
                await dlqReceiver.CompleteMessageAsync(message);
            }
            catch (Exception ex)
            {
                // Log and abandon if still failing
                Console.WriteLine($"Still failing: {ex.Message}");
                await dlqReceiver.AbandonMessageAsync(message);
            }
        }
    }

    // Resubmit dead letter messages to main queue
    public async Task ResubmitDeadLetterMessages()
    {
        var dlqReceiver = _client.CreateReceiver(_queueName, 
            new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
        
        var sender = _client.CreateSender(_queueName);

        await foreach (var message in dlqReceiver.ReceiveMessagesAsync())
        {
            // Create new message from dead letter
            var resubmitMessage = new ServiceBusMessage(message.Body)
            {
                ContentType = message.ContentType,
                Subject = message.Subject
            };

            // Copy application properties
            foreach (var prop in message.ApplicationProperties)
            {
                resubmitMessage.ApplicationProperties[prop.Key] = prop.Value;
            }

            // Add resubmission tracking
            resubmitMessage.ApplicationProperties["ResubmittedAt"] = DateTime.UtcNow;
            resubmitMessage.ApplicationProperties["OriginalDeadLetterReason"] = message.DeadLetterReason;

            await sender.SendMessageAsync(resubmitMessage);
            await dlqReceiver.CompleteMessageAsync(message);
        }
    }
}

Implementing Retry Logic

public class RetryProcessor
{
    public async Task ProcessWithRetry(ProcessMessageEventArgs args)
    {
        var deliveryCount = args.Message.DeliveryCount;
        var maxRetries = 5;

        try
        {
            // Exponential backoff based on delivery count
            if (deliveryCount > 1)
            {
                var delay = TimeSpan.FromSeconds(Math.Pow(2, deliveryCount - 1));
                await Task.Delay(delay);
            }

            // Process message
            await ProcessMessage(args.Message);
            await args.CompleteMessageAsync(args.Message);
        }
        catch (TransientException ex)
        {
            // Transient error - abandon for retry
            if (deliveryCount < maxRetries)
            {
                await args.AbandonMessageAsync(args.Message);
            }
            else
            {
                // Max retries reached - dead letter
                await args.DeadLetterMessageAsync(args.Message, 
                    "MaxRetriesExceeded", 
                    $"Failed after {maxRetries} attempts: {ex.Message}");
            }
        }
        catch (PermanentException ex)
        {
            // Permanent error - dead letter immediately
            await args.DeadLetterMessageAsync(args.Message,
                "PermanentFailure",
                ex.Message);
        }
    }
}

9. Security & Authentication

Connection Security

Azure Service Bus uses AMQP (Advanced Message Queuing Protocol) over TLS for secure communication.

// Connection string with SAS authentication
var connectionString = "Endpoint=sb://namespace.servicebus.windows.net/;" +
                      "SharedAccessKeyName=RootManageSharedAccessKey;" +
                      "SharedAccessKey=yourKey;" +
                      "TransportType=AmqpWebSockets"; // Use WebSockets for firewall-friendly

Authentication Options

1. Shared Access Signature (SAS)

// Create SAS token programmatically
public string CreateSasToken(string resourceUri, string keyName, string key)
{
    var expiry = DateTimeOffset.UtcNow.AddHours(1);
    var stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry.ToUnixTimeSeconds();
    
    using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
    var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
    
    var token = $"SharedAccessSignature sr={HttpUtility.UrlEncode(resourceUri)}" +
                $"&sig={HttpUtility.UrlEncode(signature)}" +
                $"&se={expiry.ToUnixTimeSeconds()}" +
                $"&skn={keyName}";
    
    return token;
}
// Using Managed Identity
var client = new ServiceBusClient(
    "namespace.servicebus.windows.net",
    new DefaultAzureCredential());

// Using Service Principal
var client = new ServiceBusClient(
    "namespace.servicebus.windows.net",
    new ClientSecretCredential(tenantId, clientId, clientSecret));

Role-Based Access Control (RBAC)

# Assign roles via Azure CLI
# Sender role
az role assignment create \
  --assignee <user-or-service-principal> \
  --role "Azure Service Bus Data Sender" \
  --scope /subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.ServiceBus/namespaces/{namespace}/queues/{queue}

# Receiver role
az role assignment create \
  --assignee <user-or-service-principal> \
  --role "Azure Service Bus Data Receiver" \
  --scope /subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.ServiceBus/namespaces/{namespace}/queues/{queue}

Network Security

// Configure firewall rules
var namespaceManager = new ServiceBusAdministrationClient(connectionString);

// Allow specific IP ranges
await namespaceManager.CreateOrUpdateNetworkRuleSetAsync(new NetworkRuleSet
{
    DefaultAction = NetworkRuleSetDefaultAction.Deny,
    IpRules = new[]
    {
        new NWRuleSetIpRules { IpMask = "10.0.0.0/24", Action = NetworkRuleIPAction.Allow },
        new NWRuleSetIpRules { IpMask = "192.168.1.0/24", Action = NetworkRuleIPAction.Allow }
    }
});

10. Performance Optimization

Key Performance Factors

1. Message Size: Smaller = Better throughput
2. Batch Operations: Send/Receive multiple messages
3. Prefetching: Retrieve messages before needed
4. Connection Pooling: Reuse connections
5. Partitioning: Distribute load

Optimization Techniques

1. Batching

public class BatchOptimization
{
    // Efficient batch sending
    public async Task SendLargeBatchAsync(List<Order> orders)
    {
        var sender = client.CreateSender(queueName);
        var currentBatch = await sender.CreateMessageBatchAsync();

        foreach (var order in orders)
        {
            var message = new ServiceBusMessage(JsonSerializer.Serialize(order));
            
            if (!currentBatch.TryAddMessage(message))
            {
                // Send current batch
                await sender.SendMessagesAsync(currentBatch);
                
                // Create new batch
                currentBatch = await sender.CreateMessageBatchAsync();
                currentBatch.TryAddMessage(message);
            }
        }

        // Send remaining messages
        if (currentBatch.Count > 0)
        {
            await sender.SendMessagesAsync(currentBatch);
        }
    }

    // Efficient batch receiving
    public async Task ReceiveBatchAsync()
    {
        var receiver = client.CreateReceiver(queueName);
        
        // Receive up to 100 messages
        var messages = await receiver.ReceiveMessagesAsync(
            maxMessages: 100, 
            maxWaitTime: TimeSpan.FromSeconds(5));

        // Process in parallel
        await Parallel.ForEachAsync(messages, async (message, ct) =>
        {
            await ProcessMessage(message);
            await receiver.CompleteMessageAsync(message);
        });
    }
}

2. Prefetching

// Configure prefetch
var processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions
{
    PrefetchCount = 20, // Prefetch 20 messages
    MaxConcurrentCalls = 5, // Process 5 at a time
    ReceiveMode = ServiceBusReceiveMode.PeekLock
});

3. Connection Management

public class ConnectionPool
{
    private static readonly ServiceBusClient _sharedClient;
    private static readonly Dictionary<string, ServiceBusSender> _senders = new();
    
    static ConnectionPool()
    {
        _sharedClient = new ServiceBusClient(connectionString, new ServiceBusClientOptions
        {
            TransportType = ServiceBusTransportType.AmqpWebSockets,
            RetryOptions = new ServiceBusRetryOptions
            {
                MaxRetries = 3,
                Delay = TimeSpan.FromSeconds(1),
                MaxDelay = TimeSpan.FromSeconds(10),
                Mode = ServiceBusRetryMode.Exponential
            }
        });
    }

    public static ServiceBusSender GetSender(string queueOrTopicName)
    {
        if (!_senders.ContainsKey(queueOrTopicName))
        {
            _senders[queueOrTopicName] = _sharedClient.CreateSender(queueOrTopicName);
        }
        return _senders[queueOrTopicName];
    }
}

4. Message Compression

public class MessageCompression
{
    public async Task SendCompressedAsync(LargePayload payload)
    {
        // Serialize and compress
        var json = JsonSerializer.Serialize(payload);
        var compressed = Compress(Encoding.UTF8.GetBytes(json));

        var message = new ServiceBusMessage(compressed)
        {
            ApplicationProperties = 
            {
                ["Compressed"] = true,
                ["CompressionType"] = "gzip"
            }
        };

        await sender.SendMessageAsync(message);
    }

    private byte[] Compress(byte[] data)
    {
        using var output = new MemoryStream();
        using (var gzip = new GZipStream(output, CompressionLevel.Optimal))
        {
            gzip.Write(data, 0, data.Length);
        }
        return output.ToArray();
    }
}

Performance Monitoring

public class PerformanceMonitor
{
    private readonly ILogger<PerformanceMonitor> _logger;
    private readonly Dictionary<string, PerformanceCounter> _counters = new();

    public void RecordMessageProcessed(string queueName, TimeSpan processingTime)
    {
        _logger.LogInformation("Message processed from {Queue} in {Time}ms", 
            queueName, processingTime.TotalMilliseconds);

        // Track metrics
        _counters[$"{queueName}_count"].Increment();
        _counters[$"{queueName}_avg_time"].RawValue = (long)processingTime.TotalMilliseconds;
    }

    public async Task<QueueMetrics> GetQueueMetrics(string queueName)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
        var properties = await client.GetQueueRuntimePropertiesAsync(queueName);

        return new QueueMetrics
        {
            ActiveMessageCount = properties.Value.ActiveMessageCount,
            DeadLetterMessageCount = properties.Value.DeadLetterMessageCount,
            ScheduledMessageCount = properties.Value.ScheduledMessageCount,
            SizeInMegabytes = properties.Value.SizeInBytes / (1024.0 * 1024.0)
        };
    }
}

11. Real-World Scenarios

Scenario 1: E-Commerce Order Processing

public class ECommerceOrderProcessor
{
    private readonly ServiceBusClient _client;
    private readonly IOrderService _orderService;
    private readonly IInventoryService _inventoryService;
    private readonly IPaymentService _paymentService;

    public async Task ProcessOrderWorkflow(Order order)
    {
        // Step 1: Send order to validation queue
        var validationSender = _client.CreateSender("order-validation");
        await validationSender.SendMessageAsync(new ServiceBusMessage(JsonSerializer.Serialize(order))
        {
            SessionId = order.Id, // Ensure order processing
            ApplicationProperties = 
            {
                ["OrderType"] = order.Type,
                ["Priority"] = order.Priority
            }
        });

        // Step 2: Set up processors for each stage
        await SetupOrderValidationProcessor();
        await SetupInventoryCheckProcessor();
        await SetupPaymentProcessor();
        await SetupFulfillmentProcessor();
    }

    private async Task SetupOrderValidationProcessor()
    {
        var processor = _client.CreateProcessor("order-validation");
        
        processor.ProcessMessageAsync += async (args) =>
        {
            var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
            
            try
            {
                // Validate order
                if (await _orderService.ValidateOrder(order))
                {
                    // Send to inventory check
                    var sender = _client.CreateSender("inventory-check");
                    await sender.SendMessageAsync(new ServiceBusMessage(args.Message.Body)
                    {
                        SessionId = args.Message.SessionId
                    });
                }
                else
                {
                    // Send to dead letter with reason
                    await args.DeadLetterMessageAsync(args.Message, 
                        "ValidationFailed", "Order validation failed");
                }
                
                await args.CompleteMessageAsync(args.Message);
            }
            catch (Exception ex)
            {
                await args.DeadLetterMessageAsync(args.Message, 
                    "ProcessingError", ex.Message);
            }
        };

        await processor.StartProcessingAsync();
    }

    private async Task SetupPaymentProcessor()
    {
        var processor = _client.CreateProcessor("payment-processing");
        
        processor.ProcessMessageAsync += async (args) =>
        {
            var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
            
            try
            {
                // Process payment with retry logic
                var paymentResult = await ProcessPaymentWithRetry(order, args.Message.DeliveryCount);
                
                if (paymentResult.Success)
                {
                    // Send to fulfillment
                    var sender = _client.CreateSender("order-fulfillment");
                    await sender.SendMessageAsync(new ServiceBusMessage(args.Message.Body)
                    {
                        SessionId = args.Message.SessionId,
                        ApplicationProperties = 
                        {
                            ["PaymentId"] = paymentResult.TransactionId
                        }
                    });
                    
                    await args.CompleteMessageAsync(args.Message);
                }
                else
                {
                    // Retry or dead letter based on failure type
                    if (paymentResult.IsTransient && args.Message.DeliveryCount < 5)
                    {
                        await args.AbandonMessageAsync(args.Message);
                    }
                    else
                    {
                        await args.DeadLetterMessageAsync(args.Message,
                            "PaymentFailed", paymentResult.Error);
                    }
                }
            }
            catch (Exception ex)
            {
                await args.DeadLetterMessageAsync(args.Message,
                    "ProcessingError", ex.Message);
            }
        };

        await processor.StartProcessingAsync();
    }

    private async Task<PaymentResult> ProcessPaymentWithRetry(Order order, int attemptNumber)
    {
        // Exponential backoff
        if (attemptNumber > 1)
        {
            var delay = TimeSpan.FromSeconds(Math.Pow(2, attemptNumber - 1));
            await Task.Delay(delay);
        }

        return await _paymentService.ProcessPayment(order);
    }
}

Scenario 2: IoT Telemetry Processing

public class IoTTelemetryProcessor
{
    private readonly ServiceBusClient _client;
    private readonly ITimeSeriesDatabase _timeSeriesDb;
    private readonly IAlertingService _alertingService;

    public async Task SetupTelemetryPipeline()
    {
        // Create topic for telemetry
        var adminClient = new ServiceBusAdministrationClient(connectionString);
        
        await adminClient.CreateTopicAsync(new CreateTopicOptions("device-telemetry")
        {
            EnablePartitioning = true, // For high throughput
            DefaultMessageTimeToLive = TimeSpan.FromHours(24)
        });

        // Create subscriptions for different processing needs
        await CreateSubscriptions(adminClient);
        
        // Start processors
        await StartRealTimeProcessor();
        await StartBatchProcessor();
        await StartAlertProcessor();
    }

    private async Task CreateSubscriptions(ServiceBusAdministrationClient adminClient)
    {
        // Real-time processing subscription
        await adminClient.CreateSubscriptionAsync(
            new CreateSubscriptionOptions("device-telemetry", "realtime-processing")
            {
                LockDuration = TimeSpan.FromSeconds(30)
            });

        // Batch processing subscription (for analytics)
        await adminClient.CreateSubscriptionAsync(
            new CreateSubscriptionOptions("device-telemetry", "batch-analytics")
            {
                LockDuration = TimeSpan.FromMinutes(5)
            });

        // Alert processing subscription with filter
        await adminClient.CreateSubscriptionAsync(
            new CreateSubscriptionOptions("device-telemetry", "alerts"),
            new CreateRuleOptions("HighTemperature", 
                new SqlRuleFilter("Temperature > 30 OR Humidity > 80")));
    }

    private async Task StartRealTimeProcessor()
    {
        var processor = _client.CreateProcessor("device-telemetry", "realtime-processing",
            new ServiceBusProcessorOptions
            {
                MaxConcurrentCalls = 50, // High concurrency for real-time
                PrefetchCount = 100
            });

        processor.ProcessMessageAsync += async (args) =>
        {
            var telemetry = JsonSerializer.Deserialize<DeviceTelemetry>(args.Message.Body.ToString());
            
            // Write to time series database
            await _timeSeriesDb.WriteAsync(new TimeSeriesPoint
            {
                DeviceId = telemetry.DeviceId,
                Timestamp = telemetry.Timestamp,
                Metrics = telemetry.Metrics
            });

            await args.CompleteMessageAsync(args.Message);
        };

        await processor.StartProcessingAsync();
    }

    private async Task StartBatchProcessor()
    {
        var processor = _client.CreateProcessor("device-telemetry", "batch-analytics");
        var batch = new List<DeviceTelemetry>();
        var batchTimer = new Timer(async _ => await ProcessBatch(batch), null, 
            TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));

        processor.ProcessMessageAsync += async (args) =>
        {
            var telemetry = JsonSerializer.Deserialize<DeviceTelemetry>(args.Message.Body.ToString());
            
            lock (batch)
            {
                batch.Add(telemetry);
            }

            await args.CompleteMessageAsync(args.Message);

            // Process batch if it reaches certain size
            if (batch.Count >= 1000)
            {
                await ProcessBatch(batch);
            }
        };

        await processor.StartProcessingAsync();
    }

    private async Task ProcessBatch(List<DeviceTelemetry> batch)
    {
        if (batch.Count == 0) return;

        List<DeviceTelemetry> processingBatch;
        lock (batch)
        {
            processingBatch = new List<DeviceTelemetry>(batch);
            batch.Clear();
        }

        // Perform analytics
        var analytics = new TelemetryAnalytics
        {
            Period = DateTime.UtcNow,
            DeviceCount = processingBatch.Select(t => t.DeviceId).Distinct().Count(),
            AverageTemperature = processingBatch.Average(t => t.Metrics["Temperature"]),
            MaxHumidity = processingBatch.Max(t => t.Metrics["Humidity"])
        };

        await _timeSeriesDb.WriteAnalyticsAsync(analytics);
    }
}

Scenario 3: Microservices Event Bus

public class MicroservicesEventBus
{
    private readonly ServiceBusClient _client;
    private readonly string _serviceName;
    private readonly Dictionary<Type, List<Func<object, Task>>> _handlers = new();

    public MicroservicesEventBus(string connectionString, string serviceName)
    {
        _client = new ServiceBusClient(connectionString);
        _serviceName = serviceName;
    }

    // Publish domain event
    public async Task PublishAsync<TEvent>(TEvent domainEvent) where TEvent : IDomainEvent
    {
        var eventName = typeof(TEvent).Name;
        var sender = _client.CreateSender("domain-events");

        var message = new ServiceBusMessage(JsonSerializer.Serialize(domainEvent))
        {
            Subject = eventName,
            ApplicationProperties =
            {
                ["EventType"] = eventName,
                ["SourceService"] = _serviceName,
                ["EventId"] = domainEvent.EventId,
                ["OccurredAt"] = domainEvent.OccurredAt
            }
        };

        await sender.SendMessageAsync(message);
    }

    // Subscribe to domain events
    public void Subscribe<TEvent>(Func<TEvent, Task> handler) where TEvent : IDomainEvent
    {
        var eventType = typeof(TEvent);
        
        if (!_handlers.ContainsKey(eventType))
            _handlers[eventType] = new List<Func<object, Task>>();

        _handlers[eventType].Add(async (e) => await handler((TEvent)e));
    }

    // Start processing events
    public async Task StartAsync()
    {
        var processor = _client.CreateProcessor("domain-events", _serviceName);

        processor.ProcessMessageAsync += async (args) =>
        {
            var eventType = args.Message.ApplicationProperties["EventType"].ToString();
            var eventData = args.Message.Body.ToString();

            // Find handler for event type
            var type = AppDomain.CurrentDomain.GetAssemblies()
                .SelectMany(a => a.GetTypes())
                .FirstOrDefault(t => t.Name == eventType);

            if (type != null && _handlers.ContainsKey(type))
            {
                var domainEvent = JsonSerializer.Deserialize(eventData, type);
                
                // Execute all handlers for this event type
                foreach (var handler in _handlers[type])
                {
                    await handler(domainEvent);
                }
            }

            await args.CompleteMessageAsync(args.Message);
        };

        await processor.StartProcessingAsync();
    }
}

// Usage in a microservice
public class OrderService
{
    private readonly MicroservicesEventBus _eventBus;

    public OrderService(MicroservicesEventBus eventBus)
    {
        _eventBus = eventBus;
        
        // Subscribe to events from other services
        _eventBus.Subscribe<PaymentCompletedEvent>(HandlePaymentCompleted);
        _eventBus.Subscribe<InventoryReservedEvent>(HandleInventoryReserved);
    }

    public async Task CreateOrder(CreateOrderCommand command)
    {
        // Create order logic...
        var order = new Order { /* ... */ };

        // Publish event
        await _eventBus.PublishAsync(new OrderCreatedEvent
        {
            EventId = Guid.NewGuid(),
            OccurredAt = DateTime.UtcNow,
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            TotalAmount = order.TotalAmount
        });
    }

    private async Task HandlePaymentCompleted(PaymentCompletedEvent evt)
    {
        // Update order status
        Console.WriteLine($"Payment completed for order {evt.OrderId}");
    }
}

12. Monitoring & Troubleshooting

Key Metrics to Monitor

public class ServiceBusMonitor
{
    private readonly ServiceBusAdministrationClient _adminClient;
    private readonly ILogger<ServiceBusMonitor> _logger;

    public async Task<HealthStatus> CheckHealth()
    {
        var health = new HealthStatus();

        try
        {
            // Check namespace accessibility
            var namespaceProperties = await _adminClient.GetNamespacePropertiesAsync();
            health.NamespaceActive = true;

            // Check queues
            await foreach (var queue in _adminClient.GetQueuesAsync())
            {
                var runtimeProps = await _adminClient.GetQueueRuntimePropertiesAsync(queue.Name);
                
                health.Queues.Add(new QueueHealth
                {
                    Name = queue.Name,
                    ActiveMessages = runtimeProps.Value.ActiveMessageCount,
                    DeadLetterMessages = runtimeProps.Value.DeadLetterMessageCount,
                    IsHealthy = runtimeProps.Value.DeadLetterMessageCount < 100 // Threshold
                });

                // Alert if too many dead letters
                if (runtimeProps.Value.DeadLetterMessageCount > 100)
                {
                    _logger.LogWarning("Queue {Queue} has {Count} dead letter messages",
                        queue.Name, runtimeProps.Value.DeadLetterMessageCount);
                }
            }

            return health;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Health check failed");
            health.NamespaceActive = false;
            return health;
        }
    }

    public async Task<Dictionary<string, object>> GetDetailedMetrics(string queueName)
    {
        var properties = await _adminClient.GetQueueRuntimePropertiesAsync(queueName);
        var queue = properties.Value;

        return new Dictionary<string, object>
        {
            ["ActiveMessageCount"] = queue.ActiveMessageCount,
            ["DeadLetterMessageCount"] = queue.DeadLetterMessageCount,
            ["ScheduledMessageCount"] = queue.ScheduledMessageCount,
            ["TransferDeadLetterMessageCount"] = queue.TransferDeadLetterMessageCount,
            ["TransferMessageCount"] = queue.TransferMessageCount,
            ["SizeInBytes"] = queue.SizeInBytes,
            ["CreatedAt"] = queue.CreatedAt,
            ["UpdatedAt"] = queue.UpdatedAt,
            ["AccessedAt"] = queue.AccessedAt
        };
    }
}

Application Insights Integration

public class TelemetryProcessor : ServiceBusProcessor
{
    private readonly TelemetryClient _telemetryClient;

    protected override async Task OnProcessMessageAsync(ProcessMessageEventArgs args)
    {
        using var operation = _telemetryClient.StartOperation<RequestTelemetry>("ProcessMessage");
        
        try
        {
            operation.Telemetry.Properties["MessageId"] = args.Message.MessageId;
            operation.Telemetry.Properties["Queue"] = args.EntityPath;
            operation.Telemetry.Properties["DeliveryCount"] = args.Message.DeliveryCount.ToString();

            // Process message
            await base.OnProcessMessageAsync(args);

            operation.Telemetry.Success = true;
            _telemetryClient.TrackMetric("MessageProcessed", 1, 
                new Dictionary<string, string> { ["Queue"] = args.EntityPath });
        }
        catch (Exception ex)
        {
            operation.Telemetry.Success = false;
            _telemetryClient.TrackException(ex);
            throw;
        }
    }
}

Common Issues and Solutions

Issue 1: Messages Not Being Processed

public class TroubleshootingHelper
{
    public async Task DiagnoseQueueIssues(string queueName)
    {
        Console.WriteLine($"Diagnosing queue: {queueName}");

        // Check queue properties
        var properties = await _adminClient.GetQueueRuntimePropertiesAsync(queueName);
        Console.WriteLine($"Active messages: {properties.Value.ActiveMessageCount}");
        Console.WriteLine($"Dead letter messages: {properties.Value.DeadLetterMessageCount}");

        // Peek messages without removing
        var receiver = _client.CreateReceiver(queueName);
        var peekedMessages = await receiver.PeekMessagesAsync(10);

        foreach (var message in peekedMessages)
        {
            Console.WriteLine($"Message ID: {message.MessageId}");
            Console.WriteLine($"Enqueued: {message.EnqueuedTime}");
            Console.WriteLine($"Delivery Count: {message.DeliveryCount}");
            Console.WriteLine($"Lock Until: {message.LockedUntil}");
        }

        // Check dead letter queue
        var dlqReceiver = _client.CreateReceiver(queueName, 
            new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
        
        var dlqMessages = await dlqReceiver.PeekMessagesAsync(10);
        foreach (var dlq in dlqMessages)
        {
            Console.WriteLine($"DLQ Reason: {dlq.DeadLetterReason}");
            Console.WriteLine($"DLQ Description: {dlq.DeadLetterErrorDescription}");
        }
    }
}

Issue 2: Performance Problems

public class PerformanceDiagnostics
{
    public async Task AnalyzePerformance()
    {
        var stopwatch = Stopwatch.StartNew();
        var messageCount = 1000;

        // Test send performance
        var sendTimes = new List<long>();
        for (int i = 0; i < messageCount; i++)
        {
            var sw = Stopwatch.StartNew();
            await sender.SendMessageAsync(new ServiceBusMessage($"Test {i}"));
            sendTimes.Add(sw.ElapsedMilliseconds);
        }

        Console.WriteLine($"Average send time: {sendTimes.Average()}ms");
        Console.WriteLine($"Max send time: {sendTimes.Max()}ms");

        // Test receive performance
        var receiveTimes = new List<long>();
        var receiver = _client.CreateReceiver(queueName);

        for (int i = 0; i < messageCount; i++)
        {
            var sw = Stopwatch.StartNew();
            var message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(5));
            if (message != null)
            {
                await receiver.CompleteMessageAsync(message);
                receiveTimes.Add(sw.ElapsedMilliseconds);
            }
        }

        Console.WriteLine($"Average receive time: {receiveTimes.Average()}ms");
        Console.WriteLine($"Max receive time: {receiveTimes.Max()}ms");
    }
}

13. Best Practices

1. Message Design

// Good: Small, focused messages
public class OrderCreatedEvent
{
    public string OrderId { get; set; }
    public DateTime CreatedAt { get; set; }
    public decimal TotalAmount { get; set; }
    // Reference to detailed data, not the data itself
    public string OrderDetailsUrl { get; set; }
}

// Bad: Large, monolithic messages
public class BadOrderEvent
{
    public Order FullOrder { get; set; }
    public List<OrderItem> AllItems { get; set; }
    public Customer CompleteCustomerRecord { get; set; }
    public byte[] AttachedPdf { get; set; } // Don't do this!
}

2. Error Handling Strategy

public class RobustMessageHandler
{
    private readonly ILogger _logger;
    private readonly int[] _retryDelays = { 1, 5, 10, 30, 60 }; // seconds

    public async Task HandleMessageAsync(ProcessMessageEventArgs args)
    {
        var attemptCount = args.Message.DeliveryCount;

        try
        {
            // Add delay for retries (exponential backoff)
            if (attemptCount > 1)
            {
                var delayIndex = Math.Min(attemptCount - 2, _retryDelays.Length - 1);
                await Task.Delay(TimeSpan.FromSeconds(_retryDelays[delayIndex]));
            }

            // Process with timeout
            using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
            await ProcessBusinessLogicAsync(args.Message, cts.Token);

            // Success - complete the message
            await args.CompleteMessageAsync(args.Message);
        }
        catch (BusinessValidationException ex)
        {
            // Business rule violation - don't retry
            _logger.LogWarning(ex, "Business validation failed");
            await args.DeadLetterMessageAsync(args.Message, 
                "ValidationFailed", ex.Message);
        }
        catch (TransientException ex) when (attemptCount < 5)
        {
            // Transient error - retry
            _logger.LogWarning(ex, "Transient error, attempt {Attempt}", attemptCount);
            await args.AbandonMessageAsync(args.Message);
        }
        catch (Exception ex)
        {
            // Unexpected error - dead letter after max attempts
            _logger.LogError(ex, "Failed to process message after {Attempts} attempts", attemptCount);
            await args.DeadLetterMessageAsync(args.Message,
                "ProcessingFailed", $"Failed after {attemptCount} attempts: {ex.Message}");
        }
    }
}

3. Connection Management

public class ServiceBusConnectionManager : IAsyncDisposable
{
    private readonly ServiceBusClient _client;
    private readonly Dictionary<string, ServiceBusSender> _senders = new();
    private readonly Dictionary<string, ServiceBusProcessor> _processors = new();
    private readonly SemaphoreSlim _semaphore = new(1, 1);

    public ServiceBusConnectionManager(string connectionString)
    {
        _client = new ServiceBusClient(connectionString, new ServiceBusClientOptions
        {
            TransportType = ServiceBusTransportType.AmqpWebSockets,
            RetryOptions = new ServiceBusRetryOptions
            {
                Mode = ServiceBusRetryMode.Exponential,
                MaxRetries = 5,
                Delay = TimeSpan.FromSeconds(1),
                MaxDelay = TimeSpan.FromMinutes(1)
            }
        });
    }

    public async Task<ServiceBusSender> GetSenderAsync(string entityPath)
    {
        if (_senders.TryGetValue(entityPath, out var sender))
            return sender;

        await _semaphore.WaitAsync();
        try
        {
            if (!_senders.TryGetValue(entityPath, out sender))
            {
                sender = _client.CreateSender(entityPath);
                _senders[entityPath] = sender;
            }
            return sender;
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async ValueTask DisposeAsync()
    {
        foreach (var processor in _processors.Values)
        {
            await processor.StopProcessingAsync();
            await processor.DisposeAsync();
        }

        foreach (var sender in _senders.Values)
        {
            await sender.DisposeAsync();
        }

        await _client.DisposeAsync();
    }
}

4. Testing Strategies

public class ServiceBusTestHelper
{
    // Use in-memory test double for unit tests
    public interface IServiceBusService
    {
        Task SendMessageAsync(string queueName, object message);
        Task<T> ReceiveMessageAsync<T>(string queueName);
    }

    // Integration test with real Service Bus
    [TestClass]
    public class ServiceBusIntegrationTests
    {
        private ServiceBusClient _client;
        private ServiceBusAdministrationClient _adminClient;
        private string _testQueueName;

        [TestInitialize]
        public async Task Setup()
        {
            _testQueueName = $"test-queue-{Guid.NewGuid()}";
            _adminClient = new ServiceBusAdministrationClient(TestConnectionString);
            
            // Create test queue
            await _adminClient.CreateQueueAsync(_testQueueName);
            _client = new ServiceBusClient(TestConnectionString);
        }

        [TestCleanup]
        public async Task Cleanup()
        {
            // Delete test queue
            await _adminClient.DeleteQueueAsync(_testQueueName);
            await _client.DisposeAsync();
        }

        [TestMethod]
        public async Task Should_Send_And_Receive_Message()
        {
            // Arrange
            var sender = _client.CreateSender(_testQueueName);
            var receiver = _client.CreateReceiver(_testQueueName);
            var expectedMessage = "Test message";

            // Act
            await sender.SendMessageAsync(new ServiceBusMessage(expectedMessage));
            var receivedMessage = await receiver.ReceiveMessageAsync();

            // Assert
            Assert.IsNotNull(receivedMessage);
            Assert.AreEqual(expectedMessage, receivedMessage.Body.ToString());
            
            await receiver.CompleteMessageAsync(receivedMessage);
        }
    }
}

14. Migration Guide

Migrating from Legacy SDK

// Old SDK (WindowsAzure.ServiceBus)
var factory = MessagingFactory.CreateFromConnectionString(connectionString);
var client = factory.CreateQueueClient(queueName);
var message = new BrokeredMessage("Hello");
await client.SendAsync(message);

// New SDK (Azure.Messaging.ServiceBus)
await using var client = new ServiceBusClient(connectionString);
await using var sender = client.CreateSender(queueName);
await sender.SendMessageAsync(new ServiceBusMessage("Hello"));

Migration Checklist

  1. Update NuGet Packages ```xml

2. **Update Connection Strings**
```csharp
// Add TransportType if using AMQP over WebSockets
connectionString += ";TransportType=AmqpWebSockets";
  1. Update Message Types ```csharp // Old BrokeredMessage β†’ ServiceBusMessage MessageReceiver β†’ ServiceBusReceiver MessageSender β†’ ServiceBusSender QueueClient β†’ ServiceBusClient + ServiceBusSender/Receiver

// New patterns await using var client = new ServiceBusClient(connectionString); await using var sender = client.CreateSender(queueName); await using var receiver = client.CreateReceiver(queueName); ```

15. Resources & Next Steps

Official Resources

Code Samples

Tools and Libraries

  1. Service Bus Explorer: GUI tool for managing Service Bus
  2. MassTransit: Open-source distributed application framework
  3. NServiceBus: Commercial messaging framework
  4. Rebus: Simple and lean service bus implementation

Next Steps

  1. Start Small: Create a simple queue and send/receive messages
  2. Implement Patterns: Try request-reply or pub/sub patterns
  3. Add Resilience: Implement retry logic and dead letter handling
  4. Monitor: Set up Application Insights or Azure Monitor
  5. Scale: Test performance and optimize for your workload

Community


Summary

Azure Service Bus is a powerful enterprise messaging solution that enables reliable, scalable communication between distributed applications. By following the patterns and practices in this guide, you can build robust messaging solutions that handle failures gracefully and scale with your needs.

Remember:

  • Start with the right tier for your needs
  • Design small, focused messages
  • Implement proper error handling
  • Monitor your queues and topics
  • Use batching for performance
  • Leverage dead letter queues

Happy messaging! πŸš€