nats

package module
v0.0.0-...-f88d5f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 2, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

README

github.com/taouniverse/tao-nats

Go Report Card GoDoc

Tao Universe 组件单元(Unit),基于泛型工厂模式封装 NATS 消息系统。

安装

go get github.com/taouniverse/tao-nats

使用

导入
import _ "github.com/taouniverse/tao-nats"
配置
# 单实例配置
nats:
  url: nats://localhost:4222

# 多实例配置(如连接多个 NATS 集群)
nats:
  default_instance: primary
  primary:
    url: nats://primary:4222
    user: admin
    password: secret
  secondary:
    url: nats://backup:4222
    token: my-token
配置字段说明
字段 类型 默认值 说明
url string nats://localhost:4222 NATS 服务器地址
user string - 用户名
password string - 密码
token string - 认证 Token(与 user/password 二选一)
name string - 连接名称,用于服务端识别
reconnect bool true 是否自动重连
max_reconnect int 5 最大重连次数
reconnect_wait duration 2s 重连等待时间
timeout duration 10s 连接超时

工厂模式 API

API 说明
nats.M 配置实例 *Config
nats.Factory *tao.BaseFactory[*nats.Conn] 工厂实例
nats.Nats() 获取默认 NATS 连接 (*nats.Conn, error)
nats.GetNats(name) 获取指定名称的连接 (*nats.Conn, error)

使用示例

获取连接并发布消息
package main

import (
    "log"
    
    "github.com/taouniverse/tao-nats"
)

func main() {
    // 获取默认实例
    conn, err := nats.Nats()
    if err != nil {
        log.Fatal(err)
    }
    
    // 发布消息
    err = conn.Publish("orders.created", []byte(`{"id": "123"}`))
    if err != nil {
        log.Fatal(err)
    }
}
订阅消息
conn, _ := nats.Nats()

// 普通订阅(所有消费者都收到)
sub, err := conn.Subscribe("orders.created", func(msg *nats.Msg) {
    log.Printf("Received: %s", msg.Data)
})
if err != nil {
    log.Fatal(err)
}
defer sub.Unsubscribe()

// 队列订阅(负载均衡,只有一个消费者收到)
queueSub, err := conn.QueueSubscribe("orders.process", "workers", func(msg *nats.Msg) {
    log.Println("Processing order...")
})
if err != nil {
    log.Fatal(err)
}
defer queueSub.Unsubscribe()
请求-响应模式
conn, _ := nats.Nats()

// 发送请求并等待响应
msg, err := conn.Request("inventory.check", []byte(`{"sku": "abc"}`), 2*time.Second)
if err != nil {
    log.Fatal(err)
}
log.Printf("Response: %s", msg.Data)

// 响应请求
conn.Subscribe("inventory.check", func(msg *nats.Msg) {
    msg.Respond([]byte(`{"available": true}`))
})
多实例使用
// 获取不同集群的连接
primary, _ := nats.GetNats("primary")
backup, _ := nats.GetNats("backup")

// 向主集群发布
primary.Publish("events", data)

// 向备份集群发布
backup.Publish("events", data)

单元测试

快速测试(无需 Docker)
# 仅运行配置相关测试
go test -v -run "TestConfig" ./...
完整集成测试(需要 Docker)
# 启动 NATS 并运行单实例测试
make test

# 启动 NATS 并运行多实例测试
make test-multi

# 启动 NATS 并运行所有测试
make test-all

# 生成覆盖率报告
make coverage

# 停止 NATS 服务
make down
手动测试
# 1. 启动 NATS
docker-compose up -d

# 2. 运行单实例测试
go test -v ./...

# 3. 运行多实例测试
TAO_TEST_MULTI_INSTANCE=true go test -v ./...

# 4. 停止 NATS
docker-compose down

开发指南

文件 说明
config.go InstanceConfig 字段 + ValidSelf 默认值
nats.go NewNats 构造器 + 工厂注册

Documentation

Index

Constants

View Source
const ConfigKey = "nats"

ConfigKey for this repo

Variables

Factory manages all instances

View Source
var M = &Config{}

M config of nats

Functions

func GetNats

func GetNats(name string) (*nats.Conn, error)

GetNats returns the instance by name

func Nats

func Nats() (*nats.Conn, error)

Nats returns the default instance

func NewNats

func NewNats(name string, cfg InstanceConfig) (*nats.Conn, func() error, error)

NewNats constructor for factory pattern

Types

type Config

type Config struct {
	tao.BaseMultiConfig[InstanceConfig]
	RunAfters []string `json:"run_after,omitempty"`
}

Config is the total configuration, implements tao.MultiConfig interface

func (*Config) Name

func (n *Config) Name() string

Name returns the config key name

func (*Config) RunAfter

func (n *Config) RunAfter() []string

RunAfter returns the list of tasks to run after

func (*Config) ToTask

func (n *Config) ToTask() tao.Task

ToTask converts the config to a tao.Task

func (*Config) ValidSelf

func (n *Config) ValidSelf()

ValidSelf validates and sets default values for the config

type InstanceConfig

type InstanceConfig struct {
	URL           string        `json:"url" yaml:"url"`
	User          string        `json:"user,omitempty" yaml:"user,omitempty"`
	Password      string        `json:"password,omitempty" yaml:"password,omitempty"`
	Token         string        `json:"token,omitempty" yaml:"token,omitempty"`
	Name          string        `json:"name,omitempty" yaml:"name,omitempty"`
	Reconnect     bool          `json:"reconnect" yaml:"reconnect"`
	MaxReconnect  int           `json:"max_reconnect" yaml:"max_reconnect"`
	ReconnectWait time.Duration `json:"reconnect_wait" yaml:"reconnect_wait"`
	Timeout       time.Duration `json:"timeout" yaml:"timeout"`
}

InstanceConfig 单实例配置

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL