wskit

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: MIT Imports: 10 Imported by: 0

README

go-wskit

CI Go Reference Go Report Card

WebSocket hub-and-spoke server on coder/websocket with optional Redis Pub/Sub for multi-instance broadcast.

Install

go get github.com/TakuyaYagam1/go-wskit
import "github.com/TakuyaYagam1/go-wskit"

Features

  • Hub — single-goroutine dispatcher with register/unregister/broadcast channels, graceful shutdown via context, atomic client count
  • Client — ReadPump (drain incoming) + WritePump (send + ping/pong), safe concurrent Send with close protection
  • Redis Pub/Sub — BroadcastEvent with JSON serialization, fallback to local broadcast on Redis failure, SubscribeToRedis for horizontal scaling
  • Event envelopeEvent{Type, Payload, Timestamp} standard JSON format, BroadcastJSON helper
  • Accept helper — upgrade HTTP to WebSocket + create client + register in one call
  • Functional options — configurable timeouts, buffer sizes, callbacks

Example

hub := wskit.NewHub(
    wskit.WithRedis(redisClient, "ws:events"),
    wskit.WithOnConnect(func(c *wskit.Client) {
        data, _ := json.Marshal(wskit.NewEvent("connected", nil))
        c.Send(data)
    }),
)
go hub.Run(ctx)
go hub.SubscribeToRedis(ctx)

http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
    client, err := wskit.Accept(r.Context(), w, r, hub, nil)
    if err != nil {
        return
    }
    go client.ReadPump()
    go client.WritePump()
})

hub.BroadcastJSON(ctx, "notification", map[string]string{"message": "hello"})

Options

Hub: WithRedis, WithBroadcastBuf, WithRegisterBuf, WithChannelTimeout, WithOnTimeout, WithOnConnect

Client: WithWriteWait, WithPingInterval, WithMaxMessageSize, WithSendBufSize

Documentation

Overview

Package wskit provides a WebSocket hub-and-spoke server built on coder/websocket.

Hub and Client

Create a Hub with NewHub (optionally WithRedis for multi-instance broadcast), run Hub.Run(ctx) in a goroutine, then use Accept to upgrade HTTP connections and register clients. Run Client.ReadPump and Client.WritePump in separate goroutines per connection.

Event envelope

Event and NewEvent provide a standard JSON envelope (type, payload, timestamp). Use Hub.BroadcastEvent or Hub.BroadcastJSON to send to all clients.

Redis Pub/Sub

WithRedis(client, channel) enables publishing to Redis on BroadcastEvent/BroadcastJSON; other instances run SubscribeToRedis(ctx) to receive and broadcast locally.

Options

Hub: WithRedis, WithBroadcastBuf, WithRegisterBuf, WithChannelTimeout, WithOnTimeout, WithOnConnect. Client: WithWriteWait, WithPingInterval, WithMaxMessageSize, WithSendBufSize.

Index

Constants

View Source
const (
	DefaultWriteWait      = 10 * time.Second
	DefaultPingInterval   = 30 * time.Second
	DefaultMaxMessageSize = 512
	DefaultSendBufSize    = 256
)
View Source
const (
	DefaultBroadcastBuf   = 256
	DefaultRegisterBuf    = 64
	DefaultChannelTimeout = 5 * time.Second
)

Variables

View Source
var (
	ErrHubStopped = errors.New("wskit: hub is stopped")
)

Functions

This section is empty.

Types

type AcceptFunc

type AcceptFunc func(w http.ResponseWriter, r *http.Request, opts *websocket.AcceptOptions) (*websocket.Conn, error)

AcceptFunc is the type of websocket.Accept for dependency injection.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a single WebSocket connection attached to a Hub.

func Accept

func Accept(ctx context.Context, w http.ResponseWriter, r *http.Request, hub *Hub, acceptOpts *websocket.AcceptOptions, clientOpts ...ClientOption) (*Client, error)

Accept upgrades the HTTP connection to WebSocket, creates a Client, and registers it with the hub. Caller should run ReadPump and WritePump in goroutines. acceptOpts may be nil for default upgrade options.

func NewClient

func NewClient(hub *Hub, conn *websocket.Conn, ctx context.Context, opts ...ClientOption) *Client

NewClient creates a client for the given hub and connection. Call Register on the hub, then run ReadPump and WritePump in separate goroutines.

func (*Client) ReadPump

func (c *Client) ReadPump()

ReadPump reads messages from the connection until it closes or errors. On exit it unregisters the client and closes the connection. Run in a goroutine.

func (*Client) Send

func (c *Client) Send(data []byte) (sent bool)

Send enqueues data for writing. Non-blocking; returns false if the send buffer is full or the client is unregistered.

func (*Client) SendErr

func (c *Client) SendErr(data []byte) error

SendErr is like Send but returns ErrHubStopped when the client is unregistered.

func (*Client) WritePump

func (c *Client) WritePump()

WritePump writes messages from the send channel and sends ping frames at the configured interval. Run in a goroutine.

type ClientConfig

type ClientConfig struct {
	WriteWait      time.Duration
	PingInterval   time.Duration
	MaxMessageSize int64
	SendBufSize    int
}

type ClientOption

type ClientOption func(*ClientConfig)

ClientOption configures a Client.

func WithMaxMessageSize

func WithMaxMessageSize(n int64) ClientOption

WithMaxMessageSize sets the maximum size of a single incoming message.

func WithPingInterval

func WithPingInterval(d time.Duration) ClientOption

WithPingInterval sets the interval between ping frames.

func WithSendBufSize

func WithSendBufSize(n int) ClientOption

WithSendBufSize sets the send channel buffer size.

func WithWriteWait

func WithWriteWait(d time.Duration) ClientOption

WithWriteWait sets the timeout for writing a message or ping.

type Conn

type Conn interface {
	Read(ctx context.Context) (websocket.MessageType, []byte, error)
	Writer(ctx context.Context, typ websocket.MessageType) (io.WriteCloser, error)
	Close(code websocket.StatusCode, reason string) error
	Ping(ctx context.Context) error
	SetReadLimit(limit int64)
}

Conn abstracts a WebSocket connection for testing and alternate implementations.

type Event

type Event struct {
	Type      string    `json:"type"`
	Payload   any       `json:"payload"`
	Timestamp time.Time `json:"timestamp"`
}

Event is the default envelope for WebSocket messages: type, payload, and timestamp.

func NewEvent

func NewEvent(typ string, payload any) Event

NewEvent builds an Event with Type, Payload, and Timestamp set to now.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is the central dispatcher for WebSocket clients. Run one goroutine with Run(ctx).

func NewHub

func NewHub(opts ...HubOption) *Hub

NewHub creates a Hub with the given options.

func (*Hub) Broadcast

func (h *Hub) Broadcast(data []byte)

Broadcast sends data to all connected clients. Non-blocking with timeout.

func (*Hub) BroadcastEvent

func (h *Hub) BroadcastEvent(ctx context.Context, event any) error

BroadcastEvent marshals event as JSON and broadcasts it. If Redis is configured, publishes to Redis first; on failure falls back to local Broadcast. Returns an error only when JSON marshaling fails.

func (*Hub) BroadcastJSON

func (h *Hub) BroadcastJSON(ctx context.Context, typ string, payload any) error

BroadcastJSON marshals Event{typ, payload, now} and broadcasts it via the hub. Returns an error if JSON marshaling fails.

func (*Hub) ClientCount

func (h *Hub) ClientCount() int

ClientCount returns the number of registered clients.

func (*Hub) Register

func (h *Hub) Register(client *Client)

Register adds the client to the hub. Non-blocking with timeout.

func (*Hub) Run

func (h *Hub) Run(ctx context.Context)

Run runs the hub loop until ctx is cancelled. Closes all clients on exit.

func (*Hub) SubscribeToRedis

func (h *Hub) SubscribeToRedis(ctx context.Context)

SubscribeToRedis subscribes to the hub's Redis channel and broadcasts received messages to all clients. Run in a goroutine; it returns when ctx is cancelled.

func (*Hub) Unregister

func (h *Hub) Unregister(client *Client)

Unregister removes the client from the hub. Non-blocking with timeout.

type HubOption

type HubOption func(*Hub)

HubOption configures a Hub.

func WithBroadcastBuf

func WithBroadcastBuf(n int) HubOption

WithBroadcastBuf sets the broadcast channel buffer size.

func WithChannelTimeout

func WithChannelTimeout(d time.Duration) HubOption

WithChannelTimeout sets the timeout for Register, Unregister, and Broadcast operations.

func WithOnConnect

func WithOnConnect(fn OnConnect) HubOption

WithOnConnect sets the callback invoked when a client registers.

func WithOnTimeout

func WithOnTimeout(fn func(op string)) HubOption

WithOnTimeout sets a callback when a channel operation times out (e.g. for logging).

func WithRedis

func WithRedis(client *redis.Client, channel string) HubOption

WithRedis configures Redis Pub/Sub for multi-instance broadcast. If client is nil, Redis is disabled.

func WithRegisterBuf

func WithRegisterBuf(n int) HubOption

WithRegisterBuf sets the register/unregister channel buffer size.

type OnConnect

type OnConnect func(client *Client)

OnConnect is called when a client is registered; use it to send a welcome message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL