wskit

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2026 License: MIT Imports: 10 Imported by: 0

README

go-wskit

CI Go Reference Go Report Card

WebSocket hub-and-spoke server on coder/websocket with optional Redis Pub/Sub for multi-instance broadcast.

Install

go get github.com/wahrwelt-kit/go-wskit
import "github.com/wahrwelt-kit/go-wskit"

Features

  • Hub - single-goroutine dispatcher with register/unregister/broadcast channels, graceful shutdown via context, atomic subscriber count
  • Client - ReadPump (drain incoming) + WritePump (send + ping/pong), safe concurrent Send with close protection
  • SSEClient - Server-Sent Events subscriber; AcceptSSE registers it with the hub and streams data until disconnect or shutdown
  • Redis Pub/Sub - BroadcastEvent with JSON serialization, fallback to local broadcast on Redis failure, SubscribeToRedis for horizontal scaling
  • Event envelope - Event{Type, Payload, Timestamp} standard JSON format, BroadcastJSON helper
  • Accept / AcceptSSE helpers - upgrade HTTP to WebSocket or SSE + create client + register in one call
  • Functional options - configurable timeouts, buffer sizes, callbacks

Example

hub := wskit.NewHub(
    wskit.WithRedis(redisClient, "ws:events"),
    wskit.WithOnConnect(func(sub wskit.Subscriber) {
        data, _ := json.Marshal(wskit.NewEvent("connected", nil))
        sub.Send(data)
    }),
)
go hub.Run(ctx)
go hub.SubscribeToRedis(ctx)

// WebSocket endpoint
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
    client, err := wskit.Accept(r.Context(), w, r, hub, nil)
    if err != nil {
        return
    }
    go client.ReadPump()
    go client.WritePump()
})

// SSE endpoint
http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
    wskit.AcceptSSE(w, r, hub)
})

hub.BroadcastJSON(ctx, "notification", map[string]string{"message": "hello"})

Options

Hub: WithRedis, WithBroadcastBuf, WithRegisterBuf, WithChannelTimeout, WithOnTimeout, WithOnConnect, WithOnDisconnect

Client: WithWriteWait, WithPingInterval, WithMaxMessageSize, WithSendBufSize

Documentation

Overview

Package wskit provides a WebSocket and SSE hub-and-spoke server built on coder/websocket

Hub and Subscriber

Create a Hub with NewHub (optionally WithRedis for multi-instance broadcast), run Hub.Run(ctx) in a goroutine, then use Accept to upgrade HTTP connections and register WebSocket clients, or AcceptSSE for Server-Sent Events Any type implementing the Subscriber interface (Send + Close) can participate in hub broadcasts

WebSocket Clients

Use Accept to upgrade HTTP connections and register clients. Run Client.ReadPump and Client.WritePump in separate goroutines per connection

SSE Clients

Use AcceptSSE to handle SSE connections. It registers an SSEClient with the hub and blocks until the client disconnects or the hub shuts down

Event envelope

Event and NewEvent provide a standard JSON envelope (type, payload, timestamp). Use Hub.BroadcastEvent or Hub.BroadcastJSON to send to all subscribers

Redis Pub/Sub

WithRedis(client, channel) enables publishing to Redis on BroadcastEvent/BroadcastJSON; other instances run SubscribeToRedis(ctx) to receive and broadcast locally. SubscribeToRedis automatically reconnects with exponential backoff

Options

Hub: WithRedis, WithBroadcastBuf, WithRegisterBuf, WithChannelTimeout, WithOnTimeout, WithOnConnect, WithOnDisconnect Client: WithWriteWait, WithPingInterval, WithMaxMessageSize, WithSendBufSize

Index

Constants

View Source
const (
	DefaultWriteWait      = 10 * time.Second // Timeout for writing a single message or ping frame to the WebSocket connection
	DefaultPingInterval   = 30 * time.Second // Interval between outgoing ping frames to keep the connection alive
	DefaultMaxMessageSize = 512              // Maximum allowed size of a single incoming message in bytes
	DefaultSendBufSize    = 256              // Default send channel buffer size (number of messages) for Client and SSEClient
)

DefaultWriteWait is the timeout for writing a single message or ping frame to the WebSocket connection

View Source
const (
	DefaultBroadcastBuf   = 256             // Default broadcast channel buffer size (number of messages)
	DefaultRegisterBuf    = 64              // Default register/unregister channel buffer size (number of operations)
	DefaultChannelTimeout = 5 * time.Second // Default timeout for Register, Unregister, and Broadcast channel operations
)

DefaultBroadcastBuf is the default broadcast channel buffer size (number of messages)

Variables

View Source
var (
	// ErrHubStopped is returned by Client.SendErr when the send channel is closed (hub shut down or client unregistered)
	ErrHubStopped = errors.New("wskit: hub is stopped")
	// ErrFlusherNotSupported is returned by AcceptSSE when the ResponseWriter does not implement http.Flusher
	ErrFlusherNotSupported = errors.New("wskit: http.Flusher not supported")
)

Functions

func AcceptSSE added in v0.3.0

func AcceptSSE(w http.ResponseWriter, r *http.Request, hub *Hub) error

AcceptSSE upgrades an HTTP request to an SSE stream. It registers with the hub, writes SSE-formatted messages, and blocks until the client disconnects or the hub shuts down

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a single WebSocket connection attached to a Hub It implements the Subscriber interface

func Accept

func Accept(ctx context.Context, w http.ResponseWriter, r *http.Request, hub *Hub, acceptOpts *websocket.AcceptOptions, clientOpts ...ClientOption) (*Client, error)

Accept upgrades the HTTP connection to WebSocket, creates a Client, and registers it with the hub. Caller should run ReadPump and WritePump in goroutines. acceptOpts may be nil for default upgrade options

func NewClient

func NewClient(hub *Hub, conn *websocket.Conn, ctx context.Context, opts ...ClientOption) *Client

NewClient creates a client for the given hub and connection. Call Register on the hub, then run ReadPump and WritePump in separate goroutines

func (*Client) Close added in v0.3.0

func (c *Client) Close()

Close signals the client to shut down. It is idempotent and safe to call from any goroutine. The underlying WebSocket connection is closed by WritePump/ReadPump defers

func (*Client) ReadPump

func (c *Client) ReadPump()

ReadPump reads messages from the connection until it closes or errors. On exit it unregisters the client and closes the connection. Run in a goroutine

func (*Client) Send

func (c *Client) Send(data []byte) bool

Send enqueues data for writing. Non-blocking; returns false if the send buffer is full or the client has been closed

func (*Client) SendErr

func (c *Client) SendErr(data []byte) error

SendErr is like Send but returns ErrHubStopped when the client is closed

func (*Client) WritePump

func (c *Client) WritePump()

WritePump writes messages from the send channel and sends ping frames at the configured interval. Run in a goroutine

type ClientConfig

type ClientConfig struct {
	WriteWait      time.Duration
	PingInterval   time.Duration
	MaxMessageSize int64
	SendBufSize    int
}

ClientConfig holds configuration parameters for a Client or SSEClient

type ClientOption

type ClientOption func(*ClientConfig)

ClientOption configures a Client

func WithMaxMessageSize

func WithMaxMessageSize(n int64) ClientOption

WithMaxMessageSize sets the maximum size of a single incoming message

func WithPingInterval

func WithPingInterval(d time.Duration) ClientOption

WithPingInterval sets the interval between ping frames

func WithSendBufSize

func WithSendBufSize(n int) ClientOption

WithSendBufSize sets the send channel buffer size

func WithWriteWait

func WithWriteWait(d time.Duration) ClientOption

WithWriteWait sets the timeout for writing a message or ping

type Event

type Event struct {
	Type      string    `json:"type"`
	Payload   any       `json:"payload"`
	Timestamp time.Time `json:"timestamp"`
}

Event is the default envelope for WebSocket messages: type, payload, and timestamp

func NewEvent

func NewEvent(typ string, payload any) Event

NewEvent builds an Event with Type, Payload, and Timestamp set to now

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is the central dispatcher for subscribers. Run one goroutine with Run(ctx)

func NewHub

func NewHub(opts ...HubOption) *Hub

NewHub creates a Hub with the given options

func (*Hub) Broadcast

func (h *Hub) Broadcast(data []byte)

Broadcast sends data to all connected subscribers. Non-blocking with timeout

func (*Hub) BroadcastEvent

func (h *Hub) BroadcastEvent(ctx context.Context, event any) error

BroadcastEvent marshals event as JSON and broadcasts it. If Redis is configured, publishes to Redis first; on failure falls back to local Broadcast. Returns an error only when JSON marshaling fails

func (*Hub) BroadcastJSON

func (h *Hub) BroadcastJSON(ctx context.Context, typ string, payload any) error

BroadcastJSON marshals Event{typ, payload, now} and broadcasts it via the hub. Returns an error if JSON marshaling fails

func (*Hub) Register

func (h *Hub) Register(sub Subscriber)

Register adds the subscriber to the hub. Non-blocking with timeout

func (*Hub) Run

func (h *Hub) Run(ctx context.Context)

Run runs the hub loop until ctx is cancelled. Closes all subscribers on exit

func (*Hub) SubscribeToRedis

func (h *Hub) SubscribeToRedis(ctx context.Context)

SubscribeToRedis subscribes to the hub's Redis channel and broadcasts received messages to all clients. It automatically reconnects with exponential backoff if the subscription is lost. Run in a goroutine; it returns when ctx is cancelled

func (*Hub) SubscriberCount added in v0.3.0

func (h *Hub) SubscriberCount() int

SubscriberCount returns the number of registered subscribers

func (*Hub) Unregister

func (h *Hub) Unregister(sub Subscriber)

Unregister removes the subscriber from the hub. Non-blocking with timeout

type HubOption

type HubOption func(*Hub)

HubOption configures a Hub

func WithBroadcastBuf

func WithBroadcastBuf(n int) HubOption

WithBroadcastBuf sets the broadcast channel buffer size

func WithChannelTimeout

func WithChannelTimeout(d time.Duration) HubOption

WithChannelTimeout sets the timeout for Register, Unregister, and Broadcast operations

func WithOnConnect

func WithOnConnect(fn func(Subscriber)) HubOption

WithOnConnect sets the callback invoked when a subscriber registers

func WithOnDisconnect added in v0.3.0

func WithOnDisconnect(fn func(Subscriber)) HubOption

WithOnDisconnect sets the callback invoked when a subscriber unregisters

func WithOnTimeout

func WithOnTimeout(fn func(op string)) HubOption

WithOnTimeout sets a callback when a channel operation times out (e.g. for logging)

func WithRedis

func WithRedis(client *redis.Client, channel string) HubOption

WithRedis configures Redis Pub/Sub for multi-instance broadcast. If client is nil, Redis is disabled

func WithRegisterBuf

func WithRegisterBuf(n int) HubOption

WithRegisterBuf sets the register/unregister channel buffer size

type SSEClient added in v0.3.0

type SSEClient struct {
	// contains filtered or unexported fields
}

SSEClient represents a Server-Sent Events connection attached to a Hub It implements the Subscriber interface

func NewSSEClient added in v0.3.0

func NewSSEClient(hub *Hub, bufSize int) *SSEClient

NewSSEClient creates an SSE subscriber for the given hub with the specified buffer size

func (*SSEClient) Close added in v0.3.0

func (c *SSEClient) Close()

Close signals the SSE client to shut down. It is idempotent and safe to call from any goroutine. AcceptSSE returns once the done channel is closed

func (*SSEClient) Send added in v0.3.0

func (c *SSEClient) Send(data []byte) bool

Send enqueues data for delivery to the SSE stream. Non-blocking: returns false if the send buffer is full or the client has been closed, true if the message was accepted

type Subscriber added in v0.3.0

type Subscriber interface {
	// Send enqueues data for delivery. Returns true if accepted, false if dropped
	Send(data []byte) bool
	// Close signals the subscriber to shut down. Must be idempotent
	Close()
}

Subscriber is the interface that any client (WebSocket, SSE, etc.) must implement to participate in hub broadcasts

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL