trabbits

package module
v0.0.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2026 License: BSD-2-Clause, MIT Imports: 42 Imported by: 0

README

trabbits

This project is currently in ALPHA status and should NOT be used in production environments.

The API and behavior may change significantly before reaching stable release.


trabbits is a proxy server for sending and receiving messages using the AMQP protocol. This project supports RabbitMQ's AMQP 0-9-1 protocol.

trabbits can have multiple upstreams, which can be single RabbitMQ servers or RabbitMQ clusters that it connects to. It can also route messages to different upstreams based on the routing key.

Propose of this project

trabbits is helpful for migrating from one RabbitMQ server to another. You can use trabbits to proxy messages from the old RabbitMQ server to the new RabbitMQ server. This allows you to migrate your RabbitMQ server without any downtime.

Architecture

+-----------------+     +-----------------+
| RabbitMQ Server |     | RabbitMQ Server |
+-----------------+     +-----------------+
                  \     /
                   \   /  (routed by routing key)
                    \ /
            +-----------------+
            |     trabbits    |
            +-----------------+
                     |
                     | Publish/Consume
                     |
           +-------------------+
           | AMQP 0.9.1 client |
           +-------------------+
  1. The AMQP 0.9.1 client connects to trabbits.
  2. trabbits connects to multiple RabbitMQ servers (upstreams).
  3. trabbits routes messages to different upstreams based on the routing key with Basic.Publish method.
  4. trabbits consumes messages from all upstreams and sends them to the AMQP 0.9.1 client.

Your clients can connect to trabbits and send and receive messages without knowing the actual RabbitMQ server.

Features

  • Support for AMQP 0-9-1 protocol
  • Proxy functionality between client and upstream
  • Support for both single RabbitMQ servers and RabbitMQ clusters as upstreams
  • Automatic failover within RabbitMQ clusters
  • Health checking for cluster nodes with automatic node isolation and recovery
  • Routing publishing messages to different upstreams based on the routing key
  • Consuming messages from multiple upstreams
  • Prometheus exporter for monitoring
  • Dynamic configuration reloading
  • CLI and API for managing configuration

Limitations

  • Authentication mechanism support is limited to PLAIN and AMQPLAIN.
  • Not all AMQP 0-9-1 methods are supported. See Supported Methods section for more details.

Installation

TODO

Usage

Usage: trabbits <command> [flags]

Flags:
  -h, --help                    Show context-sensitive help.
      --config="config.json"    Path to the configuration file ($TRABBITS_CONFIG).
      --port=6672               Port to listen on ($TRABBITS_PORT).
      --metrics-port=16692      Port to listen on for metrics ($TRABBITS_METRICS_PORT)
      --api-socket="/tmp/trabbits.sock"
                                Path to the API socket ($TRABBITS_API_SOCKET).
      --debug                   Enable debug mode ($DEBUG).
      --enable-pprof            Enable pprof ($ENABLE_PPROF).
      --version                 Show version.

Commands:
  run [flags]
    Run the trabbits server.

  manage config <command>
    Manage the configuration.

  test match-routing <pattern> <key>
    Test routing pattern matching.

Run "trabbits <command> --help" for more information on a command.

To start trabbits server, run the following command:

trabbits run --config config.json

By default, trabbits listens on port 6672 and connects to an upstream RabbitMQ server.

Configuration

trabbit's configuration file is located at config.json. The configuration file contains the following fields:

{
    "handshake_timeout": "5s",
    "connection_close_timeout": "1s",
    "upstreams": [
        {
            "name": "primary",
            "address": "localhost:5672"
        },
        {
            "name": "secondary-cluster",
            "cluster": {
                "nodes": [
                    "localhost:5673",
                    "localhost:5674",
                    "localhost:5675"
                ]
            },
            "timeout": "10s",
            "health_check": {
                "enabled": true,
                "interval": "30s",
                "timeout": "5s",
                "unhealthy_threshold": 3,
                "recovery_interval": "60s"
            },
            "routing": {
                "key_patterns": [
                    "test.queue.another.*"
                ]
            },
            "queue_attributes": {
                "durable": true,
                "auto_delete": false,
                "exclusive": false,
                "arguments": {
                    "x-queue-type": "quorum"
                }
            },
            "queue_options": {
                "try_passive": true,
                "emulate_auto_delete": true
            }
        }
    ]
}

trabbits supports dynamic configuration reloading. You can put a new configuration via HTTP PUT request to /config endpoint. See API server section for more details.

Upstreams section

The upstreams section contains an array of upstreams.

The first upstream is used as the default. If the routing key does not match any patterns, trabbits will use the default upstream to publish messages.

Each upstream has the following fields:

  • name: (Required) A unique name for the upstream.
  • address: The address of the RabbitMQ server in host:port format (required for single server configuration). Supports IPv6 addresses (e.g., [::1]:5672).
  • cluster: Configuration for RabbitMQ cluster connection (alternative to address).
    • nodes: An array of cluster node addresses in host:port format.
  • timeout: Connection timeout duration (optional, default: 5s). Accepts Go duration format (e.g., "10s", "1m").
  • health_check: Health check configuration for cluster upstreams (optional).
    • interval: Health check interval (default: 30s). Accepts Go duration format.
    • timeout: Health check timeout (default: 5s). Accepts Go duration format.
    • unhealthy_threshold: Number of consecutive failures before marking node as unhealthy (default: 3).
    • recovery_interval: Interval for checking unhealthy nodes for recovery (default: 60s).
    • username: Username for health check authentication (required).
    • password: Password for health check authentication (required).
  • routing: The routing rules for this upstream.
    • key_patterns: An array of routing key patterns. If the routing key matches any of these patterns, trabbits will use this upstream to publish. The patterns are the same as the RabbitMQ's topic exchange routing key patterns, including wildcard characters * and #.
  • queue_attributes: The attributes of the queue that will be declared on this upstream. All of the attributes are optional.
    • durable: Override the durable flag. If not specified (null), the client's value is used.
    • auto_delete: Override the auto_delete flag. If not specified (null), the client's value is used.
    • exclusive: Override the exclusive flag. If not specified (null), the client's value is used.
    • arguments: A map of arguments for the queue. Arguments are merged with client-provided arguments. The keys are strings and the values are any type. If the value is null, the argument will be removed from the client's arguments.
  • queue_options: Options for queue declaration behavior on this upstream. All options are optional.
    • try_passive: (Optional, default: false) When set to true, trabbits will first attempt a passive queue declare to check if the queue already exists. If the queue exists, it will be used as-is regardless of the configured attributes. If the queue doesn't exist (404 error), trabbits will fall back to creating the queue with the configured attributes. This is useful for avoiding PRECONDITION_FAILED errors when working with existing queues that may have different attributes.
    • emulate_auto_delete: (Optional, default: false) When set to true, trabbits will emulate auto_delete behavior for durable queues. This is useful for quorum queues which don't support the real auto_delete flag. When a client requests auto_delete=true and you override it to false in the configuration, enabling this option will make trabbits automatically delete the queue when the connection closes. Note: This differs from RabbitMQ's standard auto_delete which deletes when the last consumer disconnects - trabbits emulates per-connection deletion instead.
Cluster Connection Behavior

When connecting to a cluster upstream, trabbits will:

  1. Health-based Selection: Prioritize healthy nodes for connections
  2. Random Selection: Randomly shuffle available nodes to distribute connection load
  3. Failover: Try each node in the shuffled order until a successful connection is established
  4. Connection Reuse: Once connected to a cluster node, that connection is used for all operations
  5. Timeout: Use the configured timeout (default: 5s) for each connection attempt
Health Check Behavior

For cluster upstreams with health checking enabled:

  1. Background Monitoring: Run health checks in a separate goroutine at configured intervals
  2. Node Isolation: Mark nodes as unhealthy after consecutive failures exceed the threshold
  3. Automatic Recovery: Periodically check unhealthy nodes and restore them when they recover
  4. Graceful Degradation: Fall back to all nodes if no healthy nodes are available
  5. Metrics Export: Expose healthy/unhealthy node counts via Prometheus metrics

Health checks use simple AMQP connection attempts with immediate disconnection to minimize overhead.

Routing Algorithm

You can specify routing rules in the configuration file. The routing rules are based on the routing key. If the routing key matches the specified pattern, trabbits will route the message to the corresponding upstream.

Supported patterns are equivalent to the RabbitMQ's topic exchange routing key patterns:

  • * matches a single word
  • # matches zero or more words

trabbits tries to match the routing key with the specified pattern in the order they are defined in the configuration file. If the routing key matches a pattern, trabbits will use the corresponding upstream immediately (will not check other patterns).

If the routing key does not match any patterns, trabbits will use the first upstream as the default.

Global Configuration Options
Timeout Settings (Advanced)

These timeout settings control internal connection behavior and typically do not need to be modified:

  • handshake_timeout: (Optional) Maximum time to wait for each frame during the AMQP handshake process (Connection.Start-Ok, Connection.Tune-Ok, Connection.Open) from client connections (default: 5s). Accepts Go duration format (e.g., "5s", "10s").
  • connection_close_timeout: (Optional) Maximum time to wait for Connection.Close-Ok response during graceful connection shutdown (default: 1s). Accepts Go duration format.

Note: These are advanced settings that should only be adjusted if you experience specific timeout-related issues. The default values are suitable for most use cases. Normal message processing uses a longer fixed timeout (5 minutes) to accommodate low-frequency client communication patterns.

Graceful Shutdown Settings

trabbits implements graceful shutdown when receiving termination or reload signals. On SIGTERM, the server first closes the listener to prevent new connections, then gracefully disconnects all existing connections using the AMQP Connection.Close protocol. On SIGHUP (configuration reload), connections using outdated configuration are gracefully disconnected with rate limiting.

The default configuration can handle approximately 1000 connections within a 10-second timeout, which covers most typical deployments. For larger-scale environments with thousands of connections, you may need to adjust the timeout and rate limiting parameters based on your requirements.

Important: If graceful shutdown cannot complete within the configured timeout (e.g., due to unresponsive clients or too many connections), remaining connections will be forcibly terminated.

  • graceful_shutdown: Configuration for graceful shutdown behavior
    • shutdown_timeout: Maximum time to wait for graceful shutdown on SIGTERM (default: 10s). Accepts Go duration format.
    • reload_timeout: Maximum time to wait for graceful disconnection during configuration reload on SIGHUP (default: 30s). Accepts Go duration format.
    • rate_limit: Maximum number of connections to disconnect per second (default: 100). Controls disconnection rate to prevent connection storms.
    • burst_size: Initial burst size for rate limiting (default: 10). Allows quick disconnection of small connection pools.

Example configuration for large-scale deployments:

{
  "graceful_shutdown": {
    "shutdown_timeout": "30s",
    "reload_timeout": "60s",
    "rate_limit": 300,
    "burst_size": 50
  },
  "upstreams": [...]
}

Configuration File Formats

trabbits supports two configuration file formats:

JSON Configuration

Standard JSON format configuration files are supported with environment variable expansion using the ${VAR} syntax.

Jsonnet Configuration

trabbits also supports Jsonnet configuration files (.jsonnet extension), which provides:

  • Dynamic configuration generation
  • Functions and conditionals
  • Imports and includes
  • Environment variable access via std.native('env')
  • Additional utility functions from jsonnet-armed such as file reading, JSON/YAML parsing, and more
Jsonnet Example
local env = std.native('env');
{
  upstreams: [
    {
      name: 'cluster',
      cluster: {
        nodes: [
          'localhost:5672',
        ],
      },
      health_check: {
        username: env('RABBITMQ_HEALTH_USER', 'guest'),
        password: env('RABBITMQ_HEALTH_PASS', 'guest'),
        interval: '30s',
        timeout: '5s',
        unhealthy_threshold: 3,
        recovery_interval: '60s',
      },
    },
  ],
}

To use a Jsonnet configuration:

export RABBITMQ_HEALTH_USER=healthcheck
export RABBITMQ_HEALTH_PASS=secretpassword
trabbits run --config config.jsonnet

Environment Variable Expansion

For JSON configuration files, trabbits supports environment variable expansion using the ${VAR} syntax. This is particularly useful for sensitive information like passwords that should not be stored in plain text in configuration files.

Example

Instead of storing credentials directly in the configuration file:

{
    "upstreams": [
        {
            "name": "cluster",
            "cluster": {
                "nodes": [
                    "localhost:5672"
                ]
            },
            "health_check": {
                "username": "${RABBITMQ_HEALTH_USER}",
                "password": "${RABBITMQ_HEALTH_PASS}",
                "interval": "30s",
                "timeout": "5s",
                "unhealthy_threshold": 3,
                "recovery_interval": "60s"
            }
        }
    ]
}

Set the environment variables before running trabbits:

export RABBITMQ_HEALTH_USER=healthcheck
export RABBITMQ_HEALTH_PASS=secretpassword
trabbits run --config config.json
Notes
  • If an environment variable is not set, it expands to an empty string in JSON files
  • For Jsonnet files, use std.native('env') function with default values
  • Environment variable expansion works for any string field in the configuration
  • Variable names are case-sensitive
  • Use double quotes around the ${VAR} syntax in JSON

Setting Log Level

By default, the log level is set to info. You can change the log level to debug by setting the DEBUG environment variable to true.

DEBUG=true trabbits run

Supported Methods

trabbits currently supports the following AMQP methods:

  • ChannelOpen
  • ChannelClose
  • QueueDeclare
  • QueueDelete
  • QueueBind
  • QueueUnbind
  • QueuePurge
  • ExchangeDeclare
  • BasicPublish
  • BasicConsume
  • BasicGet
  • BasicAck
  • BasicNack
  • BasicCancel
  • BasicQos
  • ConfirmSelect (Publisher Confirms)

Server-named queues emulation

trabbits can emulate server-named queues. If you declare a queue with an empty name, trabbits will generate a unique name for the queue.

This is not a feature of the AMQP 0.9.1 protocol, but a feature in RabbitMQ. See Server-named queues.

RabbitMQ generates a unique name for example amq.gen-(random string). trabbits generates a unique name in the format trabbits.gen-(random string) because amq.gen- is reserved by RabbitMQ.

The generated queue by trabbis is not a temporary queue on the upstream RabbitMQ server. It is created as a normal queue with the specified attributes. The queue will not be deleted when the client disconnects by RabbitMQ, So trabbits emulates the server-named queue behavior.

trabbits will delete the queue when the connection that declared the queue is closed (=exclusive).

Monitoring server

trabbits provides a monitoring server that exposes metrics about the proxy server. You can access the metrics at http://localhost:16692/metrics.

These metrics format is compatible with Prometheus.

The monitoring server is enabled by default and listens on port 16692. You can change the port by using the --metrics-port option.

API Server

trabbits provides an HTTP API server that allows you to manage the configuration.

trabbits listens on unix domain socket by default. You can change the socket path by using the --api-socket option.

Configuration API / CLI

You can update the configuration of trabbits via the API server. You can get the current configuration and update with a new configuration via HTTP request to /config endpoint.

trabbits cli also supports the configuration management. You can use trabbits manage config command to manage the configuration. The cli access to the API server on the localhost.

Usage: trabbits manage config <command> [<file>]

Manage the configuration.

Arguments:
  <command>    Command to run (get, diff, put, reload).
  <file>       Configuration file (required for diff/put commands).

You can also manage connected clients using the trabbits manage clients command. This provides both CLI commands for scripting and a Terminal User Interface (TUI) for interactive management:

Usage: trabbits manage clients <command>

Manage connected clients.

Commands:
  list                     Get connected clients information
  info <proxy-id>          Get detailed information for a specific proxy
  probe <proxy-id>         Stream real-time probe logs for a specific proxy
  shutdown <proxy-id>      Shutdown a specific proxy
  tui                      Interactive terminal interface for managing clients
Interactive Client Management (TUI)

For interactive client management, trabbits provides a modern Terminal User Interface built with Bubble Tea:

$ trabbits manage clients tui

The TUI provides a real-time, top-like interface with the following features:

Main Interface:

  • Fixed Header: Server statistics (active clients, total clients, last update time) always visible at top
  • Client List: Scrollable table showing connected clients with key information:
    • Client ID (shortened for display)
    • Username and Virtual Host
    • Client address
    • Connection status (active/shutting_down)
    • Connected time (relative)
    • Method and frame statistics
  • Real-time Updates: All information refreshes automatically every 2 seconds
  • Pagination: Handles large numbers of clients with proper scrolling and pagination indicators

Navigation:

  • ↑↓ or k/j: Navigate through client list
  • Page Up/Down: Jump by pages for large client lists
  • Home/End: Jump to first/last client
  • Enter: View detailed client information
  • p: Start probe log streaming for selected client
  • l: View server logs (dedicated full-screen view)
  • Shift+K: Shutdown selected client (with confirmation)
  • r: Force refresh
  • q: Quit

Client Detail View:

  • Complete client information including properties and capabilities
  • Method-level statistics breakdown showing usage patterns
  • Frame counters and connection duration
  • Scrollable content for clients with many properties
  • p: Start probe log streaming for current client
  • Shift+K: Shutdown client directly from detail view
  • Real-time updates of statistics and status

Probe Log View:

  • Real-time streaming of AMQP protocol events for selected client
  • Shows method calls with structured attributes in JSON format (routing keys, message properties, etc.)
  • Scrollable log viewer with auto-scroll behavior
  • Navigation controls: ↑↓/kj to scroll, Home/End to jump, PgUp/PgDn for page navigation
  • Auto-scroll control: SPACE to toggle auto-scroll on/off
    • Auto-scroll automatically disables when scrolling up to view older logs
    • Press SPACE or End to re-enable auto-scroll and jump to latest logs
    • Current auto-scroll status displayed in help text
  • Save logs to file: s to open save dialog
    • Default filename format: {client-id}-{timestamp}.log
    • Edit file path with e key (supports full text editing with cursor navigation)
    • Logs saved in JSON Lines format for easy processing
    • ENTER to save, ESC/n to cancel
  • q/Esc: Return to main client list

Server Logs View:

  • Dedicated full-screen view for reviewing server logs
  • Shows all recent server logs (last 100 entries) with structured attributes in JSON format
  • Scrollable viewer with full navigation support
  • Navigation controls: ↑↓/kj to scroll, Home/End to jump, PgUp/PgDn for page navigation
  • Each log entry shows timestamp, level (INFO/DEBUG/ERROR), message, and attributes
  • Useful for monitoring server health and debugging issues
  • q/Esc: Return to main client list

Client Shutdown:

  • Confirmation dialog showing client details before shutdown
  • Optional reason (defaults to "TUI shutdown")
  • Immediate feedback on success/failure
  • Graceful disconnection with proper AMQP close protocol

Benefits:

  • No Terminal Disruption: Unlike curl commands, the TUI doesn't clutter your terminal with JSON output
  • Real-time Monitoring: See client connections, disconnections, and activity as it happens
  • Live Protocol Inspection: Stream real-time AMQP protocol events and method calls for debugging
  • Server Log Integration: View server logs directly in TUI for comprehensive monitoring
  • Log Persistence: Save probe logs to file for offline analysis and troubleshooting
  • Efficient Navigation: Quickly browse through many clients without manual ID copying
  • Comprehensive Information: All client details in an easy-to-read format
  • Safe Operations: Confirmation dialogs prevent accidental shutdowns
Configuration Versioning and Graceful Disconnection

When a new configuration is loaded (via API PUT request or SIGHUP signal), trabbits implements a graceful proxy management system:

  • Each active proxy connection maintains a hash of the configuration it was created with
  • Upon configuration update, trabbits calculates a new configuration hash
  • Proxy connections using outdated configurations are gracefully disconnected with connection-forced (320) error code
  • Clients receive a "Configuration updated, please reconnect" message and can immediately reconnect with the new configuration
  • This ensures that configuration changes are reflected quickly across all connections while maintaining service availability

This behavior helps ensure that configuration updates (such as upstream changes, routing rule modifications, or credential updates) are applied consistently across all active connections without requiring a full server restart.

Use curl to access the API server

You can use curl to access the API server. The API server listens on the unix domain socket by default. You can change the socket path by using the --api-socket option.

$ curl --unix-socket /tmp/trabbits.sock http://localhost/config
Note

Reloading the configuration will not affect the existing connections. The new configuration will be applied to new connections only.

Get the current configuration

You can get the current configuration by sending a GET request to the /config endpoint.

$ curl --unix-socket /tmp/trabbits.sock http://localhost/config

trabbits returns the current configuration in JSON format.

You can also use the trabbits cli to get the current configuration.

$ trabbits manage config get
Update the configuration

You can update the configuration by sending a PUT request with a new configuration in JSON format.

$ curl --unix-socket /tmp/trabbits.sock -X PUT -d @new_config.json \
    -H "Content-Type: application/json" http://localhost/config

trabbits will reload the configuration and apply the new configuration.

You can also use the trabbits cli to update the configuration.

$ trabbits manage config put new_config.json
Diff the configuration

You can diff the current configuration and a new configuration using trabbits cli.

$ trabbits manage config diff new_config.json
Reload the configuration

You can reload the configuration from the original configuration file using trabbits cli.

$ trabbits manage config reload

This command reloads the configuration from the file specified by --config option (default: config.json).

Clients API

trabbits provides an API to get information about connected clients. You can get the list of all connected clients by sending a GET request to the /clients endpoint.

Get connected clients information

You can get information about all connected clients by sending a GET request to the /clients endpoint.

$ curl --unix-socket /tmp/trabbits.sock http://localhost/clients

The response includes the following information for each client:

[
  {
    "id": "proxy-12345678",
    "client_address": "127.0.0.1:54321",
    "user": "guest",
    "virtual_host": "/",
    "client_banner": "Platform/Product/Version",
    "connected_at": "2025-01-01T12:00:00Z",
    "status": "active",
    "shutdown_reason": ""
  },
  {
    "id": "proxy-87654321",
    "client_address": "127.0.0.1:54322",
    "user": "guest",
    "virtual_host": "/",
    "client_banner": "Platform/Product/Version",
    "connected_at": "2025-01-01T12:01:00Z",
    "status": "shutting_down",
    "shutdown_reason": "Configuration updated, please reconnect"
  }
]

Fields description:

  • id: Unique proxy identifier
  • client_address: Client's IP address and port
  • user: Authenticated username
  • virtual_host: Virtual host the client is connected to
  • client_banner: Client platform/product/version information
  • connected_at: Timestamp when the client connected
  • status: Connection status - either active or shutting_down
  • shutdown_reason: Reason for shutdown (only present when status is shutting_down)

You can also use the trabbits CLI to get clients information:

$ trabbits manage clients list
Get detailed client information

You can get comprehensive information about a specific client by sending a GET request to the /clients/{proxy_id} endpoint.

$ curl --unix-socket /tmp/trabbits.sock http://localhost/clients/proxy-12345678

This returns complete client information including:

{
  "id": "proxy-12345678",
  "client_address": "127.0.0.1:54321",
  "user": "admin",
  "virtual_host": "/",
  "client_banner": "golang/golang/AMQP 0.9.1 Client/1.10.0",
  "client_properties": {
    "product": "my-app",
    "version": "1.0.0",
    "capabilities": {
      "publisher_confirms": true,
      "consumer_cancel_notify": true
    }
  },
  "connected_at": "2023-11-20T10:30:00Z",
  "status": "active",
  "stats": {
    "started_at": "2023-11-20T10:30:00Z",
    "methods": {
      "Basic.Publish": 150,
      "Basic.Consume": 5,
      "Queue.Declare": 3
    },
    "total_methods": 158,
    "received_frames": 320,
    "sent_frames": 285,
    "total_frames": 605,
    "duration": "45m30s"
  }
}

Using the CLI:

$ trabbits manage clients info proxy-12345678

The response includes:

  • Complete client properties and capabilities
  • Detailed statistics with method-level breakdown
  • Frame counters (received/sent frames)
  • Connection duration and timestamps
Shutdown a specific proxy

You can gracefully shutdown a specific proxy by sending a DELETE request to the /clients/{proxy_id} endpoint.

$ curl -X DELETE --unix-socket /tmp/trabbits.sock http://localhost/clients/proxy-12345678

You can also provide a custom shutdown reason using the reason query parameter:

$ curl -X DELETE --unix-socket /tmp/trabbits.sock "http://localhost/clients/proxy-12345678?reason=Maintenance"

The response includes the shutdown status:

{
  "status": "shutdown_initiated",
  "proxy_id": "proxy-12345678",
  "reason": "Maintenance"
}

Using the CLI:

# Shutdown a proxy with default reason
$ trabbits manage clients shutdown proxy-12345678

# Shutdown a proxy with custom reason
$ trabbits manage clients shutdown proxy-12345678 --reason "Scheduled maintenance"

The proxy will be gracefully disconnected, allowing it to properly close ongoing operations before termination.

Stream real-time probe logs

trabbits provides a probe log system for real-time monitoring and debugging of AMQP protocol events. This allows you to observe what operations a specific client is performing in real-time.

API Access:

You can stream real-time probe logs for a specific proxy using Server-Sent Events (SSE):

$ curl --unix-socket /tmp/trabbits.sock http://localhost/clients/proxy-12345678/probe

The response uses Server-Sent Events (SSE) format with JSON payloads:

data: {"type":"connected","proxy_id":"proxy-12345678"}

data: {"timestamp":"2025-01-20T15:04:05.123Z","message":"Channel.Open","attrs":{"channel":1,"message":{"Reserved1":0}}}

data: {"timestamp":"2025-01-20T15:04:05.234Z","message":"Queue.Declare","attrs":{"message":{"Queue":"test.queue","Passive":false,"Durable":true,"AutoDelete":false,"Exclusive":false,"NoWait":false,"Arguments":null}}}

data: {"timestamp":"2025-01-20T15:04:05.345Z","message":"Basic.Publish","attrs":{"message":{"Exchange":"","RoutingKey":"test.queue","Mandatory":false,"Immediate":false,"Properties":{"ContentType":"text/plain","DeliveryMode":2},"Body":"SGVsbG8gV29ybGQh"}}}

data: {"type":"proxy_ended"}

CLI Access:

Stream probe logs using the CLI for human-readable output:

# Stream probe logs in text format (default)
$ trabbits manage clients probe proxy-12345678

# Stream probe logs in JSON format
$ trabbits manage clients probe proxy-12345678 --format json

Example text output:

Connected to probe stream for proxy: proxy-12345678
Press Ctrl+C to stop...
✓ Connected to probe stream
15:04:05.123 Channel.Open channel=1
15:04:05.234 Queue.Declare queue=test.queue durable=true
15:04:05.345 Basic.Publish exchange= routing_key=test.queue mandatory=false
15:04:05.456 Basic.Consume queue=test.queue consumer_tag=ctag-1
15:04:05.567 Basic.Deliver delivery_tag=1 exchange= routing_key=test.queue

What Probe Logs Capture:

Probe logs capture all major AMQP method calls with structured attributes:

  • Channel operations: Open/Close with channel IDs
  • Queue operations: Declare, Bind, Delete with queue names and attributes
  • Exchange operations: Declare with exchange names and properties
  • Basic operations: Publish (with routing keys, properties), Consume, Deliver, Ack/Nack
  • Message properties: Content type, delivery mode, routing keys, exchange names
  • Delivery tags: Both client-side and upstream delivery tag mappings
  • Error conditions: Protocol errors and connection issues

Use Cases:

  • Protocol Debugging: See exactly what AMQP methods clients are calling
  • Message Flow Analysis: Track publish/consume patterns and routing behavior
  • Performance Investigation: Identify bottlenecks in client operations
  • Integration Testing: Verify that applications are using AMQP correctly
  • Troubleshooting: Debug unexpected client behavior or routing issues

Important Notes:

  • Probe logs have minimal performance impact as they use non-blocking channels
  • Only the latest 100 log entries are buffered per client; older logs are discarded
  • Probe logs are retained after client disconnection: When a proxy disconnects, its probe logs remain accessible for debugging short-lived connections (up to 1000 most recently disconnected proxies are retained using LRU cache)
  • For disconnected proxies, the API returns all buffered logs and immediately ends the stream with {"type":"proxy_ended","status":"disconnected"}
  • Use Ctrl+C to stop CLI streaming; the connection will be gracefully closed
Reload configuration with SIGHUP

You can also reload the configuration by sending a SIGHUP signal to the trabbits process.

For better reliability, use a PID file to ensure you target the correct process:

$ trabbits run --pid-file /var/run/trabbits.pid --config config.json
$ kill -HUP $(cat /var/run/trabbits.pid)

Alternatively, you can use process discovery (less reliable):

$ kill -HUP $(pidof trabbits)

or

$ pkill -HUP trabbits

This will reload the configuration from the original configuration file specified when trabbits was started.

--- http://localhost:16692/config
+++ new_config.json
@@ -7,10 +7,10 @@
     },
     {
       "address": "localhost:5673",
+      "address": "localhost:5674",
       "routing": {
         "key_patterns": [
-          "#"
+          "test.queue.example.*"
         ]
       },
       "queue_attributes": {
Test utilities
Test routing pattern matching

The test match-routing command allows you to test whether a routing key matches a given binding pattern. This is useful for debugging routing rules before applying them in your configuration.

# Test pattern matching
$ trabbits test match-routing "logs.*.error" "logs.app.error"
✓ MATCHED: pattern 'logs.*.error' matches key 'logs.app.error'

$ trabbits test match-routing "logs.*.error" "metrics.app.error"
✗ NOT MATCHED: pattern 'logs.*.error' does not match key 'metrics.app.error'

# Exit code is 0 for match, 1 for no match
$ trabbits test match-routing "logs.#" "logs.app.error.critical"
✓ MATCHED: pattern 'logs.#' matches key 'logs.app.error.critical'
$ echo $?
0

$ trabbits test match-routing "metrics.*" "metrics.cpu.usage"  
✗ NOT MATCHED: pattern 'metrics.*' does not match key 'metrics.cpu.usage'
$ echo $?
1

Pattern matching follows RabbitMQ's topic exchange rules with extensions:

  • * (star) matches exactly one word
  • # (hash) matches zero or more words
  • % (percent) matches zero or more characters within a single word (substring matching)
  • Words are delimited by dots (.)

Examples with % wildcard:

  • app.%service matches app.webservice, app.apiservice, app.service
  • foo.*.a% matches foo.xxx.aaa, foo.yyy.app
  • %.server.* matches web.server.01, api.server.prod

Support for multiple instances

trabbits can run multiple instances on the same server. You can use same port (--port) for multiple instances.

This feature uses SO_REUSEPORT socket option. This allows multiple processes to bind to the same port. The kernel will distribute incoming connections to the processes.

This feature is useful for deploying a new version without downtime. You can start a new instance with a new version and configuration and stop the old instance.

Note

The --metrics-port and --api-socket options must be different for each instance because they are individual for each instance.

License

This project is licensed under the BSD-style license. See the LICENSE file for details.

Contributing

Bug reports and pull requests are welcome. For contribution guidelines, see CONTRIBUTING.md.

  • 2021 VMware, Inc. or its affiliates
  • 2012-2021 Sean Treadway, SoundCloud Ltd.
  • 2025 fujiwara

All rights reserved.

Documentation

Index

Constants

View Source
const (
	APIContentType        = "application/json"
	APIContentTypeJsonnet = "application/jsonnet"
)
View Source
const AutoGenerateQueueNamePrefix = "trabbits.gen-"

Variables

View Source
var (
	ChannelMax                    = uint16(1023)
	HeartbeatInterval             = uint16(60)
	FrameMax                      = uint32(128 * 1024)
	UpstreamDefaultTimeout        = 5 * time.Second
	DefaultHandshakeTimeout       = 5 * time.Second
	DefaultProcessTimeout         = 5 * time.Minute // Long timeout for process loop (low-frequency clients)
	DefaultConnectionCloseTimeout = 1 * time.Second

	// Shutdown messages
	ShutdownMsgServerShutdown = "Server shutting down"
	ShutdownMsgConfigUpdate   = "Configuration updated, please reconnect"
	ShutdownMsgDefault        = "Connection closed"

	// ProbeLog retention
	ProbeLogBufferSize    = 100  // Number of logs to keep per proxy
	ProbeLogRetentionSize = 1000 // Number of proxies to retain logs for (LRU)
)
View Source
var Debug bool
View Source
var Version = "v0.0.18"

Functions

func ColoredDiff added in v0.0.9

func ColoredDiff(src string) string

ColoredDiff adds color to diff output (+/- lines)

func NewMetricSlogHandler

func NewMetricSlogHandler(base slog.Handler, logCounter *prometheus.CounterVec) slog.Handler

func Run

func Run(ctx context.Context) error

Types

type AMQPError

type AMQPError interface {
	AMQPMessage() amqp091.Message
	Error() string
	Code() uint16
}

type CLI

type CLI struct {
	Run    *RunOptions    `cmd:"" help:"Run the trabbits server."`
	Manage *ManageOptions `cmd:"" help:"Manage the trabbits server."`
	Test   *TestOptions   `cmd:"" help:"Test utilities for trabbits."`

	Config      string `help:"Path to the configuration file." default:"config.json" env:"TRABBITS_CONFIG"`
	Port        int    `help:"Port to listen on." default:"6672" env:"TRABBITS_PORT"`
	MetricsPort int    `help:"Port to listen on for metrics" default:"16692" env:"TRABBITS_METRICS_PORT"`
	APISocket   string `help:"Path to the API socket." default:"/tmp/trabbits.sock" env:"TRABBITS_API_SOCKET"`

	Debug       bool             `help:"Enable debug mode." env:"DEBUG"`
	EnablePprof bool             `help:"Enable pprof." env:"ENABLE_PPROF"`
	Version     kong.VersionFlag `help:"Show version."`
}

type ChannelError

type ChannelError struct {
	Error
}

func NewChannelError

func NewChannelError(code uint16, message string) *ChannelError

func (*ChannelError) AMQPMessage

func (e *ChannelError) AMQPMessage() amqp091.Message

type Error

type Error struct {
	// contains filtered or unexported fields
}

func NewError

func NewError(code uint16, message string) *Error

func (*Error) AMQPMessage added in v0.0.6

func (e *Error) AMQPMessage() amqp091.Message

func (*Error) Code

func (e *Error) Code() uint16

func (*Error) Error

func (e *Error) Error() string

type LogBuffer added in v0.0.15

type LogBuffer struct {
	// contains filtered or unexported fields
}

LogBuffer stores recent log entries and broadcasts them to listeners

func NewLogBuffer added in v0.0.15

func NewLogBuffer(maxSize int) *LogBuffer

NewLogBuffer creates a new log buffer with the specified maximum size

func (*LogBuffer) Add added in v0.0.15

func (b *LogBuffer) Add(entry types.ProbeLogEntry)

Add adds a log entry to the buffer and broadcasts to all listeners

func (*LogBuffer) Subscribe added in v0.0.15

func (b *LogBuffer) Subscribe(ctx context.Context, listenerID string) <-chan types.ProbeLogEntry

Subscribe creates a new listener channel

func (*LogBuffer) Unsubscribe added in v0.0.15

func (b *LogBuffer) Unsubscribe(listenerID string)

Unsubscribe removes a listener

type ManageOptions

type ManageOptions struct {
	Config struct {
		Command string `arg:"" enum:"get,diff,put,reload" help:"Command to run (get, diff, put, reload)."`
		File    string `arg:"" optional:"" help:"Configuration file (required for diff/put commands)."`
	} `cmd:"" help:"Manage the configuration."`
	Clients struct {
		List     struct{} `cmd:"" help:"Get connected clients information."`
		TUI      struct{} `cmd:"" default:"true" help:"Interactive TUI for managing clients (default)."`
		Shutdown struct {
			ProxyID string `arg:"" required:"" help:"Proxy ID to shutdown."`
			Reason  string `help:"Optional shutdown reason."`
		} `cmd:"" help:"Shutdown a specific proxy."`
		Info struct {
			ProxyID string `arg:"" required:"" help:"Proxy ID to get detailed information for."`
		} `cmd:"" help:"Get detailed information for a specific proxy."`
		Probe struct {
			ProxyID string `arg:"" required:"" help:"Proxy ID to monitor probe logs for."`
			Format  string `help:"Output format (json|text)." default:"text" enum:"json,text"`
		} `cmd:"" help:"Stream real-time probe logs for a specific proxy."`
	} `cmd:"" help:"Manage connected clients."`
}

type MetricSlogHandler

type MetricSlogHandler struct {
	slog.Handler
	// contains filtered or unexported fields
}

func (*MetricSlogHandler) Handle

func (h *MetricSlogHandler) Handle(ctx context.Context, r slog.Record) error

type ProbeLogBuffer added in v0.0.17

type ProbeLogBuffer struct {
	// contains filtered or unexported fields
}

ProbeLogBuffer stores probe logs for a proxy with a circular buffer

func NewProbeLogBuffer added in v0.0.17

func NewProbeLogBuffer(maxSize int) *ProbeLogBuffer

NewProbeLogBuffer creates a new probe log buffer

func (*ProbeLogBuffer) Add added in v0.0.17

func (b *ProbeLogBuffer) Add(log probeLog)

Add adds a probe log to the buffer

func (*ProbeLogBuffer) GetDisconnectedAt added in v0.0.17

func (b *ProbeLogBuffer) GetDisconnectedAt() time.Time

GetDisconnectedAt returns the timestamp when the proxy disconnected

func (*ProbeLogBuffer) GetLogs added in v0.0.17

func (b *ProbeLogBuffer) GetLogs() []probeLog

GetLogs returns a copy of all logs

func (*ProbeLogBuffer) GetProxyInfo added in v0.0.17

func (b *ProbeLogBuffer) GetProxyInfo() (clientAddr, user, virtualHost, clientBanner string, connectedAt time.Time)

GetProxyInfo returns the stored proxy information

func (*ProbeLogBuffer) IsActive added in v0.0.17

func (b *ProbeLogBuffer) IsActive() bool

IsActive returns whether the proxy is still active

func (*ProbeLogBuffer) MarkInactive added in v0.0.17

func (b *ProbeLogBuffer) MarkInactive()

MarkInactive marks the buffer as inactive (proxy disconnected)

func (*ProbeLogBuffer) SetProxyInfo added in v0.0.17

func (b *ProbeLogBuffer) SetProxyInfo(clientAddr, user, virtualHost, clientBanner string, connectedAt time.Time)

SetProxyInfo sets the proxy information (should be called once at registration)

type Proxy

type Proxy struct {
	VirtualHost string
	// contains filtered or unexported fields
}

func (*Proxy) ClientAddr

func (p *Proxy) ClientAddr() string

func (*Proxy) ClientBanner

func (p *Proxy) ClientBanner() string

func (*Proxy) Close

func (p *Proxy) Close()

func (*Proxy) CloseChannel

func (p *Proxy) CloseChannel(id uint16) error

func (*Proxy) ConnectToUpstreams

func (p *Proxy) ConnectToUpstreams(ctx context.Context, upstreamConfigs []config.Upstream, props amqp091.Table) error

func (*Proxy) GetChannel

func (p *Proxy) GetChannel(id uint16, routingKey string) (*rabbitmq.Channel, error)

func (*Proxy) GetChannelByDeliveryTag

func (s *Proxy) GetChannelByDeliveryTag(channelID uint16, tag uint64) (*rabbitmq.Channel, error)

func (*Proxy) GetChannelWithIndex added in v0.0.18

func (p *Proxy) GetChannelWithIndex(id uint16, routingKey string) (*rabbitmq.Channel, int, error)

func (*Proxy) GetChannels

func (p *Proxy) GetChannels(id uint16) ([]*rabbitmq.Channel, error)

func (*Proxy) GetProbeChan added in v0.0.9

func (p *Proxy) GetProbeChan() chan probeLog

GetProbeChan returns the probe channel for external access

func (*Proxy) ID added in v0.0.6

func (p *Proxy) ID() string

func (*Proxy) MonitorUpstreamConnection added in v0.0.6

func (p *Proxy) MonitorUpstreamConnection(ctx context.Context, upstream *Upstream)

MonitorUpstreamConnection monitors an upstream connection and notifies when it closes

func (*Proxy) NewChannel

func (p *Proxy) NewChannel(id uint16) error

func (*Proxy) Upstream

func (p *Proxy) Upstream(i int) *Upstream

func (*Proxy) UpstreamDeliveryTag

func (s *Proxy) UpstreamDeliveryTag(tag uint64) uint64

func (*Proxy) Upstreams

func (p *Proxy) Upstreams() []*Upstream

type ProxyStats added in v0.0.7

type ProxyStats struct {
	// contains filtered or unexported fields
}

ProxyStats holds statistics for a single proxy connection

func NewProxyStats added in v0.0.7

func NewProxyStats() *ProxyStats

NewProxyStats creates a new ProxyStats instance

func (*ProxyStats) GetAllMethodCounts added in v0.0.7

func (s *ProxyStats) GetAllMethodCounts() map[string]int64

GetAllMethodCounts returns a map of all method counts

func (*ProxyStats) GetMethodCount added in v0.0.7

func (s *ProxyStats) GetMethodCount(method string) int64

GetMethodCount returns the count for a specific method

func (*ProxyStats) GetReceivedFrames added in v0.0.7

func (s *ProxyStats) GetReceivedFrames() int64

GetReceivedFrames returns the number of frames received from client

func (*ProxyStats) GetSentFrames added in v0.0.7

func (s *ProxyStats) GetSentFrames() int64

GetSentFrames returns the number of frames sent to client

func (*ProxyStats) GetStartedAt added in v0.0.7

func (s *ProxyStats) GetStartedAt() time.Time

GetStartedAt returns when the stats collection started

func (*ProxyStats) GetTotalFrames added in v0.0.7

func (s *ProxyStats) GetTotalFrames() int64

GetTotalFrames returns the total number of frames (received + sent)

func (*ProxyStats) GetTotalMethods added in v0.0.7

func (s *ProxyStats) GetTotalMethods() int64

GetTotalMethods returns the total number of methods processed

func (*ProxyStats) IncrementMethod added in v0.0.7

func (s *ProxyStats) IncrementMethod(method string)

IncrementMethod increments the counter for a specific AMQP method

func (*ProxyStats) IncrementReceivedFrames added in v0.0.7

func (s *ProxyStats) IncrementReceivedFrames()

IncrementReceivedFrames increments the received frame counter

func (*ProxyStats) IncrementSentFrames added in v0.0.7

func (s *ProxyStats) IncrementSentFrames()

IncrementSentFrames increments the sent frame counter

func (*ProxyStats) Snapshot added in v0.0.7

func (s *ProxyStats) Snapshot() types.StatsSnapshot

Snapshot returns a snapshot of current statistics

type RunOptions

type RunOptions struct {
	PidFile string `help:"Path to write the process ID file." env:"TRABBITS_PID_FILE"`
}

type Server added in v0.0.6

type Server struct {
	// contains filtered or unexported fields
}

func NewServer added in v0.0.6

func NewServer(config *config.Config, apiSocket string) *Server

NewServer creates a new Server instance

func (*Server) CountActiveProxies added in v0.0.6

func (s *Server) CountActiveProxies() int

CountActiveProxies returns the number of active proxies for this server instance

func (*Server) GetClientInfo added in v0.0.7

func (s *Server) GetClientInfo(proxyID string) (*types.FullClientInfo, bool)

GetClientInfo returns full client information including ClientProperties and complete stats

func (*Server) GetClientsInfo added in v0.0.7

func (s *Server) GetClientsInfo() []types.ClientInfo

GetClientsInfo returns information about all connected clients

func (*Server) GetConfig added in v0.0.6

func (s *Server) GetConfig() *config.Config

GetConfig returns the current config (thread-safe)

func (*Server) GetConfigHash added in v0.0.6

func (s *Server) GetConfigHash() string

GetConfigHash returns the current config hash (thread-safe)

func (*Server) GetHealthManager added in v0.0.6

func (s *Server) GetHealthManager(upstreamName string) *health.Manager

GetHealthManager returns the health manager for the given upstream name

func (*Server) GetProbeLogBuffer added in v0.0.17

func (s *Server) GetProbeLogBuffer(proxyID string) (*ProbeLogBuffer, bool)

GetProbeLogBuffer returns the probe log buffer for a given proxy ID First checks active proxies, then falls back to LRU cache for disconnected proxies

func (*Server) GetProxy added in v0.0.6

func (s *Server) GetProxy(id string) *Proxy

GetProxy retrieves a proxy by ID from this server instance

func (*Server) Metrics added in v0.0.10

func (s *Server) Metrics() *metrics.Metrics

Metrics returns the metrics instance for this server

func (*Server) MetricsStore added in v0.0.10

func (s *Server) MetricsStore() *metrics.Store

MetricsStore returns the metrics store for this server

func (*Server) NewProxy added in v0.0.6

func (s *Server) NewProxy(conn net.Conn) *Proxy

NewProxy creates a new proxy associated with this server

func (*Server) RegisterProxy added in v0.0.6

func (s *Server) RegisterProxy(proxy *Proxy, cancel context.CancelFunc)

RegisterProxy adds a proxy and its cancel function to the server's active proxy list

func (*Server) ShutdownProxy added in v0.0.7

func (s *Server) ShutdownProxy(proxyID string, shutdownReason string) bool

ShutdownProxy gracefully shuts down a specific proxy by proxy ID

func (*Server) UnregisterProxy added in v0.0.6

func (s *Server) UnregisterProxy(proxy *Proxy)

UnregisterProxy removes a proxy from the server's active proxy list and adds its probe log buffer to LRU cache for retention

func (*Server) UpdateConfig added in v0.0.6

func (s *Server) UpdateConfig(config *config.Config)

UpdateConfig updates the config and hash atomically

type ServerLogHandler added in v0.0.15

type ServerLogHandler struct {
	// contains filtered or unexported fields
}

ServerLogHandler is a custom slog.Handler that sends logs to the log buffer

func NewServerLogHandler added in v0.0.15

func NewServerLogHandler(logBuffer *LogBuffer, next slog.Handler) *ServerLogHandler

NewServerLogHandler creates a new handler that sends logs to the buffer

func (*ServerLogHandler) Enabled added in v0.0.15

func (h *ServerLogHandler) Enabled(ctx context.Context, level slog.Level) bool

Enabled reports whether the handler handles records at the given level

func (*ServerLogHandler) Handle added in v0.0.15

func (h *ServerLogHandler) Handle(ctx context.Context, r slog.Record) error

Handle handles the Record

func (*ServerLogHandler) WithAttrs added in v0.0.15

func (h *ServerLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs returns a new Handler whose attributes consist of both the receiver's attributes and the arguments

func (*ServerLogHandler) WithGroup added in v0.0.15

func (h *ServerLogHandler) WithGroup(name string) slog.Handler

WithGroup returns a new Handler with the given group appended to the receiver's existing groups

type TestOptions added in v0.0.6

type TestOptions struct {
	MatchRouting struct {
		Pattern string `arg:"" required:"" help:"Binding pattern to test (e.g., 'logs.*.error', 'metrics.#')."`
		Key     string `arg:"" required:"" help:"Routing key to match against the pattern."`
	} `cmd:"" help:"Test routing pattern matching."`
}

type Upstream

type Upstream struct {
	// contains filtered or unexported fields
}

func NewUpstream

func NewUpstream(conn *rabbitmq.Connection, logger *slog.Logger, conf config.Upstream, address string, metrics *metrics.Metrics, probeLogFunc func(string, ...any)) *Upstream

func (*Upstream) CleanupAutoGeneratedQueue

func (u *Upstream) CleanupAutoGeneratedQueue(name string)

func (*Upstream) Close

func (u *Upstream) Close() error

func (*Upstream) CloseChannel

func (u *Upstream) CloseChannel(id uint16) error

func (*Upstream) GetChannel

func (u *Upstream) GetChannel(id uint16) (*rabbitmq.Channel, error)

func (*Upstream) NewChannel

func (u *Upstream) NewChannel(id uint16) (*rabbitmq.Channel, error)

func (*Upstream) NotifyClose added in v0.0.6

func (u *Upstream) NotifyClose() <-chan *rabbitmq.Error

NotifyClose returns a channel that will be closed when the upstream connection is closed

func (*Upstream) QueueDeclareArgs

func (u *Upstream) QueueDeclareArgs(m *amqp091.QueueDeclare) (name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args rabbitmq.Table)

QueueDeclareArgs generates arguments for rabbitmq.QueueDeclare(name, durable, autoDelete, exclusive, noWait bool, args Table) noWait is always false because we need to wait for the response from multiple upstreams

func (*Upstream) QueueDeclareWithTryPassive added in v0.0.14

func (u *Upstream) QueueDeclareWithTryPassive(ch *rabbitmq.Channel, m *amqp091.QueueDeclare) (rabbitmq.Queue, error)

QueueDeclareWithTryPassive attempts a passive declare first, then falls back to normal declare if queue doesn't exist

func (*Upstream) RegisterEmulatedAutoDeleteQueue added in v0.0.14

func (u *Upstream) RegisterEmulatedAutoDeleteQueue(queueName string)

RegisterEmulatedAutoDeleteQueue registers a queue for deletion when connection closes This emulates auto_delete behavior for durable queues

func (*Upstream) String

func (u *Upstream) String() string

Directories

Path Synopsis
cmd
trabbits command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL