Azure Service Bus: The Complete Guide to Enterprise Messaging
Azure Service Bus: Complete Developer Guide
Master enterprise-grade messaging with Azure Service Bus - from basic queues to advanced patterns, with hands-on examples and real-world scenarios
π Table of Contents
- Introduction: What is Azure Service Bus?
- Core Concepts & Architecture
- Pricing Tiers & Feature Comparison
- Getting Started: Your First Service Bus
- Working with Queues
- Topics and Subscriptions
- Advanced Messaging Patterns
- Dead Letter Queues & Error Handling
- Security & Authentication
- Performance Optimization
- Real-World Scenarios
- Monitoring & Troubleshooting
- Best Practices
- Migration Guide
- Resources & Next Steps
1. Introduction: What is Azure Service Bus?
The Simple Explanation
Imagine youβre running a busy restaurant. Orders come in from multiple sources: dine-in, takeout, delivery apps. Your kitchen needs to process these orders efficiently without losing any. Azure Service Bus is like a smart order management system that:
- π₯ Receives orders from all sources reliably
- π Queues them in the right order
- π Distributes work to available chefs
- β Guarantees no order is lost
- π Retries if something goes wrong
In technical terms, Azure Service Bus is a fully managed enterprise message broker with message queues and publish-subscribe topics.
Why Use Azure Service Bus?
Traditional Direct Communication:
βββββββββββββββ Direct Call βββββββββββββββ
β App A βββββββββββββββββββββΊβ App B β
βββββββββββββββ (Tight Coupling) βββββββββββββββ
Problems:
- If App B is down, App A fails
- If App B is slow, App A waits
- Hard to scale independently
With Service Bus:
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β App A βββββΊβ Service Bus βββββΊβ App B β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
Benefits:
- Apps work independently
- Messages are never lost
- Easy to scale
- Built-in retry logic
Real-World Use Cases
- E-commerce Order Processing: Handle order spikes during sales
- IoT Data Ingestion: Process millions of sensor readings
- Microservice Communication: Decouple services in your architecture
- Event-Driven Workflows: Trigger actions based on events
- Batch Processing: Queue work items for background processing
2. Core Concepts & Architecture
The Building Blocks
1. Namespace
Think of a namespace as your messaging container. Itβs like having your own post office where all your messaging entities live.
// Connection string to your namespace
string connectionString = "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=myKey";
2. Messages
The data packets you send. Each message has:
- Body: Your actual data (JSON, XML, binary)
- Properties: Metadata about the message
- System Properties: Service Bus metadata
// Creating a message
var message = new ServiceBusMessage("Hello, Service Bus!")
{
ContentType = "application/json",
Subject = "OrderCreated",
ApplicationProperties = {
["OrderId"] = "12345",
["Priority"] = "High"
}
};
3. Queues (Point-to-Point)
One sender, one receiver. Like a line at a coffee shop - first in, first out (FIFO).
Sender βββΊ [Message 3][Message 2][Message 1] βββΊ Receiver
4. Topics & Subscriptions (Publish-Subscribe)
One sender, multiple receivers. Like a newsletter - one publisher, many subscribers.
βββΊ Subscription A βββΊ Receiver 1
β
Sender βββΊ Topic βββΌββΊ Subscription B βββΊ Receiver 2
β
βββΊ Subscription C βββΊ Receiver 3
How Messages Flow
1. Producer creates message
2. Sends to Service Bus (Queue/Topic)
3. Service Bus stores message durably
4. Consumer requests message
5. Service Bus delivers message
6. Consumer processes message
7. Consumer acknowledges completion
8. Service Bus removes message
3. Pricing Tiers & Feature Comparison
Choose Your Tier
Feature | Basic | Standard | Premium |
---|---|---|---|
Queues | β | β | β |
Topics | β | β | β |
Message Size | 256 KB | 256 KB | 100 MB |
Sessions | β | β | β |
Transactions | β | β | β |
Dead Letter Queue | β | β | β |
Duplicate Detection | β | β | β |
Partitioning | β | β | β |
Performance | Shared | Shared | Dedicated |
Geo-DR | β | β | β |
Use Case | Dev/Test | Production | Mission Critical |
Making the Right Choice
// Basic Tier - Simple queue operations
// Good for: Development, testing, simple scenarios
// Standard Tier - Full features
// Good for: Most production workloads
// Premium Tier - Predictable performance
// Good for: High-throughput, low-latency requirements
4. Getting Started: Your First Service Bus
Step 1: Create a Service Bus Namespace (Azure Portal)
# Using Azure CLI
az servicebus namespace create \
--resource-group myResourceGroup \
--name myServiceBusNamespace \
--location eastus \
--sku Standard
Step 2: Create a Queue
# Create a queue
az servicebus queue create \
--resource-group myResourceGroup \
--namespace-name myServiceBusNamespace \
--name myQueue
Step 3: Install the SDK
# .NET
dotnet add package Azure.Messaging.ServiceBus
# Python
pip install azure-servicebus
# Node.js
npm install @azure/service-bus
# Java
# Add to pom.xml
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.15.0</version>
</dependency>
Step 4: Send Your First Message (.NET Example)
using Azure.Messaging.ServiceBus;
using System;
using System.Threading.Tasks;
class Program
{
// Connection string from Azure Portal
const string connectionString = "Endpoint=sb://...";
const string queueName = "myQueue";
static async Task Main()
{
// Create a client
await using var client = new ServiceBusClient(connectionString);
// Create a sender
await using var sender = client.CreateSender(queueName);
// Create and send a message
var message = new ServiceBusMessage("Hello, Azure Service Bus!");
await sender.SendMessageAsync(message);
Console.WriteLine("Message sent!");
}
}
Step 5: Receive Your First Message
static async Task ReceiveMessages()
{
await using var client = new ServiceBusClient(connectionString);
// Create a receiver
await using var receiver = client.CreateReceiver(queueName);
// Receive a message
var message = await receiver.ReceiveMessageAsync();
if (message != null)
{
Console.WriteLine($"Received: {message.Body.ToString()}");
// Complete the message (remove from queue)
await receiver.CompleteMessageAsync(message);
}
}
5. Working with Queues
Understanding Queue Behavior
Queues provide reliable point-to-point communication. Think of it as a reliable pipe between two applications.
Properties of a Queue:
- FIFO ordering (with sessions)
- At-least-once delivery
- Message locking
- Dead letter support
- Duplicate detection (optional)
Complete Queue Example
public class QueueService
{
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _sender;
private readonly ServiceBusProcessor _processor;
public QueueService(string connectionString, string queueName)
{
_client = new ServiceBusClient(connectionString);
_sender = _client.CreateSender(queueName);
// Create processor for receiving
_processor = _client.CreateProcessor(queueName, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 5,
PrefetchCount = 10
});
// Configure handlers
_processor.ProcessMessageAsync += ProcessMessageHandler;
_processor.ProcessErrorAsync += ProcessErrorHandler;
}
// Send a single message
public async Task SendMessageAsync(string content)
{
var message = new ServiceBusMessage(content)
{
MessageId = Guid.NewGuid().ToString(),
TimeToLive = TimeSpan.FromHours(1)
};
await _sender.SendMessageAsync(message);
}
// Send batch of messages
public async Task SendBatchAsync(List<string> contents)
{
using var batch = await _sender.CreateMessageBatchAsync();
foreach (var content in contents)
{
if (!batch.TryAddMessage(new ServiceBusMessage(content)))
{
// Batch is full, send it
await _sender.SendMessagesAsync(batch);
// Create new batch
batch = await _sender.CreateMessageBatchAsync();
batch.TryAddMessage(new ServiceBusMessage(content));
}
}
// Send remaining messages
if (batch.Count > 0)
{
await _sender.SendMessagesAsync(batch);
}
}
// Process messages continuously
public async Task StartProcessingAsync()
{
await _processor.StartProcessingAsync();
}
private async Task ProcessMessageHandler(ProcessMessageEventArgs args)
{
try
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Processing: {body}");
// Your business logic here
await ProcessBusinessLogic(body);
// Complete the message
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
// Abandon the message for retry
await args.AbandonMessageAsync(args.Message);
// Or send to dead letter queue
// await args.DeadLetterMessageAsync(args.Message, "ProcessingError", ex.Message);
}
}
private Task ProcessErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine($"Error: {args.Exception.Message}");
return Task.CompletedTask;
}
private async Task ProcessBusinessLogic(string message)
{
// Simulate work
await Task.Delay(1000);
}
}
Python Example
from azure.servicebus import ServiceBusClient, ServiceBusMessage
import asyncio
connection_str = "Endpoint=sb://..."
queue_name = "myqueue"
async def send_message():
async with ServiceBusClient.from_connection_string(connection_str) as client:
async with client.get_queue_sender(queue_name) as sender:
# Send single message
message = ServiceBusMessage("Hello from Python!")
await sender.send_messages(message)
# Send batch
messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
await sender.send_messages(messages)
async def receive_messages():
async with ServiceBusClient.from_connection_string(connection_str) as client:
async with client.get_queue_receiver(queue_name) as receiver:
messages = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
for message in messages:
print(f"Received: {str(message)}")
await receiver.complete_message(message)
# Run
asyncio.run(send_message())
asyncio.run(receive_messages())
Advanced Queue Features
1. Message Sessions (Ordered Processing)
// Enable sessions when creating queue
var queue = new CreateQueueOptions(queueName)
{
RequiresSession = true
};
// Send with session
var message = new ServiceBusMessage("Order item 1")
{
SessionId = "Order123" // All messages with same SessionId processed in order
};
// Receive from specific session
await using var receiver = await client.AcceptSessionAsync(queueName, "Order123");
2. Scheduled Messages
// Schedule a message for future delivery
var message = new ServiceBusMessage("Reminder: Meeting at 2 PM");
var scheduledTime = DateTimeOffset.UtcNow.AddHours(2);
long sequenceNumber = await sender.ScheduleMessageAsync(message, scheduledTime);
// Cancel if needed
await sender.CancelScheduledMessageAsync(sequenceNumber);
3. Message Deferral
// Defer a message for later processing
await receiver.DeferMessageAsync(message);
// Retrieve deferred message later
var deferredMessage = await receiver.ReceiveDeferredMessageAsync(sequenceNumber);
6. Topics and Subscriptions
Understanding Pub/Sub Pattern
Topics enable one-to-many communication. One publisher, multiple subscribers with filtering.
Real-world analogy: Magazine Publishing
- Topic = Magazine
- Subscriptions = Individual subscribers
- Filters = Subscriber preferences (sports, tech, etc.)
Creating Topics and Subscriptions
// Create a topic
var topic = new CreateTopicOptions("order-events")
{
DefaultMessageTimeToLive = TimeSpan.FromDays(7),
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(10),
EnablePartitioning = true
};
// Create subscriptions with filters
var allOrdersSub = new CreateSubscriptionOptions("order-events", "all-orders");
var highValueSub = new CreateSubscriptionOptions("order-events", "high-value-orders");
var highValueFilter = new CreateRuleOptions("HighValueFilter",
new SqlRuleFilter("OrderTotal > 1000"));
Complete Pub/Sub Example
public class OrderEventPublisher
{
private readonly ServiceBusSender _sender;
public OrderEventPublisher(ServiceBusClient client)
{
_sender = client.CreateSender("order-events");
}
public async Task PublishOrderEvent(Order order)
{
var message = new ServiceBusMessage(JsonSerializer.Serialize(order))
{
Subject = order.Type, // Used for filtering
ApplicationProperties =
{
["OrderTotal"] = order.Total,
["CustomerId"] = order.CustomerId,
["Region"] = order.Region
}
};
await _sender.SendMessageAsync(message);
}
}
public class OrderEventSubscriber
{
private readonly ServiceBusProcessor _processor;
public OrderEventSubscriber(ServiceBusClient client, string subscriptionName)
{
_processor = client.CreateProcessor("order-events", subscriptionName);
_processor.ProcessMessageAsync += HandleOrderEvent;
_processor.ProcessErrorAsync += HandleError;
}
public async Task StartAsync()
{
await _processor.StartProcessingAsync();
}
private async Task HandleOrderEvent(ProcessMessageEventArgs args)
{
var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
// Process based on subscription
Console.WriteLine($"Processing order {order.Id} with total {order.Total}");
await args.CompleteMessageAsync(args.Message);
}
private Task HandleError(ProcessErrorEventArgs args)
{
Console.WriteLine($"Error: {args.Exception}");
return Task.CompletedTask;
}
}
Subscription Filters
1. SQL Filters
// Complex SQL filter
var filter = new SqlRuleFilter(@"
OrderTotal > 100 AND
Region IN ('US', 'EU') AND
sys.Label = 'HighPriority'
");
2. Correlation Filters (Better Performance)
var filter = new CorrelationRuleFilter
{
Subject = "OrderCreated",
ApplicationProperties =
{
["Region"] = "US"
}
};
3. Custom Actions
// Modify message properties during filtering
var rule = new RuleProperties
{
Name = "AddRegionTag",
Filter = new SqlRuleFilter("Region = 'US'"),
Action = new SqlRuleAction("SET sys.Label = 'US-Order'")
};
7. Advanced Messaging Patterns
Pattern 1: Request-Reply
public class RequestReplyPattern
{
public async Task<string> SendRequestAsync(string request)
{
var requestMessage = new ServiceBusMessage(request)
{
ReplyTo = "reply-queue",
ReplyToSessionId = Guid.NewGuid().ToString()
};
// Send request
await _requestSender.SendMessageAsync(requestMessage);
// Wait for reply on specific session
await using var replyReceiver = await _client.AcceptSessionAsync(
"reply-queue",
requestMessage.ReplyToSessionId);
var reply = await replyReceiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30));
if (reply != null)
{
await replyReceiver.CompleteMessageAsync(reply);
return reply.Body.ToString();
}
throw new TimeoutException("No reply received");
}
}
Pattern 2: Saga/Process Manager
public class OrderSaga
{
private readonly Dictionary<string, Func<OrderState, Task>> _handlers;
public OrderSaga()
{
_handlers = new Dictionary<string, Func<OrderState, Task>>
{
["OrderPlaced"] = HandleOrderPlaced,
["PaymentReceived"] = HandlePaymentReceived,
["ItemsShipped"] = HandleItemsShipped
};
}
public async Task ProcessMessage(ServiceBusReceivedMessage message)
{
var eventType = message.Subject;
var orderId = message.SessionId;
// Load saga state
var state = await LoadOrderState(orderId);
// Handle event
if (_handlers.TryGetValue(eventType, out var handler))
{
await handler(state);
await SaveOrderState(state);
}
}
private async Task HandleOrderPlaced(OrderState state)
{
state.Status = "AwaitingPayment";
// Send payment request
}
private async Task HandlePaymentReceived(OrderState state)
{
state.Status = "Paid";
// Trigger fulfillment
}
}
Pattern 3: Message Aggregator
public class MessageAggregator
{
private readonly Dictionary<string, List<ServiceBusReceivedMessage>> _batches = new();
public async Task AggregateMessage(ServiceBusReceivedMessage message)
{
var batchId = message.ApplicationProperties["BatchId"].ToString();
var totalMessages = (int)message.ApplicationProperties["TotalMessages"];
lock (_batches)
{
if (!_batches.ContainsKey(batchId))
_batches[batchId] = new List<ServiceBusReceivedMessage>();
_batches[batchId].Add(message);
if (_batches[batchId].Count == totalMessages)
{
// Process complete batch
ProcessCompleteBatch(_batches[batchId]);
_batches.Remove(batchId);
}
}
}
}
Pattern 4: Competing Consumers
public class CompetingConsumers
{
public static async Task StartMultipleConsumers(int consumerCount)
{
var tasks = new List<Task>();
for (int i = 0; i < consumerCount; i++)
{
var consumerId = i;
tasks.Add(Task.Run(async () =>
{
var processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 1,
Identifier = $"Consumer-{consumerId}"
});
processor.ProcessMessageAsync += async (args) =>
{
Console.WriteLine($"Consumer {consumerId} processing: {args.Message.MessageId}");
await Task.Delay(1000); // Simulate work
await args.CompleteMessageAsync(args.Message);
};
await processor.StartProcessingAsync();
}));
}
await Task.WhenAll(tasks);
}
}
8. Dead Letter Queues & Error Handling
Understanding Dead Letter Queues
Every queue and subscription has a dead letter queue (DLQ) - a special sub-queue where problematic messages go.
Main Queue: myqueue
DLQ Path: myqueue/$deadletterqueue
Why messages dead letter:
1. Max delivery attempts exceeded
2. TTL expired
3. Manually dead lettered
4. Header size exceeded
5. Subscription filter evaluation errors
Working with Dead Letter Queues
public class DeadLetterHandler
{
private readonly ServiceBusClient _client;
private readonly string _queueName;
public async Task ProcessDeadLetterMessages()
{
// Create DLQ receiver
var dlqReceiver = _client.CreateReceiver(
_queueName,
new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
await foreach (var message in dlqReceiver.ReceiveMessagesAsync())
{
Console.WriteLine($"Dead letter reason: {message.DeadLetterReason}");
Console.WriteLine($"Dead letter description: {message.DeadLetterErrorDescription}");
try
{
// Try to process again or log for investigation
await ProcessDeadLetteredMessage(message);
await dlqReceiver.CompleteMessageAsync(message);
}
catch (Exception ex)
{
// Log and abandon if still failing
Console.WriteLine($"Still failing: {ex.Message}");
await dlqReceiver.AbandonMessageAsync(message);
}
}
}
// Resubmit dead letter messages to main queue
public async Task ResubmitDeadLetterMessages()
{
var dlqReceiver = _client.CreateReceiver(_queueName,
new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
var sender = _client.CreateSender(_queueName);
await foreach (var message in dlqReceiver.ReceiveMessagesAsync())
{
// Create new message from dead letter
var resubmitMessage = new ServiceBusMessage(message.Body)
{
ContentType = message.ContentType,
Subject = message.Subject
};
// Copy application properties
foreach (var prop in message.ApplicationProperties)
{
resubmitMessage.ApplicationProperties[prop.Key] = prop.Value;
}
// Add resubmission tracking
resubmitMessage.ApplicationProperties["ResubmittedAt"] = DateTime.UtcNow;
resubmitMessage.ApplicationProperties["OriginalDeadLetterReason"] = message.DeadLetterReason;
await sender.SendMessageAsync(resubmitMessage);
await dlqReceiver.CompleteMessageAsync(message);
}
}
}
Implementing Retry Logic
public class RetryProcessor
{
public async Task ProcessWithRetry(ProcessMessageEventArgs args)
{
var deliveryCount = args.Message.DeliveryCount;
var maxRetries = 5;
try
{
// Exponential backoff based on delivery count
if (deliveryCount > 1)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, deliveryCount - 1));
await Task.Delay(delay);
}
// Process message
await ProcessMessage(args.Message);
await args.CompleteMessageAsync(args.Message);
}
catch (TransientException ex)
{
// Transient error - abandon for retry
if (deliveryCount < maxRetries)
{
await args.AbandonMessageAsync(args.Message);
}
else
{
// Max retries reached - dead letter
await args.DeadLetterMessageAsync(args.Message,
"MaxRetriesExceeded",
$"Failed after {maxRetries} attempts: {ex.Message}");
}
}
catch (PermanentException ex)
{
// Permanent error - dead letter immediately
await args.DeadLetterMessageAsync(args.Message,
"PermanentFailure",
ex.Message);
}
}
}
9. Security & Authentication
Connection Security
Azure Service Bus uses AMQP (Advanced Message Queuing Protocol) over TLS for secure communication.
// Connection string with SAS authentication
var connectionString = "Endpoint=sb://namespace.servicebus.windows.net/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;" +
"SharedAccessKey=yourKey;" +
"TransportType=AmqpWebSockets"; // Use WebSockets for firewall-friendly
Authentication Options
1. Shared Access Signature (SAS)
// Create SAS token programmatically
public string CreateSasToken(string resourceUri, string keyName, string key)
{
var expiry = DateTimeOffset.UtcNow.AddHours(1);
var stringToSign = HttpUtility.UrlEncode(resourceUri) + "\n" + expiry.ToUnixTimeSeconds();
using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));
var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));
var token = $"SharedAccessSignature sr={HttpUtility.UrlEncode(resourceUri)}" +
$"&sig={HttpUtility.UrlEncode(signature)}" +
$"&se={expiry.ToUnixTimeSeconds()}" +
$"&skn={keyName}";
return token;
}
2. Azure Active Directory (Recommended)
// Using Managed Identity
var client = new ServiceBusClient(
"namespace.servicebus.windows.net",
new DefaultAzureCredential());
// Using Service Principal
var client = new ServiceBusClient(
"namespace.servicebus.windows.net",
new ClientSecretCredential(tenantId, clientId, clientSecret));
Role-Based Access Control (RBAC)
# Assign roles via Azure CLI
# Sender role
az role assignment create \
--assignee <user-or-service-principal> \
--role "Azure Service Bus Data Sender" \
--scope /subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.ServiceBus/namespaces/{namespace}/queues/{queue}
# Receiver role
az role assignment create \
--assignee <user-or-service-principal> \
--role "Azure Service Bus Data Receiver" \
--scope /subscriptions/{subscription}/resourceGroups/{rg}/providers/Microsoft.ServiceBus/namespaces/{namespace}/queues/{queue}
Network Security
// Configure firewall rules
var namespaceManager = new ServiceBusAdministrationClient(connectionString);
// Allow specific IP ranges
await namespaceManager.CreateOrUpdateNetworkRuleSetAsync(new NetworkRuleSet
{
DefaultAction = NetworkRuleSetDefaultAction.Deny,
IpRules = new[]
{
new NWRuleSetIpRules { IpMask = "10.0.0.0/24", Action = NetworkRuleIPAction.Allow },
new NWRuleSetIpRules { IpMask = "192.168.1.0/24", Action = NetworkRuleIPAction.Allow }
}
});
10. Performance Optimization
Key Performance Factors
1. Message Size: Smaller = Better throughput
2. Batch Operations: Send/Receive multiple messages
3. Prefetching: Retrieve messages before needed
4. Connection Pooling: Reuse connections
5. Partitioning: Distribute load
Optimization Techniques
1. Batching
public class BatchOptimization
{
// Efficient batch sending
public async Task SendLargeBatchAsync(List<Order> orders)
{
var sender = client.CreateSender(queueName);
var currentBatch = await sender.CreateMessageBatchAsync();
foreach (var order in orders)
{
var message = new ServiceBusMessage(JsonSerializer.Serialize(order));
if (!currentBatch.TryAddMessage(message))
{
// Send current batch
await sender.SendMessagesAsync(currentBatch);
// Create new batch
currentBatch = await sender.CreateMessageBatchAsync();
currentBatch.TryAddMessage(message);
}
}
// Send remaining messages
if (currentBatch.Count > 0)
{
await sender.SendMessagesAsync(currentBatch);
}
}
// Efficient batch receiving
public async Task ReceiveBatchAsync()
{
var receiver = client.CreateReceiver(queueName);
// Receive up to 100 messages
var messages = await receiver.ReceiveMessagesAsync(
maxMessages: 100,
maxWaitTime: TimeSpan.FromSeconds(5));
// Process in parallel
await Parallel.ForEachAsync(messages, async (message, ct) =>
{
await ProcessMessage(message);
await receiver.CompleteMessageAsync(message);
});
}
}
2. Prefetching
// Configure prefetch
var processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions
{
PrefetchCount = 20, // Prefetch 20 messages
MaxConcurrentCalls = 5, // Process 5 at a time
ReceiveMode = ServiceBusReceiveMode.PeekLock
});
3. Connection Management
public class ConnectionPool
{
private static readonly ServiceBusClient _sharedClient;
private static readonly Dictionary<string, ServiceBusSender> _senders = new();
static ConnectionPool()
{
_sharedClient = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
RetryOptions = new ServiceBusRetryOptions
{
MaxRetries = 3,
Delay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromSeconds(10),
Mode = ServiceBusRetryMode.Exponential
}
});
}
public static ServiceBusSender GetSender(string queueOrTopicName)
{
if (!_senders.ContainsKey(queueOrTopicName))
{
_senders[queueOrTopicName] = _sharedClient.CreateSender(queueOrTopicName);
}
return _senders[queueOrTopicName];
}
}
4. Message Compression
public class MessageCompression
{
public async Task SendCompressedAsync(LargePayload payload)
{
// Serialize and compress
var json = JsonSerializer.Serialize(payload);
var compressed = Compress(Encoding.UTF8.GetBytes(json));
var message = new ServiceBusMessage(compressed)
{
ApplicationProperties =
{
["Compressed"] = true,
["CompressionType"] = "gzip"
}
};
await sender.SendMessageAsync(message);
}
private byte[] Compress(byte[] data)
{
using var output = new MemoryStream();
using (var gzip = new GZipStream(output, CompressionLevel.Optimal))
{
gzip.Write(data, 0, data.Length);
}
return output.ToArray();
}
}
Performance Monitoring
public class PerformanceMonitor
{
private readonly ILogger<PerformanceMonitor> _logger;
private readonly Dictionary<string, PerformanceCounter> _counters = new();
public void RecordMessageProcessed(string queueName, TimeSpan processingTime)
{
_logger.LogInformation("Message processed from {Queue} in {Time}ms",
queueName, processingTime.TotalMilliseconds);
// Track metrics
_counters[$"{queueName}_count"].Increment();
_counters[$"{queueName}_avg_time"].RawValue = (long)processingTime.TotalMilliseconds;
}
public async Task<QueueMetrics> GetQueueMetrics(string queueName)
{
var client = new ServiceBusAdministrationClient(connectionString);
var properties = await client.GetQueueRuntimePropertiesAsync(queueName);
return new QueueMetrics
{
ActiveMessageCount = properties.Value.ActiveMessageCount,
DeadLetterMessageCount = properties.Value.DeadLetterMessageCount,
ScheduledMessageCount = properties.Value.ScheduledMessageCount,
SizeInMegabytes = properties.Value.SizeInBytes / (1024.0 * 1024.0)
};
}
}
11. Real-World Scenarios
Scenario 1: E-Commerce Order Processing
public class ECommerceOrderProcessor
{
private readonly ServiceBusClient _client;
private readonly IOrderService _orderService;
private readonly IInventoryService _inventoryService;
private readonly IPaymentService _paymentService;
public async Task ProcessOrderWorkflow(Order order)
{
// Step 1: Send order to validation queue
var validationSender = _client.CreateSender("order-validation");
await validationSender.SendMessageAsync(new ServiceBusMessage(JsonSerializer.Serialize(order))
{
SessionId = order.Id, // Ensure order processing
ApplicationProperties =
{
["OrderType"] = order.Type,
["Priority"] = order.Priority
}
});
// Step 2: Set up processors for each stage
await SetupOrderValidationProcessor();
await SetupInventoryCheckProcessor();
await SetupPaymentProcessor();
await SetupFulfillmentProcessor();
}
private async Task SetupOrderValidationProcessor()
{
var processor = _client.CreateProcessor("order-validation");
processor.ProcessMessageAsync += async (args) =>
{
var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
try
{
// Validate order
if (await _orderService.ValidateOrder(order))
{
// Send to inventory check
var sender = _client.CreateSender("inventory-check");
await sender.SendMessageAsync(new ServiceBusMessage(args.Message.Body)
{
SessionId = args.Message.SessionId
});
}
else
{
// Send to dead letter with reason
await args.DeadLetterMessageAsync(args.Message,
"ValidationFailed", "Order validation failed");
}
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
await args.DeadLetterMessageAsync(args.Message,
"ProcessingError", ex.Message);
}
};
await processor.StartProcessingAsync();
}
private async Task SetupPaymentProcessor()
{
var processor = _client.CreateProcessor("payment-processing");
processor.ProcessMessageAsync += async (args) =>
{
var order = JsonSerializer.Deserialize<Order>(args.Message.Body.ToString());
try
{
// Process payment with retry logic
var paymentResult = await ProcessPaymentWithRetry(order, args.Message.DeliveryCount);
if (paymentResult.Success)
{
// Send to fulfillment
var sender = _client.CreateSender("order-fulfillment");
await sender.SendMessageAsync(new ServiceBusMessage(args.Message.Body)
{
SessionId = args.Message.SessionId,
ApplicationProperties =
{
["PaymentId"] = paymentResult.TransactionId
}
});
await args.CompleteMessageAsync(args.Message);
}
else
{
// Retry or dead letter based on failure type
if (paymentResult.IsTransient && args.Message.DeliveryCount < 5)
{
await args.AbandonMessageAsync(args.Message);
}
else
{
await args.DeadLetterMessageAsync(args.Message,
"PaymentFailed", paymentResult.Error);
}
}
}
catch (Exception ex)
{
await args.DeadLetterMessageAsync(args.Message,
"ProcessingError", ex.Message);
}
};
await processor.StartProcessingAsync();
}
private async Task<PaymentResult> ProcessPaymentWithRetry(Order order, int attemptNumber)
{
// Exponential backoff
if (attemptNumber > 1)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, attemptNumber - 1));
await Task.Delay(delay);
}
return await _paymentService.ProcessPayment(order);
}
}
Scenario 2: IoT Telemetry Processing
public class IoTTelemetryProcessor
{
private readonly ServiceBusClient _client;
private readonly ITimeSeriesDatabase _timeSeriesDb;
private readonly IAlertingService _alertingService;
public async Task SetupTelemetryPipeline()
{
// Create topic for telemetry
var adminClient = new ServiceBusAdministrationClient(connectionString);
await adminClient.CreateTopicAsync(new CreateTopicOptions("device-telemetry")
{
EnablePartitioning = true, // For high throughput
DefaultMessageTimeToLive = TimeSpan.FromHours(24)
});
// Create subscriptions for different processing needs
await CreateSubscriptions(adminClient);
// Start processors
await StartRealTimeProcessor();
await StartBatchProcessor();
await StartAlertProcessor();
}
private async Task CreateSubscriptions(ServiceBusAdministrationClient adminClient)
{
// Real-time processing subscription
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions("device-telemetry", "realtime-processing")
{
LockDuration = TimeSpan.FromSeconds(30)
});
// Batch processing subscription (for analytics)
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions("device-telemetry", "batch-analytics")
{
LockDuration = TimeSpan.FromMinutes(5)
});
// Alert processing subscription with filter
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions("device-telemetry", "alerts"),
new CreateRuleOptions("HighTemperature",
new SqlRuleFilter("Temperature > 30 OR Humidity > 80")));
}
private async Task StartRealTimeProcessor()
{
var processor = _client.CreateProcessor("device-telemetry", "realtime-processing",
new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 50, // High concurrency for real-time
PrefetchCount = 100
});
processor.ProcessMessageAsync += async (args) =>
{
var telemetry = JsonSerializer.Deserialize<DeviceTelemetry>(args.Message.Body.ToString());
// Write to time series database
await _timeSeriesDb.WriteAsync(new TimeSeriesPoint
{
DeviceId = telemetry.DeviceId,
Timestamp = telemetry.Timestamp,
Metrics = telemetry.Metrics
});
await args.CompleteMessageAsync(args.Message);
};
await processor.StartProcessingAsync();
}
private async Task StartBatchProcessor()
{
var processor = _client.CreateProcessor("device-telemetry", "batch-analytics");
var batch = new List<DeviceTelemetry>();
var batchTimer = new Timer(async _ => await ProcessBatch(batch), null,
TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
processor.ProcessMessageAsync += async (args) =>
{
var telemetry = JsonSerializer.Deserialize<DeviceTelemetry>(args.Message.Body.ToString());
lock (batch)
{
batch.Add(telemetry);
}
await args.CompleteMessageAsync(args.Message);
// Process batch if it reaches certain size
if (batch.Count >= 1000)
{
await ProcessBatch(batch);
}
};
await processor.StartProcessingAsync();
}
private async Task ProcessBatch(List<DeviceTelemetry> batch)
{
if (batch.Count == 0) return;
List<DeviceTelemetry> processingBatch;
lock (batch)
{
processingBatch = new List<DeviceTelemetry>(batch);
batch.Clear();
}
// Perform analytics
var analytics = new TelemetryAnalytics
{
Period = DateTime.UtcNow,
DeviceCount = processingBatch.Select(t => t.DeviceId).Distinct().Count(),
AverageTemperature = processingBatch.Average(t => t.Metrics["Temperature"]),
MaxHumidity = processingBatch.Max(t => t.Metrics["Humidity"])
};
await _timeSeriesDb.WriteAnalyticsAsync(analytics);
}
}
Scenario 3: Microservices Event Bus
public class MicroservicesEventBus
{
private readonly ServiceBusClient _client;
private readonly string _serviceName;
private readonly Dictionary<Type, List<Func<object, Task>>> _handlers = new();
public MicroservicesEventBus(string connectionString, string serviceName)
{
_client = new ServiceBusClient(connectionString);
_serviceName = serviceName;
}
// Publish domain event
public async Task PublishAsync<TEvent>(TEvent domainEvent) where TEvent : IDomainEvent
{
var eventName = typeof(TEvent).Name;
var sender = _client.CreateSender("domain-events");
var message = new ServiceBusMessage(JsonSerializer.Serialize(domainEvent))
{
Subject = eventName,
ApplicationProperties =
{
["EventType"] = eventName,
["SourceService"] = _serviceName,
["EventId"] = domainEvent.EventId,
["OccurredAt"] = domainEvent.OccurredAt
}
};
await sender.SendMessageAsync(message);
}
// Subscribe to domain events
public void Subscribe<TEvent>(Func<TEvent, Task> handler) where TEvent : IDomainEvent
{
var eventType = typeof(TEvent);
if (!_handlers.ContainsKey(eventType))
_handlers[eventType] = new List<Func<object, Task>>();
_handlers[eventType].Add(async (e) => await handler((TEvent)e));
}
// Start processing events
public async Task StartAsync()
{
var processor = _client.CreateProcessor("domain-events", _serviceName);
processor.ProcessMessageAsync += async (args) =>
{
var eventType = args.Message.ApplicationProperties["EventType"].ToString();
var eventData = args.Message.Body.ToString();
// Find handler for event type
var type = AppDomain.CurrentDomain.GetAssemblies()
.SelectMany(a => a.GetTypes())
.FirstOrDefault(t => t.Name == eventType);
if (type != null && _handlers.ContainsKey(type))
{
var domainEvent = JsonSerializer.Deserialize(eventData, type);
// Execute all handlers for this event type
foreach (var handler in _handlers[type])
{
await handler(domainEvent);
}
}
await args.CompleteMessageAsync(args.Message);
};
await processor.StartProcessingAsync();
}
}
// Usage in a microservice
public class OrderService
{
private readonly MicroservicesEventBus _eventBus;
public OrderService(MicroservicesEventBus eventBus)
{
_eventBus = eventBus;
// Subscribe to events from other services
_eventBus.Subscribe<PaymentCompletedEvent>(HandlePaymentCompleted);
_eventBus.Subscribe<InventoryReservedEvent>(HandleInventoryReserved);
}
public async Task CreateOrder(CreateOrderCommand command)
{
// Create order logic...
var order = new Order { /* ... */ };
// Publish event
await _eventBus.PublishAsync(new OrderCreatedEvent
{
EventId = Guid.NewGuid(),
OccurredAt = DateTime.UtcNow,
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount
});
}
private async Task HandlePaymentCompleted(PaymentCompletedEvent evt)
{
// Update order status
Console.WriteLine($"Payment completed for order {evt.OrderId}");
}
}
12. Monitoring & Troubleshooting
Key Metrics to Monitor
public class ServiceBusMonitor
{
private readonly ServiceBusAdministrationClient _adminClient;
private readonly ILogger<ServiceBusMonitor> _logger;
public async Task<HealthStatus> CheckHealth()
{
var health = new HealthStatus();
try
{
// Check namespace accessibility
var namespaceProperties = await _adminClient.GetNamespacePropertiesAsync();
health.NamespaceActive = true;
// Check queues
await foreach (var queue in _adminClient.GetQueuesAsync())
{
var runtimeProps = await _adminClient.GetQueueRuntimePropertiesAsync(queue.Name);
health.Queues.Add(new QueueHealth
{
Name = queue.Name,
ActiveMessages = runtimeProps.Value.ActiveMessageCount,
DeadLetterMessages = runtimeProps.Value.DeadLetterMessageCount,
IsHealthy = runtimeProps.Value.DeadLetterMessageCount < 100 // Threshold
});
// Alert if too many dead letters
if (runtimeProps.Value.DeadLetterMessageCount > 100)
{
_logger.LogWarning("Queue {Queue} has {Count} dead letter messages",
queue.Name, runtimeProps.Value.DeadLetterMessageCount);
}
}
return health;
}
catch (Exception ex)
{
_logger.LogError(ex, "Health check failed");
health.NamespaceActive = false;
return health;
}
}
public async Task<Dictionary<string, object>> GetDetailedMetrics(string queueName)
{
var properties = await _adminClient.GetQueueRuntimePropertiesAsync(queueName);
var queue = properties.Value;
return new Dictionary<string, object>
{
["ActiveMessageCount"] = queue.ActiveMessageCount,
["DeadLetterMessageCount"] = queue.DeadLetterMessageCount,
["ScheduledMessageCount"] = queue.ScheduledMessageCount,
["TransferDeadLetterMessageCount"] = queue.TransferDeadLetterMessageCount,
["TransferMessageCount"] = queue.TransferMessageCount,
["SizeInBytes"] = queue.SizeInBytes,
["CreatedAt"] = queue.CreatedAt,
["UpdatedAt"] = queue.UpdatedAt,
["AccessedAt"] = queue.AccessedAt
};
}
}
Application Insights Integration
public class TelemetryProcessor : ServiceBusProcessor
{
private readonly TelemetryClient _telemetryClient;
protected override async Task OnProcessMessageAsync(ProcessMessageEventArgs args)
{
using var operation = _telemetryClient.StartOperation<RequestTelemetry>("ProcessMessage");
try
{
operation.Telemetry.Properties["MessageId"] = args.Message.MessageId;
operation.Telemetry.Properties["Queue"] = args.EntityPath;
operation.Telemetry.Properties["DeliveryCount"] = args.Message.DeliveryCount.ToString();
// Process message
await base.OnProcessMessageAsync(args);
operation.Telemetry.Success = true;
_telemetryClient.TrackMetric("MessageProcessed", 1,
new Dictionary<string, string> { ["Queue"] = args.EntityPath });
}
catch (Exception ex)
{
operation.Telemetry.Success = false;
_telemetryClient.TrackException(ex);
throw;
}
}
}
Common Issues and Solutions
Issue 1: Messages Not Being Processed
public class TroubleshootingHelper
{
public async Task DiagnoseQueueIssues(string queueName)
{
Console.WriteLine($"Diagnosing queue: {queueName}");
// Check queue properties
var properties = await _adminClient.GetQueueRuntimePropertiesAsync(queueName);
Console.WriteLine($"Active messages: {properties.Value.ActiveMessageCount}");
Console.WriteLine($"Dead letter messages: {properties.Value.DeadLetterMessageCount}");
// Peek messages without removing
var receiver = _client.CreateReceiver(queueName);
var peekedMessages = await receiver.PeekMessagesAsync(10);
foreach (var message in peekedMessages)
{
Console.WriteLine($"Message ID: {message.MessageId}");
Console.WriteLine($"Enqueued: {message.EnqueuedTime}");
Console.WriteLine($"Delivery Count: {message.DeliveryCount}");
Console.WriteLine($"Lock Until: {message.LockedUntil}");
}
// Check dead letter queue
var dlqReceiver = _client.CreateReceiver(queueName,
new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter });
var dlqMessages = await dlqReceiver.PeekMessagesAsync(10);
foreach (var dlq in dlqMessages)
{
Console.WriteLine($"DLQ Reason: {dlq.DeadLetterReason}");
Console.WriteLine($"DLQ Description: {dlq.DeadLetterErrorDescription}");
}
}
}
Issue 2: Performance Problems
public class PerformanceDiagnostics
{
public async Task AnalyzePerformance()
{
var stopwatch = Stopwatch.StartNew();
var messageCount = 1000;
// Test send performance
var sendTimes = new List<long>();
for (int i = 0; i < messageCount; i++)
{
var sw = Stopwatch.StartNew();
await sender.SendMessageAsync(new ServiceBusMessage($"Test {i}"));
sendTimes.Add(sw.ElapsedMilliseconds);
}
Console.WriteLine($"Average send time: {sendTimes.Average()}ms");
Console.WriteLine($"Max send time: {sendTimes.Max()}ms");
// Test receive performance
var receiveTimes = new List<long>();
var receiver = _client.CreateReceiver(queueName);
for (int i = 0; i < messageCount; i++)
{
var sw = Stopwatch.StartNew();
var message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(5));
if (message != null)
{
await receiver.CompleteMessageAsync(message);
receiveTimes.Add(sw.ElapsedMilliseconds);
}
}
Console.WriteLine($"Average receive time: {receiveTimes.Average()}ms");
Console.WriteLine($"Max receive time: {receiveTimes.Max()}ms");
}
}
13. Best Practices
1. Message Design
// Good: Small, focused messages
public class OrderCreatedEvent
{
public string OrderId { get; set; }
public DateTime CreatedAt { get; set; }
public decimal TotalAmount { get; set; }
// Reference to detailed data, not the data itself
public string OrderDetailsUrl { get; set; }
}
// Bad: Large, monolithic messages
public class BadOrderEvent
{
public Order FullOrder { get; set; }
public List<OrderItem> AllItems { get; set; }
public Customer CompleteCustomerRecord { get; set; }
public byte[] AttachedPdf { get; set; } // Don't do this!
}
2. Error Handling Strategy
public class RobustMessageHandler
{
private readonly ILogger _logger;
private readonly int[] _retryDelays = { 1, 5, 10, 30, 60 }; // seconds
public async Task HandleMessageAsync(ProcessMessageEventArgs args)
{
var attemptCount = args.Message.DeliveryCount;
try
{
// Add delay for retries (exponential backoff)
if (attemptCount > 1)
{
var delayIndex = Math.Min(attemptCount - 2, _retryDelays.Length - 1);
await Task.Delay(TimeSpan.FromSeconds(_retryDelays[delayIndex]));
}
// Process with timeout
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
await ProcessBusinessLogicAsync(args.Message, cts.Token);
// Success - complete the message
await args.CompleteMessageAsync(args.Message);
}
catch (BusinessValidationException ex)
{
// Business rule violation - don't retry
_logger.LogWarning(ex, "Business validation failed");
await args.DeadLetterMessageAsync(args.Message,
"ValidationFailed", ex.Message);
}
catch (TransientException ex) when (attemptCount < 5)
{
// Transient error - retry
_logger.LogWarning(ex, "Transient error, attempt {Attempt}", attemptCount);
await args.AbandonMessageAsync(args.Message);
}
catch (Exception ex)
{
// Unexpected error - dead letter after max attempts
_logger.LogError(ex, "Failed to process message after {Attempts} attempts", attemptCount);
await args.DeadLetterMessageAsync(args.Message,
"ProcessingFailed", $"Failed after {attemptCount} attempts: {ex.Message}");
}
}
}
3. Connection Management
public class ServiceBusConnectionManager : IAsyncDisposable
{
private readonly ServiceBusClient _client;
private readonly Dictionary<string, ServiceBusSender> _senders = new();
private readonly Dictionary<string, ServiceBusProcessor> _processors = new();
private readonly SemaphoreSlim _semaphore = new(1, 1);
public ServiceBusConnectionManager(string connectionString)
{
_client = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
RetryOptions = new ServiceBusRetryOptions
{
Mode = ServiceBusRetryMode.Exponential,
MaxRetries = 5,
Delay = TimeSpan.FromSeconds(1),
MaxDelay = TimeSpan.FromMinutes(1)
}
});
}
public async Task<ServiceBusSender> GetSenderAsync(string entityPath)
{
if (_senders.TryGetValue(entityPath, out var sender))
return sender;
await _semaphore.WaitAsync();
try
{
if (!_senders.TryGetValue(entityPath, out sender))
{
sender = _client.CreateSender(entityPath);
_senders[entityPath] = sender;
}
return sender;
}
finally
{
_semaphore.Release();
}
}
public async ValueTask DisposeAsync()
{
foreach (var processor in _processors.Values)
{
await processor.StopProcessingAsync();
await processor.DisposeAsync();
}
foreach (var sender in _senders.Values)
{
await sender.DisposeAsync();
}
await _client.DisposeAsync();
}
}
4. Testing Strategies
public class ServiceBusTestHelper
{
// Use in-memory test double for unit tests
public interface IServiceBusService
{
Task SendMessageAsync(string queueName, object message);
Task<T> ReceiveMessageAsync<T>(string queueName);
}
// Integration test with real Service Bus
[TestClass]
public class ServiceBusIntegrationTests
{
private ServiceBusClient _client;
private ServiceBusAdministrationClient _adminClient;
private string _testQueueName;
[TestInitialize]
public async Task Setup()
{
_testQueueName = $"test-queue-{Guid.NewGuid()}";
_adminClient = new ServiceBusAdministrationClient(TestConnectionString);
// Create test queue
await _adminClient.CreateQueueAsync(_testQueueName);
_client = new ServiceBusClient(TestConnectionString);
}
[TestCleanup]
public async Task Cleanup()
{
// Delete test queue
await _adminClient.DeleteQueueAsync(_testQueueName);
await _client.DisposeAsync();
}
[TestMethod]
public async Task Should_Send_And_Receive_Message()
{
// Arrange
var sender = _client.CreateSender(_testQueueName);
var receiver = _client.CreateReceiver(_testQueueName);
var expectedMessage = "Test message";
// Act
await sender.SendMessageAsync(new ServiceBusMessage(expectedMessage));
var receivedMessage = await receiver.ReceiveMessageAsync();
// Assert
Assert.IsNotNull(receivedMessage);
Assert.AreEqual(expectedMessage, receivedMessage.Body.ToString());
await receiver.CompleteMessageAsync(receivedMessage);
}
}
}
14. Migration Guide
Migrating from Legacy SDK
// Old SDK (WindowsAzure.ServiceBus)
var factory = MessagingFactory.CreateFromConnectionString(connectionString);
var client = factory.CreateQueueClient(queueName);
var message = new BrokeredMessage("Hello");
await client.SendAsync(message);
// New SDK (Azure.Messaging.ServiceBus)
await using var client = new ServiceBusClient(connectionString);
await using var sender = client.CreateSender(queueName);
await sender.SendMessageAsync(new ServiceBusMessage("Hello"));
Migration Checklist
- Update NuGet Packages ```xml
2. **Update Connection Strings**
```csharp
// Add TransportType if using AMQP over WebSockets
connectionString += ";TransportType=AmqpWebSockets";
- Update Message Types ```csharp // Old BrokeredMessage β ServiceBusMessage MessageReceiver β ServiceBusReceiver MessageSender β ServiceBusSender QueueClient β ServiceBusClient + ServiceBusSender/Receiver
// New patterns await using var client = new ServiceBusClient(connectionString); await using var sender = client.CreateSender(queueName); await using var receiver = client.CreateReceiver(queueName); ```
15. Resources & Next Steps
Official Resources
Code Samples
Tools and Libraries
- Service Bus Explorer: GUI tool for managing Service Bus
- MassTransit: Open-source distributed application framework
- NServiceBus: Commercial messaging framework
- Rebus: Simple and lean service bus implementation
Next Steps
- Start Small: Create a simple queue and send/receive messages
- Implement Patterns: Try request-reply or pub/sub patterns
- Add Resilience: Implement retry logic and dead letter handling
- Monitor: Set up Application Insights or Azure Monitor
- Scale: Test performance and optimize for your workload
Community
Summary
Azure Service Bus is a powerful enterprise messaging solution that enables reliable, scalable communication between distributed applications. By following the patterns and practices in this guide, you can build robust messaging solutions that handle failures gracefully and scale with your needs.
Remember:
- Start with the right tier for your needs
- Design small, focused messages
- Implement proper error handling
- Monitor your queues and topics
- Use batching for performance
- Leverage dead letter queues
Happy messaging! π