ws

package module
v0.28.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: MIT Imports: 14 Imported by: 2

README

ws

infrago 的 WebSocket 模块。

能力

  • http/web.Context.Upgrade() 直接接入 ws
  • space 隔离
  • ws.HookOpen/Receive/Send/Close
  • ws.Filter:入站消息执行链
  • ws.Message:客户端上行消息
  • ws.Command:服务端下行命令
  • ctx.Reply/Push/Broadcast/Groupcast
  • ctx.BindUserPushUser
  • PushResult/BroadcastResult/GroupcastResult
  • ctx.Answer
  • ws.Export() / ws.Metrics()
  • demo 首页会直接展示 /ws/export/ws/metrics

接入模型

http/web 只负责:

  • 路由
  • 权鉴 / 参数
  • Upgrade
  • 把升级后的连接交给 ws

大多数项目直接这样用:

infra.Register(".socket", web.Router{
    Uri: "/socket",
    Action: func(ctx *web.Context) {
        if err := ctx.Upgrade(); err != nil {
            ctx.Error(infra.Fail.With(err.Error()))
        }
    },
})

默认规则:

  • ctx.Upgrade():默认 space = ctx.Name
  • 如果 ctx.Name == "",则回退 infra.DEFAULT
  • ctx.Upgrade("name"):显式使用指定 space

自定义空间示例:

infra.Register(".socket.custom", web.Router{
    Uri: "/socket/custom",
    Action: func(ctx *web.Context) {
        _ = ctx.Upgrade("custom")
    },
})

配置

[ws]
format = "text"
codec = "json"
ping_interval = "30s"
read_timeout = "75s"
write_timeout = "10s"
max_message_size = 4194304
queue_size = 128
queue_policy = "close"
compression = false
compress_level = 0
observe_interval = "30s"
observe_log = false
observe_trace = false

默认输出结构:

{"code":0,"name":"demo.notice","data":{"text":"hello"},"time":1770000000}

注意:

  • space 是服务端内部隔离维度,不在 websocket 包体里传输
  • ws.Export() 中每条协议会同时给出 request / response 示例
  • ws.Export() 顶层带 schema.name / schema.version / schema.generated
  • ws.Export() 顶层 spaces 会汇总每个空间的消息、命令、过滤器、处理器和钩子数量

接收端兼容:

  • msg / name
  • args / data
  • 若没有 args/data,则把除消息名外的其它字段全部合并为参数

发送端固定结构:

  • name
  • data
  • code
  • text
  • time

Space

ws 的连接、用户、分组、广播都按 space 隔离。

默认情况下:

  • ctx.Upgrade() 使用当前路由名 ctx.Name 作为 space
  • 同一路由下的连接天然互通
  • 不同路由的连接天然隔离

下列能力都按 space 生效:

  • Message / Command
  • Hook / Filter / Handler
  • BindUser / PushUser
  • Join / Groupcast
  • Broadcast
  • 节点间 _ws.dispatch

其中注册规则是:

  • Message / Command / Handler:先查当前 space,再回退全局 infra.DEFAULT
  • Filter / Hook:执行全局 infra.DEFAULT + 当前 space

如果不在 ctx 中、但又要显式指定空间,可以使用:

  • ws.PushUserIn(space, uid, msg, data)
  • ws.BroadcastIn(space, msg, data)
  • ws.GroupcastIn(space, gid, msg, data)

队列优先级

ws.Command.Setting 支持:

  • priority = "high" | "normal" | "low"
  • queue_policy = "block" | "close" | "drop"

默认规则:

  • high:优先保留,默认按 block
  • normal:按模块级 queue_policy
  • low:高压时自动丢弃,默认按 drop

指标约定:

  • queued 表示当前发送队列深度,不是累计入队次数
  • dropped 表示被丢弃的消息累计次数

Hook 语义

  • Send Hook 可以读取并修改 ctx.Output,最终会写出修改后的字节内容
  • Close Hook 对同一个 session 只触发一次
  • Hook / Handler 内部 panic 会被模块捕获并记录,不会向外打断连接循环

节点间分发

ws 内部通过 _ws.dispatch 做节点间分发,并严格校验:

  • op 必须是 push / push_user / broadcast / groupcast
  • msg 不能为空
  • push 必须带 sid
  • push_user 必须带 uid
  • groupcast 必须带 gid

跨节点 push sid 时,如果当前节点没有这个 session,会按 no-op 处理并返回成功。

示例

infra.Register("demo.echo", ws.Message{
    Action: func(ctx *ws.Context) {
        _ = ctx.Reply("demo.echoed", Map{"text": ctx.Value["text"]})
    },
})

Documentation

Index

Constants

View Source
const (
	TextMessage   = 1
	BinaryMessage = 2
	CloseMessage  = 8
	PingMessage   = 9
	PongMessage   = 10
)

Variables

This section is empty.

Functions

func Accept

func Accept(opts AcceptOptions) error

func Broadcast

func Broadcast(msg string, values ...Map) error

func BroadcastIn added in v0.20.0

func BroadcastIn(space, msg string, values ...Map) error

func CompressionEnabled

func CompressionEnabled() bool

func CompressionLevel

func CompressionLevel() int

func Export

func Export() Map

func Groupcast

func Groupcast(gid, msg string, values ...Map) error

func GroupcastIn added in v0.20.0

func GroupcastIn(space, gid, msg string, values ...Map) error

func Push

func Push(sid, msg string, values ...Map) error

func PushUser

func PushUser(uid, msg string, values ...Map) error

func PushUserIn added in v0.20.0

func PushUserIn(space, uid, msg string, values ...Map) error

Types

type AcceptOptions

type AcceptOptions struct {
	Conn       Conn
	Meta       *infra.Meta
	Space      string
	Name       string
	Site       string
	Host       string
	Domain     string
	RootDomain string
	Path       string
	Uri        string
	Setting    Map
	Params     Map
	Query      Map
	Form       Map
	Value      Map
	Args       Map
	Locals     Map
}

type Command

type Command struct {
	Space    string `json:"space,omitempty"`
	Name     string `json:"name"`
	Desc     string `json:"desc"`
	Nullable bool   `json:"nullable"`
	Args     Vars   `json:"args"`
	Setting  Map    `json:"-"`
}

func (Command) RegistryComponent

func (Command) RegistryComponent() string

type Commands

type Commands map[string]Command

func (Commands) RegistryComponent

func (Commands) RegistryComponent() string

type Config

type Config struct {
	Format          string        `json:"format"`
	Codec           string        `json:"codec"`
	PingInterval    time.Duration `json:"ping_interval"`
	ReadTimeout     time.Duration `json:"read_timeout"`
	WriteTimeout    time.Duration `json:"write_timeout"`
	MaxMessageSize  int64         `json:"max_message_size"`
	QueueSize       int           `json:"queue_size"`
	QueuePolicy     string        `json:"queue_policy"`
	Compression     bool          `json:"compression"`
	CompressLevel   int           `json:"compress_level"`
	ObserveInterval time.Duration `json:"observe_interval"`
	ObserveLog      bool          `json:"observe_log"`
	ObserveTrace    bool          `json:"observe_trace"`
	Setting         Map           `json:"-"`
}

type Conn

type Conn interface {
	ReadMessage() (int, []byte, error)
	WriteMessage(messageType int, data []byte) error
	Close() error
	Raw() Any
}

type Context

type Context struct {
	*infra.Meta
	Session *Session
	Conn    Conn

	Op      string
	Space   string
	Type    int
	Name    string
	Message Message
	Command Command
	Setting Map
	Value   Map
	Args    Map
	Locals  Map
	Input   []byte
	Output  []byte
	// contains filtered or unexported fields
}

func (*Context) Answer

func (ctx *Context) Answer(msg string, data Map, results ...Res) error

func (*Context) AnswerResult

func (ctx *Context) AnswerResult(msg string, data Map, results ...Res) Delivery

func (*Context) BindUser

func (ctx *Context) BindUser(uid string)

func (*Context) Broadcast

func (ctx *Context) Broadcast(msg string, values ...Map) error

func (*Context) BroadcastResult

func (ctx *Context) BroadcastResult(msg string, values ...Map) Delivery

func (*Context) Deny

func (ctx *Context) Deny(args ...Res)

func (*Context) Error

func (ctx *Context) Error(args ...Res)

func (*Context) Fail

func (ctx *Context) Fail(args ...Res)

func (*Context) Groupcast

func (ctx *Context) Groupcast(gid, msg string, values ...Map) error

func (*Context) GroupcastResult

func (ctx *Context) GroupcastResult(gid, msg string, values ...Map) Delivery

func (*Context) Groups

func (ctx *Context) Groups() []string

func (*Context) InGroup

func (ctx *Context) InGroup(group string) bool

func (*Context) Invalid

func (ctx *Context) Invalid(args ...Res)

func (*Context) Join

func (ctx *Context) Join(groups ...string)

func (*Context) Leave

func (ctx *Context) Leave(groups ...string)

func (*Context) LeaveAll

func (ctx *Context) LeaveAll()

func (*Context) Next

func (ctx *Context) Next()

func (*Context) Push

func (ctx *Context) Push(sid, msg string, values ...Map) error

func (*Context) PushResult

func (ctx *Context) PushResult(sid, msg string, values ...Map) Delivery

func (*Context) PushUser

func (ctx *Context) PushUser(uid, msg string, values ...Map) error

func (*Context) PushUserResult

func (ctx *Context) PushUserResult(uid, msg string, values ...Map) Delivery

func (*Context) Reply

func (ctx *Context) Reply(msg string, values ...Map) error

func (*Context) ReplyResult

func (ctx *Context) ReplyResult(msg string, values ...Map) Delivery

type Delivery

type Delivery struct {
	Hit        int    `json:"hit"`
	Success    int    `json:"success"`
	Failed     int    `json:"failed"`
	FirstError string `json:"first_error,omitempty"`
}

func BroadcastResult

func BroadcastResult(msg string, values ...Map) Delivery

func BroadcastResultIn added in v0.20.0

func BroadcastResultIn(space, msg string, values ...Map) Delivery

func GroupcastResult

func GroupcastResult(gid, msg string, values ...Map) Delivery

func GroupcastResultIn added in v0.20.0

func GroupcastResultIn(space, gid, msg string, values ...Map) Delivery

func PushResult

func PushResult(sid, msg string, values ...Map) Delivery

func PushUserResult

func PushUserResult(uid, msg string, values ...Map) Delivery

func PushUserResultIn added in v0.20.0

func PushUserResultIn(space, uid, msg string, values ...Map) Delivery

type Filter

type Filter struct {
	Space   string  `json:"space,omitempty"`
	Name    string  `json:"name"`
	Desc    string  `json:"desc"`
	Message ctxFunc `json:"-"`
}

func (Filter) RegistryComponent

func (Filter) RegistryComponent() string

type Filters

type Filters map[string]Filter

func (Filters) RegistryComponent

func (Filters) RegistryComponent() string

type Handler

type Handler struct {
	Space   string  `json:"space,omitempty"`
	Name    string  `json:"name"`
	Desc    string  `json:"desc"`
	Error   ctxFunc `json:"-"`
	Invalid ctxFunc `json:"-"`
	Denied  ctxFunc `json:"-"`
}

func (Handler) RegistryComponent

func (Handler) RegistryComponent() string

type Handlers

type Handlers map[string]Handler

func (Handlers) RegistryComponent

func (Handlers) RegistryComponent() string

type Hook

type Hook struct {
	Space   string  `json:"space,omitempty"`
	Name    string  `json:"name"`
	Desc    string  `json:"desc"`
	Open    ctxFunc `json:"-"`
	Close   ctxFunc `json:"-"`
	Receive ctxFunc `json:"-"`
	Send    ctxFunc `json:"-"`
}

func (Hook) RegistryComponent

func (Hook) RegistryComponent() string

type Hooks

type Hooks map[string]Hook

func (Hooks) RegistryComponent

func (Hooks) RegistryComponent() string

type Message

type Message struct {
	Space    string  `json:"space,omitempty"`
	Name     string  `json:"name"`
	Desc     string  `json:"desc"`
	Nullable bool    `json:"nullable"`
	Args     Vars    `json:"args"`
	Setting  Map     `json:"-"`
	Action   ctxFunc `json:"-"`
}

func (Message) RegistryComponent

func (Message) RegistryComponent() string

type Messages

type Messages map[string]Message

func (Messages) RegistryComponent

func (Messages) RegistryComponent() string

type Module

type Module struct {
	// contains filtered or unexported fields
}

func (*Module) Close

func (m *Module) Close()

func (*Module) Config

func (m *Module) Config(global Map)

func (*Module) Open

func (m *Module) Open()

func (*Module) Register

func (m *Module) Register(name string, value Any)

func (*Module) RegisterCommand

func (m *Module) RegisterCommand(name string, command Command)

func (*Module) RegisterCommands

func (m *Module) RegisterCommands(prefix string, items Commands)

func (*Module) RegisterFilter

func (m *Module) RegisterFilter(name string, filter Filter)

func (*Module) RegisterFilters

func (m *Module) RegisterFilters(items Filters)

func (*Module) RegisterHandler

func (m *Module) RegisterHandler(name string, handler Handler)

func (*Module) RegisterHandlers

func (m *Module) RegisterHandlers(items Handlers)

func (*Module) RegisterHook

func (m *Module) RegisterHook(name string, hook Hook)

func (*Module) RegisterHooks

func (m *Module) RegisterHooks(items Hooks)

func (*Module) RegisterMessage

func (m *Module) RegisterMessage(name string, message Message)

func (*Module) RegisterMessages

func (m *Module) RegisterMessages(prefix string, items Messages)

func (*Module) Setup

func (m *Module) Setup()

func (*Module) Start

func (m *Module) Start()

func (*Module) Stop

func (m *Module) Stop()

type Session

type Session struct {
	ID         string         `json:"id"`
	Space      string         `json:"space"`
	User       string         `json:"user,omitempty"`
	Name       string         `json:"name"`
	Site       string         `json:"site,omitempty"`
	Host       string         `json:"host,omitempty"`
	Domain     string         `json:"domain,omitempty"`
	RootDomain string         `json:"root_domain,omitempty"`
	Path       string         `json:"path,omitempty"`
	Uri        string         `json:"uri,omitempty"`
	Setting    Map            `json:"setting,omitempty"`
	Params     Map            `json:"params,omitempty"`
	Query      Map            `json:"query,omitempty"`
	Form       Map            `json:"form,omitempty"`
	Value      Map            `json:"value,omitempty"`
	Args       Map            `json:"args,omitempty"`
	Locals     Map            `json:"locals,omitempty"`
	Groups     map[string]Any `json:"groups,omitempty"`

	Meta *infra.Meta `json:"-"`
	Conn Conn        `json:"-"`
	// contains filtered or unexported fields
}

func SessionByID

func SessionByID(id string) *Session

func Sessions

func Sessions() []*Session

func SessionsByUser

func SessionsByUser(uid string) []*Session

func SessionsByUserIn added in v0.20.0

func SessionsByUserIn(space, uid string) []*Session

type Stats

type Stats struct {
	Connections      int64 `json:"connections"`
	Users            int64 `json:"users"`
	MessagesReceived int64 `json:"messages_received"`
	MessagesSent     int64 `json:"messages_sent"`
	ReceiveFailed    int64 `json:"receive_failed"`
	SendFailed       int64 `json:"send_failed"`
	BytesReceived    int64 `json:"bytes_received"`
	BytesSent        int64 `json:"bytes_sent"`
	AvgReceiveBytes  int64 `json:"avg_receive_bytes"`
	AvgSendBytes     int64 `json:"avg_send_bytes"`
	Queued           int64 `json:"queued"`
	Dropped          int64 `json:"dropped"`
}

func Metrics

func Metrics() Stats

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL