mqbridge

package module
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: MIT Imports: 36 Imported by: 2

README

mqbridge

A message bridge between RabbitMQ and SimpleMQ (Sakura Cloud). Define multiple forwarding rules (bridges) in a configuration file and run them concurrently.

Table of Contents

Overview

                         mqbridge
              ┌─────────────────────────┐
              │                         │
              │   ┌───────────────┐     │
 RabbitMQ ────┼──►│  Bridge (1:N) │──┬──┼────► SimpleMQ
   queue      │   └───────────────┘  │  │       queue
              │                      └──┼────► SimpleMQ
              │                         │       queue
              │   ┌───────────────┐     │
 SimpleMQ ────┼──►│  Bridge (1:1) │────-┼────► RabbitMQ
   queue      │   └───────────────┘     │    exchange/routing_key
              │                         │
              │   ┌───────────────┐     │
 SimpleMQ ────┼──►│  Bridge (1:N) │──┬──┼────► SimpleMQ
   queue      │   └───────────────┘  │  │       queue
              │                      └──┼────► SimpleMQ
              │                         │       queue
              │          ...            │
              └─────────────────────────┘
                    (concurrent bridges)

Each bridge has one subscriber (source) and one or more publishers (destinations). Multiple bridges run concurrently within a single mqbridge process.

Features

  • RabbitMQ → SimpleMQ: Consume from a RabbitMQ queue and forward messages to one or more SimpleMQ queues (fan-out).
  • SimpleMQ → RabbitMQ: Poll a SimpleMQ queue and publish messages to RabbitMQ with exchange/routing key determined by message content.
  • SimpleMQ → SimpleMQ: Poll a SimpleMQ queue and forward messages to one or more SimpleMQ queues (fan-out). Useful as a fan-out mechanism since SimpleMQ has no native fan-out support.
  • RabbitMQ → RabbitMQ: Not supported. Use RabbitMQ's built-in features such as exchange bindings or the shovel plugin instead.
  • Automatic reconnection: RabbitMQ subscriber and publisher automatically reconnect with exponential backoff (1s–30s) on connection loss.
  • Graceful shutdown: On SIGTERM/SIGINT, waits for in-flight messages to complete before exiting.
  • Named bridges: Optional name field per bridge for readable log output.
  • OpenTelemetry metrics and tracing: Built-in metrics (received, published, errors, duration) and distributed tracing auto-enabled via OTEL_EXPORTER_OTLP_ENDPOINT. Trace context is propagated through messages using W3C Trace Context (traceparent header).
  • Structured logging with trace correlation: Text (colored) or JSON format with configurable log level. When tracing is active, trace_id and span_id are automatically included in log output.
  • Jsonnet configuration: Use jsonnet-armed for configuration with environment variable support (env(), must_env()).
  • Secret Manager integration: Retrieve credentials from Sakura Cloud Secret Manager using secret() native function in Jsonnet.

Message Delivery

mqbridge provides at-least-once delivery semantics. Messages are acknowledged to the source only after all destinations have been published to successfully.

When a bridge has multiple destinations (fan-out), publishes are performed sequentially. If a publish to one destination fails, the remaining destinations are skipped, and the message is returned to the source queue for redelivery. This means destinations that were already published to before the failure will receive duplicate messages on retry. Consumers should handle messages idempotently.

Installation

Homebrew
$ brew install fujiwara/tap/mqbridge
Binary releases

Download the latest binary from GitHub Releases.

Go install
$ go install github.com/fujiwara/mqbridge/cmd/mqbridge@latest

Usage

Usage: mqbridge <command> [flags]

Commands:
  run         Run the bridge
  validate    Validate config (unknown fields cause error)
  render      Render config as JSON to stdout

Flags:
  --config, -c    Config file path (Jsonnet/JSON) (required) [$MQBRIDGE_CONFIG]
  --log-format    Log format: text (default, colored with source) or json [$MQBRIDGE_LOG_FORMAT]
  --log-level     Log level: debug, info (default), warn, error [$MQBRIDGE_LOG_LEVEL]
  --version       Show version
  --help          Show help

Configuration

Configuration is written in Jsonnet (plain JSON is also accepted).

local must_env = std.native('must_env');
{
  rabbitmq: {
    // AMQP URI: amqp://user:pass@host:port/vhost
    // See https://www.rabbitmq.com/docs/uri-spec
    url: must_env('RABBITMQ_URL'),
  },
  simplemq: {
    api_url: 'http://localhost:18080',  // optional, default: official endpoint
  },
  bridges: [
    {
      name: 'rmq-to-smq',  // optional, used in log output (defaults to bridge index)
      // RabbitMQ → SimpleMQ (fan-out)
      from: {
        rabbitmq: {
          queue: 'source-queue',
          exchange: 'source-exchange',
          exchange_type: 'topic',     // direct, fanout, topic, headers
          routing_key: '#',
          exchange_passive: false,    // true to verify exchange exists without declaring
        },
      },
      to: [
        { simplemq: { queue: 'dest-queue-1', api_key: must_env('SIMPLEMQ_API_KEY_1') } },
        { simplemq: { queue: 'dest-queue-2', api_key: must_env('SIMPLEMQ_API_KEY_2') } },
      ],
    },
    {
      name: 'smq-to-rmq',
      // SimpleMQ → RabbitMQ (routing by message content)
      from: {
        simplemq: {
          queue: 'inbound-queue',
          api_key: must_env('SIMPLEMQ_API_KEY_INBOUND'),
          polling_interval: '1s',  // default: 1s
        },
      },
      to: [
        { rabbitmq: {} },  // destination determined by message JSON
      ],
    },
    {
      name: 'smq-to-smq',
      // SimpleMQ → SimpleMQ (fan-out)
      from: {
        simplemq: {
          queue: 'source-queue',
          api_key: must_env('SIMPLEMQ_API_KEY_SOURCE'),
          polling_interval: '1s',
        },
      },
      to: [
        { simplemq: { queue: 'fanout-queue-1', api_key: must_env('SIMPLEMQ_API_KEY_FANOUT_1') } },
        { simplemq: { queue: 'fanout-queue-2', api_key: must_env('SIMPLEMQ_API_KEY_FANOUT_2') } },
      ],
    },
  ],
}
Per-bridge URL Override

By default, all bridges share the global rabbitmq.url and simplemq.api_url settings. You can override these per bridge by specifying url or api_url directly in each bridge's from / to block. If a per-bridge URL is set, it takes precedence over the global setting.

{
  rabbitmq: {
    url: 'amqp://default-host:5672/',  // global default
  },
  bridges: [
    {
      name: 'bridge-to-other-host',
      from: {
        rabbitmq: {
          url: 'amqp://other-host:5672/',  // overrides global
          queue: 'source-queue',
          exchange: 'source-exchange',
        },
      },
      to: [
        { simplemq: { queue: 'dest-queue', api_key: 'key' } },
      ],
    },
    {
      name: 'bridge-using-global',
      from: {
        rabbitmq: {
          // no url: inherits amqp://default-host:5672/ from global
          queue: 'another-queue',
          exchange: 'another-exchange',
        },
      },
      to: [
        { simplemq: { queue: 'dest-queue-2', api_key: 'key2' } },
      ],
    },
  ],
}
Configuration Notes
  • rabbitmq.url is required for each bridge that uses RabbitMQ (either from global setting or per-bridge override).
  • RabbitMQ source requires SimpleMQ destinations. SimpleMQ source supports both RabbitMQ and SimpleMQ destinations. RabbitMQ → RabbitMQ bridging is not supported; use RabbitMQ's built-in features such as exchange bindings or the shovel plugin instead.
  • exchange_type defaults to direct if omitted.
  • exchange_passive (default: false): When true, the subscriber uses ExchangeDeclarePassive instead of ExchangeDeclare. This verifies the exchange exists without attempting to create it or modify its properties. Useful when the exchange is managed externally and its properties (type, durable, etc.) may differ from what mqbridge would declare, avoiding PRECONDITION_FAILED errors.
  • routing_key accepts a single string or an array of strings for multiple bindings. Defaults to # if omitted. Example: routing_key: ['foo.*', 'bar.*']
  • polling_interval defaults to 1s. Invalid values silently fall back to 1s.
Secret Manager Integration

You can use secret() native function to retrieve credentials from Sakura Cloud Secret Manager. This requires SAKURA_ACCESS_TOKEN and SAKURA_ACCESS_TOKEN_SECRET environment variables.

local secret = std.native('secret');
secret('vault-id', 'name')      // latest version
secret('vault-id', 'name:1')    // specific version
local must_env = std.native('must_env');
local secret = std.native('secret');
{
  rabbitmq: {
    url: must_env('RABBITMQ_URL'),
  },
  bridges: [
    {
      from: {
        rabbitmq: {
          queue: 'source-queue',
          exchange: 'source-exchange',
          exchange_type: 'topic',
          routing_key: '#',
        },
      },
      to: [
        {
          simplemq: {
            queue: 'dest-queue',
            api_key: secret('vault-id-xxx', 'simplemq-api-key'),
          },
        },
      ],
    },
  ],
}
Message Format

mqbridge uses a structured Message type internally, consisting of Body (raw bytes) and Headers (string key-value pairs). This allows metadata to flow through the bridge without loss.

Wire format on SimpleMQ

Messages stored in SimpleMQ are base64-encoded JSON:

{
  "headers": {
    "rabbitmq.exchange": "my-exchange",
    "rabbitmq.routing_key": "my.routing.key",
    "rabbitmq.header.x-custom": "value"
  },
  "body": "message body as UTF-8 string"
}
  • headers is optional. When absent, the message has no metadata.
  • body is the message body. For UTF-8 text, it is stored as a plain string. For binary data (non-UTF-8), it is automatically base64-encoded.
  • body_encoding (auto-set): When body is base64-encoded, this field is set to "base64" so the receiver can decode it. No configuration is needed — encoding is determined automatically based on whether the body is valid UTF-8.

For backward compatibility, if the base64-decoded content is not valid JSON in this format, it is treated as a plain body with no headers.

RabbitMQ → SimpleMQ

When consuming from RabbitMQ, the subscriber captures delivery metadata into Message.Headers with rabbitmq.* prefixed keys:

Header key AMQP field
rabbitmq.exchange Exchange
rabbitmq.routing_key RoutingKey
rabbitmq.reply_to ReplyTo
rabbitmq.correlation_id CorrelationId
rabbitmq.content_type ContentType
rabbitmq.message_id MessageId
rabbitmq.header.<key> Custom AMQP headers

If the original RabbitMQ message does not have a MessageId, mqbridge automatically generates a UUID v4 and assigns it to the rabbitmq.message_id header. This ensures every bridged message has a traceable identifier.

This means RPC-related fields (reply_to, correlation_id) are preserved when forwarding to SimpleMQ, enabling request-reply patterns across the bridge.

SimpleMQ → RabbitMQ

When publishing to RabbitMQ, the publisher reads the destination from Message.Headers:

  • rabbitmq.exchange (required key) — target exchange; may be the empty string ("") to use the AMQP default exchange
  • rabbitmq.routing_key (required key) — routing key; may be empty (e.g. for fanout exchanges)
  • rabbitmq.reply_to, rabbitmq.correlation_id, rabbitmq.content_type, rabbitmq.message_id — mapped to the corresponding AMQP fields
  • rabbitmq.header.<key> — published as custom AMQP headers (prefix stripped)

The AMQP MessageId is determined by the following priority:

  1. rabbitmq.message_id header in the message — used as-is
  2. SimpleMQ message ID — inherited from the source message when consumed from SimpleMQ
  3. UUID v4 — auto-generated as a fallback

This ensures every message published to RabbitMQ has a traceable identifier. When bridging from SimpleMQ, the original SimpleMQ message ID is carried over to RabbitMQ for end-to-end traceability.

External producers sending messages to a SimpleMQ queue for RabbitMQ delivery must use the wire format above. The rabbitmq.exchange and rabbitmq.routing_key header keys must be present; to target the default exchange, set "rabbitmq.exchange": "" in the headers.

SimpleMQ → SimpleMQ

Messages are forwarded as-is (re-serialized in the same wire format). No RabbitMQ-specific headers are added unless the original message already contained them.

Programmatic Message Construction

You can use mqbridge.MarshalMessage() and mqbridge.UnmarshalMessage() to construct or parse messages programmatically. For messages destined for RabbitMQ, use msg.ValidateForRabbitMQ() to verify that the required headers are present before sending.

msg := &mqbridge.Message{
    Body: []byte("hello"),
    Headers: map[string]string{
        mqbridge.HeaderRabbitMQExchange:   "my-exchange",
        mqbridge.HeaderRabbitMQRoutingKey: "my.key",
    },
}
if err := msg.ValidateForRabbitMQ(); err != nil {
    log.Fatal(err) // rabbitmq.exchange or rabbitmq.routing_key is missing
}
data, _ := mqbridge.MarshalMessage(msg)
// send data to SimpleMQ queue (base64-encoded)

Messages that fail validation at the bridge are logged and dropped (not retried), and counted in the mqbridge.messages.dropped metric.

High Availability

You can run multiple mqbridge instances with the same configuration to achieve high availability. Each message is delivered to exactly one instance — no duplication occurs.

               +------------+
               | mqbridge   |---+
               | instance 1 |   |
        +------+            |   |   +-------------+
        |      +------------+   +-->| destination |
 source |                       |   |   queue(s)  |
 queue--+                       +-->|             |
        |      +------------+   |   +-------------+
        +------+ mqbridge   |---+
               | instance 2 |
               +------------+
  • No configuration changes are needed — just run multiple instances with the same config.
  • RabbitMQ source: RabbitMQ distributes messages across consumers using round-robin (competing consumers pattern). Unacknowledged messages are automatically redelivered to the remaining instances on failure.
  • SimpleMQ source: Each message is received by one poller and deleted after successful processing. Other instances will not see the same message.

Observability

mqbridge supports OpenTelemetry metrics and distributed tracing. Both are auto-enabled when the OTEL_EXPORTER_OTLP_ENDPOINT environment variable is set. When not set, they are disabled with zero overhead.

$ OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 mqbridge run -c config.jsonnet

Both HTTP and gRPC protocols are supported. Set OTEL_EXPORTER_OTLP_PROTOCOL to grpc for gRPC transport (default: http/protobuf). All standard OTEL_* environment variables are supported.

Metrics

The following metrics are exported:

Metric Type Description Attributes
mqbridge.messages.received Counter Messages received from subscriber bridge, source_type, source_queue
mqbridge.messages.published Counter Messages published to destination bridge, destination_type, destination_queue
mqbridge.messages.errors Counter Message processing errors (retriable) bridge, source_type, source_queue
mqbridge.messages.dropped Counter Messages dropped due to unrecoverable content errors bridge, source_type, source_queue
mqbridge.message.processing.duration Histogram Time to publish to all destinations (seconds) bridge, source_type, source_queue
mqbridge.log.messages Counter Number of log messages by level level

processing.duration measures only the publish phase (from after the message is received to after all destinations have been published to). It does not include subscriber wait time.

Attribute values are derived from the bridge configuration and message content. bridge is the bridge name (or index if unnamed). source_type / destination_type is rabbitmq or simplemq. source_queue is the source queue name. destination_queue is the SimpleMQ queue name, or the exchange name for RabbitMQ (from each message).

Tracing

mqbridge creates a span (mqbridge.bridge) for each message processed through a bridge. Trace context is propagated using W3C Trace Context format.

Trace context extraction (incoming messages):

  1. Checks for traceparent header in the message
  2. Falls back to rabbitmq.header.traceparent (set when RabbitMQ messages carry trace context in custom AMQP headers)

Trace context injection (outgoing messages):

  • traceparent (and tracestate if present) are injected into message headers before publishing

This is compatible with simplemq-subscriber, which uses the same trace context propagation format.

Log Trace Correlation

When tracing is active, trace_id and span_id from the current span context are automatically added to all log records that have a span in their context. This works with both text and JSON log formats.

LICENSE

MIT

Author

fujiwara

Documentation

Index

Constants

View Source
const (
	HeaderRabbitMQExchange      = "rabbitmq.exchange"
	HeaderRabbitMQRoutingKey    = "rabbitmq.routing_key"
	HeaderRabbitMQReplyTo       = "rabbitmq.reply_to"
	HeaderRabbitMQCorrelationID = "rabbitmq.correlation_id"
	HeaderRabbitMQContentType   = "rabbitmq.content_type"
	HeaderRabbitMQMessageID     = "rabbitmq.message_id"
	// HeaderRabbitMQHeaderPrefix is used for custom AMQP headers.
	// e.g. an AMQP header "x-foo" becomes "rabbitmq.header.x-foo".
	HeaderRabbitMQHeaderPrefix = "rabbitmq.header."
)

Header key constants for RabbitMQ-specific metadata.

Variables

View Source
var Version = "v0.4.3"

Functions

func MarshalMessage added in v0.1.0

func MarshalMessage(msg *Message) ([]byte, error)

MarshalMessage serializes a Message to JSON (for SimpleMQ transport). If the body is valid UTF-8, it is stored as a plain string. Otherwise, it is base64-encoded and body_encoding is set to "base64".

func RenderConfig

func RenderConfig(ctx context.Context, path string) ([]byte, error)

RenderConfig evaluates a Jsonnet config file and returns the resulting JSON.

func RenderConfigTo

func RenderConfigTo(ctx context.Context, path string, w io.Writer) error

RenderConfigTo evaluates a config file and writes pretty-printed JSON to w.

func RunCLI

func RunCLI(ctx context.Context) error

RunCLI parses command-line arguments and executes the appropriate subcommand.

Types

type App

type App struct {
	Config *Config
	// contains filtered or unexported fields
}

App holds the application state.

func New

func New(ctx context.Context, cfg *Config) (*App, error)

New creates a new App from a config. It validates the config (which also applies global defaults to per-bridge settings).

func (*App) Run

func (a *App) Run(ctx context.Context) error

Run starts all bridges concurrently and waits for context cancellation.

type Bridge

type Bridge struct {
	From Subscriber
	To   []Publisher
	// contains filtered or unexported fields
}

Bridge connects one Subscriber to multiple Publishers.

func (*Bridge) Close

func (b *Bridge) Close() error

Close closes all subscribers and publishers.

func (*Bridge) Run

func (b *Bridge) Run(ctx context.Context) error

Run starts the bridge, consuming messages from the subscriber and publishing to all publishers.

type BridgeConfig

type BridgeConfig struct {
	Name string     `json:"name,omitempty"`
	From FromConfig `json:"from"`
	To   []ToConfig `json:"to"`
}

BridgeConfig defines a single bridge rule.

type CLI

type CLI struct {
	Config    string           `kong:"required,short='c',env='MQBRIDGE_CONFIG',help='Config file path (Jsonnet/JSON)'" `
	LogFormat string           `kong:"default='text',enum='text,json',env='MQBRIDGE_LOG_FORMAT',help='Log format (text or json)'" `
	LogLevel  string           `kong:"default='info',enum='debug,info,warn,error',env='MQBRIDGE_LOG_LEVEL',help='Log level (debug, info, warn, error)'" `
	Run       RunCmd           `cmd:"" help:"Run the bridge"`
	Validate  ValidateCmd      `cmd:"" help:"Validate config"`
	Render    RenderCmd        `cmd:"" help:"Render config as JSON to stdout"`
	Version   kong.VersionFlag `help:"Show version"`
}

CLI defines the command-line interface for mqbridge.

type Config

type Config struct {
	RabbitMQ RabbitMQConfig `json:"rabbitmq"`
	SimpleMQ SimpleMQConfig `json:"simplemq"`
	Bridges  []BridgeConfig `json:"bridges"`
}

Config is the top-level configuration for mqbridge.

func LoadConfig

func LoadConfig(ctx context.Context, path string) (*Config, error)

LoadConfig loads and parses a configuration file (Jsonnet or JSON).

func (*Config) Validate

func (c *Config) Validate() error

Validate checks the configuration for correctness.

type FromConfig

type FromConfig struct {
	RabbitMQ *FromRabbitMQConfig `json:"rabbitmq,omitempty"`
	SimpleMQ *FromSimpleMQConfig `json:"simplemq,omitempty"`
}

FromConfig defines the source of a bridge.

type FromRabbitMQConfig

type FromRabbitMQConfig struct {
	RabbitMQConfig              // embedded: url (overrides global)
	Queue           string      `json:"queue"`
	Exchange        string      `json:"exchange"`
	ExchangeType    string      `json:"exchange_type"`
	RoutingKey      RoutingKeys `json:"routing_key"`
	ExchangePassive bool        `json:"exchange_passive"`
}

FromRabbitMQConfig defines a RabbitMQ source.

type FromSimpleMQConfig

type FromSimpleMQConfig struct {
	SimpleMQConfig         // embedded: api_url (overrides global)
	Queue           string `json:"queue"`
	APIKey          string `json:"api_key"`
	PollingInterval string `json:"polling_interval"`
}

FromSimpleMQConfig defines a SimpleMQ source.

func (*FromSimpleMQConfig) GetPollingInterval

func (c *FromSimpleMQConfig) GetPollingInterval() time.Duration

GetPollingInterval returns the polling interval as a time.Duration.

type Message added in v0.1.0

type Message struct {
	Body    []byte
	Headers map[string]string
	// contains filtered or unexported fields
}

Message is the common message type passed between Subscriber and Publisher.

func UnmarshalMessage added in v0.1.0

func UnmarshalMessage(data []byte) *Message

UnmarshalMessage deserializes a Message from JSON. If body_encoding is "base64", the body is decoded from base64. If the data is not valid messageWire JSON or the body field is absent, it falls back to treating the entire data as the message body (backward compatibility).

func (*Message) MessageID added in v0.4.2

func (m *Message) MessageID() string

MessageID returns the message identifier. It prefers the in-memory id (e.g. SimpleMQ message ID), then falls back to the rabbitmq.message_id header. Returns empty string if neither is set.

func (*Message) RabbitMQPublishParams added in v0.1.0

func (m *Message) RabbitMQPublishParams() (exchange, routingKey string, headers map[string]string, err error)

RabbitMQPublishParams extracts RabbitMQ publishing parameters from a Message. Returns exchange, routingKey, and custom AMQP headers. Exchange may be empty (AMQP default exchange). RoutingKey may also be empty (e.g. for fanout exchanges). Both header keys must be present.

func (*Message) ValidateForRabbitMQ added in v0.1.0

func (m *Message) ValidateForRabbitMQ() error

ValidateForRabbitMQ checks that the message has the required headers for publishing to RabbitMQ (rabbitmq.exchange and rabbitmq.routing_key). Both keys must be present but may be empty strings.

type MessageError added in v0.1.0

type MessageError struct {
	Err error
}

MessageError represents an error caused by the message content itself (e.g. missing required headers). Messages that cause this error cannot be processed regardless of retries and should be dropped.

func (*MessageError) Error added in v0.1.0

func (e *MessageError) Error() string

func (*MessageError) Unwrap added in v0.1.0

func (e *MessageError) Unwrap() error

type Metrics added in v0.0.2

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds OpenTelemetry metric instruments for mqbridge.

type PublishResult added in v0.0.2

type PublishResult struct {
	Destination string // destination identifier (e.g. queue name, "exchange/routing_key")
}

PublishResult holds metadata about a published message.

type Publisher

type Publisher interface {
	Publish(ctx context.Context, msg *Message) (*PublishResult, error)
	// Type returns the publisher type name (e.g. "rabbitmq", "simplemq").
	Type() string
	Close() error
}

Publisher sends messages to a destination.

type RabbitMQConfig

type RabbitMQConfig struct {
	URL string `json:"url"`
}

RabbitMQConfig holds the global RabbitMQ connection settings.

type RabbitMQPublisher

type RabbitMQPublisher struct {
	// contains filtered or unexported fields
}

RabbitMQPublisher publishes messages to RabbitMQ. The destination exchange and routing key are determined by the message JSON.

func NewRabbitMQPublisher

func NewRabbitMQPublisher(config ToRabbitMQConfig, logger *slog.Logger) *RabbitMQPublisher

NewRabbitMQPublisher creates a new RabbitMQPublisher.

func (*RabbitMQPublisher) Close

func (p *RabbitMQPublisher) Close() error

Close closes the RabbitMQ connection.

func (*RabbitMQPublisher) Publish

func (p *RabbitMQPublisher) Publish(ctx context.Context, msg *Message) (*PublishResult, error)

Publish sends a Message to RabbitMQ. The destination exchange, routing key, and custom AMQP headers are read from the Message headers.

func (*RabbitMQPublisher) Type added in v0.0.2

func (p *RabbitMQPublisher) Type() string

Type returns the publisher type name.

type RabbitMQSubscriber

type RabbitMQSubscriber struct {
	// contains filtered or unexported fields
}

RabbitMQSubscriber consumes messages from a RabbitMQ queue.

func NewRabbitMQSubscriber

func NewRabbitMQSubscriber(config FromRabbitMQConfig, logger *slog.Logger) *RabbitMQSubscriber

NewRabbitMQSubscriber creates a new RabbitMQSubscriber.

func (*RabbitMQSubscriber) Close

func (s *RabbitMQSubscriber) Close() error

Close closes the RabbitMQ connection.

func (*RabbitMQSubscriber) Subscribe

func (s *RabbitMQSubscriber) Subscribe(ctx context.Context, handler func(ctx context.Context, msg *Message) error) error

Subscribe starts consuming messages from the RabbitMQ queue with automatic reconnection on connection loss. It uses exponential backoff between retries.

type RenderCmd

type RenderCmd struct{}

RenderCmd is the "render" subcommand.

func (*RenderCmd) Run

func (c *RenderCmd) Run(ctx context.Context, globals *CLI) error

type RoutingKeys added in v0.3.0

type RoutingKeys []string

RoutingKeys holds one or more routing keys. It accepts both a single string and an array of strings in JSON/Jsonnet.

func (*RoutingKeys) UnmarshalJSON added in v0.3.0

func (rk *RoutingKeys) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler. It accepts both a single string ("key") and an array (["key1", "key2"]).

type RunCmd

type RunCmd struct{}

RunCmd is the "run" subcommand.

func (*RunCmd) Run

func (c *RunCmd) Run(ctx context.Context, globals *CLI) error

type SimpleMQConfig

type SimpleMQConfig struct {
	APIURL string `json:"api_url"`
}

SimpleMQConfig holds the global SimpleMQ settings.

type SimpleMQPublisher

type SimpleMQPublisher struct {
	// contains filtered or unexported fields
}

SimpleMQPublisher sends messages to a SimpleMQ queue.

func NewSimpleMQPublisher

func NewSimpleMQPublisher(config ToSimpleMQConfig) (*SimpleMQPublisher, error)

NewSimpleMQPublisher creates a new SimpleMQPublisher.

func (*SimpleMQPublisher) Close

func (p *SimpleMQPublisher) Close() error

Close is a no-op for SimpleMQPublisher.

func (*SimpleMQPublisher) Publish

func (p *SimpleMQPublisher) Publish(ctx context.Context, msg *Message) (*PublishResult, error)

Publish sends a Message to the SimpleMQ queue. The Message is serialized to JSON and then base64-encoded before sending.

func (*SimpleMQPublisher) Type added in v0.0.2

func (p *SimpleMQPublisher) Type() string

Type returns the publisher type name.

type SimpleMQSubscriber

type SimpleMQSubscriber struct {
	// contains filtered or unexported fields
}

SimpleMQSubscriber receives messages from a SimpleMQ queue via polling.

func NewSimpleMQSubscriber

func NewSimpleMQSubscriber(config FromSimpleMQConfig, logger *slog.Logger) (*SimpleMQSubscriber, error)

NewSimpleMQSubscriber creates a new SimpleMQSubscriber.

func (*SimpleMQSubscriber) Close

func (s *SimpleMQSubscriber) Close() error

Close is a no-op for SimpleMQSubscriber (HTTP client needs no explicit close).

func (*SimpleMQSubscriber) Subscribe

func (s *SimpleMQSubscriber) Subscribe(ctx context.Context, handler func(ctx context.Context, msg *Message) error) error

Subscribe polls the SimpleMQ queue and calls the handler for each message.

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, handler func(ctx context.Context, msg *Message) error) error
	Close() error
}

Subscriber consumes messages from a source.

type ToConfig

type ToConfig struct {
	RabbitMQ *ToRabbitMQConfig `json:"rabbitmq,omitempty"`
	SimpleMQ *ToSimpleMQConfig `json:"simplemq,omitempty"`
}

ToConfig defines a destination of a bridge.

type ToRabbitMQConfig

type ToRabbitMQConfig struct {
	RabbitMQConfig // embedded: url (overrides global)
}

ToRabbitMQConfig defines a RabbitMQ destination. The actual exchange/routing_key/headers are determined by the message JSON.

type ToSimpleMQConfig

type ToSimpleMQConfig struct {
	SimpleMQConfig        // embedded: api_url (overrides global)
	Queue          string `json:"queue"`
	APIKey         string `json:"api_key"`
}

ToSimpleMQConfig defines a SimpleMQ destination.

type ValidateCmd

type ValidateCmd struct{}

ValidateCmd is the "validate" subcommand.

func (*ValidateCmd) Run

func (c *ValidateCmd) Run(ctx context.Context, globals *CLI) error

Directories

Path Synopsis
cmd
mqbridge command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL