streamhub

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: GPL-3.0 Imports: 8 Imported by: 0

README

streamhub

Go Reference Go Report Card

Resumable LLM streaming for Go, backed by Redis.

streamhub is meant for the fairly common case where the code producing a stream and the code consuming it don't share the same lifetime — they might not even be on the same instance. Think LLM responses, SSE endpoints, or anything where you need the stream to survive reconnects.

It uses Redis Streams for chunk persistence (so subscribers can replay what they missed) and Redis Pub/Sub for cancel signals (so you can stop a generation from anywhere). Each producer gets a generation ID as a fencing token, and only one producer can own a session at a time.

Requirements

  • Go 1.26
  • Redis

Install

go get github.com/gtoxlili/streamhub

Usage

Create a Hub:

client, err := rueidis.NewClient(rueidis.ClientOption{
	InitAddress: []string{"127.0.0.1:6379"},
})
if err != nil {
	log.Fatal(err)
}
defer client.Close()

hub := streamhub.New(client)

Register a producer:

stream, created, err := hub.Register("chat:123", func() {
	// called when someone cancels this session
})
if err != nil {
	log.Fatal(err)
}
if !created {
	return // another instance already owns this session
}
defer stream.Close()

stream.SetMetadata(map[string]any{"model": "claude-sonnet-4-20250514"})

stream.Publish("hello")
stream.Publish(" world")

created is the important bit — if it's false, a producer is already running for this session.

Subscribe (from any instance):

stream := hub.Get("chat:123")
if stream == nil {
	return
}

chunks, unsubscribe := stream.Subscribe(streamhub.WithBuffer(128))
defer unsubscribe()

for chunk := range chunks {
	// replays existing chunks first, then streams live
	println(chunk)
}

Cancel:

if stream := hub.Get("chat:123"); stream != nil {
	// Pass context.Background() for fire-and-forget, or a timeout to wait
	// for the producer to finish persisting.
	stream.Cancel(context.Background())
}

API

streamhub.New(client)

Creates a Hub.

hub.Register(sessionID, cancelRuntime)

Tries to claim a session as producer. Returns (stream, created, err) — check created before writing.

hub.Get(sessionID)

Returns a handle for an existing stream, or nil.

hub.Active(sessionIDs)

Checks which sessions are still active.

hub.Remove(sessionID)

Deletes Redis keys and local state for a stream.

stream.Publish(chunk)

Publishes a chunk.

stream.Subscribe(opts...)

Subscribes. Replays existing chunks, then delivers new ones live. Options: WithBuffer(n) tunes the channel buffer; WithBatchReplay() concatenates replay into one string instead of sending chunks one-by-one.

stream.SetMetadata(v) / stream.Metadata(&target)

Stores / loads per-stream JSON metadata.

stream.Cancel(ctx)

Sends a cancel signal via Pub/Sub and waits for the producer to finish (or ctx to expire). Use context.Background() for fire-and-forget.

stream.Close()

Marks the stream as done.

stream.Done()

Reports whether the stream has finished.

Typical flow

  1. Register when a request comes in
  2. Only start the job if created == true
  3. Publish chunks as they're generated
  4. Consumers call Get + Subscribe from any instance
  5. Close when done, Cancel if the user aborts

Notes

  • Don't start a second producer when created == false
  • Call SetMetadata before Close
  • Always call unsubscribe
  • Ensure Close runs on every producer exit path, including panics. If your producer goroutine panics or its input channel never closes, Close never runs, heartbeat keeps refreshing TTL, and the session looks "streaming forever" to every client. The safe pattern:
    go func() {
        defer func() {
            if r := recover(); r != nil { /* log */ }
            live.Close()
            cancelRuntime()
        }()
        for chunk := range runtime.Chunks {
            live.Publish(chunk)
        }
        live.SetMetadata(usage)
    }()
    

See also

License

GPL-3.0. See LICENSE.

Documentation

Overview

Package streamhub provides resumable LLM streaming for Go, backed by Redis.

It is meant for the case where the code producing a stream and the code consuming it don't share the same lifetime — they might not even be on the same instance. Chunks are stored in Redis Streams so reconnecting subscribers can replay what they missed, and cancel signals are delivered via Redis Pub/Sub so a generation can be stopped from anywhere.

Each producer gets a generation ID as a fencing token, and only one producer can own a session at a time.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub manages Redis-backed LiveStream values.

func New

func New(client rueidis.Client) *Hub

New creates a Hub from a Redis client.

func (*Hub) Active

func (h *Hub) Active(sessionIDs []string) map[string]bool

Active reports which session IDs still have an unfinished stream.

func (*Hub) Get

func (h *Hub) Get(sessionID string) *LiveStream

Get returns a stream proxy for sessionID, or nil if it does not exist.

func (*Hub) Register

func (h *Hub) Register(sessionID string, cancelRuntime func()) (*LiveStream, bool, error)

Register tries to create a new LiveStream for sessionID. If created is false, the caller must not start another producer.

func (*Hub) Remove

func (h *Hub) Remove(sessionID string)

Remove deletes a stream's Redis keys and local state.

type LiveStream

type LiveStream struct {
	// contains filtered or unexported fields
}

LiveStream is a Redis-backed stream for one session. generation is only set on the producer side.

func (*LiveStream) Cancel

func (s *LiveStream) Cancel(ctx context.Context)

Cancel broadcasts a cancel signal and waits for the stream to finish (or the context to expire). Pass context.Background() for fire-and-forget.

func (*LiveStream) Close

func (s *LiveStream) Close()

Close marks the stream as done and stops local subscribers. If this generation is stale, Close only shuts down local state.

func (*LiveStream) Done

func (s *LiveStream) Done() bool

Done reports whether the stream is marked done in Redis.

func (*LiveStream) Metadata

func (s *LiveStream) Metadata(target any) bool

Metadata loads stored metadata into target.

func (*LiveStream) Publish

func (s *LiveStream) Publish(chunk string)

Publish appends a chunk to Redis if the generation still matches and the stream is still active. Fails silently on stale generation, closed stream, or Redis errors — producers should treat Publish as best-effort.

func (*LiveStream) SetMetadata

func (s *LiveStream) SetMetadata(v any)

SetMetadata stores stream metadata as JSON. It only writes when the current generation still owns the stream. The ownership check and the write are atomic, so a late writer from a stale generation can't overwrite a new owner's metadata.

func (*LiveStream) Subscribe

func (s *LiveStream) Subscribe(opts ...SubscribeOption) (<-chan string, func())

Subscribe replays existing chunks, then follows new ones. The returned unsubscribe should be called when the caller is done.

type SubscribeOption

type SubscribeOption func(*subscribeConfig)

SubscribeOption configures Subscribe behaviour.

func WithBatchReplay

func WithBatchReplay() SubscribeOption

WithBatchReplay makes Subscribe concatenate all existing chunks into a single string instead of sending them one by one. Useful for reconnecting clients that don't need per-chunk granularity on replay.

func WithBuffer

func WithBuffer(n int) SubscribeOption

WithBuffer sets the extra channel buffer size for live chunks. Defaults to 256 if not specified or ≤ 0.

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL