mq

package module
v0.0.0-...-8eef57f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2025 License: MIT Imports: 20 Imported by: 0

README

📨 MQ - Unified Message Queue Module

Go Version License

A unified Starlark module for message queue operations across AWS SQS and Azure Service Bus. Provides a consistent API interface for common message queue operations including queue management, message sending/receiving, and dead letter queue handling.

⭐ Features

Unified API
  • Single Interface: Same API for AWS SQS and Azure Service Bus
  • Auto-Detection: Automatically detects service type based on credentials
  • Consistent Error Handling: Unified error messages across services
Message Queue Operations
  • Queue Management: Create, delete, list, and inspect queues
  • Message Operations: Send, receive, delete, and schedule messages
  • Batch Operations: Send multiple messages efficiently
  • Dead Letter Queues: Handle failed message processing
Advanced Features
  • Message Scheduling: Schedule messages for future delivery
  • Session Support: Ordered message processing (Azure)
  • Duplicate Detection: Prevent duplicate message processing
  • Lock Management: Control message visibility and processing time

🚀 Installation

Add to your Go module:

go get github.com/starpkg/mq

⚙️ Configuration

Environment Variables
Variable Description Default
MQ_SERVICE_TYPE Service type (aws_sqs, azure_servicebus, auto) auto
MQ_TIMEOUT Connection timeout in seconds 30
MQ_MAX_RETRIES Maximum retry attempts 3
MQ_CONNECTION_STRING Azure Service Bus connection string ""
MQ_AWS_REGION AWS region for SQS us-east-1
MQ_AWS_ACCESS_KEY AWS access key ID ""
MQ_AWS_SECRET_KEY AWS secret access key ""
MQ_AWS_SESSION_TOKEN AWS session token ""
MQ_DEFAULT_LOCK_DURATION Default message lock duration (seconds) 30
MQ_DEFAULT_BATCH_SIZE Default batch size for operations 10

📚 API Reference

Module Functions

These functions are available directly from the mq module:

connect(service_type?, connection_string?, aws_region?, aws_access_key?, aws_secret_key?, timeout?, max_retries?) -> Client

Creates and returns a message queue client.

Parameters:

  • service_type (string, optional): Service type - "aws_sqs", "azure_servicebus", or "auto" (default: "auto")
  • connection_string (string, optional): Azure Service Bus connection string (default: "")
  • aws_region (string, optional): AWS region for SQS (default: "us-east-1")
  • aws_access_key (string, optional): AWS access key ID (default: "")
  • aws_secret_key (string, optional): AWS secret access key (default: "")
  • timeout (int, optional): Connection timeout in seconds (default: 30)
  • max_retries (int, optional): Maximum retry attempts (default: 3)

Returns: Client object for message queue operations

Example:

# AWS SQS
client = mq.connect(
    service_type="aws_sqs",
    aws_region="us-west-2",
    aws_access_key="your-access-key",
    aws_secret_key="your-secret-key"
)

# Azure Service Bus
client = mq.connect(
    service_type="azure_servicebus",
    connection_string="Endpoint=sb://..."
)

# Auto-detect (based on provided credentials)
client = mq.connect(aws_region="us-east-1")  # Will use AWS SQS
get_supported_services() -> list

Returns a list of supported message queue services.

Returns: List of supported service types: ["aws_sqs", "azure_servicebus"]

Example:

services = mq.get_supported_services()
print(services)  # ["aws_sqs", "azure_servicebus"]
get_client_info(client) -> dict

Returns information about a client instance.

Parameters:

  • client (Client): The client object to inspect

Returns: Dictionary containing client information

Example:

info = mq.get_client_info(client)
print(info["service_type"])  # "aws_sqs" or "azure_servicebus"

Client Methods

Once you have a client object, you can call these methods:

Queue Operations
create_queue(name, lock_duration?, retention_period?, max_delivery_count?, dead_letter_config?, enable_sessions?, duplicate_detection?, duplicate_window_secs?, max_queue_size?) -> Queue

Creates a new message queue.

Parameters:

  • name (string, required): Queue name
  • lock_duration (int, optional): Message lock duration in seconds (default: 30)
  • retention_period (int, optional): Message retention period in seconds (default: 1209600 = 14 days)
  • max_delivery_count (int, optional): Maximum delivery attempts before moving to DLQ (default: 10)
  • dead_letter_config (dict, optional): Dead letter queue configuration
  • enable_sessions (bool, optional): Enable message sessions/ordering (default: false)
  • duplicate_detection (bool, optional): Enable duplicate detection (default: false)
  • duplicate_window_secs (int, optional): Duplicate detection window in seconds (default: 300)
  • max_queue_size (int, optional): Maximum queue size in bytes (default: 0 = unlimited)

Dead Letter Config Structure:

{
    "enabled": True,
    "queue_name": "my-dlq",  # Dead letter queue name
    "max_delivery_count": 5   # Max attempts before moving to DLQ
}

Returns: Queue object with queue information

Example:

# Basic queue
queue = client.create_queue("orders")

# Queue with DLQ
queue = client.create_queue(
    "priority-orders",
    lock_duration=60,
    max_delivery_count=3,
    dead_letter_config={
        "enabled": True,
        "queue_name": "priority-orders-dlq",
        "max_delivery_count": 3
    }
)
delete_queue(name) -> bool

Deletes a queue.

Parameters:

  • name (string, required): Queue name to delete

Returns: True if successful

list_queues(prefix?) -> list

Lists queues, optionally filtered by name prefix.

Parameters:

  • prefix (string, optional): Filter queues by name prefix (default: "" = all queues)

Returns: List of Queue objects

get_queue(name) -> Queue

Gets information about a specific queue.

Parameters:

  • name (string, required): Queue name

Returns: Queue object or None if not found

exists(name) -> bool

Checks if a queue exists.

Parameters:

  • name (string, required): Queue name

Returns: True if queue exists, False otherwise

purge(name) -> bool

Purges all messages from a queue.

Parameters:

  • name (string, required): Queue name

Returns: True if successful

Note: ⚠️ AWS SQS implementation is not yet complete.

get_info(name) -> dict

Gets detailed queue statistics and information.

Parameters:

  • name (string, required): Queue name

Returns: Dictionary with queue statistics

Message Operations
send(queue_name, body, properties?, scheduled_time?, session_id?, correlation_id?, reply_to?, time_to_live?, message_id?) -> MessageResult

Sends a message to a queue.

Parameters:

  • queue_name (string, required): Target queue name
  • body (string, required): Message body content
  • properties (dict, optional): Custom message properties/attributes
  • scheduled_time (string, optional): ISO 8601 timestamp for scheduled delivery
  • session_id (string, optional): Session ID for message ordering
  • correlation_id (string, optional): Correlation ID for request-response patterns
  • reply_to (string, optional): Reply queue name
  • time_to_live (int, optional): Message TTL in seconds
  • message_id (string, optional): Custom message ID

Returns: MessageResult object with send result

Example:

# Basic message
result = client.send("orders", "New order received")

# Message with properties
result = client.send(
    "orders",
    "Priority order",
    properties={"priority": "high", "customer_id": "12345"},
    time_to_live=3600
)

# Scheduled message
result = client.send(
    "reminders",
    "Daily reminder",
    scheduled_time="2024-01-01T10:00:00Z"
)
receive(queue_name, max_count?, wait_time?, lock_duration?, peek_only?) -> list

Receives messages from a queue.

Parameters:

  • queue_name (string, required): Source queue name
  • max_count (int, optional): Maximum number of messages to receive (default: 1)
  • wait_time (int, optional): Long polling wait time in seconds (default: 0)
  • lock_duration (int, optional): Message lock duration in seconds (default: uses queue default)
  • peek_only (bool, optional): Peek without removing messages (default: false)

Returns: List of MessageResult objects

Example:

# Receive one message
messages = client.receive("orders")

# Receive multiple with long polling
messages = client.receive("orders", max_count=10, wait_time=20)

# Peek without removing
messages = client.receive("orders", peek_only=True)
delete(queue_name, message_ids) -> bool

Deletes messages from a queue.

Parameters:

  • queue_name (string, required): Source queue name
  • message_ids (string or list, required): Message ID(s) to delete

Returns: True if successful

Example:

# Delete single message
client.delete("orders", "msg-123")

# Delete multiple messages
client.delete("orders", ["msg-123", "msg-456"])
batch_send(queue_name, messages) -> list

Sends multiple messages in a batch operation.

Parameters:

  • queue_name (string, required): Target queue name
  • messages (list, required): List of message dictionaries

Message Dictionary Structure:

{
    "body": "Message content",
    "properties": {"key": "value"},  # optional
    "session_id": "session1",        # optional
    "scheduled_time": "2024-01-01T10:00:00Z"  # optional
}

Returns: List of MessageResult objects

Note: ⚠️ AWS SQS implementation is not yet complete.

schedule(queue_name, body, scheduled_time, properties?, session_id?) -> MessageResult

Schedules a message for future delivery.

Parameters:

  • queue_name (string, required): Target queue name
  • body (string, required): Message body content
  • scheduled_time (string, required): ISO 8601 timestamp for delivery
  • properties (dict, optional): Custom message properties
  • session_id (string, optional): Session ID for ordering

Returns: MessageResult object

cancel(queue_name, message_id) -> bool

Cancels a scheduled message.

Parameters:

  • queue_name (string, required): Queue name
  • message_id (string, required): Message ID to cancel

Returns: True if successful

Note: ⚠️ Azure Service Bus implementation is not yet complete.

peek(queue_name, max_count?) -> list

Peeks at messages without receiving them.

Parameters:

  • queue_name (string, required): Source queue name
  • max_count (int, optional): Maximum messages to peek (default: 1)

Returns: List of MessageResult objects

Note: ⚠️ Azure Service Bus implementation is not yet complete.

Message Lock Management
lock(queue_name, message_id, lock_duration) -> bool

Extends the lock duration of a message.

Parameters:

  • queue_name (string, required): Queue name
  • message_id (string, required): Message ID
  • lock_duration (int, required): New lock duration in seconds

Returns: True if successful

Note: ⚠️ AWS SQS implementation is not yet complete.

unlock(queue_name, message_id) -> bool

Releases the lock on a message.

Parameters:

  • queue_name (string, required): Queue name
  • message_id (string, required): Message ID

Returns: True if successful

Note: ⚠️ AWS SQS implementation is not yet complete.

Dead Letter Queue Operations
dead_letter_receive(queue_name, max_count?) -> list

Receives messages from the dead letter queue.

Parameters:

  • queue_name (string, required): Main queue name (DLQ is auto-resolved)
  • max_count (int, optional): Maximum messages to receive (default: 10)

Returns: List of MessageResult objects from DLQ

dead_letter_requeue(queue_name, message_id) -> bool

Moves a message back from dead letter queue to main queue.

Parameters:

  • queue_name (string, required): Main queue name
  • message_id (string, required): Message ID to requeue

Returns: True if successful

Note: ⚠️ Both AWS SQS and Azure Service Bus implementations are not yet complete.

dead_letter_purge(queue_name) -> bool

Purges all messages from the dead letter queue.

Parameters:

  • queue_name (string, required): Main queue name (DLQ is auto-resolved)

Returns: True if successful

get_client_info() -> dict

Returns information about the client.

Returns: Dictionary containing client information


Data Structures
Queue Object

Represents a message queue with unified properties across services.

Properties:

  • name (string): Queue name
  • service_type (string): Service type ("aws_sqs" or "azure_servicebus")
  • url (string): Service-specific queue URL/identifier
  • message_count (int): Number of messages in queue
  • lock_duration (int): Message lock duration in seconds
  • retention_period (int): Message retention period in seconds
  • max_delivery_count (int): Maximum delivery attempts before DLQ
  • dead_letter_config (dict): Dead letter queue configuration
  • enable_sessions (bool): Whether message sessions are enabled
  • duplicate_detection (dict): Duplicate detection configuration
  • max_queue_size (int): Maximum queue size in bytes
  • created_time (string): Queue creation timestamp
  • modified_time (string): Last modification timestamp
MessageResult Object

Represents a message received from or sent to a queue.

Properties:

  • message_id (string): Unique message identifier
  • body (string): Message body content
  • properties (dict): Custom message properties/attributes
  • session_id (string): Session ID for message ordering
  • correlation_id (string): Correlation ID for request-response
  • reply_to (string): Reply queue name
  • enqueue_time (string): When message was enqueued
  • scheduled_time (string): Scheduled delivery time (if any)
  • lock_expires_at (string): When message lock expires
  • delivery_count (int): Number of delivery attempts
  • time_to_live (int): Message TTL in seconds
  • receipt_handle (string): Service-specific receipt handle
  • success (bool): Whether operation was successful
  • error (string): Error message (if any)

Implementation Status
✅ Fully Implemented
  • Basic queue operations (create, delete, list, get, exists)
  • Message send and receive operations
  • Dead letter queue receive and purge
  • Client connection and configuration
  • Azure Service Bus core functionality
⚠️ Partially Implemented
  • AWS SQS: purge, lock/unlock, batch_send, dead_letter_requeue
  • Azure Service Bus: cancel, peek, dead_letter_requeue
📋 Feature Compatibility Matrix
Feature AWS SQS Azure Service Bus
Queue Operations ✅ Full ✅ Full
Send Message ✅ Full ✅ Full
Receive Message ✅ Full ✅ Full
Delete Message ✅ Full ✅ Full
Batch Send ⚠️ TODO ✅ Full
Schedule Message ✅ Full ✅ Full
Cancel Message ✅ Full ⚠️ TODO
Peek Message ✅ Full ⚠️ TODO
Lock Management ⚠️ TODO ✅ Full
DLQ Receive ✅ Full ✅ Full
DLQ Requeue ⚠️ TODO ⚠️ TODO
DLQ Purge ✅ Full ✅ Full
Queue Purge ⚠️ TODO ✅ Full

🎯 Quick Start

Basic Usage
load("mq", "connect")

def main():
    # Connect to your message queue service
    client = connect(
        service_type="azure_servicebus",
        connection_string="Endpoint=sb://..."
    )
    
    # Create a queue
    queue = client.create_queue("orders")
    
    # Send a message
    result = client.send("orders", "New order received")
    print("Message sent: {}".format(result["message_id"]))
    
    # Receive and process messages
    messages = client.receive("orders", max_count=10)
    for msg in messages:
        print("Processing: {}".format(msg["body"]))
        # Process message here...
        client.delete("orders", msg["message_id"])

main()
Connection Examples
load("mq", "connect", "get_supported_services")

def main():
    # Check supported services
    print("Supported: {}".format(get_supported_services()))
    
    # AWS SQS
    aws_client = connect(
        service_type="aws_sqs",
        aws_region="us-west-2",
        aws_access_key="YOUR_ACCESS_KEY",
        aws_secret_key="YOUR_SECRET_KEY"
    )
    
    # Azure Service Bus
    azure_client = connect(
        service_type="azure_servicebus", 
        connection_string="Endpoint=sb://...;SharedAccessKeyName=...;SharedAccessKey=..."
    )
    
    # Auto-detect (uses first available credentials)
    auto_client = connect(aws_region="us-east-1")  # Will use AWS SQS

main()
More Examples
# Dead Letter Queue Example
queue = client.create_queue(
    "orders",
    max_delivery_count=3,
    dead_letter_config={
        "enabled": True,
        "queue_name": "orders-dlq",
        "max_delivery_count": 3
    }
)

# Batch sending
messages = [
    {"body": "Order #1", "properties": {"priority": "high"}},
    {"body": "Order #2", "properties": {"priority": "normal"}}
]
results = client.batch_send("orders", messages)

# Scheduled messages
client.schedule(
    "reminders",
    "Meeting reminder", 
    "2024-12-31T09:00:00Z",
    properties={"type": "meeting"}
)

🚨 Service-Specific Notes

AWS SQS
  • Maximum message delay: 15 minutes
  • Maximum batch size: 10 messages
  • FIFO queues require .fifo suffix
  • Message deduplication only for FIFO queues
Azure Service Bus
  • Message scheduling up to 7 days in advance
  • Sessions enable ordered processing
  • Built-in duplicate detection per queue
  • Native dead letter queue support

📈 Performance Tips

  1. Use batch operations for sending multiple messages
  2. Configure appropriate timeouts for your use case
  3. Enable long polling to reduce costs and improve responsiveness
  4. Use sessions/FIFO queues only when message ordering is required
  5. Monitor dead letter queues for failed message processing
  6. Set appropriate message TTL to prevent queue bloat

🧪 Testing Considerations

When writing tests for your message queue operations, consider using shorter timeouts for faster test execution:

# For testing: Use shorter lock duration to speed up tests
test_queue = client.create_queue(
    "test-queue",
    lock_duration=5,        # 5 seconds instead of default 30
    max_delivery_count=2    # Lower threshold for DLQ testing
)

# Production: Use default values for reliability
prod_queue = client.create_queue("prod-queue")  # Uses 30-second default
Testing Best Practices
  1. Use explicit short timeouts in test queues to speed up test execution
  2. Clean up test queues after each test to avoid conflicts
  3. Use unique queue names with timestamps to prevent collisions
  4. Test DLQ behavior with lower max_delivery_count values
  5. Mock external dependencies when testing queue integration

🐛 Troubleshooting

Common Issues

Connection timeouts:

  • Increase the timeout configuration
  • Check network connectivity
  • Verify credentials and permissions

Message not appearing:

  • Check visibility timeout settings
  • Verify queue exists and is correct
  • Check dead letter queue for failed messages

Authentication errors:

  • Verify AWS credentials or Azure connection string
  • Check IAM permissions for AWS SQS
  • Verify shared access policy for Azure Service Bus

For more detailed troubleshooting, enable debug logging and check the error messages returned by the module functions.

📄 License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Documentation

Overview

Package mq provides a Starlark module for unified message queue operations. It supports AWS SQS and Azure Service Bus with a consistent API.

Index

Constants

View Source
const (
	ServiceTypeAWSSQS          = "aws_sqs"
	ServiceTypeAzureServiceBus = "azure_servicebus"
	ServiceTypeAuto            = "auto"
)

Service type constants

View Source
const ModuleName = "mq"

ModuleName defines the expected name for this module when used in Starlark's load() function

Variables

View Source
var (
	// ErrQueueNotFound indicates that the specified queue does not exist
	ErrQueueNotFound = errors.New("queue not found")

	// ErrQueueAlreadyExists indicates that the queue already exists
	ErrQueueAlreadyExists = errors.New("queue already exists")

	// ErrMessageNotFound indicates that the specified message does not exist
	ErrMessageNotFound = errors.New("message not found")

	// ErrMessageTooLarge indicates that the message exceeds size limits
	ErrMessageTooLarge = errors.New("message too large")

	// ErrAccessDenied indicates insufficient permissions
	ErrAccessDenied = errors.New("access denied")

	// ErrThrottled indicates that the request was throttled
	ErrThrottled = errors.New("request throttled")

	// ErrServiceUnavailable indicates that the service is temporarily unavailable
	ErrServiceUnavailable = errors.New("service unavailable")

	// ErrInvalidParameter indicates invalid parameter values
	ErrInvalidParameter = errors.New("invalid parameter")

	// ErrUnsupportedOperation indicates that the operation is not supported by the service
	ErrUnsupportedOperation = errors.New("unsupported operation")

	// ErrConnectionFailed indicates that connection to the service failed
	ErrConnectionFailed = errors.New("connection failed")

	// ErrTimeout indicates that the operation timed out
	ErrTimeout = errors.New("operation timed out")
)

Common error types for unified error handling across services

Functions

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError determines if an error is retryable

func IsTemporaryError

func IsTemporaryError(err error) bool

IsTemporaryError determines if an error is temporary

func NormalizeError

func NormalizeError(service, operation string, err error) error

NormalizeError converts service-specific errors to unified MQError types

Types

type AWSSQSClient

type AWSSQSClient struct {
	// contains filtered or unexported fields
}

AWSSQSClient implements the Client interface for AWS SQS

func (*AWSSQSClient) BatchSend

func (c *AWSSQSClient) BatchSend(ctx context.Context, queueName string, messages []BatchMessage) ([]*MessageResult, error)

BatchSend sends multiple messages in batches

func (*AWSSQSClient) Cancel

func (c *AWSSQSClient) Cancel(ctx context.Context, queueName, messageID string) error

Cancel cancels a scheduled message (not supported by SQS)

func (*AWSSQSClient) Close

func (c *AWSSQSClient) Close() error

Close closes the client connection

func (*AWSSQSClient) CreateQueue

func (c *AWSSQSClient) CreateQueue(ctx context.Context, name string, options QueueOptions) (*Queue, error)

CreateQueue creates a new SQS queue

func (*AWSSQSClient) DeadLetterPurge

func (c *AWSSQSClient) DeadLetterPurge(ctx context.Context, queueName string) error

DeadLetterPurge purges all messages from the dead letter queue

func (*AWSSQSClient) DeadLetterReceive

func (c *AWSSQSClient) DeadLetterReceive(ctx context.Context, queueName string, maxCount int) ([]*MessageResult, error)

DeadLetterReceive receives messages from the dead letter queue

func (*AWSSQSClient) DeadLetterRequeue

func (c *AWSSQSClient) DeadLetterRequeue(ctx context.Context, queueName, messageID string) error

DeadLetterRequeue moves a message back from DLQ to main queue

func (*AWSSQSClient) Delete

func (c *AWSSQSClient) Delete(ctx context.Context, queueName string, messageIDs []string) ([]bool, error)

Delete deletes messages from a queue

func (*AWSSQSClient) DeleteQueue

func (c *AWSSQSClient) DeleteQueue(ctx context.Context, name string) error

DeleteQueue deletes an SQS queue

func (*AWSSQSClient) Exists

func (c *AWSSQSClient) Exists(ctx context.Context, name string) (bool, error)

Exists checks if a queue exists

func (*AWSSQSClient) GetClientInfo

func (c *AWSSQSClient) GetClientInfo() map[string]interface{}

GetClientInfo returns information about the client

func (*AWSSQSClient) GetInfo

func (c *AWSSQSClient) GetInfo(ctx context.Context, name string) (*Queue, error)

GetInfo gets detailed queue information

func (*AWSSQSClient) GetQueue

func (c *AWSSQSClient) GetQueue(ctx context.Context, name string) (*Queue, error)

GetQueue gets information about a specific queue

func (*AWSSQSClient) ListQueues

func (c *AWSSQSClient) ListQueues(ctx context.Context, prefix string) ([]*Queue, error)

ListQueues lists SQS queues

func (*AWSSQSClient) Lock

func (c *AWSSQSClient) Lock(ctx context.Context, queueName, messageID string, duration int) error

Lock extends the visibility timeout of a message

func (*AWSSQSClient) Peek

func (c *AWSSQSClient) Peek(ctx context.Context, queueName string, maxCount int) ([]*MessageResult, error)

Peek peeks at messages without receiving them (not supported by SQS)

func (*AWSSQSClient) Purge

func (c *AWSSQSClient) Purge(ctx context.Context, name string) error

Purge purges all messages from a queue

func (*AWSSQSClient) Receive

func (c *AWSSQSClient) Receive(ctx context.Context, queueName string, options ReceiveOptions) ([]*MessageResult, error)

Receive receives messages from a queue

func (*AWSSQSClient) Schedule

func (c *AWSSQSClient) Schedule(ctx context.Context, queueName, body string, scheduledTime time.Time, options MessageOptions) (*MessageResult, error)

Schedule schedules a message for future delivery

func (*AWSSQSClient) Send

func (c *AWSSQSClient) Send(ctx context.Context, queueName, body string, options MessageOptions) (*MessageResult, error)

Send sends a message to a queue

func (*AWSSQSClient) Unlock

func (c *AWSSQSClient) Unlock(ctx context.Context, queueName, messageID string) error

Unlock releases a message by setting visibility timeout to 0

type AzureServiceBusClient

type AzureServiceBusClient struct {
	// contains filtered or unexported fields
}

AzureServiceBusClient implements the Client interface for Azure Service Bus

func (*AzureServiceBusClient) AbandonMessage

func (c *AzureServiceBusClient) AbandonMessage(ctx context.Context, queueName string, msgResult *MessageResult) error

AbandonMessage abandons a message using the original Azure ReceivedMessage

func (*AzureServiceBusClient) BatchSend

func (c *AzureServiceBusClient) BatchSend(ctx context.Context, queueName string, messages []BatchMessage) ([]*MessageResult, error)

BatchSend sends multiple messages in batches

func (*AzureServiceBusClient) Cancel

func (c *AzureServiceBusClient) Cancel(ctx context.Context, queueName, messageID string) error

Cancel cancels a scheduled message

func (*AzureServiceBusClient) Close

func (c *AzureServiceBusClient) Close() error

Close closes the client connection

func (*AzureServiceBusClient) CompleteMessage

func (c *AzureServiceBusClient) CompleteMessage(ctx context.Context, queueName string, msgResult *MessageResult) error

CompleteMessage completes a message using the original Azure ReceivedMessage

func (*AzureServiceBusClient) CreateQueue

func (c *AzureServiceBusClient) CreateQueue(ctx context.Context, name string, options QueueOptions) (*Queue, error)

CreateQueue creates a new Service Bus queue using admin client

func (*AzureServiceBusClient) DeadLetterPurge

func (c *AzureServiceBusClient) DeadLetterPurge(ctx context.Context, queueName string) error

DeadLetterPurge purges all messages from the dead letter queue

func (*AzureServiceBusClient) DeadLetterReceive

func (c *AzureServiceBusClient) DeadLetterReceive(ctx context.Context, queueName string, maxCount int) ([]*MessageResult, error)

DeadLetterReceive receives messages from the dead letter queue

func (*AzureServiceBusClient) DeadLetterRequeue

func (c *AzureServiceBusClient) DeadLetterRequeue(ctx context.Context, queueName, messageID string) error

DeadLetterRequeue moves a message back from DLQ to main queue

func (*AzureServiceBusClient) Delete

func (c *AzureServiceBusClient) Delete(ctx context.Context, queueName string, messageIDs []string) ([]bool, error)

Delete deletes messages from a queue (completes them in Service Bus terms)

func (*AzureServiceBusClient) DeleteQueue

func (c *AzureServiceBusClient) DeleteQueue(ctx context.Context, name string) error

DeleteQueue deletes a Service Bus queue

func (*AzureServiceBusClient) Exists

func (c *AzureServiceBusClient) Exists(ctx context.Context, name string) (bool, error)

Exists checks if a queue exists

func (*AzureServiceBusClient) GetClientInfo

func (c *AzureServiceBusClient) GetClientInfo() map[string]interface{}

GetClientInfo returns information about the client

func (*AzureServiceBusClient) GetInfo

func (c *AzureServiceBusClient) GetInfo(ctx context.Context, name string) (*Queue, error)

GetInfo gets detailed queue information

func (*AzureServiceBusClient) GetQueue

func (c *AzureServiceBusClient) GetQueue(ctx context.Context, name string) (*Queue, error)

GetQueue gets information about a specific queue

func (*AzureServiceBusClient) ListQueues

func (c *AzureServiceBusClient) ListQueues(ctx context.Context, prefix string) ([]*Queue, error)

ListQueues lists Service Bus queues using admin client

func (*AzureServiceBusClient) Lock

func (c *AzureServiceBusClient) Lock(ctx context.Context, queueName, messageID string, duration int) error

Lock renews the lock on a message

func (*AzureServiceBusClient) Peek

func (c *AzureServiceBusClient) Peek(ctx context.Context, queueName string, maxCount int) ([]*MessageResult, error)

Peek peeks at messages without receiving them

func (*AzureServiceBusClient) Purge

func (c *AzureServiceBusClient) Purge(ctx context.Context, name string) error

Purge purges all messages from a queue

func (*AzureServiceBusClient) Receive

func (c *AzureServiceBusClient) Receive(ctx context.Context, queueName string, options ReceiveOptions) ([]*MessageResult, error)

Receive receives messages from a queue

func (*AzureServiceBusClient) RenewMessageLock

func (c *AzureServiceBusClient) RenewMessageLock(ctx context.Context, queueName string, msgResult *MessageResult) error

RenewMessageLock renews the lock on a message using the original Azure ReceivedMessage

func (*AzureServiceBusClient) Schedule

func (c *AzureServiceBusClient) Schedule(ctx context.Context, queueName, body string, scheduledTime time.Time, options MessageOptions) (*MessageResult, error)

Schedule schedules a message for future delivery

func (*AzureServiceBusClient) Send

func (c *AzureServiceBusClient) Send(ctx context.Context, queueName, body string, options MessageOptions) (*MessageResult, error)

Send sends a message to a queue

func (*AzureServiceBusClient) Unlock

func (c *AzureServiceBusClient) Unlock(ctx context.Context, queueName, messageID string) error

Unlock abandons a message (releases the lock)

type BatchMessage

type BatchMessage struct {
	Body          string                 // Message body
	Properties    map[string]interface{} // Message properties
	SessionID     string                 // Session ID
	CorrelationID string                 // Correlation ID
	ReplyTo       string                 // Reply to queue
	TimeToLive    int                    // TTL in seconds
	MessageID     string                 // Message ID
	ScheduledTime *time.Time             // Scheduled delivery time
}

BatchMessage represents a message for batch sending

type Client

type Client interface {
	// Queue operations
	CreateQueue(ctx context.Context, name string, options QueueOptions) (*Queue, error)
	DeleteQueue(ctx context.Context, name string) error
	ListQueues(ctx context.Context, prefix string) ([]*Queue, error)
	GetQueue(ctx context.Context, name string) (*Queue, error)
	Exists(ctx context.Context, name string) (bool, error)
	Purge(ctx context.Context, name string) error
	GetInfo(ctx context.Context, name string) (*Queue, error)

	// Message operations
	Send(ctx context.Context, queueName, body string, options MessageOptions) (*MessageResult, error)
	Receive(ctx context.Context, queueName string, options ReceiveOptions) ([]*MessageResult, error)
	Delete(ctx context.Context, queueName string, messageIDs []string) ([]bool, error)

	// Message lock management
	Lock(ctx context.Context, queueName, messageID string, duration int) error
	Unlock(ctx context.Context, queueName, messageID string) error

	// Batch operations
	BatchSend(ctx context.Context, queueName string, messages []BatchMessage) ([]*MessageResult, error)

	// Specialized message operations
	Schedule(ctx context.Context, queueName, body string, scheduledTime time.Time, options MessageOptions) (*MessageResult, error)
	Cancel(ctx context.Context, queueName, messageID string) error
	Peek(ctx context.Context, queueName string, maxCount int) ([]*MessageResult, error)

	// Dead letter queue operations
	DeadLetterReceive(ctx context.Context, queueName string, maxCount int) ([]*MessageResult, error)
	DeadLetterRequeue(ctx context.Context, queueName, messageID string) error
	DeadLetterPurge(ctx context.Context, queueName string) error

	// Connection management
	Close() error
	GetClientInfo() map[string]interface{}
}

Client interface defines the unified operations for message queue services

func NewAWSSQSClient

func NewAWSSQSClient(ctx context.Context, config *ClientConfig) (Client, error)

NewAWSSQSClient creates a new AWS SQS client

func NewAzureServiceBusClient

func NewAzureServiceBusClient(ctx context.Context, config *ClientConfig) (Client, error)

NewAzureServiceBusClient creates a new Azure Service Bus client

type ClientConfig

type ClientConfig struct {
	// Service configuration
	ServiceType      string // Service type (aws_sqs, azure_servicebus, auto)
	ConnectionString string // Azure Service Bus connection string

	// AWS specific configuration
	AWSRegion       string // AWS region
	AWSAccessKey    string // AWS access key ID
	AWSSecretKey    string // AWS secret access key
	AWSSessionToken string // AWS session token

	// Connection and performance settings
	Timeout    int // Connection timeout in seconds
	MaxRetries int // Maximum retry attempts

	// Default operation settings
	DefaultLockDuration int // Default message lock duration in seconds
	DefaultBatchSize    int // Default batch size for operations
}

ClientConfig contains configuration for a message queue client

func (*ClientConfig) Copy

func (c *ClientConfig) Copy() *ClientConfig

Copy creates a copy of the configuration

func (*ClientConfig) GetDefaultLockDuration

func (c *ClientConfig) GetDefaultLockDuration() time.Duration

GetDefaultLockDuration returns the default lock duration as a time.Duration

func (*ClientConfig) GetTimeout

func (c *ClientConfig) GetTimeout() time.Duration

GetTimeout returns the timeout as a time.Duration

func (*ClientConfig) Validate

func (c *ClientConfig) Validate() error

Validate validates the configuration

type ClientWrapper

type ClientWrapper struct {
	// contains filtered or unexported fields
}

ClientWrapper wraps the message queue client for Starlark

func NewClientWrapper

func NewClientWrapper(client Client) *ClientWrapper

NewClientWrapper creates a new ClientWrapper with initialized method maps

func (*ClientWrapper) Attr

func (cw *ClientWrapper) Attr(name string) (starlark.Value, error)

Implement starlark.HasAttrs interface

func (*ClientWrapper) AttrNames

func (cw *ClientWrapper) AttrNames() []string

func (*ClientWrapper) Freeze

func (cw *ClientWrapper) Freeze()

func (*ClientWrapper) Hash

func (cw *ClientWrapper) Hash() (uint32, error)

func (*ClientWrapper) String

func (cw *ClientWrapper) String() string

Implement starlark.Value interface

func (*ClientWrapper) Truth

func (cw *ClientWrapper) Truth() starlark.Bool

func (*ClientWrapper) Type

func (cw *ClientWrapper) Type() string

type DeadLetterConfig

type DeadLetterConfig struct {
	Enabled          bool   // Whether DLQ is enabled
	QueueName        string // Dead letter queue name
	MaxDeliveryCount int    // Maximum delivery count before moving to DLQ
}

DeadLetterConfig contains dead letter queue configuration

type DuplicateDetection

type DuplicateDetection struct {
	Enabled       bool `json:"enabled"`
	WindowSeconds int  `json:"window_seconds"` // Deduplication window in seconds
}

DuplicateDetection represents duplicate detection configuration

type ErrorType

type ErrorType string

ErrorType categorizes different types of errors

const (
	ErrorTypeNotFound      ErrorType = "not_found"
	ErrorTypeAlreadyExists ErrorType = "already_exists"
	ErrorTypePermission    ErrorType = "permission"
	ErrorTypeThrottling    ErrorType = "throttling"
	ErrorTypeValidation    ErrorType = "validation"
	ErrorTypeConnection    ErrorType = "connection"
	ErrorTypeTimeout       ErrorType = "timeout"
	ErrorTypeService       ErrorType = "service"
	ErrorTypeUnsupported   ErrorType = "unsupported"
	ErrorTypeUnknown       ErrorType = "unknown"
)

type MQError

type MQError struct {
	// Type categorizes the error
	Type ErrorType

	// Message provides a human-readable description
	Message string

	// Service indicates which service reported the error
	Service string

	// Operation indicates which operation failed
	Operation string

	// Underlying error from the service
	Err error

	// Additional context
	Context map[string]interface{}
}

MQError represents a message queue operation error with additional context

func NewMQError

func NewMQError(errType ErrorType, service, operation, message string, err error) *MQError

NewMQError creates a new MQError

func (*MQError) Error

func (e *MQError) Error() string

Error implements the error interface

func (*MQError) Is

func (e *MQError) Is(target error) bool

Is checks if the error matches a target error

func (*MQError) Unwrap

func (e *MQError) Unwrap() error

Unwrap returns the underlying error

func (*MQError) WithContext

func (e *MQError) WithContext(key string, value interface{}) *MQError

WithContext adds context to the error

type MessageOptions

type MessageOptions struct {
	// Message properties and metadata
	Properties map[string]interface{} // Message properties/attributes

	// Message scheduling
	ScheduledTime *time.Time // When message should become available for processing

	// Message grouping and correlation
	SessionID     string // Session ID for ordered processing
	CorrelationID string // Correlation ID for request tracking
	ReplyTo       string // Response destination queue

	// Message lifecycle
	TimeToLive int    // Message TTL in seconds
	MessageID  string // Message ID for deduplication
}

MessageOptions contains options for sending messages

type MessageResult

type MessageResult struct {
	// Message identification
	MessageID string `json:"message_id"`
	Body      string `json:"body"`

	// Message metadata and properties
	Properties map[string]interface{} `json:"properties"`

	// Message grouping and correlation
	SessionID     string `json:"session_id"`
	CorrelationID string `json:"correlation_id"`
	ReplyTo       string `json:"reply_to"`

	// Timing information
	EnqueueTime   time.Time  `json:"enqueue_time"`
	ScheduledTime *time.Time `json:"scheduled_time,omitempty"`
	LockExpiresAt *time.Time `json:"lock_expires_at,omitempty"`

	// Delivery information
	DeliveryCount int `json:"delivery_count"`
	TimeToLive    int `json:"time_to_live"` // TTL in seconds

	// Service-specific information (internal use)
	ReceiptHandle   string      `json:"receipt_handle"` // Service-specific handle for acknowledgment
	OriginalMessage interface{} `json:"-"`              // Original service-specific message object (not serialized)

	// Operation result
	Success bool   `json:"success"`
	Error   string `json:"error,omitempty"`
}

MessageResult represents a message received from or sent to a queue

func NewMessageResult

func NewMessageResult(messageID, body string) *MessageResult

NewMessageResult creates a new MessageResult with basic information

func (*MessageResult) IsExpired

func (m *MessageResult) IsExpired() bool

IsExpired checks if the message lock has expired

func (*MessageResult) IsScheduled

func (m *MessageResult) IsScheduled() bool

IsScheduled checks if the message is scheduled for future delivery

func (*MessageResult) Struct

func (m *MessageResult) Struct() (starlark.Value, error)

Struct converts MessageResult to a Starlark dict value for compatibility

func (*MessageResult) TimeUntilExpiry

func (m *MessageResult) TimeUntilExpiry() time.Duration

TimeUntilExpiry returns the duration until the message lock expires

func (*MessageResult) TimeUntilScheduled

func (m *MessageResult) TimeUntilScheduled() time.Duration

TimeUntilScheduled returns the duration until the message becomes available

type Module

type Module struct {
	// contains filtered or unexported fields
}

Module wraps the ConfigurableModule with specific functionality for MQ operations

func NewModule

func NewModule() *Module

NewModule creates a new instance of Module with default configurations

func (*Module) LoadModule

func (m *Module) LoadModule() starlet.ModuleLoader

LoadModule returns the Starlark module loader with MQ-specific functions

type Queue

type Queue struct {
	// Basic queue information
	Name        string `json:"name"`
	ServiceType string `json:"service_type"` // Which service backs this queue
	URL         string `json:"url"`          // Service-specific queue URL/identifier

	// Queue statistics
	MessageCount int `json:"message_count"`

	// Queue configuration (unified across services)
	LockDuration    int `json:"lock_duration"`    // Message lock duration in seconds
	RetentionPeriod int `json:"retention_period"` // Message retention in seconds

	// Dead letter queue configuration
	MaxDeliveryCount int               `json:"max_delivery_count"`
	DeadLetterConfig *DeadLetterConfig `json:"dead_letter_config"`

	// Message ordering and deduplication
	EnableSessions     bool                `json:"enable_sessions"`     // Ordered processing support
	DuplicateDetection *DuplicateDetection `json:"duplicate_detection"` // Deduplication configuration

	// Queue limits
	MaxQueueSize int64 `json:"max_queue_size"` // Queue size in bytes (-1 for unlimited)

	// Timestamps
	CreatedTime  time.Time `json:"created_time"`
	ModifiedTime time.Time `json:"modified_time"`
}

Queue represents a message queue with unified properties across services

func NewQueue

func NewQueue(name, serviceType string) *Queue

NewQueue creates a new Queue with default values

func (*Queue) Copy

func (q *Queue) Copy() *Queue

Copy creates a copy of the Queue

func (*Queue) FromStarlark

func (q *Queue) FromStarlark(val starlark.Value) error

FromStarlark populates Queue from a Starlark value

func (*Queue) GetLockDuration

func (q *Queue) GetLockDuration() time.Duration

GetLockDuration returns the lock duration as a time.Duration

func (*Queue) GetRetentionPeriod

func (q *Queue) GetRetentionPeriod() time.Duration

GetRetentionPeriod returns the retention period as a time.Duration

func (*Queue) IsDeadLetterEnabled

func (q *Queue) IsDeadLetterEnabled() bool

IsDeadLetterEnabled checks if dead letter queue is enabled

func (*Queue) IsDuplicateDetectionEnabled

func (q *Queue) IsDuplicateDetectionEnabled() bool

IsDuplicateDetectionEnabled checks if duplicate detection is enabled

func (*Queue) Struct

func (q *Queue) Struct() (starlark.Value, error)

Struct converts Queue to a Starlark dict value for compatibility

type QueueOptions

type QueueOptions struct {
	// Lock duration for messages (unified visibility timeout/lock duration)
	LockDuration int // Message lock duration in seconds

	// Message retention settings
	RetentionPeriod int // Message retention period in seconds

	// Dead letter queue configuration
	MaxDeliveryCount int               // Maximum delivery attempts before moving to DLQ
	DeadLetterConfig *DeadLetterConfig // Dead letter queue configuration

	// Message ordering and deduplication
	EnableSessions      bool // Enable sessions for message ordering (Azure) or FIFO (AWS)
	DuplicateDetection  bool // Enable duplicate message detection
	DuplicateWindowSecs int  // Duplicate detection window in seconds

	// Queue size limits
	MaxQueueSize int64 // Maximum queue size in bytes (-1 for unlimited)
}

QueueOptions contains options for creating or configuring a queue

type ReceiveOptions

type ReceiveOptions struct {
	// Receive behavior
	MaxCount int  // Maximum number of messages to receive
	WaitTime int  // Long polling wait time in seconds
	PeekOnly bool // Peek messages without receiving them

	// Message lock settings
	LockDuration *int // Override default lock duration
}

ReceiveOptions contains options for receiving messages

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL