Skip to content

Consumer

Brian Lehnen edited this page Apr 8, 2026 · 9 revisions

Consuming messages

You may consume messages in a couple of different ways. Either via dedicated worker threads, or via a pool of shared threads between multiple queues ConsumerAsync

Creating the queue

You'll need the following

  • The transport your connecting to, determined by the transport init module you specify when creating the container.
  • The connection string to the transport
  • The name of the queue
  • A delegate that will be called for each message to process

ConsumerQueueNotifications provides 6 callback delegates for error handling and completion tracking.

using (var queueContainer = new QueueContainer<QueueInit>())
{
    var queueConnection = new QueueConnection(queueName, connectionString);
    using (var queue = queueContainer.CreateConsumer(queueConnection))
    {
        queue.Configuration.Worker.WorkerCount = 4;

        var notifications = new ConsumerQueueNotifications(
            (notification) => Console.WriteLine($"Error: {notification.Error}"),
            (notification) => Console.WriteLine($"Receive error: {notification.Error}"),
            (notification) => Console.WriteLine($"Moved to error queue: {notification.MessageId}"),
            (notification) => Console.WriteLine($"Poison message: {notification.MessageId}"),
            (notification) => Console.WriteLine($"Rollback: {notification.MessageId}"),
            (notification) => Console.WriteLine($"Completed: {notification.MessageId}"));

        queue.Start<SimpleMessage>(HandleMessages, notifications);
        Console.WriteLine("Processing messages - press any key to stop");
        Console.ReadKey((true));
    }
}

private void HandleMessages(IReceivedMessage<SimpleMessage> message, IWorkerNotification notifications)
{
    //processing logic goes here
}

When your delegate method finishes with no errors, the message will be acked and considered finished. However, you may wish to check for a condition in your logic

  • The queue shutting down. A cancel token is provided for this.
    • If the transport supports rollback, you may throw an operation canceled exception to requeue the message

For example, here is how you can check to see if cancellation is requested, and also force a requeue. Note that we are verifying that the transport supports rollbacks first.

if (notifications.TransportSupportsRollback)
{
    var canceled = notifications.MessageCancellation.Token.WaitHandle.WaitOne(
        TimeSpan.FromMilliseconds(processingTime));
    if (canceled) throw new OperationCanceledException("Processing was canceled");
}

You could register a delegate with the cancel token instead, so that you don't have to constantly check the token throughout your code.

IWorkerNotification properties

Your handler receives an IWorkerNotification with these properties:

Property Type Description
WorkerStopping ICancelWork Fires when the queue is shutting down; check this to stop long-running work cooperatively
HeaderNames IHeaders Well-known header names
HeartBeat IWorkerHeartBeatNotification Heartbeat status for the current message
TransportSupportsRollback bool Whether the transport supports rolling back a de-queue; always check before throwing OperationCanceledException
Log ILogger Microsoft.Extensions.Logging logger scoped to the worker
Metrics IMetrics Metrics logging
Tracer ActivitySource OpenTelemetry ActivitySource for distributed tracing
MessageCancellation IMessageCancellation Per-message cancellation token; fires on graceful shutdown or a dashboard-initiated cancel
Logging inside a handler

Use the standard MEL extension methods on notifications.Log:

notifications.Log.LogDebug($"Starting message {message.MessageId.Id.Value}");
notifications.Log.LogInformation($"Processing complete");
notifications.Log.LogWarning($"Retrying due to transient error");
notifications.Log.LogError($"Unhandled exception during processing");

For a complete working example, see SQLServerConsumer in the samples repository.

Stopping the queue

To stop the queue, call dispose on it.

queue.Dispose();

Calling dispose on the queue container will dispose all queues created by that container as well.

Dispose is a blocking operation. Depending on your configuration settings and how quickly your message consuming code responds to cancels, it may take a while to return.

Clone this wiki locally