subscriber

package module
v0.7.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2026 License: MIT Imports: 45 Imported by: 0

README

mqsubscriber

A daemon that subscribes to message queues (SAKURA Cloud SimpleMQ or RabbitMQ), dispatches messages to external commands based on header matching, and publishes results back to a response queue.

Designed to work with mqbridge for bridging on-premises RabbitMQ messaging to the cloud.

Architecture

SimpleMQ backend (poll-based):

┌────────────────────────────────────┐
│            SimpleMQ                │
│  [request queue]  [response queue] │
└────────┬──────────────────▲────────┘
         │                  │
         ▼                  │
┌────────────────────────────────────┐
│          mqsubscriber              │
│  poll → match → execute → publish  │
└────────────────┬───────────────────┘
                 │
                 ▼
          ┌──────────────┐
          │   command    │
          │  stdin: body │
          │  env: headers│
          │  stdout: resp│
          └──────────────┘

RabbitMQ backend (push-based):

┌────────────────────────────────────┐
│            RabbitMQ                │
│  [request queue]  [response queue] │
└────────┬──────────────────▲────────┘
         │ consume          │ publish
         ▼                  │
┌────────────────────────────────────┐
│          mqsubscriber              │
│  recv → match → execute → publish  │
└────────────────┬───────────────────┘
                 │
                 ▼
          ┌──────────────┐
          │   command    │
          │  stdin: body │
          │  env: headers│
          │  stdout: resp│
          └──────────────┘

With mqbridge (SimpleMQ + RabbitMQ bridging):

           On-premises                Cloud (SAKURA Cloud)
        ┌──────────────┐
        │   RabbitMQ   │
        └──────┬───────┘
  Request      │  ▲      Response
               ▼  │
        ┌──────────────┐
        │   mqbridge   │
        └──────┬───────┘
               │  ▲
    ═══════════╪══╪═══════════════════════════════
               │  │
               ▼  │
        ┌──────────────────────────────┐
        │  SimpleMQ + mqsubscriber     │
        └──────────────────────────────┘

Message Format

SimpleMQ backend

mqsubscriber uses the same wire format as mqbridge (mqbridge.Message). Messages on SimpleMQ are base64-encoded JSON with the following structure:

{
  "headers": {
    "rabbitmq.exchange": "my-exchange",
    "rabbitmq.routing_key": "my.routing.key",
    "rabbitmq.header.x-custom": "value"
  },
  "body": "message body text",
  "body_encoding": "base64"
}
  • headers: Key-value metadata. When originating from RabbitMQ via mqbridge, headers are prefixed with rabbitmq. (e.g., rabbitmq.exchange, rabbitmq.routing_key, rabbitmq.correlation_id, rabbitmq.header.* for custom AMQP headers)
  • body: The message payload. Plain string if valid UTF-8, or base64-encoded for binary data
  • body_encoding: Set to "base64" when the body is base64-encoded (binary-safe). Omitted for plain text
RabbitMQ backend

Messages are native AMQP deliveries. AMQP metadata (exchange, routing key, reply-to, etc.) and custom headers are mapped to rabbitmq.* headers internally, providing the same handler interface regardless of backend.

Message flow detail
  1. Receive: The backend delivers a message → mqsubscriber parses it into headers + body
  2. Dispatch: The headers are used for handler matching (e.g., match on rabbitmq.routing_key)
  3. Execute: body is passed to the command's stdin. headers are available as MQ_HEADER_* environment variables
  4. Respond: Command stdout becomes the new body. If rabbitmq.reply_to is present, the response is routed to the reply queue via the default exchange (RPC pattern). Otherwise, the response is sent to the configured response queue
  5. Publish: The response is published via the same backend

Installation

Homebrew
brew install fujiwara/tap/mqsubscriber
Binary releases

Download the latest binary from GitHub Releases.

Go install
go install github.com/fujiwara/mqsubscriber/cmd/mqsubscriber@latest

Usage

# Run the subscriber daemon
mqsubscriber run -c config.jsonnet

# Validate configuration
mqsubscriber validate -c config.jsonnet

# Render configuration as JSON
mqsubscriber render -c config.jsonnet

# Publish a message to the request queue
mqsubscriber publish -c config.jsonnet --body 'hello' -H rabbitmq.routing_key=upper
Options
  • -c, --config (required): Config file path (Jsonnet/JSON). Env: MQSUBSCRIBER_CONFIG
  • -e, --envfile: Environment file to load. Variables defined in this file are exported to the process environment before config evaluation. Env: MQSUBSCRIBER_ENVFILE
  • --log-format: Log format (text or json, default: text). Env: MQSUBSCRIBER_LOG_FORMAT
  • --log-level: Log level (debug, info, warn, error, default: info). Env: MQSUBSCRIBER_LOG_LEVEL

Configuration

Configuration is written in Jsonnet (plain JSON is also supported). Jsonnet evaluation is powered by jsonnet-armed, which provides built-in functions for environment variables, hashing, and more. See the jsonnet-armed README for the full list of available functions.

Only one of simplemq or rabbitmq can be configured per process.

SimpleMQ backend
{
  simplemq: {
    api_url: "",  // optional, uses default SimpleMQ API URL
    timeout: "30s",  // optional, default: 30s — timeout for queue operations
  },
  request: {
    queue: "request-queue",
    api_key: must_env("REQUEST_API_KEY"),
    polling_interval: "1s",  // optional, default: 1s
  },
  // response queue is optional — required only when any handler has response: true
  response: {
    queue: "response-queue",
    api_key: must_env("RESPONSE_API_KEY"),
  },
  drop_unmatched: false,  // optional, default: false — when true, ack (delete) messages that match no handler
  handlers: [
    // ... (see Handler Configuration below)
  ],
}
RabbitMQ backend
{
  rabbitmq: {
    url: must_env("AMQP_URL"),  // e.g. "amqp://user:pass@host:5672/"
    timeout: "30s",  // optional, default: 30s — timeout for dial/publish/ack/nack
  },
  request: {
    queue: "request-queue",
    exchange: "my-exchange",       // optional
    exchange_type: "direct",       // optional, default: "direct"
    routing_key: ["deploy", "notify"],  // optional, default: ["#"]
    exchange_passive: false,       // optional, default: false
  },
  // response is optional — required only when any handler has response: true
  response: {
    queue: "response-queue",
    exchange: "",          // optional, default: "" (default exchange)
    routing_key: "",       // optional, default: response queue name
    reply_to: false,       // optional, default: false (see below)
  },
  drop_unmatched: false,  // optional, default: false — when true, ack (delete) messages that match no handler
  handlers: [
    // ... (see Handler Configuration below)
  ],
}
Handler Configuration
{
  handlers: [
    {
      name: "deploy",
      match: {
        "rabbitmq.routing_key": "deploy",
        "rabbitmq.header.x-env": "production",
      },
      command: ["/usr/local/bin/deploy.sh"],
      env: {              // optional: custom environment variables for this handler
        "DEPLOY_TARGET": "production",
      },
      timeout: "60s",     // optional, default: 30s
      blocking: true,     // wait for completion before processing next message
      response: true,     // send response back (requires response queue or reply_to)
      response_ignore: {  // optional: suppress response for specific exit code
        exit_code: 99,    // if command exits with 99, no response is sent
      },
    },
    {
      name: "notify",
      match: {
        "rabbitmq.routing_key": "notify",
      },
      command: ["/usr/local/bin/notify.sh"],
      timeout: "10s",
      blocking: false,       // run in background goroutine
      max_concurrency: 5,    // max concurrent executions (default: 1)
      reject_on_full: false, // optional: when true, reject (no wait) at max_concurrency — disposition follows drop_unmatched
      // response defaults to false (fire-and-forget)
      log_message: "processing notification",  // optional: custom log message per handler
      log_header_fields: ["rabbitmq.routing_key"],  // optional: header keys to include in log
      log_body_fields: ["notification_id", "channel"],  // optional: JSON body fields to include in log
    },
  ],
}
Custom Handler Logging

Each handler can emit a custom log message when it starts processing a message, with selected fields extracted from message headers and/or the JSON body.

  • log_message (string): Custom message to log at Info level. If not set, no custom log is emitted.
  • log_header_fields ([]string): List of message header keys to include in the log as header.<key> attributes. Missing headers are silently skipped.
  • log_body_fields ([]string): List of top-level JSON field names to extract from the message body and include in the log as body.<field> attributes. Only parsed when log_body_fields is set. If the body is not valid JSON, a warning is logged. Missing fields are silently skipped.

Example log output:

INFO processing notification  handler=notify message_id=abc123 header.rabbitmq.routing_key=notify body.notification_id=N-001 body.channel=slack
Handler Matching
  • match defines header key-value pairs that must all match (AND condition)
  • By default, values must match exactly
  • Set match_pattern: true to enable AMQP topic-style pattern matching on all match values:
    • * matches exactly one dot-delimited word (e.g., order.* matches order.created but not order.created.v2)
    • # matches zero or more dot-delimited words (e.g., order.# matches order, order.created, and order.created.v2)
    • Values without * or # still match exactly, so you can mix patterns and literal values
  • Handlers are evaluated in order; the first match wins
  • Messages that match no handler are nacked by default (SimpleMQ: redelivered after visibility timeout; RabbitMQ: nack without requeue, routed to dead-letter exchange if configured). Set drop_unmatched: true to ack (delete) them instead

Example with pattern matching:

{
  handlers: [
    {
      name: "order-handler",
      match: {
        "rabbitmq.routing_key": "order.*",   // matches order.created, order.updated, etc.
        "rabbitmq.header.x-env": "production",  // exact match
      },
      match_pattern: true,
      command: ["/usr/local/bin/handle-order.sh"],
    },
  ],
}
Blocking vs Non-blocking
  • blocking: true — The subscriber waits for the command to complete before processing the next message
  • blocking: false — The command runs in a goroutine. The subscriber immediately proceeds to the next message. When max_concurrency is reached, the subscriber blocks until a slot is available
  • reject_on_full: true (non-blocking only) — Instead of waiting when max_concurrency is reached, the message is immediately rejected without invoking the command. Disposition follows the top-level drop_unmatched: drop_unmatched: true acks (drops) the message; drop_unmatched: false nacks it (SimpleMQ: redelivered after visibility timeout; RabbitMQ: nack without requeue). Useful when waiting would build unwanted backpressure on the receive loop
Command Execution
  • Message body is passed via stdin
  • Environment variables are inherited from the mqsubscriber process, with the following additions (later entries override earlier ones):
    1. Parent process environment — all environment variables from the mqsubscriber process
    2. Handler env — per-handler custom environment variables (see Handler Configuration)
    3. Message headers — passed as MQ_HEADER_* variables (dots and hyphens are converted to underscores, uppercased). e.g., rabbitmq.routing_keyMQ_HEADER_RABBITMQ_ROUTING_KEY
  • Command stdout becomes the response message body
  • Command stderr is logged
  • If the command times out, SIGTERM is sent to the entire process group (including child processes) to allow graceful shutdown. If the process does not exit within 30 seconds, it is forcibly killed (SIGKILL)
  • If the command fails (non-zero exit) and response is disabled, the message is nacked (SimpleMQ: not deleted, redelivered after visibility timeout; RabbitMQ: nack without requeue, routed to dead-letter exchange if configured). If circuit_breaker is configured, the message is dropped (acked) after reaching the error threshold (see Circuit Breaker)
Circuit Breaker

When a fire-and-forget handler (response: false) encounters repeated errors for the same message, the message is normally nacked and redelivered indefinitely. The circuit_breaker option prevents this by dropping (acking) the message after a configured number of failures.

{
  name: "process-task",
  match: { "type": "task" },
  command: ["./process.sh"],
  circuit_breaker: {
    max_errors: 5,    // drop the message after 5 failures
    ttl: "10m",       // reset error count after 10 minutes (default: 10m)
  },
}
Field Type Default Description
max_errors int (required) Number of errors before dropping the message
ttl string "10m" Duration after which the error count resets

Message identity across redeliveries:

  • SimpleMQ: Uses the message ID (stable across redeliveries)
  • RabbitMQ: Uses the rabbitmq.message_id header (mapped from AMQP MessageId property). Publishers must set MessageId for the circuit breaker to work; without it, each redelivery gets a new delivery tag and the circuit breaker cannot track the message

Error counts are stored in-memory per handler (up to 1024 tracked messages, LRU eviction). Counts reset on process restart.

circuit_breaker cannot be used with response: true (response mode already acks on error).

Response Publishing

Response messages are published with retry (3 attempts, exponential backoff: 1s then 2s between attempts, capped at 4s). If all retries are exhausted, the request message is still acknowledged to prevent command re-execution on redelivery.

Response Chain Guard

When a response message is published, mqsubscriber sets the mqsubscriber.responded header with an incrementing counter (starting at 1). On receiving a message, if this counter reaches the configured max_response_chain limit, the message is logged as a warning and dropped (acked without processing). This prevents infinite loops when a response is accidentally routed back to the request queue.

The max_response_chain setting is a top-level config option:

{
  // ...
  max_response_chain: 0,  // default: 0 (no chaining allowed)
  handlers: [ /* ... */ ],
}
Value Behavior
0 (default) Responses that arrive back as requests are dropped
1 One chain hop is allowed (response → re-process → drop)
N Up to N chain hops are allowed
Response Status Headers

Response messages include status headers to indicate success or failure:

Header Description
x-status success or error
x-exit-code Exit code (only set on error, e.g. 1)

When the message originates from RabbitMQ (has rabbitmq.exchange header), these headers use the rabbitmq.header. prefix (e.g. rabbitmq.header.x-status) so they are mapped to AMQP headers.

Error handling by response setting:

  • response: true: On command failure, an error response is sent with x-status: error, the last 4KB of stderr as the body, and the message is acknowledged. This ensures the caller is not left waiting indefinitely.
  • response: false (default): On command failure, no response is sent and the message is nacked for redelivery. No response queue is needed.
Suppressing Response (response_ignore)

When response: true is set, you can selectively suppress the response based on the command's exit code using response_ignore. This is useful when running multiple subscriber instances where only one should respond (e.g., the command acquires a lock and only the winner responds).

{
  name: "my-handler",
  response: true,
  response_ignore: {
    exit_code: 99,  // suppress response when command exits with 99
  },
  // ...
}

When the command exits with the specified exit_code:

  • No response message is published
  • The request message is acknowledged (the command already ran)
  • The event is logged at Info level

For any other exit code, the normal behavior applies (success response for exit 0, error response for other non-zero codes).

Response reply_to Mode (RabbitMQ)

When using the RabbitMQ backend with the RPC pattern, you can set response.reply_to: true instead of specifying a response.queue. In this mode, responses are routed exclusively via the rabbitmq.reply_to header from each incoming message — no static response queue is needed.

{
  rabbitmq: { url: must_env("AMQP_URL") },
  request: { queue: "request-queue" },
  response: { reply_to: true },
  handlers: [
    { name: "rpc", match: { /* ... */ }, command: ["./handler.sh"], response: true },
  ],
}

response.reply_to and response.queue cannot both be set — they are mutually exclusive.

RPC Response Routing

When a message contains a rabbitmq.reply_to header (set by RabbitMQ RPC clients), the response is automatically routed to the reply queue:

  • rabbitmq.exchange is set to "" (default exchange)
  • rabbitmq.routing_key is set to the rabbitmq.reply_to value
  • rabbitmq.reply_to is removed from the response headers
  • rabbitmq.correlation_id is preserved as-is

This enables the standard RabbitMQ RPC pattern: the response is delivered directly to the caller's exclusive reply queue via the default exchange.

If rabbitmq.reply_to is not present, the request routing headers (rabbitmq.exchange, rabbitmq.routing_key) are removed from the response so that the publisher uses the configured response queue instead.

Jsonnet Built-in Functions

The following functions are available in config files via jsonnet-armed:

  • must_env("VAR") — Read environment variable (error if not set)
  • env("VAR", "default") — Read environment variable with default
  • secret("vault-id", "name") — Read from SAKURA Cloud Secret Manager
  • sha256(str), md5(str) — Hash functions
  • See jsonnet-armed README for more

Observability

OpenTelemetry metrics and traces are automatically enabled when OTEL_EXPORTER_OTLP_ENDPOINT is set.

Traces

Distributed tracing is supported via W3C Trace Context propagation through message headers.

Trace context propagation:

  • On receive: extracts traceparent/tracestate from message headers (falls back to rabbitmq.header.traceparent for mqbridge compatibility)
  • On response: injects traceparent/tracestate into response message headers
  • On command execution: sets TRACEPARENT/TRACESTATE environment variables for child processes (W3C standard)
  • On publish subcommand: reads TRACEPARENT/TRACESTATE environment variables to continue the trace from the parent process

Spans:

Span Description Key Attributes
mqsubscriber.handle_message Per-message processing handler, message_id, blocking, request.header.*
mqsubscriber.execute Command execution handler, command, command.timed_out, exit_code
mqsubscriber.publish Response publish queue, response.header.*
publish Publish subcommand messaging.destination.name, messaging.message.body.size

Errors (command failure, publish failure) are recorded on spans with Error status.

Metrics
Metric Type Description Attributes
mqsubscriber.messages.received Counter Messages received from request queue
mqsubscriber.messages.processed Counter Messages successfully processed handler
mqsubscriber.messages.errors Counter Message processing errors handler
mqsubscriber.messages.dropped Counter Messages dropped/acked with no matching handler (drop_unmatched: true)
mqsubscriber.messages.unmatched Counter Messages nacked with no matching handler (drop_unmatched: false)
mqsubscriber.messages.circuit_broken Counter Messages dropped by circuit breaker after repeated failures handler
mqsubscriber.messages.rejected Counter Messages rejected because handler reached max_concurrency (reject_on_full: true) handler
mqsubscriber.command.duration Histogram Command execution duration (seconds) handler
mqsubscriber.command.timeouts Counter Command execution timeouts handler
mqsubscriber.log.messages Counter Number of log messages by level level

Publish Subcommand

The publish subcommand sends a message to the request or response queue. This is useful for debugging handlers, testing configurations, and self-invoking commands.

mqsubscriber publish -c config.jsonnet [flags]
Flags
Flag Description
-H key=value Message header (repeatable)
--body <string> Message body as a string
--body-file <path> Read message body from a file
--request Publish to the request queue (default)
--response Publish to the response queue
(stdin) If neither --body nor --body-file is given, body is read from stdin

--body and --body-file are mutually exclusive. --request and --response are mutually exclusive.

Retry

The publish subcommand opens a fresh connection to the backend on every invocation, so transient dial failures (TCP timeouts, intermediate network devices closing idle paths, etc.) are the common failure mode. Publishes are therefore retried up to 3 times with exponential backoff (1s then 2s between attempts) — the same policy used for response publishing. If all attempts fail the command exits with a non-zero status. Interrupting the command (e.g. Ctrl+C) during the backoff aborts immediately.

Destination Routing

By default, messages are sent to the request queue. Use --response to send to the response queue instead.

RabbitMQ backend:

By default, messages are sent to request.queue via the default exchange. You can override the destination by setting headers — this uses the same routing logic as response publishing.

Headers provided Destination
(none) Default exchange → request.queue
-H rabbitmq.routing_key=KEY Default exchange → KEY
-H rabbitmq.exchange=EX -H rabbitmq.routing_key=KEY Exchange EX → routing key KEY
Usage Examples
# Send to the request queue (default)
mqsubscriber publish -c config.jsonnet --body 'hello'

# Send to the response queue
mqsubscriber publish -c config.jsonnet --response --body 'hello'

# Send with routing key (matched by handler's match condition)
mqsubscriber publish -c config.jsonnet --body 'hello' \
  -H rabbitmq.routing_key=upper

# Send to a named exchange with a routing key
mqsubscriber publish -c config.jsonnet --body 'hello' \
  -H rabbitmq.exchange=my-exchange -H rabbitmq.routing_key=deploy

# Set custom headers (works for both SimpleMQ and RabbitMQ)
# For RabbitMQ, non-rabbitmq.* headers are sent as AMQP headers
# and received as rabbitmq.header.* (e.g. x-priority → rabbitmq.header.x-priority)
mqsubscriber publish -c config.jsonnet --body 'hello' \
  -H rabbitmq.routing_key=upper -H x-priority=high

# Pipe body from stdin
echo '{"action":"deploy"}' | mqsubscriber publish -c config.jsonnet \
  -H rabbitmq.routing_key=deploy

# Read body from file
mqsubscriber publish -c config.jsonnet \
  --body-file payload.json -H rabbitmq.routing_key=deploy

Examples

LICENSE

MIT

Author

fujiwara

Documentation

Index

Constants

View Source
const (
	// DefaultCircuitBreakerMaxEntries is the maximum number of message keys tracked
	// by a single circuit breaker instance (LRU eviction when exceeded).
	DefaultCircuitBreakerMaxEntries = 1024

	// DefaultCircuitBreakerTTL is the default TTL for circuit breaker entries.
	DefaultCircuitBreakerTTL = 10 * time.Minute
)
View Source
const (
	// DefaultPollingInterval is the default interval for polling the request queue.
	DefaultPollingInterval = time.Second
	// DefaultCommandTimeout is the default timeout for command execution.
	DefaultCommandTimeout = 30 * time.Second
	// DefaultMaxConcurrency is the default max concurrency for non-blocking handlers.
	DefaultMaxConcurrency = 1
	// DefaultMaxResponseChain is the default maximum number of allowed response chain hops.
	// A value of 0 means responses that arrive back as requests are dropped (no chaining).
	DefaultMaxResponseChain = 0
	// DefaultQueueTimeout is the default timeout for queue operations (publish, ack, nack, receive).
	DefaultQueueTimeout = 30 * time.Second
)
View Source
const (
	BackendSimpleMQ = "simplemq"
	BackendRabbitMQ = "rabbitmq"
)

Backend type constants.

View Source
const HeaderResponseChain = "mqsubscriber.responded"

HeaderResponseChain is the header key used to track response chain depth. When a response is published, this value is incremented. On receive, if the value reaches max_response_chain, the message is dropped to prevent loops.

Variables

View Source
var ErrSlotFull = errors.New("handler at max_concurrency")

ErrSlotFull is returned by Acquire when the handler is at max_concurrency and reject_on_full is enabled. The caller must dispose of the message (ack or nack) instead of waiting.

View Source
var Version = "v0.7.7"

Functions

func RenderConfig

func RenderConfig(ctx context.Context, path string) ([]byte, error)

RenderConfig evaluates a Jsonnet config file and returns the resulting JSON.

func RenderConfigTo

func RenderConfigTo(ctx context.Context, path string, w io.Writer) error

RenderConfigTo evaluates a config file and writes pretty-printed JSON to w.

func Run

func Run(ctx context.Context) error

Run is the main entry point for the subscriber.

func RunCLI

func RunCLI(ctx context.Context) error

RunCLI parses command-line arguments and executes the appropriate subcommand.

Types

type App

type App struct {
	// contains filtered or unexported fields
}

App holds the application state.

func New

func New(ctx context.Context, cfg *Config) (*App, error)

New creates a new App from a config.

func (*App) Run

func (a *App) Run(ctx context.Context) error

Run starts the subscriber loop.

type CLI

type CLI struct {
	EnvFile   string           `kong:"short='e',env='MQSUBSCRIBER_ENVFILE',help='Environment file to load'" `
	Config    string           `kong:"required,short='c',env='MQSUBSCRIBER_CONFIG',help='Config file path (Jsonnet/JSON)'" `
	LogFormat string           `kong:"default='text',enum='text,json',env='MQSUBSCRIBER_LOG_FORMAT',help='Log format (text or json)'" `
	LogLevel  string           `kong:"default='info',enum='debug,info,warn,error',env='MQSUBSCRIBER_LOG_LEVEL',help='Log level (debug, info, warn, error)'" `
	Run       RunCmd           `cmd:"" default:"1" help:"Run the subscriber"`
	Validate  ValidateCmd      `cmd:"" help:"Validate config"`
	Render    RenderCmd        `cmd:"" help:"Render config as JSON to stdout"`
	Publish   PublishCmd       `cmd:"" help:"Publish a message to the request or response queue"`
	Version   kong.VersionFlag `help:"Show version"`
}

CLI defines the command-line interface.

type CircuitBreaker added in v0.7.3

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker tracks per-message error counts and drops messages after a configured threshold is reached. Thread-safe via expirable.LRU (internally synchronized).

func NewCircuitBreaker added in v0.7.3

func NewCircuitBreaker(threshold int, ttl time.Duration) *CircuitBreaker

NewCircuitBreaker creates a CircuitBreaker with the given error threshold and TTL.

func (*CircuitBreaker) Clear added in v0.7.3

func (cb *CircuitBreaker) Clear(key string)

Clear removes the error count for the given key (called on success).

func (*CircuitBreaker) RecordError added in v0.7.3

func (cb *CircuitBreaker) RecordError(key string) bool

RecordError increments the error count for the given key. Returns true if the threshold has been reached (message should be dropped).

type CircuitBreakerConfig added in v0.7.3

type CircuitBreakerConfig struct {
	MaxErrors int    `json:"max_errors"`
	TTL       string `json:"ttl"`
}

CircuitBreakerConfig defines circuit breaker settings for a handler. When a message causes repeated errors, it is dropped (acked) after max_errors failures.

func (*CircuitBreakerConfig) GetTTL added in v0.7.3

func (c *CircuitBreakerConfig) GetTTL() time.Duration

GetTTL returns the TTL as a time.Duration.

type CommandResult

type CommandResult struct {
	Stdout   []byte
	Stderr   []byte
	ExitCode int
	Err      error // non-nil if command failed (non-zero exit or execution error)
	Elapsed  time.Duration
	TimedOut bool // true if the command was terminated due to timeout
}

CommandResult holds the output of a command execution.

type Config

type Config struct {
	SimpleMQ *SimpleMQConfig
	RabbitMQ *RabbitMQConfig

	RequestQueue  string
	ResponseQueue string

	SMQRequest  *SMQRequestConfig
	SMQResponse *SMQResponseConfig
	RMQRequest  *RMQRequestConfig
	RMQResponse *RMQResponseConfig

	Handlers         []HandlerConfig
	MaxResponseChain int
	DropUnmatched    bool
}

Config is the resolved top-level configuration. Backend-specific fields are in SMQ*/RMQ* structs (exactly one pair is non-nil).

func LoadConfig

func LoadConfig(ctx context.Context, path string) (*Config, error)

LoadConfig loads and parses a configuration file (Jsonnet or JSON).

func (*Config) BackendType

func (c *Config) BackendType() string

BackendType returns the MQ backend type based on configuration.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for correctness.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler matches messages by headers and executes a command.

func NewHandler

func NewHandler(cfg HandlerConfig, logger *slog.Logger, m *Metrics) (*Handler, error)

NewHandler creates a Handler from config.

func (*Handler) Acquire

func (h *Handler) Acquire(ctx context.Context) error

Acquire acquires a semaphore slot for non-blocking handlers. Blocks until a slot is free, unless reject_on_full is enabled — in which case it returns ErrSlotFull immediately when the semaphore is full.

func (*Handler) Execute

func (h *Handler) Execute(ctx context.Context, msg *mqbridge.Message) *CommandResult

Execute runs the command with the message body as stdin and returns the result.

func (*Handler) Match

func (h *Handler) Match(msg *mqbridge.Message) bool

Match returns true if all match conditions are satisfied by the message headers. When match_pattern is enabled, values are matched using AMQP topic-style patterns (* matches one dot-delimited word, # matches zero or more words). Otherwise, values must match exactly.

func (*Handler) Release

func (h *Handler) Release()

Release releases a semaphore slot for non-blocking handlers.

type HandlerConfig

type HandlerConfig struct {
	Name            string                `json:"name"`
	Match           map[string]string     `json:"match"`
	MatchPattern    bool                  `json:"match_pattern"`
	Command         []string              `json:"command"`
	Timeout         string                `json:"timeout"`
	Blocking        bool                  `json:"blocking"`
	MaxConcurrency  int                   `json:"max_concurrency"`
	RejectOnFull    bool                  `json:"reject_on_full"`
	Response        bool                  `json:"response"`
	ResponseIgnore  *ResponseIgnoreConfig `json:"response_ignore"`
	CircuitBreaker  *CircuitBreakerConfig `json:"circuit_breaker"`
	Env             map[string]string     `json:"env"`
	LogMessage      string                `json:"log_message"`
	LogHeaderFields []string              `json:"log_header_fields"`
	LogBodyFields   []string              `json:"log_body_fields"`
}

HandlerConfig defines a handler that matches messages and executes a command.

func (*HandlerConfig) GetMaxConcurrency

func (c *HandlerConfig) GetMaxConcurrency() int

GetMaxConcurrency returns the max concurrency for non-blocking handlers.

func (*HandlerConfig) GetTimeout

func (c *HandlerConfig) GetTimeout() time.Duration

GetTimeout returns the command timeout as a time.Duration.

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds OpenTelemetry metric instruments.

type PublishCmd added in v0.5.0

type PublishCmd struct {
	Header   map[string]string `short:"H" help:"Message header as key=value (repeatable)"`
	Body     string            `help:"Message body string"`
	BodyFile string            `help:"Read message body from file" type:"existingfile"`
	Request  bool              `help:"Publish to the request queue (default)" xor:"queue"`
	Response bool              `help:"Publish to the response queue" xor:"queue"`
}

PublishCmd is the "publish" subcommand.

func (*PublishCmd) Run added in v0.5.0

func (c *PublishCmd) Run(ctx context.Context, globals *CLI) error

type QueueClient

type QueueClient interface {
	// Receive returns a single message from the queue.
	// Returns (nil, nil) when no messages are available.
	Receive(ctx context.Context) (*QueueMessage, error)

	// Publish sends a message to the response queue.
	Publish(ctx context.Context, msg *mqbridge.Message) error

	// Ack acknowledges a message (SimpleMQ: DELETE, RabbitMQ: Ack).
	Ack(ctx context.Context, qmsg *QueueMessage) error

	// Nack negatively acknowledges a message.
	// SimpleMQ: no-op (visibility timeout handles redelivery).
	// RabbitMQ: Nack without requeue (message routed to dead-letter exchange if configured).
	Nack(ctx context.Context, qmsg *QueueMessage) error

	// Close releases resources held by the client.
	Close() error
}

QueueClient abstracts queue operations for different MQ backends.

type QueueMessage

type QueueMessage struct {
	ID      string
	Message *mqbridge.Message
	// contains filtered or unexported fields
}

QueueMessage represents a message received from a queue.

type RMQRequestConfig

type RMQRequestConfig struct {
	Queue           string      `json:"queue"`
	Exchange        string      `json:"exchange"`
	ExchangeType    string      `json:"exchange_type"`
	RoutingKey      RoutingKeys `json:"routing_key"`
	ExchangePassive bool        `json:"exchange_passive"`
}

RMQRequestConfig defines the RabbitMQ request (inbound) queue.

type RMQResponseConfig

type RMQResponseConfig struct {
	Queue      string `json:"queue"`
	ReplyTo    bool   `json:"reply_to"`
	Exchange   string `json:"exchange"`
	RoutingKey string `json:"routing_key"`
}

RMQResponseConfig defines the RabbitMQ response (outbound) queue.

type RabbitMQConfig

type RabbitMQConfig struct {
	URL     string `json:"url"`
	Timeout string `json:"timeout"`
}

RabbitMQConfig holds the global RabbitMQ settings.

func (*RabbitMQConfig) GetTimeout added in v0.5.0

func (c *RabbitMQConfig) GetTimeout() time.Duration

GetTimeout returns the timeout as a time.Duration.

type RabbitMQPublisher

type RabbitMQPublisher struct {
	// contains filtered or unexported fields
}

RabbitMQPublisher implements QueueClient for publishing to RabbitMQ.

func NewRabbitMQPublisher

func NewRabbitMQPublisher(cfg *Config) *RabbitMQPublisher

NewRabbitMQPublisher creates a new RabbitMQPublisher.

func (*RabbitMQPublisher) Ack

Ack is not supported on RabbitMQPublisher.

func (*RabbitMQPublisher) Close

func (p *RabbitMQPublisher) Close() error

Close closes the RabbitMQ publisher connection.

func (*RabbitMQPublisher) Nack

Nack is not supported on RabbitMQPublisher.

func (*RabbitMQPublisher) Publish

func (p *RabbitMQPublisher) Publish(ctx context.Context, msg *mqbridge.Message) error

Publish sends a message to RabbitMQ. Uses rabbitmq.exchange and rabbitmq.routing_key from message headers, or falls back to config-level exchange/routing_key if set.

func (*RabbitMQPublisher) Receive

Receive is not supported on RabbitMQPublisher.

type RabbitMQReceiver

type RabbitMQReceiver struct {
	// contains filtered or unexported fields
}

RabbitMQReceiver implements QueueClient for receiving from RabbitMQ.

func NewRabbitMQReceiver

func NewRabbitMQReceiver(cfg *Config, prefetch int) *RabbitMQReceiver

NewRabbitMQReceiver creates a new RabbitMQReceiver.

func (*RabbitMQReceiver) Ack

Ack acknowledges the message.

func (*RabbitMQReceiver) Close

func (r *RabbitMQReceiver) Close() error

Close closes the RabbitMQ connection.

func (*RabbitMQReceiver) Nack

Nack negatively acknowledges the message without requeue. The message is routed to the dead-letter exchange if configured.

func (*RabbitMQReceiver) Publish

Publish is not supported on RabbitMQReceiver. Use RabbitMQPublisher instead.

func (*RabbitMQReceiver) Receive

func (r *RabbitMQReceiver) Receive(ctx context.Context) (*QueueMessage, error)

Receive returns a single message from the RabbitMQ queue. Blocks until a message is available or context is cancelled.

type RenderCmd

type RenderCmd struct{}

RenderCmd is the "render" subcommand.

func (*RenderCmd) Run

func (c *RenderCmd) Run(ctx context.Context, globals *CLI) error

type ResponseIgnoreConfig

type ResponseIgnoreConfig struct {
	ExitCode *int `json:"exit_code"`
}

ResponseIgnoreConfig defines conditions under which a response is suppressed.

type RoutingKeys added in v0.3.1

type RoutingKeys []string

RoutingKeys is a []string that accepts both a single string and an array of strings in JSON.

func (*RoutingKeys) UnmarshalJSON added in v0.3.1

func (r *RoutingKeys) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom unmarshalling to accept both a string and an array of strings.

type RunCmd

type RunCmd struct{}

RunCmd is the "run" subcommand.

func (*RunCmd) Run

func (c *RunCmd) Run(ctx context.Context, globals *CLI) error

type SMQRequestConfig

type SMQRequestConfig struct {
	Queue           string `json:"queue"`
	APIKey          string `json:"api_key"`
	APIURL          string `json:"api_url"`
	PollingInterval string `json:"polling_interval"`
}

SMQRequestConfig defines the SimpleMQ request (inbound) queue.

func (*SMQRequestConfig) GetPollingInterval

func (c *SMQRequestConfig) GetPollingInterval() time.Duration

GetPollingInterval returns the polling interval as a time.Duration.

type SMQResponseConfig

type SMQResponseConfig struct {
	Queue  string `json:"queue"`
	APIKey string `json:"api_key"`
	APIURL string `json:"api_url"`
}

SMQResponseConfig defines the SimpleMQ response (outbound) queue.

type SimpleMQConfig

type SimpleMQConfig struct {
	APIURL  string `json:"api_url"`
	Timeout string `json:"timeout"`
}

SimpleMQConfig holds the global SimpleMQ settings.

func (*SimpleMQConfig) GetTimeout added in v0.5.0

func (c *SimpleMQConfig) GetTimeout() time.Duration

GetTimeout returns the timeout as a time.Duration.

type SimpleMQPublisher

type SimpleMQPublisher struct {
	// contains filtered or unexported fields
}

SimpleMQPublisher implements QueueClient for publishing to SimpleMQ.

func NewSimpleMQPublisher

func NewSimpleMQPublisher(apiURL, apiKey, queue string, timeout time.Duration) (*SimpleMQPublisher, error)

NewSimpleMQPublisher creates a new SimpleMQPublisher.

func (*SimpleMQPublisher) Ack

Ack is not supported on SimpleMQPublisher.

func (*SimpleMQPublisher) Close

func (p *SimpleMQPublisher) Close() error

Close is a no-op for SimpleMQ (HTTP client, no persistent connection).

func (*SimpleMQPublisher) Nack

Nack is not supported on SimpleMQPublisher.

func (*SimpleMQPublisher) Publish

func (p *SimpleMQPublisher) Publish(ctx context.Context, msg *mqbridge.Message) error

Publish sends a message to the SimpleMQ response queue.

func (*SimpleMQPublisher) Receive

Receive is not supported on SimpleMQPublisher.

type SimpleMQReceiver

type SimpleMQReceiver struct {
	// contains filtered or unexported fields
}

SimpleMQReceiver implements QueueClient for receiving from SimpleMQ.

func NewSimpleMQReceiver

func NewSimpleMQReceiver(apiURL, apiKey, queue string, timeout time.Duration) (*SimpleMQReceiver, error)

NewSimpleMQReceiver creates a new SimpleMQReceiver.

func (*SimpleMQReceiver) Ack

func (r *SimpleMQReceiver) Ack(ctx context.Context, qmsg *QueueMessage) error

Ack deletes the message from the SimpleMQ queue.

func (*SimpleMQReceiver) Close

func (r *SimpleMQReceiver) Close() error

Close is a no-op for SimpleMQ (HTTP client, no persistent connection).

func (*SimpleMQReceiver) Nack

Nack is a no-op for SimpleMQ (messages are redelivered after visibility timeout).

func (*SimpleMQReceiver) Publish

Publish is not supported on SimpleMQReceiver. Use SimpleMQPublisher instead.

func (*SimpleMQReceiver) Receive

func (r *SimpleMQReceiver) Receive(ctx context.Context) (*QueueMessage, error)

Receive returns a single message from the queue. Internally buffers multiple messages from a single poll and returns them one at a time.

type ValidateCmd

type ValidateCmd struct{}

ValidateCmd is the "validate" subcommand.

func (*ValidateCmd) Run

func (c *ValidateCmd) Run(ctx context.Context, globals *CLI) error

Directories

Path Synopsis
cmd
mqsubscriber command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL