wshub

package module
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2026 License: MIT Imports: 16 Imported by: 1

README

wshub

Go Reference Go Report Card Go Version CI GitHub tag codecov

A production-ready, scalable WebSocket package for Go with support for rooms, broadcasting, multi-node clustering, middleware, hooks, and extensibility.

Documentation | API Reference

Features

  • Production-Ready: Proper concurrency, graceful shutdown & drain, error handling
  • Horizontally Scalable: Multi-node support via adapter pattern (Redis, NATS, or custom)
  • Pluggable: Bring your own logger, metrics
  • Middleware System: Chain handlers with custom logic
  • Lifecycle Hooks: Hook into connection, message, room, and backpressure events
  • Room Support: Group clients into rooms for targeted broadcasting
  • Metrics & Logging: Built-in interfaces for observability; official Prometheus subpackage (wshub/prometheus)
  • Configurable: Extensive configuration with builder pattern
  • Limits & Rate Limiting: Control connections, rooms, and message rates
  • Backpressure Control: Configurable drop policies with notification hooks
  • Write Coalescing: Opt-in batching of text messages into single frames for reduced syscalls
  • Health Probes: Built-in /healthz and /readyz handlers with JSON responses for Kubernetes
  • Global Counts: Cluster-wide client and room counts via presence gossip
  • Zero Business Logic: Pure infrastructure, bring your own logic

Performance Highlights

Hot-path operations are zero-allocation; the dispatch loop iterates a lock-free snapshot. The numbers below are in-process dispatch overhead measured with mock clients — they show how fast the hub iterates its registry and pushes to client channels, not end-to-end delivery latency over real WebSocket connections. For end-to-end numbers see Real-world load tests.

Operation Scale Time Allocs
SendToClient 1,000,000 clients 130 ns 0
SendToUser 1,000,000 users 192 ns 1
GetClient 1,000 clients 17.7 ns 0
GlobalClientCount 500 nodes 4.2 μs 0
Middleware chain (built) 3 middlewares 14.3 ns 0
Broadcast dispatch 1,000,000 clients 263 ms 0

The Broadcast row measures how long the hub takes to enqueue a message to 1M client channels — actual delivery to remote clients is bounded by TCP, writePump throughput, and the Go scheduler. See full benchmarks for detail.

Installation

go get github.com/KARTIKrocks/wshub

Quick Start

package main

import (
    "context"
    "log"
    "net/http"
    "time"

    "github.com/KARTIKrocks/wshub"
)

func main() {
    // Create hub with configuration
    config := wshub.DefaultConfig().
        WithMaxMessageSize(1024 * 1024).
        WithCompression(true)

    hub := wshub.NewHub(
        wshub.WithConfig(config),
        wshub.WithMessageHandler(func(client *wshub.Client, msg *wshub.Message) error {
            log.Printf("Message from %s: %s", client.ID, msg.Text())
            return client.Send(msg.Data)
        }),
    )

    // Start the hub
    go hub.Run()

    // Set up HTTP handler
    http.HandleFunc("/ws", hub.HandleHTTP())

    log.Println("Server starting on :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        log.Fatal(err)
    }

    // Graceful drain + shutdown
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    hub.Drain(ctx)    // stop new connections, wait for existing ones
    hub.Shutdown(ctx) // force-close anything remaining
}

Configuration

Basic Configuration
config := wshub.DefaultConfig()

// Or customize
config := wshub.Config{
    ReadBufferSize:    4096,
    WriteBufferSize:   4096,
    WriteWait:         10 * time.Second,
    PongWait:          60 * time.Second,
    PingPeriod:        54 * time.Second,
    MaxMessageSize:    1024 * 1024,
    SendChannelSize:   512,
    EnableCompression: true,
    CheckOrigin:       wshub.AllowAllOrigins,
}
Builder Pattern
config := wshub.DefaultConfig().
    WithBufferSizes(4096, 4096).
    WithMaxMessageSize(1024 * 1024).
    WithCompression(true).
    WithCheckOrigin(wshub.AllowOrigins("https://example.com"))
Origin Checking
// Allow all origins (default)
config.CheckOrigin = wshub.AllowAllOrigins

// Allow same origin only
config.CheckOrigin = wshub.AllowSameOrigin

// Allow specific origins
config.CheckOrigin = wshub.AllowOrigins("https://example.com", "https://app.example.com")

// Custom checker
config.CheckOrigin = func(r *http.Request) bool {
    return strings.HasSuffix(r.Header.Get("Origin"), ".example.com")
}

Hub API

Client Management
// Get all clients
clients := hub.Clients()
count := hub.ClientCount()

// Find client
client, ok := hub.GetClient(clientID)
client, ok := hub.GetClientByUserID(userID)
clients := hub.GetClientsByUserID(userID)
Broadcasting
// Broadcast to all
hub.Broadcast([]byte("Hello everyone"))
hub.BroadcastText("Hello everyone")
hub.BroadcastJSON(map[string]string{"message": "Hello"})

// Broadcast pre-encoded JSON (zero-alloc, ideal for fan-out)
data, _ := json.Marshal(map[string]string{"message": "Hello"})
hub.BroadcastRawJSON(data)

// Broadcast with context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
hub.BroadcastWithContext(ctx, data)

// Broadcast except one client
hub.BroadcastExcept(data, excludeClient)

// Send to specific client
hub.SendToClient(clientID, data)

// Send to all connections of a user
hub.SendToUser(userID, data)
Rooms
// Join/leave rooms
hub.JoinRoom(client, "general")
hub.LeaveRoom(client, "general")
hub.LeaveAllRooms(client)

// Broadcast to room
hub.BroadcastToRoom("general", data)
hub.BroadcastToRoomExcept("general", data, exceptClient)

// Room info
clients := hub.RoomClients("general")
count := hub.RoomCount("general")
rooms := hub.RoomNames()
exists := hub.RoomExists("general")

Client API

// Client properties
client.ID       // Unique client ID

// Set user ID
client.SetUserID("user-123")
userID := client.GetUserID()

// Metadata
client.SetMetadata("role", "admin")
role, ok := client.GetMetadata("role")
client.DeleteMetadata("role")

// Send messages
client.Send([]byte("Hello"))
client.SendText("Hello")
client.SendJSON(map[string]string{"message": "Hello"})
client.SendRawJSON(preEncodedJSON) // skip marshaling
client.SendBinary(data)
client.SendWithContext(ctx, data)

// Close connection
client.Close()
client.CloseWithCode(websocket.CloseNormalClosure, "Goodbye")

// Room membership
rooms := client.Rooms()
inRoom := client.InRoom("general")
count := client.RoomCount()

// Status
closed := client.IsClosed()
closedAt := client.ClosedAt()

// Client-specific handlers
client.OnMessage(func(c *wshub.Client, msg *wshub.Message) {
    // Handle message
})

client.OnClose(func(c *wshub.Client) {
    // Handle close
})

client.OnError(func(c *wshub.Client, err error) {
    // Handle error
})

Hooks System

hub := wshub.NewHub(
    wshub.WithHooks(wshub.Hooks{
        // Before connection upgrade
        BeforeConnect: func(r *http.Request) error {
            token := r.Header.Get("Authorization")
            if !validateToken(token) {
                return wshub.ErrAuthenticationFailed
            }
            return nil
        },

        // After successful connection
        AfterConnect: func(client *wshub.Client) {
            log.Printf("Client connected: %s", client.ID)
        },

        // Before message processing
        BeforeMessage: func(client *wshub.Client, msg *wshub.Message) (*wshub.Message, error) {
            if len(msg.Data) > 1000 {
                return nil, errors.New("message too large")
            }
            return msg, nil
        },

        // After message processing
        AfterMessage: func(client *wshub.Client, msg *wshub.Message, err error) {
            if err != nil {
                log.Printf("Message error: %v", err)
            }
        },

        // Before room join
        BeforeRoomJoin: func(client *wshub.Client, room string) error {
            if !canJoinRoom(client, room) {
                return wshub.ErrUnauthorized
            }
            return nil
        },

        // After room join
        AfterRoomJoin: func(client *wshub.Client, room string) {
            hub.BroadcastToRoomExcept(room,
                []byte(fmt.Sprintf("%s joined", client.ID)),
                client,
            )
        },

        // On error
        OnError: func(client *wshub.Client, err error) {
            log.Printf("Client error: %v", err)
        },
    }),
)

Middleware System

// Create middleware chain
chain := wshub.NewMiddlewareChain(handleMessage).
    Use(wshub.RecoveryMiddleware(logger)).
    Use(wshub.LoggingMiddleware(logger)).
    Use(wshub.MetricsMiddleware(metrics)).
    Build()

// Use in message handler
hub := wshub.NewHub(
    wshub.WithMessageHandler(chain.Execute),
)
Built-in Middlewares
// Logging
wshub.LoggingMiddleware(logger)

// Panic recovery
wshub.RecoveryMiddleware(logger)

// Metrics
wshub.MetricsMiddleware(metrics)
Custom Middleware
func RateLimitMiddleware(limiter RateLimiter) wshub.Middleware {
    return func(next wshub.HandlerFunc) wshub.HandlerFunc {
        return func(client *wshub.Client, msg *wshub.Message) error {
            if !limiter.Allow(client.ID) {
                return wshub.ErrRateLimitExceeded
            }
            return next(client, msg)
        }
    }
}

func AuthMiddleware(auth AuthService) wshub.Middleware {
    return func(next wshub.HandlerFunc) wshub.HandlerFunc {
        return func(client *wshub.Client, msg *wshub.Message) error {
            if client.GetUserID() == "" {
                return wshub.ErrUnauthorized
            }
            return next(client, msg)
        }
    }
}

Logging

// Implement the Logger interface
type ZapLogger struct {
    logger *zap.Logger
}

func (l *ZapLogger) Debug(msg string, args ...any) {
    l.logger.Sugar().Debugw(msg, args...)
}

func (l *ZapLogger) Info(msg string, args ...any) {
    l.logger.Sugar().Infow(msg, args...)
}

func (l *ZapLogger) Warn(msg string, args ...any) {
    l.logger.Sugar().Warnw(msg, args...)
}

func (l *ZapLogger) Error(msg string, args ...any) {
    l.logger.Sugar().Errorw(msg, args...)
}

// Use it
hub := wshub.NewHub(wshub.WithLogger(&ZapLogger{logger}))

Metrics

Use the official Prometheus subpackage for production metrics:

import (
    wshubprom "github.com/KARTIKrocks/wshub/prometheus"
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promhttp"
)

reg := prometheus.NewRegistry()
collector := wshubprom.New(wshubprom.WithRegistry(reg))
hub := wshub.NewHub(wshub.WithMetrics(collector))
go hub.Run()

http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))

Or implement the MetricsCollector interface yourself (e.g. for StatsD):

type MyMetrics struct{}

func (m *MyMetrics) IncrementConnections()                          {}
func (m *MyMetrics) DecrementConnections()                          {}
func (m *MyMetrics) IncrementMessagesReceived()                     {}
func (m *MyMetrics) IncrementMessagesSent(count int)                {}
func (m *MyMetrics) IncrementMessagesDropped()                      {}
func (m *MyMetrics) RecordMessageSize(size int)                     {}
func (m *MyMetrics) RecordLatency(d time.Duration)                  {}
func (m *MyMetrics) RecordBroadcastDuration(d time.Duration)        {}
func (m *MyMetrics) IncrementErrors(errorType string)               {}
func (m *MyMetrics) IncrementRoomJoins()                            {}
func (m *MyMetrics) IncrementRoomLeaves()                           {}
func (m *MyMetrics) IncrementRooms()                                {}
func (m *MyMetrics) DecrementRooms()                                {}

hub := wshub.NewHub(wshub.WithMetrics(&MyMetrics{}))

For development, use DebugMetrics for an in-memory snapshot:

metrics := wshub.NewDebugMetrics()
hub := wshub.NewHub(wshub.WithMetrics(metrics))

// Later
stats := metrics.Stats()
fmt.Println(stats.ActiveConnections, stats.TotalMessagesRecv, stats.AvgBroadcast)

Limits

limits := wshub.DefaultLimits().
    WithMaxConnections(10000).
    WithMaxConnectionsPerUser(5).
    WithMaxRoomsPerClient(10).
    WithMaxClientsPerRoom(100).
    WithMaxMessageRate(100)

hub := wshub.NewHub(wshub.WithLimits(limits))

Multi-Node Scaling

Scale horizontally by connecting multiple hub instances through a shared message bus. All broadcasts and targeted sends are automatically relayed across nodes.

import wshubredis "github.com/KARTIKrocks/wshub/adapter/redis"

rdb := goredis.NewClient(&goredis.Options{Addr: "localhost:6379"})
adapter := wshubredis.New(rdb)

hub := wshub.NewHub(
    wshub.WithAdapter(adapter),
    wshub.WithNodeID("pod-web-1"), // optional: stable ID for debugging
)
go hub.Run()
Available Adapters
Adapter Install Best For
Redis go get github.com/KARTIKrocks/wshub/adapter/redis Most deployments, easy setup
NATS go get github.com/KARTIKrocks/wshub/adapter/nats Low-latency, high-throughput
Custom Implement wshub.Adapter interface Any message bus

Adapters are separate Go modules -- importing the core wshub package never pulls in Redis or NATS dependencies.

What Gets Relayed Across Nodes
Operation Cross-Node
Broadcast, BroadcastBinary, BroadcastText, BroadcastJSON, BroadcastRawJSON Yes
BroadcastExcept Yes
BroadcastToRoom, BroadcastToRoomExcept Yes
SendToUser Yes
SendToClient Yes
JoinRoom, LeaveRoom No (local per hub)
GetClient, ClientCount No (local per hub)
Global Counts (Presence)

Enable presence gossip to get cluster-wide totals:

hub := wshub.NewHub(
    wshub.WithAdapter(adapter),
    wshub.WithPresence(5 * time.Second), // publish stats every 5s
)

hub.GlobalClientCount()          // total across all nodes
hub.GlobalRoomCount("general")   // room members across all nodes

Nodes that miss 3 consecutive heartbeats are automatically evicted from the totals.

Graceful Draining

For zero-downtime rolling deploys (e.g. Kubernetes), call Drain before Shutdown. Drain stops accepting new connections (HTTP 503) while letting existing connections finish their in-flight messages. Idle connections are proactively closed after the drain timeout.

// preStop / SIGTERM handler
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
hub.Drain(ctx)    // stop new connections, wait for existing ones
hub.Shutdown(ctx) // force-close anything remaining
Configuration
hub := wshub.NewHub(
    // Configure idle connection reaper timeout (default: 30s).
    // Connections idle for this duration during drain are closed with CloseGoingAway.
    // Set to 0 to disable the reaper entirely.
    wshub.WithDrainTimeout(15 * time.Second),
)
Health & Readiness Probes
// Drop-in HTTP handlers — respond with JSON and correct status codes
http.Handle("/healthz", hub.HealthHandler()) // 200 while Run() is alive, else 503
http.Handle("/readyz", hub.ReadyHandler())   // 200 while running, 503 when draining/stopped

// Programmatic access
hs := hub.Health()  // HealthStatus{Alive, Ready, State, Uptime, Clients}
hub.Alive()         // true only while Run() goroutine is executing
hub.Ready()         // true when alive and in StateRunning
hub.Uptime()        // time.Duration since Run() started (0 if not started or exited)

Backpressure Control

When a client's send buffer is full, configure how messages are handled:

hub := wshub.NewHub(
    // DropNewest (default): discard the new message
    // DropOldest: evict the oldest queued message to make room
    wshub.WithDropPolicy(wshub.DropOldest),

    wshub.WithHooks(wshub.Hooks{
        OnSendDropped: func(client *wshub.Client, data []byte) {
            log.Printf("dropped %d bytes for client %s", len(data), client.ID)
            // Options: disconnect slow client, log, queue externally
            // client.Close()
        },
    }),
)
Policy Behavior Best For
DropNewest Discards the new message Default, safe
DropOldest Evicts oldest queued message Real-time data (dashboards, tickers, game state)

Write Coalescing

When throughput is high and messages queue up, enable write coalescing to batch multiple text messages into a single WebSocket frame separated by newlines (\n). This reduces syscalls at the cost of receivers needing to split frames:

cfg := wshub.DefaultConfig().WithCoalesceWrites(true)
hub := wshub.NewHub(wshub.WithConfig(cfg))
  • Only text messages are coalesced; binary messages are always sent as individual frames
  • Receivers must split coalesced frames on \n to recover individual messages
  • When disabled (default), every message is its own frame — no behavior change

Error Handling

err := hub.JoinRoom(client, room)
switch err {
case wshub.ErrClientNotFound:
    // Client not registered
case wshub.ErrAlreadyInRoom:
    // Client already in room
case wshub.ErrEmptyRoomName:
    // Empty room name
case wshub.ErrRoomNotFound:
    // Room doesn't exist
case wshub.ErrNotInRoom:
    // Client not in room
case wshub.ErrConnectionClosed:
    // Connection was closed
case wshub.ErrSendBufferFull:
    // Send buffer full
case wshub.ErrHubNotStarted:
    // Hub Run() has not been called yet
case wshub.ErrHubDraining:
    // Hub is draining, not accepting new connections
case wshub.ErrHubStopped:
    // Hub has been shut down
case wshub.ErrMaxConnectionsReached:
    // Connection limit reached
case wshub.ErrMaxRoomsReached:
    // Room limit per client reached
case wshub.ErrRoomFull:
    // Room is full
case wshub.ErrRateLimitExceeded:
    // Rate limit exceeded
case wshub.ErrAuthenticationFailed:
    // Authentication failed
case wshub.ErrUnauthorized:
    // Unauthorized action
}

Complete Example: Chat Application

See examples/chat/ for a complete chat application demonstrating:

  • Room management
  • Username tracking
  • Message broadcasting
  • Middleware (recovery + logging)
  • Rate limiting
  • Connection limits

Test Client

Save as index.html and open in a browser while the server is running:

<!DOCTYPE html>
<html>
  <head>
    <title>WebSocket Test</title>
  </head>
  <body>
    <h1>WebSocket Test</h1>
    <div>
      <input type="text" id="message" placeholder="Type a message" />
      <button onclick="send()">Send</button>
    </div>
    <div id="messages"></div>

    <script>
      const ws = new WebSocket("ws://localhost:8080/ws");

      ws.onopen = () => {
        console.log("Connected");
        addMessage("Connected to server");
      };

      ws.onmessage = (event) => {
        addMessage("Received: " + event.data);
      };

      ws.onclose = () => {
        addMessage("Disconnected");
      };

      ws.onerror = (error) => {
        console.error("WebSocket error:", error);
        addMessage("Error occurred");
      };

      function send() {
        const input = document.getElementById("message");
        ws.send(input.value);
        addMessage("Sent: " + input.value);
        input.value = "";
      }

      function addMessage(msg) {
        const div = document.getElementById("messages");
        div.innerHTML += "<p>" + msg + "</p>";
      }
    </script>
  </body>
</html>

Best Practices

  1. Always use middleware for cross-cutting concerns (logging, metrics, auth)
  2. Use hooks for lifecycle events instead of wrapping the hub
  3. Implement proper logging and metrics for production observability
  4. Set appropriate limits to prevent resource exhaustion
  5. Use Drain then Shutdown for zero-downtime deploys
  6. Handle errors appropriately - don't ignore send failures
  7. Use rooms for targeted messaging instead of filtering in handlers
  8. Set user ID after authentication for multi-device support
  9. Use metadata for request-scoped data instead of global state
  10. Test with concurrent clients to ensure thread safety

Performance Tips

  • Increase SendChannelSize for high-throughput scenarios
  • Enable CoalesceWrites to batch queued text frames into a single WebSocket write — reduces syscalls under sustained broadcast load
  • Enable compression for large messages
  • Use BroadcastWithContext for timeout control
  • Batch messages when possible
  • Monitor send buffer sizes via metrics
  • For per-node fanout above ~5K clients, prefer scaling horizontally (multi-node via the Redis or NATS adapter) over WithParallelBroadcast — see Real-world load tests

Benchmarks

Two kinds of numbers below:

  1. In-process dispatch (Go benchmarks with mock clients) — measures hub bookkeeping and channel push cost. Useful for spotting allocation regressions, not for predicting real throughput.
  2. End-to-end load tests (real httptest.Server + gorilla/websocket dialer) — measures what an actual deployment will see.

Measured on an Intel i5-11400H @ 2.70GHz (12 cores), Go 1.26, Linux.

Run them yourself:

go test -bench=. -benchmem ./...      # in-process micro-benchmarks
make loadtest LOADTEST_ARGS="..."     # end-to-end load tests
In-process dispatch (mock clients)

These measure how fast the hub iterates its snapshot and pushes to client channels. They do not include TCP, writePump, or remote-client work.

Broadcast dispatch (zero allocations)
Operation Clients Time Allocs
Broadcast 100,000 22.0 ms 0
Broadcast 1,000,000 263 ms 0
BroadcastToRoom 100,000 23.2 ms 0
BroadcastToRoom 1,000,000 260 ms 0
BroadcastExcept 100,000 25.9 ms 1
BroadcastExcept 1,000,000 294 ms 1
BroadcastToRoomExcept 100,000 26.0 ms 1
BroadcastToRoomExcept 1,000,000 277 ms 1
Targeted Send (O(1) at any scale, zero allocations)
Operation Scale Time Allocs
SendToClient 100,000 clients 129 ns 0
SendToClient 1,000,000 clients 130 ns 0
SendToUser 100,000 users 198 ns 1
SendToUser 1,000,000 users 192 ns 1
Global Counts — Presence (zero allocations)
Operation Nodes Time Allocs
GlobalClientCount 5 63 ns 0
GlobalClientCount 50 397 ns 0
GlobalClientCount 100 715 ns 0
GlobalClientCount 500 4.2 μs 0
GlobalRoomCount 5 118 ns 0
GlobalRoomCount 50 823 ns 0
GlobalRoomCount 100 1.7 μs 0
GlobalRoomCount 500 9.7 μs 0
Client & Room Lookups (zero allocations)
Operation Time Allocs
GetClient (1,000 clients) 17.7 ns 0
ClientCount 0.28 ns 0
GetClientByUserID 51.3 ns 0
RoomExists 23.6 ns 0
RoomCount 22.1 ns 0
GetMetadata 17.0 ns 0
SetMetadata 30.6 ns 0
Client Send
Operation Time Allocs
Send (text) 82.9 ns 1
SendJSON 495 ns 5
Middleware Chain
Mode Time Allocs
Built (cached) 14.3 ns 0
Unbuilt (on-the-fly) 17.0 ns 0
Real-world load tests

End-to-end timings using real WebSocket connections via httptest.Server and gorilla/websocket.Dialer. Latency is measured by embedding a unix-nano timestamp in the payload and computing now - sent on receive. Reproduce with make loadtest.

Connect — handshake throughput
Clients Connect time Rate Mem/conn
1,000 122 ms 8,205 conn/s 24.4 KB
5,000 371 ms 13,486 conn/s 20.5 KB
10,000 485 ms 20,609 conn/s 24.4 KB
Fanout — single broadcaster, 100 msg/s for 10s, 128 B payload
Clients Throughput p50 p95 p99
1,000 100,000 msg/s 2.53 ms 4.83 ms 6.68 ms
5,000 497,000 msg/s 44.04 ms 396.9 ms 632.6 ms
10,000 397,284 msg/s 3.22 s 6.03 s 6.33 s

Past ~5K clients on a single node, fanout latency grows steeply — the bottleneck is Go scheduler pressure across 3 × clients goroutines (readPump + writePump

  • handshake server), not the hub's dispatch loop. For higher per-node fanout, tune SendChannelSize, enable CoalesceWrites, or scale horizontally.
Rooms — broadcast scoped to a room (100 msg/s, 10s)
Clients Rooms Per-room p50 p99
5,000 100 11.01 ms 15.19 ms
10,000 100 29.15 ms 36.05 ms
Echo — per-connection round-trip (5,000 clients, 10s)
RTT/sec p50 p95 p99
228,380 19.93 ms 35.35 ms 72.52 ms

Note on WithParallelBroadcast: in real load tests, parallel dispatch is consistently slower than the default serial path because the per-call cost of trySend (RLock + defer/recover) dominates and parallel batching can't overcome it. The option remains for backward compatibility but is no longer recommended — use the default serial broadcast.

Always call Build() on your middleware chain for best performance.

Concurrent Access (parallel goroutines)
Operation Time Allocs
GetClient 31.0 ns 0
ClientCount 0.23 ns 0
Metadata (set+get) 76.5 ns 0
Broadcast (100 clients) 5.9 μs 120
Message Creation
Operation Time Allocs
NewMessage 30.5 ns 0
NewTextMessage 32.0 ns 0
NewBinaryMessage 30.2 ns 0
NewJSONMessage 820 ns 9
NewRawJSONMessage 30.9 ns 0

Thread Safety

All Hub and Client methods are thread-safe. The package uses:

  • RWMutex for client/room maps
  • Separate mutexes for callbacks
  • Channels for cross-goroutine communication
  • WaitGroups for graceful shutdown

License

MIT

Contributing

Contributions welcome! Please read CONTRIBUTING.md for guidelines.

Documentation

Overview

Package wshub provides production-ready WebSocket connection management with support for rooms, broadcasting, middleware, hooks, and extensibility.

The central type is Hub, which manages all connected clients and provides broadcasting, room management, and lifecycle hooks. Create one with NewHub and configure it using functional options:

hub := wshub.NewHub(
    wshub.WithConfig(wshub.DefaultConfig().WithCompression(true)),
    wshub.WithLogger(myLogger),
    wshub.WithLimits(wshub.DefaultLimits().WithMaxConnections(10000)),
    wshub.WithHooks(wshub.Hooks{
        AfterConnect: func(c *wshub.Client) { /* ... */ },
    }),
    wshub.WithMessageHandler(handler),
)
go hub.Run()

Connection Lifecycle

Upgrade HTTP connections with Hub.UpgradeConnection or the convenience Hub.HandleHTTP handler. Each connection spawns a read pump and write pump goroutine managed by the hub. Use Hub.Shutdown for graceful teardown.

Graceful Draining

For zero-downtime rolling deploys (e.g. Kubernetes), call Hub.Drain before Hub.Shutdown. Drain stops accepting new connections (returning HTTP 503) while letting existing connections finish their in-flight messages. Idle connections are proactively closed after the configured drain timeout (see WithDrainTimeout). Inspect the hub's lifecycle state with Hub.State, Hub.IsRunning, and Hub.IsDraining to implement health and readiness probes:

// preStop / SIGTERM handler
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
hub.Drain(ctx)    // stop new connections, wait for existing ones
hub.Shutdown(ctx) // force-close anything remaining

Health Probes

Use Hub.HealthHandler and Hub.ReadyHandler as HTTP handlers for liveness and readiness probes. For programmatic access, Hub.Health returns a HealthStatus snapshot, and Hub.Alive / Hub.Ready provide simple boolean checks:

http.Handle("/healthz", hub.HealthHandler())
http.Handle("/readyz", hub.ReadyHandler())

Rooms

Clients can join and leave named rooms via Hub.JoinRoom and Hub.LeaveRoom. Broadcast to a room with Hub.BroadcastToRoom. Rooms are created lazily and removed automatically when the last client leaves.

Middleware

Use MiddlewareChain to compose message-processing pipelines. Built-in middlewares include LoggingMiddleware, RecoveryMiddleware, and MetricsMiddleware. Call MiddlewareChain.Build to pre-compute the chain for better performance.

Message Routing

Router dispatches messages to per-event handlers based on an extractor function, decoupling routing from any specific message format.

Multi-Node Scaling

Scale horizontally by setting an Adapter via WithAdapter. All broadcast and targeted send methods automatically relay messages to other nodes through the adapter's message bus. The core package ships no adapter implementations — import a subpackage to avoid pulling in unwanted dependencies:

  • github.com/KARTIKrocks/wshub/adapter/redis — Redis Pub/Sub
  • github.com/KARTIKrocks/wshub/adapter/nats — NATS core Pub/Sub

Implement the Adapter interface to integrate any other message bus.

Enable WithPresence to exchange periodic heartbeats between nodes. Hub.GlobalClientCount and Hub.GlobalRoomCount then return cluster-wide totals. Nodes that miss three consecutive heartbeats are automatically evicted from the totals.

Backpressure

When a client's send buffer is full, the hub applies the configured DropPolicy (set via WithDropPolicy):

  • DropNewest (default) discards the new message.
  • DropOldest evicts the oldest queued message to make room.

In both cases the Hooks.OnSendDropped callback fires so the application can log, disconnect slow clients, or take other corrective action.

Write Coalescing

When Config.CoalesceWrites is true, the write pump batches queued text messages into a single WebSocket frame separated by newline bytes (\n). This reduces the number of syscalls under high throughput. Binary messages are always sent as individual frames. Receivers must split coalesced frames on \n to recover individual messages.

cfg := wshub.DefaultConfig().WithCoalesceWrites(true)
hub := wshub.NewHub(wshub.WithConfig(cfg))

Index

Examples

Constants

View Source
const (
	AdapterBroadcast       = "broadcast"
	AdapterBroadcastExcept = "broadcast_except"
	AdapterRoom            = "room"
	AdapterRoomExcept      = "room_except"
	AdapterUser            = "user"
	AdapterClient          = "client"
	AdapterPresence        = "presence"
)

Adapter message type constants identify the operation being relayed.

Variables

View Source
var (
	// Connection errors
	ErrConnectionClosed = errors.New("wshub: connection closed")
	ErrInvalidMessage   = errors.New("wshub: invalid message")
	ErrSendBufferFull   = errors.New("wshub: send buffer full")

	// Client errors
	ErrClientNotFound = errors.New("wshub: client not found")

	// Room errors
	ErrEmptyRoomName = errors.New("wshub: empty room name")
	ErrRoomNotFound  = errors.New("wshub: room not found")
	ErrAlreadyInRoom = errors.New("wshub: client already in room")
	ErrNotInRoom     = errors.New("wshub: client not in room")
	ErrRoomFull      = errors.New("wshub: room is full")

	// Limit errors
	ErrMaxConnectionsReached = errors.New("wshub: max connections reached")
	ErrMaxRoomsReached       = errors.New("wshub: max rooms per client reached")
	// ErrRateLimitExceeded is provided for use in application hooks and
	// handlers. The library's internal rate limiter drops messages silently
	// without returning this error.
	ErrRateLimitExceeded         = errors.New("wshub: rate limit exceeded")
	ErrMaxUserConnectionsReached = errors.New("wshub: max connections per user reached")

	// Hub state errors
	ErrHubNotStarted = errors.New("wshub: hub not started")
	ErrHubDraining   = errors.New("wshub: hub is draining")
	ErrHubStopped    = errors.New("wshub: hub is stopped")

	// Authentication errors — provided for use in BeforeConnect hooks and
	// application-level handlers. The library does not return these directly.
	ErrAuthenticationFailed = errors.New("wshub: authentication failed")
	ErrUnauthorized         = errors.New("wshub: unauthorized")
)

Sentinel errors for WebSocket operations.

Functions

func AllowAllOrigins

func AllowAllOrigins(r *http.Request) bool

AllowAllOrigins is a CheckOrigin function that allows all origins.

func AllowOrigins

func AllowOrigins(origins ...string) func(r *http.Request) bool

AllowOrigins returns a CheckOrigin function that allows specific origins. Requests without an Origin header are allowed (see AllowSameOrigin for rationale).

func AllowSameOrigin

func AllowSameOrigin(r *http.Request) bool

AllowSameOrigin is a CheckOrigin function that only allows same-origin requests. It parses the Origin header as a URL and compares the host (including port) against the request's Host header, handling mismatched ports correctly.

Requests without an Origin header are allowed because non-browser clients (mobile apps, CLI tools) typically omit it. If your threat model requires rejecting originless requests, use a custom CheckOrigin function.

Types

type Adapter added in v1.1.0

type Adapter interface {
	// Publish sends a message to all other nodes.
	Publish(ctx context.Context, msg AdapterMessage) error

	// Subscribe begins receiving messages from other nodes. The provided
	// handler is invoked for every message received. Subscribe must not
	// block; it should spawn its own goroutine(s) internally.
	// The context controls the subscription lifetime — cancelling it
	// stops the subscriber.
	Subscribe(ctx context.Context, handler func(AdapterMessage)) error

	// Close shuts down the adapter, releasing all resources.
	Close() error
}

Adapter enables multi-node Hub communication through a shared message bus. When set via WithAdapter, every public broadcast and send method publishes to the adapter after delivering locally, so that other nodes can relay the message to their own clients.

Thread safety requirements:

  • Publish must be safe for concurrent calls from multiple goroutines.
  • Subscribe is called once by Hub.Run and must not be called concurrently.
  • Publish and Subscribe may be called concurrently with each other.
  • Close may be called concurrently with Publish; implementations should handle this gracefully (e.g., return an error after close).

type AdapterMessage added in v1.1.0

type AdapterMessage struct {
	// NodeID identifies the originating hub node (used for deduplication).
	NodeID string `json:"node_id"`

	// Type identifies the operation. Use the Adapter* constants.
	Type string `json:"type"`

	// Room is the target room name (for room-scoped operations).
	Room string `json:"room,omitempty"`

	// UserID is the target user (for SendToUser).
	UserID string `json:"user_id,omitempty"`

	// ClientID is the target client (for SendToClient).
	ClientID string `json:"client_id,omitempty"`

	// ExceptClientIDs lists client IDs to exclude from delivery.
	ExceptClientIDs []string `json:"except,omitempty"`

	// MsgType is the WebSocket message type (TextMessage or BinaryMessage).
	MsgType int `json:"msg_type"`

	// Data is the raw message payload.
	Data []byte `json:"data"`
}

AdapterMessage is the wire format for inter-node messages.

type Client

type Client struct {
	// ID is the unique identifier for this client.
	ID string
	// contains filtered or unexported fields
}

Client represents a WebSocket client connection.

func (*Client) Close

func (c *Client) Close() error

Close closes the client connection.

func (*Client) CloseWithCode

func (c *Client) CloseWithCode(code int, reason string) error

CloseWithCode closes the client connection with a specific close code and reason. It uses mu+closed rather than sync.Once because it needs to atomically set closeCode/closeReason and the closed flag before closing the send channel.

func (*Client) ClosedAt

func (c *Client) ClosedAt() time.Time

ClosedAt returns the time when the connection was closed.

func (*Client) ConnectedAt

func (c *Client) ConnectedAt() time.Time

ConnectedAt returns the time when this client connected.

func (*Client) DeleteMetadata

func (c *Client) DeleteMetadata(key string)

DeleteMetadata removes a metadata value.

func (*Client) GetMetadata

func (c *Client) GetMetadata(key string) (any, bool)

GetMetadata returns a metadata value.

func (*Client) GetUserID

func (c *Client) GetUserID() string

GetUserID returns the user ID.

func (*Client) InRoom

func (c *Client) InRoom(room string) bool

InRoom checks if the client is in a specific room.

func (*Client) IsClosed

func (c *Client) IsClosed() bool

IsClosed returns true if the connection is closed.

func (*Client) OnClose

func (c *Client) OnClose(fn func(*Client))

OnClose sets the close handler for this client.

func (*Client) OnError

func (c *Client) OnError(fn func(*Client, error))

OnError sets the error handler for this client.

func (*Client) OnMessage

func (c *Client) OnMessage(fn func(*Client, *Message))

OnMessage sets the message handler for this client.

func (*Client) Request

func (c *Client) Request() *http.Request

Request returns the HTTP request that initiated this WebSocket connection. Use it to access headers, query params, remote address, and other request data.

func (*Client) RoomCount

func (c *Client) RoomCount() int

RoomCount returns the number of rooms the client is in.

func (*Client) Rooms

func (c *Client) Rooms() []string

Rooms returns the list of rooms the client is in.

func (*Client) Send

func (c *Client) Send(data []byte) error

Send sends a text message to the client.

func (*Client) SendBinary

func (c *Client) SendBinary(data []byte) error

SendBinary sends a binary message to the client.

func (*Client) SendJSON

func (c *Client) SendJSON(v any) error

SendJSON sends a JSON-encoded message to the client.

func (*Client) SendMessage

func (c *Client) SendMessage(msgType MessageType, data []byte) error

SendMessage sends a message with the specified type. The behavior when the send buffer is full depends on the hub's DropPolicy.

func (*Client) SendMessageWithContext added in v1.1.0

func (c *Client) SendMessageWithContext(ctx context.Context, msgType MessageType, data []byte) (err error)

SendMessageWithContext sends a message with the specified type and context support. It blocks until the message is enqueued or the context is cancelled.

Unlike SendMessage, this method does not apply the hub's DropPolicy. When the send buffer is full it waits for space rather than dropping messages, giving callers explicit control over the timeout via ctx.

func (*Client) SendRawJSON added in v1.1.3

func (c *Client) SendRawJSON(data []byte) error

SendRawJSON sends pre-encoded JSON data as a text message to the client. Use this instead of SendJSON when the JSON is already marshaled to avoid redundant serialization.

func (*Client) SendText

func (c *Client) SendText(text string) error

SendText sends a text message to the client.

func (*Client) SendWithContext

func (c *Client) SendWithContext(ctx context.Context, data []byte) error

SendWithContext sends a text message with context support. It blocks until the message is enqueued or the context is cancelled.

func (*Client) SetMetadata

func (c *Client) SetMetadata(key string, value any)

SetMetadata sets a metadata value.

func (*Client) SetUserID

func (c *Client) SetUserID(userID string) error

SetUserID sets the user ID for authenticated clients. Returns an error if MaxConnectionsPerUser limit would be exceeded.

type Config

type Config struct {
	// ReadBufferSize is the size of the read buffer (default: 1024).
	ReadBufferSize int

	// WriteBufferSize is the size of the write buffer (default: 1024).
	WriteBufferSize int

	// WriteWait is the time allowed to write a message (default: 10s).
	WriteWait time.Duration

	// PongWait is the time allowed to read the next pong message (default: 60s).
	PongWait time.Duration

	// PingPeriod is the period between pings (default: 54s, must be < PongWait).
	PingPeriod time.Duration

	// MaxMessageSize is the maximum message size allowed (default: 512KB).
	MaxMessageSize int64

	// SendChannelSize is the size of the send channel buffer (default: 256).
	SendChannelSize int

	// EnableCompression enables per-message compression (default: false).
	EnableCompression bool

	// CoalesceWrites batches queued text messages into a single WebSocket
	// frame separated by newline bytes (\n), reducing syscalls under high
	// throughput. Binary messages are always sent as individual frames.
	// Receivers must split coalesced frames on \n. Default: false.
	CoalesceWrites bool

	// CheckOrigin is a function to validate the request origin.
	CheckOrigin func(r *http.Request) bool

	// Subprotocols specifies the server's supported protocols.
	Subprotocols []string
}

Config holds configuration for WebSocket connections.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a default WebSocket configuration.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	config := wshub.DefaultConfig()
	fmt.Println("compression:", config.EnableCompression)

	custom := config.
		WithMaxMessageSize(1024).
		WithCompression(true)
	fmt.Println("custom compression:", custom.EnableCompression)
}
Output:
compression: false
custom compression: true

func (Config) WithBufferSizes

func (c Config) WithBufferSizes(read, write int) Config

WithBufferSizes returns a new config with the specified buffer sizes.

func (Config) WithCheckOrigin

func (c Config) WithCheckOrigin(fn func(r *http.Request) bool) Config

WithCheckOrigin returns a new config with a custom origin checker.

func (Config) WithCoalesceWrites added in v1.3.0

func (c Config) WithCoalesceWrites(enabled bool) Config

WithCoalesceWrites returns a new config with write coalescing enabled/disabled.

func (Config) WithCompression

func (c Config) WithCompression(enabled bool) Config

WithCompression returns a new config with compression enabled/disabled.

func (Config) WithMaxMessageSize

func (c Config) WithMaxMessageSize(size int64) Config

WithMaxMessageSize returns a new config with the specified max message size.

func (Config) WithSendChannelSize

func (c Config) WithSendChannelSize(size int) Config

WithSendChannelSize returns a new config with the specified send channel size.

func (Config) WithSubprotocols

func (c Config) WithSubprotocols(protocols ...string) Config

WithSubprotocols returns a new config with the specified subprotocols.

func (Config) WithTimeouts

func (c Config) WithTimeouts(writeWait, pongWait, pingPeriod time.Duration) Config

WithTimeouts returns a new config with the specified timeouts.

type DebugMetrics

type DebugMetrics struct {
	// contains filtered or unexported fields
}

DebugMetrics is a thread-safe in-memory MetricsCollector for development and testing. Use Stats() to read a snapshot or String() to print a summary.

Usage:

m := wshub.NewDebugMetrics()
hub := wshub.NewHub(wshub.WithMetrics(m))
...
fmt.Println(m)           // pretty-print summary
stats := m.Stats()       // programmatic access

func NewDebugMetrics

func NewDebugMetrics() *DebugMetrics

NewDebugMetrics creates a new DebugMetrics instance.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	metrics := wshub.NewDebugMetrics()
	metrics.IncrementConnections()
	metrics.IncrementMessagesReceived()
	metrics.RecordMessageSize(128)

	stats := metrics.Stats()
	fmt.Println("connections:", stats.ActiveConnections)
	fmt.Println("messages:", stats.TotalMessagesRecv)
}
Output:
connections: 1
messages: 1

func (*DebugMetrics) DecrementConnections

func (d *DebugMetrics) DecrementConnections()

func (*DebugMetrics) DecrementRooms added in v1.5.0

func (d *DebugMetrics) DecrementRooms()

func (*DebugMetrics) IncrementConnections

func (d *DebugMetrics) IncrementConnections()

func (*DebugMetrics) IncrementErrors

func (d *DebugMetrics) IncrementErrors(errorType string)

func (*DebugMetrics) IncrementMessagesDropped added in v1.5.0

func (d *DebugMetrics) IncrementMessagesDropped()

func (*DebugMetrics) IncrementMessagesReceived added in v1.5.0

func (d *DebugMetrics) IncrementMessagesReceived()

func (*DebugMetrics) IncrementMessagesSent added in v1.5.0

func (d *DebugMetrics) IncrementMessagesSent(count int)

func (*DebugMetrics) IncrementRoomJoins

func (d *DebugMetrics) IncrementRoomJoins()

func (*DebugMetrics) IncrementRoomLeaves

func (d *DebugMetrics) IncrementRoomLeaves()

func (*DebugMetrics) IncrementRooms added in v1.5.0

func (d *DebugMetrics) IncrementRooms()

func (*DebugMetrics) RecordBroadcastDuration added in v1.5.0

func (d *DebugMetrics) RecordBroadcastDuration(duration time.Duration)

func (*DebugMetrics) RecordLatency

func (d *DebugMetrics) RecordLatency(duration time.Duration)

func (*DebugMetrics) RecordMessageSize

func (d *DebugMetrics) RecordMessageSize(size int)

func (*DebugMetrics) Reset

func (d *DebugMetrics) Reset()

Reset zeroes all counters and resets the uptime clock.

func (*DebugMetrics) Stats

func (d *DebugMetrics) Stats() DebugStats

Stats returns a point-in-time snapshot of all metrics.

func (*DebugMetrics) String

func (d *DebugMetrics) String() string

String returns a human-readable summary of all metrics. Implements fmt.Stringer so it prints naturally with fmt.Println(m).

type DebugStats

type DebugStats struct {
	ActiveConnections int64
	TotalConnections  int64
	TotalMessagesRecv int64
	TotalMessagesSent int64
	TotalDropped      int64
	TotalMessageBytes int64
	TotalRoomJoins    int64
	TotalRoomLeaves   int64
	ActiveRooms       int64
	AvgLatency        time.Duration
	AvgBroadcast      time.Duration
	Errors            map[string]int64
	Uptime            time.Duration
}

DebugStats is a point-in-time snapshot returned by DebugMetrics.Stats().

type DropPolicy added in v1.1.0

type DropPolicy int

DropPolicy controls what happens when a client's send buffer is full.

const (
	// DropNewest drops the new message when the send buffer is full.
	// This is the default and matches the original behavior.
	DropNewest DropPolicy = iota

	// DropOldest evicts the oldest queued message to make room for the new one.
	// This ensures the client always receives the most recent data, at the cost
	// of losing older messages.
	DropOldest
)

type HandlerFunc

type HandlerFunc func(*Client, *Message) error

HandlerFunc is a function that handles WebSocket messages.

type HealthStatus added in v1.4.0

type HealthStatus struct {
	// Alive is true when the Run() goroutine is executing.
	Alive bool

	// Ready is true when the hub is alive and accepting new connections.
	Ready bool

	// State is the hub's lifecycle state as a human-readable string
	// ("running", "draining", or "stopped").
	State string

	// Uptime is how long the Run() goroutine has been executing.
	// Zero if Run() has not been called.
	Uptime time.Duration

	// Clients is the current number of connected clients.
	Clients int
}

HealthStatus is a point-in-time snapshot of hub health, suitable for liveness and readiness probes.

type Hooks

type Hooks struct {
	// BeforeConnect is called before upgrading the connection.
	// Return an error to reject the connection.
	BeforeConnect func(*http.Request) error

	// AfterConnect is called after a client successfully connects.
	// It runs in its own goroutine, so the client may receive messages
	// before this callback returns.
	AfterConnect func(*Client)

	// BeforeDisconnect is called before a client disconnects.
	// The hook runs in a goroutine with a timeout (default 5s, configurable
	// via WithHookTimeout). If the hook does not complete within the timeout
	// the hub proceeds with the disconnect; the goroutine is NOT cancelled
	// and will continue running until it returns. Keep this hook fast to
	// avoid leaking goroutines.
	BeforeDisconnect func(*Client)

	// AfterDisconnect is called after a client disconnects.
	AfterDisconnect func(*Client)

	// BeforeMessage is called before processing a message.
	// Can modify the message or return an error to reject it.
	BeforeMessage func(*Client, *Message) (*Message, error)

	// AfterMessage is called after processing a message.
	AfterMessage func(*Client, *Message, error)

	// OnError is called when an error occurs.
	OnError func(*Client, error)

	// OnSendDropped is called when a message is dropped because the client's
	// send buffer is full. The application can use this to decide whether to
	// disconnect the slow client, log the event, or queue the data externally.
	// The hook is called synchronously in the sender's goroutine — keep it
	// fast to avoid blocking broadcasts.
	OnSendDropped func(client *Client, data []byte)

	// BeforeRoomJoin is called before a client joins a room.
	// Return an error to prevent joining.
	BeforeRoomJoin func(*Client, string) error

	// AfterRoomJoin is called after a client joins a room.
	AfterRoomJoin func(*Client, string)

	// BeforeRoomLeave is called before a client leaves a room.
	BeforeRoomLeave func(*Client, string)

	// AfterRoomLeave is called after a client leaves a room.
	AfterRoomLeave func(*Client, string)
}

Hooks defines lifecycle callbacks for WebSocket operations.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub maintains the set of active clients and broadcasts messages.

Lock ordering (acquire in this order to prevent deadlocks):

mu (hub clients) → roomsMu → Room.mu → Client.mu → userIndexMu

Not all paths acquire every lock; the rule is that when multiple locks from this list are held simultaneously, the earlier one must be acquired first. Individual locks that are never held together (e.g. Client.mu and userIndexMu acquired sequentially, not nested) are safe regardless of order.

func NewHub

func NewHub(opts ...Option) *Hub

NewHub creates a new WebSocket hub.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub()
	fmt.Println("clients:", hub.ClientCount())
}
Output:
clients: 0
Example (WithOptions)
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub(
		wshub.WithConfig(wshub.DefaultConfig().WithMaxMessageSize(4096)),
		wshub.WithLimits(wshub.DefaultLimits().WithMaxConnections(1000)),
		wshub.WithLogger(&wshub.NoOpLogger{}),
		wshub.WithMetrics(wshub.NewDebugMetrics()),
	)
	fmt.Println("clients:", hub.ClientCount())
}
Output:
clients: 0

func (*Hub) Alive added in v1.4.0

func (h *Hub) Alive() bool

Alive reports whether the Hub.Run goroutine is currently executing. Returns false before Run is called or after it exits. The read is a single atomic load — safe for concurrent use on hot paths.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub()
	fmt.Println("alive before Run:", hub.Alive())
}
Output:
alive before Run: false

func (*Hub) Broadcast

func (h *Hub) Broadcast(data []byte)

Broadcast sends a text message to all connected clients. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) BroadcastBinary

func (h *Hub) BroadcastBinary(data []byte)

BroadcastBinary sends a binary message to all connected clients. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) BroadcastBinaryExcept added in v1.1.0

func (h *Hub) BroadcastBinaryExcept(data []byte, except ...*Client)

BroadcastBinaryExcept sends a binary message to all clients except those specified. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) BroadcastBinaryToRoom added in v1.1.0

func (h *Hub) BroadcastBinaryToRoom(roomName string, data []byte) error

BroadcastBinaryToRoom sends a binary message to all clients in a room. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) BroadcastBinaryToRoomExcept added in v1.1.0

func (h *Hub) BroadcastBinaryToRoomExcept(roomName string, data []byte, except ...*Client) error

BroadcastBinaryToRoomExcept sends a binary message to all clients in a room except those specified. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) BroadcastExcept

func (h *Hub) BroadcastExcept(data []byte, except ...*Client)

BroadcastExcept sends a text message to all clients except those specified. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) BroadcastJSON

func (h *Hub) BroadcastJSON(v any) error

BroadcastJSON sends a JSON message to all connected clients.

func (*Hub) BroadcastRawJSON added in v1.1.3

func (h *Hub) BroadcastRawJSON(data []byte)

BroadcastRawJSON sends pre-encoded JSON data as a text message to all connected clients. Use this instead of BroadcastJSON when the JSON is already marshaled to avoid redundant serialization. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) BroadcastText

func (h *Hub) BroadcastText(text string)

BroadcastText sends a text message to all connected clients.

func (*Hub) BroadcastToRoom

func (h *Hub) BroadcastToRoom(roomName string, data []byte) error

BroadcastToRoom sends a text message to all clients in a room. In multi-node mode the message is also relayed to other nodes via the adapter so that room members on other nodes receive it.

func (*Hub) BroadcastToRoomExcept

func (h *Hub) BroadcastToRoomExcept(roomName string, data []byte, except ...*Client) error

BroadcastToRoomExcept sends a text message to all clients in a room except those specified. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) BroadcastToRoomWithContext added in v1.1.0

func (h *Hub) BroadcastToRoomWithContext(ctx context.Context, roomName string, data []byte) error

BroadcastToRoomWithContext sends a text message to all clients in a room with context support. Returns ctx.Err() if the context is cancelled mid-broadcast.

func (*Hub) BroadcastWithContext

func (h *Hub) BroadcastWithContext(ctx context.Context, data []byte) error

BroadcastWithContext sends a message to all clients with context support. If the context is cancelled mid-broadcast, remaining local clients are skipped but the adapter publish still fires so other nodes can deliver. The returned error (if any) is the context error.

func (*Hub) ClientCount

func (h *Hub) ClientCount() int

ClientCount returns the number of connected clients. Uses an atomic counter — no locking required.

func (*Hub) Clients

func (h *Hub) Clients() []*Client

Clients returns all connected clients using the lock-free broadcast snapshot, avoiding contention on h.mu.

func (*Hub) Drain added in v1.2.0

func (h *Hub) Drain(ctx context.Context) error

Drain initiates graceful connection draining. It stops accepting new connections (Hub.UpgradeConnection returns HTTP 503) and waits for all existing connections to disconnect or for the context to expire.

During drain, idle connections whose send buffers have been empty for the configured drain timeout (see WithDrainTimeout) are proactively closed with CloseGoingAway (1001). This prevents indefinite waiting for clients that are connected but doing nothing.

Drain returns nil when all clients have disconnected, or the context error if the context expires first. Calling Drain on an already-draining hub blocks on the same completion signal. Calling Drain after Hub.Shutdown returns immediately.

Typical usage in a Kubernetes preStop hook:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
hub.Drain(ctx)    // stop new connections, wait for existing ones
hub.Shutdown(ctx) // force-close anything remaining

func (*Hub) GetClient

func (h *Hub) GetClient(id string) (*Client, bool)

GetClient returns a client by ID (O(1) lookup).

func (*Hub) GetClientByUserID

func (h *Hub) GetClientByUserID(userID string) (*Client, bool)

GetClientByUserID returns a client by user ID.

func (*Hub) GetClientsByUserID

func (h *Hub) GetClientsByUserID(userID string) []*Client

GetClientsByUserID returns all clients for a user ID.

func (*Hub) GlobalClientCount added in v1.1.0

func (h *Hub) GlobalClientCount() int

GlobalClientCount returns the total number of connected clients across all nodes. In single-node mode or when presence is not enabled it returns the local count.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	// Without presence, GlobalClientCount returns local count.
	hub := wshub.NewHub()
	fmt.Println("global:", hub.GlobalClientCount())
}
Output:
global: 0

func (*Hub) GlobalRoomCount added in v1.1.0

func (h *Hub) GlobalRoomCount(roomName string) int

GlobalRoomCount returns the total number of clients in a room across all nodes. In single-node mode or when presence is not enabled it returns the local room count.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	// Without presence, GlobalRoomCount returns local room count.
	hub := wshub.NewHub()
	fmt.Println("room count:", hub.GlobalRoomCount("lobby"))
}
Output:
room count: 0

func (*Hub) HandleHTTP

func (h *Hub) HandleHTTP() http.HandlerFunc

HandleHTTP returns an HTTP handler that upgrades connections to WebSocket. Upgrade errors are logged via the hub's logger.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub()

	handler := hub.HandleHTTP()
	fmt.Println("handler is nil:", handler == nil)
}
Output:
handler is nil: false

func (*Hub) Health added in v1.4.0

func (h *Hub) Health() HealthStatus

Health returns a point-in-time HealthStatus snapshot. All reads are lock-free atomic loads.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub()
	hs := hub.Health()
	fmt.Println("alive:", hs.Alive)
	fmt.Println("ready:", hs.Ready)
}
Output:
alive: false
ready: false

func (*Hub) HealthHandler added in v1.4.0

func (h *Hub) HealthHandler() http.HandlerFunc

HealthHandler returns an HTTP handler that reports the hub's liveness status. Returns 200 OK when the Hub.Run goroutine is alive, 503 Service Unavailable otherwise. The response body is a JSON object with the same fields as HealthStatus.

Typical usage with Kubernetes:

http.Handle("/healthz", hub.HealthHandler())
Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub()

	handler := hub.HealthHandler()
	fmt.Println("handler is nil:", handler == nil)
}
Output:
handler is nil: false

func (*Hub) IsDraining added in v1.2.0

func (h *Hub) IsDraining() bool

IsDraining reports whether the hub is in the draining state.

func (*Hub) IsRunning added in v1.2.0

func (h *Hub) IsRunning() bool

IsRunning reports whether the hub is in the running state and accepting new connections. Returns false when draining or stopped.

func (*Hub) JoinRoom

func (h *Hub) JoinRoom(client *Client, roomName string) error

JoinRoom adds a client to a room.

func (*Hub) LeaveAllRooms

func (h *Hub) LeaveAllRooms(client *Client)

LeaveAllRooms removes a client from all rooms, firing BeforeRoomLeave/AfterRoomLeave hooks for each room.

func (*Hub) LeaveRoom

func (h *Hub) LeaveRoom(client *Client, roomName string) error

LeaveRoom removes a client from a room.

func (*Hub) NodeID added in v1.1.0

func (h *Hub) NodeID() string

NodeID returns this hub's unique node identifier. In multi-node setups each hub has a distinct ID used for message deduplication.

func (*Hub) Ready added in v1.4.0

func (h *Hub) Ready() bool

Ready reports whether the hub is alive and in the StateRunning state, meaning it can accept and process new connections. Returns false before Hub.Run is called, while draining, or after shutdown.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub()
	fmt.Println("ready before Run:", hub.Ready())
}
Output:
ready before Run: false

func (*Hub) ReadyHandler added in v1.4.0

func (h *Hub) ReadyHandler() http.HandlerFunc

ReadyHandler returns an HTTP handler that reports the hub's readiness status. Returns 200 OK when the hub is alive and in StateRunning (accepting connections), 503 Service Unavailable otherwise.

Typical usage with Kubernetes:

http.Handle("/readyz", hub.ReadyHandler())
Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub()

	handler := hub.ReadyHandler()
	fmt.Println("handler is nil:", handler == nil)
}
Output:
handler is nil: false

func (*Hub) RoomClients

func (h *Hub) RoomClients(roomName string) []*Client

RoomClients returns all clients in a room.

func (*Hub) RoomCount

func (h *Hub) RoomCount(roomName string) int

RoomCount returns the number of clients in a room.

func (*Hub) RoomExists

func (h *Hub) RoomExists(roomName string) bool

RoomExists checks if a room exists.

func (*Hub) RoomNames

func (h *Hub) RoomNames() []string

RoomNames returns all room names.

func (*Hub) Run

func (h *Hub) Run()

Run starts the hub's main loop.

func (*Hub) SendBinaryToClient added in v1.1.0

func (h *Hub) SendBinaryToClient(clientID string, data []byte) error

SendBinaryToClient sends a binary message to a specific client by ID. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) SendBinaryToUser added in v1.1.0

func (h *Hub) SendBinaryToUser(userID string, data []byte)

SendBinaryToUser sends a binary message to all clients of a specific user. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) SendToClient

func (h *Hub) SendToClient(clientID string, data []byte) error

SendToClient sends a text message to a specific client by ID. In multi-node mode the message is also relayed to other nodes via the adapter, allowing delivery to clients connected to a different node.

func (*Hub) SendToClientWithContext added in v1.1.0

func (h *Hub) SendToClientWithContext(ctx context.Context, clientID string, data []byte) error

SendToClientWithContext sends a text message to a specific client by ID with context support. It blocks until the message is enqueued or the context is cancelled. In multi-node mode the message is also relayed via the adapter.

func (*Hub) SendToUser

func (h *Hub) SendToUser(userID string, data []byte)

SendToUser sends a text message to all clients of a specific user. In multi-node mode the message is also relayed to other nodes via the adapter.

func (*Hub) SendToUserWithContext added in v1.1.0

func (h *Hub) SendToUserWithContext(ctx context.Context, userID string, data []byte) error

SendToUserWithContext sends a text message to all clients of a specific user with context support. It blocks until messages are enqueued or the context is cancelled. In multi-node mode the message is also relayed via the adapter.

func (*Hub) Shutdown

func (h *Hub) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the hub. It force-closes all remaining connections and waits for goroutines to exit. If the hub is draining, Shutdown unblocks any pending Hub.Drain call.

func (*Hub) State added in v1.2.0

func (h *Hub) State() HubState

State returns the current lifecycle state of the hub. Safe for concurrent use; the read is a single atomic load.

func (*Hub) UpgradeConnection

func (h *Hub) UpgradeConnection(w http.ResponseWriter, r *http.Request, opts ...UpgradeOption) (*Client, error)

UpgradeConnection upgrades an HTTP connection to WebSocket.

func (*Hub) Uptime added in v1.4.0

func (h *Hub) Uptime() time.Duration

Uptime returns how long the Hub.Run goroutine has been executing. Returns zero if Run has not been called or has already exited.

type HubState added in v1.2.0

type HubState int32

HubState represents the lifecycle state of a Hub.

const (
	// StateRunning means the hub is accepting new connections and processing
	// messages normally.
	StateRunning HubState = iota

	// StateDraining means the hub has stopped accepting new connections
	// (returning HTTP 503) but is allowing existing connections to finish
	// their in-flight messages. Initiated by [Hub.Drain].
	StateDraining

	// StateStopped means the hub has been shut down via [Hub.Shutdown].
	// All connections are closed and the Run loop has exited.
	StateStopped
)

func (HubState) String added in v1.2.0

func (s HubState) String() string

String returns a human-readable name for the hub state.

type Limits

type Limits struct {
	// MaxConnections is the maximum number of concurrent connections.
	// 0 means unlimited.
	MaxConnections int

	// MaxConnectionsPerUser is the maximum number of connections per user ID.
	// 0 means unlimited.
	MaxConnectionsPerUser int

	// MaxRoomsPerClient is the maximum number of rooms a client can join.
	// 0 means unlimited.
	MaxRoomsPerClient int

	// MaxClientsPerRoom is the maximum number of clients in a room.
	// 0 means unlimited.
	MaxClientsPerRoom int

	// MaxMessageRate is the maximum messages per second per client.
	// 0 means unlimited.
	MaxMessageRate int
}

Limits defines various limits for WebSocket operations.

func DefaultLimits

func DefaultLimits() Limits

DefaultLimits returns default limits (all unlimited).

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	limits := wshub.DefaultLimits()
	fmt.Println("max connections:", limits.MaxConnections)

	custom := limits.
		WithMaxConnections(5000).
		WithMaxRoomsPerClient(10)
	fmt.Println("custom max connections:", custom.MaxConnections)
	fmt.Println("custom max rooms:", custom.MaxRoomsPerClient)
}
Output:
max connections: 0
custom max connections: 5000
custom max rooms: 10

func (Limits) WithMaxClientsPerRoom

func (l Limits) WithMaxClientsPerRoom(max int) Limits

WithMaxClientsPerRoom sets the maximum clients per room limit.

func (Limits) WithMaxConnections

func (l Limits) WithMaxConnections(max int) Limits

WithMaxConnections sets the maximum connections limit.

func (Limits) WithMaxConnectionsPerUser

func (l Limits) WithMaxConnectionsPerUser(max int) Limits

WithMaxConnectionsPerUser sets the maximum connections per user limit.

func (Limits) WithMaxMessageRate

func (l Limits) WithMaxMessageRate(rate int) Limits

WithMaxMessageRate sets the maximum message rate limit.

func (Limits) WithMaxRoomsPerClient

func (l Limits) WithMaxRoomsPerClient(max int) Limits

WithMaxRoomsPerClient sets the maximum rooms per client limit.

type Logger

type Logger interface {
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
}

Logger is an interface for logging. Applications can plug in their own logger (zap, logrus, slog, etc.)

type Message

type Message struct {
	// Type is the message type (text, binary, etc.).
	Type MessageType

	// Data is the raw message data.
	Data []byte

	// ClientID is the ID of the client that sent the message.
	ClientID string

	// Time is when the message was received.
	Time time.Time
}

Message represents a WebSocket message.

func NewBinaryMessage

func NewBinaryMessage(data []byte) *Message

NewBinaryMessage creates a new binary message.

func NewJSONMessage

func NewJSONMessage(v any) (*Message, error)

NewJSONMessage creates a new JSON message.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	payload := map[string]string{"event": "chat", "text": "hello"}
	msg, err := wshub.NewJSONMessage(payload)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	fmt.Println("has data:", len(msg.Data) > 0)

	var decoded map[string]string
	msg.JSON(&decoded)
	fmt.Println("event:", decoded["event"])
}
Output:
has data: true
event: chat

func NewMessage

func NewMessage(data []byte) *Message

NewMessage creates a new text message.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	msg := wshub.NewMessage([]byte("hello world"))
	fmt.Println("text:", msg.Text())
	fmt.Println("type:", msg.Type)
}
Output:
text: hello world
type: 1

func NewRawJSONMessage added in v1.1.3

func NewRawJSONMessage(data []byte) *Message

NewRawJSONMessage creates a text message from pre-encoded JSON data. The caller is responsible for ensuring data is valid JSON. This avoids the marshaling cost of NewJSONMessage when the JSON is already available (e.g., cached or encoded once for fan-out).

func NewTextMessage

func NewTextMessage(text string) *Message

NewTextMessage creates a new text message from a string.

func (*Message) JSON

func (m *Message) JSON(v any) error

JSON unmarshals the message data into the provided value.

func (*Message) Text

func (m *Message) Text() string

Text returns the message data as a string.

type MessageType

type MessageType int

MessageType represents the type of WebSocket message.

type MetricsCollector

type MetricsCollector interface {
	// Connections
	IncrementConnections()
	DecrementConnections()

	// Messages
	IncrementMessagesReceived()
	IncrementMessagesSent(count int)
	IncrementMessagesDropped()

	// Observations
	RecordMessageSize(size int)
	RecordLatency(duration time.Duration)
	RecordBroadcastDuration(duration time.Duration)

	// Rooms
	IncrementRoomJoins()
	IncrementRoomLeaves()
	IncrementRooms()
	DecrementRooms()

	// Errors
	IncrementErrors(errorType string)
}

MetricsCollector is an interface for collecting metrics. Applications can implement this with Prometheus, StatsD, etc.

type Middleware

type Middleware func(HandlerFunc) HandlerFunc

Middleware wraps a HandlerFunc to add additional functionality.

func LoggingMiddleware

func LoggingMiddleware(logger Logger) Middleware

LoggingMiddleware logs incoming messages.

func MetricsMiddleware

func MetricsMiddleware(metrics MetricsCollector) Middleware

MetricsMiddleware records handler-level metrics for message processing. Note: message count and size are already tracked by the readPump, so this middleware only records handler errors and processing latency to avoid double-counting those. However, processing latency is also recorded internally by the hub when a message handler is set via WithMessageHandler. If you use MetricsMiddleware inside a chain passed to WithMessageHandler, add WithoutHandlerLatency() to your hub options to disable the hub's built-in latency recording and avoid double-counting.

func RecoveryMiddleware

func RecoveryMiddleware(logger Logger) Middleware

RecoveryMiddleware recovers from panics in message handlers.

type MiddlewareChain

type MiddlewareChain struct {
	// contains filtered or unexported fields
}

MiddlewareChain manages a chain of middlewares. All mutations (Use) must happen before the first Execute call. Execute is safe for concurrent use once the chain is built.

func NewMiddlewareChain

func NewMiddlewareChain(handler HandlerFunc) *MiddlewareChain

NewMiddlewareChain creates a new middleware chain with the final handler.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	handler := func(c *wshub.Client, m *wshub.Message) error {
		return nil
	}

	chain := wshub.NewMiddlewareChain(handler).
		Use(wshub.RecoveryMiddleware(&wshub.NoOpLogger{})).
		Build()

	fmt.Println("chain built:", chain != nil)
}
Output:
chain built: true

func (*MiddlewareChain) Build

func (m *MiddlewareChain) Build() *MiddlewareChain

Build pre-computes the composed handler chain and caches it. Subsequent calls to Execute will use the cached handler for better performance. Must not be called concurrently with Use. Returns the chain for method chaining.

func (*MiddlewareChain) Execute

func (m *MiddlewareChain) Execute(client *Client, msg *Message) error

Execute runs the middleware chain and final handler. Automatically builds and caches the chain on first call if Build was not called explicitly. Safe for concurrent use once the chain is built. Uses double-checked locking to ensure only one goroutine builds.

func (*MiddlewareChain) Use

func (m *MiddlewareChain) Use(middleware Middleware) *MiddlewareChain

Use adds a middleware to the chain. Adding middleware invalidates any previously cached Build result. Must not be called concurrently with Execute.

type NoOpLogger

type NoOpLogger struct{}

NoOpLogger is a default implementation that does nothing.

func (*NoOpLogger) Debug

func (n *NoOpLogger) Debug(msg string, args ...any)

func (*NoOpLogger) Error

func (n *NoOpLogger) Error(msg string, args ...any)

func (*NoOpLogger) Info

func (n *NoOpLogger) Info(msg string, args ...any)

func (*NoOpLogger) Warn

func (n *NoOpLogger) Warn(msg string, args ...any)

type NoOpMetrics

type NoOpMetrics struct{}

NoOpMetrics is a default implementation that does nothing.

func (*NoOpMetrics) DecrementConnections

func (n *NoOpMetrics) DecrementConnections()

func (*NoOpMetrics) DecrementRooms added in v1.5.0

func (n *NoOpMetrics) DecrementRooms()

func (*NoOpMetrics) IncrementConnections

func (n *NoOpMetrics) IncrementConnections()

func (*NoOpMetrics) IncrementErrors

func (n *NoOpMetrics) IncrementErrors(errorType string)

func (*NoOpMetrics) IncrementMessagesDropped added in v1.5.0

func (n *NoOpMetrics) IncrementMessagesDropped()

func (*NoOpMetrics) IncrementMessagesReceived added in v1.5.0

func (n *NoOpMetrics) IncrementMessagesReceived()

func (*NoOpMetrics) IncrementMessagesSent added in v1.5.0

func (n *NoOpMetrics) IncrementMessagesSent(count int)

func (*NoOpMetrics) IncrementRoomJoins

func (n *NoOpMetrics) IncrementRoomJoins()

func (*NoOpMetrics) IncrementRoomLeaves

func (n *NoOpMetrics) IncrementRoomLeaves()

func (*NoOpMetrics) IncrementRooms added in v1.5.0

func (n *NoOpMetrics) IncrementRooms()

func (*NoOpMetrics) RecordBroadcastDuration added in v1.5.0

func (n *NoOpMetrics) RecordBroadcastDuration(duration time.Duration)

func (*NoOpMetrics) RecordLatency

func (n *NoOpMetrics) RecordLatency(duration time.Duration)

func (*NoOpMetrics) RecordMessageSize

func (n *NoOpMetrics) RecordMessageSize(size int)

type Option

type Option func(*Hub)

Option configures a Hub during construction.

func WithAdapter added in v1.1.0

func WithAdapter(adapter Adapter) Option

WithAdapter sets the multi-node adapter for cross-node message delivery. When configured, every broadcast and targeted send is relayed to other nodes through the adapter, enabling horizontal scaling behind a load balancer.

func WithConfig

func WithConfig(config Config) Option

WithConfig sets the WebSocket configuration.

func WithDrainTimeout added in v1.2.0

func WithDrainTimeout(timeout time.Duration) Option

WithDrainTimeout sets the maximum time an idle connection can remain open after Hub.Drain is called. Connections whose send buffers have been empty for this duration are proactively closed with CloseGoingAway (1001).

Default: 30s. Set to 0 to disable the idle connection reaper entirely, relying solely on natural client disconnection during drain.

func WithDropPolicy added in v1.1.0

func WithDropPolicy(policy DropPolicy) Option

WithDropPolicy sets the behavior when a client's send buffer is full. The default is DropNewest which discards the new message. DropOldest evicts the oldest queued message to make room for the new one.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub(
		wshub.WithDropPolicy(wshub.DropOldest),
		wshub.WithHooks(wshub.Hooks{
			OnSendDropped: func(c *wshub.Client, data []byte) {
				fmt.Printf("dropped %d bytes for %s\n", len(data), c.ID)
			},
		}),
	)
	fmt.Println("drop policy hub:", hub.ClientCount())
}
Output:
drop policy hub: 0

func WithHookTimeout added in v1.1.0

func WithHookTimeout(timeout time.Duration) Option

WithHookTimeout sets the maximum time to wait for synchronous lifecycle hooks (e.g. BeforeDisconnect) before proceeding. Default: 5s.

Example
package main

import (
	"fmt"
	"time"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub(
		wshub.WithHookTimeout(10 * time.Second),
	)
	fmt.Println("hub created:", hub.ClientCount())
}
Output:
hub created: 0

func WithHooks

func WithHooks(hooks Hooks) Option

WithHooks sets the lifecycle hooks for the hub.

Example
package main

import (
	"fmt"
	"net/http"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub(
		wshub.WithHooks(wshub.Hooks{
			BeforeConnect: func(r *http.Request) error {
				token := r.URL.Query().Get("token")
				if token == "" {
					return fmt.Errorf("missing token")
				}
				return nil
			},
			AfterConnect: func(c *wshub.Client) {
				fmt.Println("client connected:", c.ID)
			},
		}),
	)
	fmt.Println("hub with hooks:", hub.ClientCount())
}
Output:
hub with hooks: 0

func WithLimits

func WithLimits(limits Limits) Option

WithLimits sets the limits for the hub.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets the logger for the hub.

func WithMessageHandler

func WithMessageHandler(fn func(*Client, *Message) error) Option

WithMessageHandler sets the hub-level message handler.

func WithMetrics

func WithMetrics(metrics MetricsCollector) Option

WithMetrics sets the metrics collector for the hub.

func WithNodeID added in v1.1.0

func WithNodeID(id string) Option

WithNodeID sets a deterministic node identifier for this hub. By default a random UUID is generated. Setting a stable ID (e.g., hostname or pod name) makes logs and debugging easier in multi-node deployments.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub(
		wshub.WithNodeID("pod-web-1"),
	)
	fmt.Println("node:", hub.NodeID())
}
Output:
node: pod-web-1

func WithParallelBroadcast deprecated

func WithParallelBroadcast(batchSize int) Option

WithParallelBroadcast enables parallel broadcasting with the given batch size. batchSize determines how many clients each goroutine handles (recommended: 50-200).

Deprecated: end-to-end load tests (see `make loadtest`) show parallel dispatch is consistently slower than the default serial path — the per-call cost of the non-blocking send (RLock + defer/recover) dominates and parallel batching cannot overcome it. This option is retained for backward compatibility and may be removed in a future major version. For per-node fanout above ~5K clients, scale horizontally via the Redis or NATS adapter rather than enabling parallel broadcast.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	hub := wshub.NewHub(
		wshub.WithParallelBroadcast(100),
	)
	fmt.Println("parallel hub created:", hub.ClientCount())
}
Output:
parallel hub created: 0

func WithParallelBroadcastWorkers deprecated added in v1.1.2

func WithParallelBroadcastWorkers(n int) Option

WithParallelBroadcastWorkers sets the number of persistent worker goroutines used for parallel broadcasting. The default is runtime.NumCPU(). This option has no effect unless WithParallelBroadcast is also set.

Deprecated: see WithParallelBroadcast. Parallel broadcast is no longer recommended; this tuning option is retained for backward compatibility.

func WithPresence added in v1.1.0

func WithPresence(interval time.Duration) Option

WithPresence enables periodic presence broadcasting for multi-node stats. Each hub publishes its local client and room counts at the given interval, allowing GlobalClientCount and GlobalRoomCount to return cluster-wide totals.

When interval is zero, the default of 5 seconds is used. Nodes that miss 3 consecutive heartbeats are considered stale and evicted from the totals.

Presence requires an adapter to be set via WithAdapter; without one it is a no-op.

func WithoutHandlerLatency added in v1.1.0

func WithoutHandlerLatency() Option

WithoutHandlerLatency disables the hub's automatic handler latency recording. Use this when your message handler chain already includes MetricsMiddleware to avoid double-counting latency.

Example
package main

import (
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	metrics := wshub.NewDebugMetrics()
	handler := func(c *wshub.Client, m *wshub.Message) error { return nil }

	chain := wshub.NewMiddlewareChain(handler).
		Use(wshub.MetricsMiddleware(metrics)).
		Build()

	hub := wshub.NewHub(
		wshub.WithMetrics(metrics),
		wshub.WithMessageHandler(chain.Execute),
		wshub.WithoutHandlerLatency(), // avoid double-counting with MetricsMiddleware
	)
	fmt.Println("latency skipped:", hub.ClientCount() == 0)
}
Output:
latency skipped: true

type Room

type Room struct {
	// contains filtered or unexported fields
}

Room represents a chat room with its own lock for better concurrency.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router dispatches incoming messages to per-event handlers based on an event name extracted from each message by a user-provided extractor function.

The extractor decouples the router from any specific message format — JSON, msgpack, binary with a leading byte, or anything else.

Usage:

router := wshub.NewRouter(func(msg *wshub.Message) string {
    var env struct{ Type string `json:"type"` }
    json.Unmarshal(msg.Data, &env)
    return env.Type
})

router.
    On("chat",  handleChat).
    On("join",  handleJoin).
    On("leave", handleLeave)

hub := wshub.NewHub(wshub.WithMessageHandler(router.Handle))

All On/OnNotFound calls should be made before the hub starts running.

func NewRouter

func NewRouter(extractor func(*Message) string) *Router

NewRouter creates a Router. The extractor is called on every incoming message to determine which registered handler to dispatch to.

Example
package main

import (
	"encoding/json"
	"fmt"

	"github.com/KARTIKrocks/wshub"
)

func main() {
	router := wshub.NewRouter(func(m *wshub.Message) string {
		var envelope struct {
			Event string `json:"event"`
		}
		json.Unmarshal(m.Data, &envelope)
		return envelope.Event
	})

	router.On("ping", func(c *wshub.Client, m *wshub.Message) error {
		return c.SendText("pong")
	})

	router.OnNotFound(func(c *wshub.Client, m *wshub.Message) error {
		return c.SendText("unknown event")
	})

	fmt.Println("router created")
}
Output:
router created

func (*Router) Handle

func (r *Router) Handle(client *Client, msg *Message) error

Handle dispatches the message to the appropriate handler. Pass this method to WithMessageHandler or use it as a HandlerFunc directly.

func (*Router) On

func (r *Router) On(event string, handler HandlerFunc) *Router

On registers a handler for the given event name. Returns the router for chaining.

func (*Router) OnNotFound

func (r *Router) OnNotFound(handler HandlerFunc) *Router

OnNotFound sets a fallback handler called when the extracted event name has no registered handler. If not set, unmatched events return ErrInvalidMessage.

type UpgradeOption added in v1.1.0

type UpgradeOption func(*Client)

UpgradeOption configures a single UpgradeConnection call.

func WithUserID added in v1.1.0

func WithUserID(userID string) UpgradeOption

WithUserID sets the user ID on the client atomically during connection upgrade, before the client is registered. This avoids the window where a client exists without a user ID, which can bypass MaxConnectionsPerUser.

Directories

Path Synopsis
adapter
redis module
cmd
loadtest command
Command loadtest runs end-to-end load tests against a real wshub server with real WebSocket connections to find bottlenecks that micro-benchmarks miss.
Command loadtest runs end-to-end load tests against a real wshub server with real WebSocket connections to find bottlenecks that micro-benchmarks miss.
examples
auth command
Example: JWT authentication with wshub.
Example: JWT authentication with wshub.
chat command
metrics command
Example: Prometheus-style metrics with wshub.
Example: Prometheus-style metrics with wshub.
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL