-
Notifications
You must be signed in to change notification settings - Fork 16
PostGreTransport
Brian Lehnen edited this page May 16, 2026
·
6 revisions
The PostGre transport uses SQL tables to store messages. It supports the following features
- Delayed messages
- Message expiration
- Message priority
- Routing
- History tracking
- User-defined metadata columns
- Transactional, status-based, or FIFO processing
The PostGre transport requires that the queue be created before usage. This can be done manually, or via the queue creation classes.
//Create the queue if it doesn't exist
var queueName = "testing";
var connectionString = "Server=V-SQL;Port=5432;Database=IntegrationTesting;Integrated Security=true;";
var queueConnection = new QueueConnection(queueName, connectionString);
using (var createQueueContainer = new QueueCreationContainer<PostgreSqlMessageQueueInit>())
{
using (var createQueue = createQueueContainer.GetQueueCreation<PostgreSqlMessageQueueCreation>(queueConnection))
{
if (!createQueue.QueueExists)
{
createQueue.CreateQueue();
}
}
}[Producer]
var queueConnection = new QueueConnection(queueName, connectionString);
using (var queueContainer = new QueueContainer<PostgreSqlMessageQueueInit>())
{
using (var queue = queueContainer.CreateProducer<SimpleMessage>(queueConnection))
{
queue.Send(new SimpleMessage {Message = "Hello World"});
}
}[Consumer]
var queueConnection = new QueueConnection(queueName, connectionString);
using (var queueContainer = new QueueContainer<PostgreSqlMessageQueueInit>())
{
using (var queue = queueContainer.CreateConsumer(queueConnection))
{
var notifications = new ConsumerQueueNotifications(
(n) => Console.WriteLine($"Error: {n.Error}"),
(n) => Console.WriteLine($"Receive error: {n.Error}"),
(n) => Console.WriteLine($"Moved to error queue: {n.MessageId}"),
(n) => Console.WriteLine($"Poison message: {n.MessageId}"),
(n) => Console.WriteLine($"Rollback: {n.MessageId}"),
(n) => Console.WriteLine($"Completed: {n.MessageId}"));
queue.Start<SimpleMessage>(HandleMessages, notifications);
Console.WriteLine("Processing messages - press any key to stop");
Console.ReadKey((true));
}
}
private void HandleMessages(IReceivedMessage<SimpleMessage> message, IWorkerNotification notifications)
{
notifications.Log.LogDebug($"Processing Message {message.Body.Message}");
}For more consumer patterns, see ConsumerMethod and ConsumerAsync.
If history tracking is enabled, see MessageHistory for retention and query options.
To enqueue a queue message inside your own business transaction (transactional outbox pattern), see Outbox Pattern.
See the PostgreSQL samples in the DotNetWorkQueue.Samples repository.
For any issues please use the GitHub issues