mutex

package module
v0.29.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2026 License: MIT Imports: 14 Imported by: 4

README

mutex

mutex 是 infrago 的模块

包定位

  • 类型:模块
  • 作用:分布式锁模块,负责锁获取/释放与并发互斥。

主要功能

  • 对上提供统一模块接口
  • 对下通过驱动接口接入具体后端
  • 支持按配置切换驱动实现
  • 支持非破坏式 Locked() 锁状态检查
  • 支持 Check() / CheckOn() 返回明确错误,区分“锁已存在”和“后端检查失败”
  • 支持 TryLock() / WaitLock() / WaitLockContext() 显式控制即时抢锁与等待抢锁
  • 支持 Refresh() / RefreshOn() 续租,适合长任务按需延长租约
  • 支持 locker.KeepAlive() 自动续租,适合长任务
  • 支持 Stats() / StatsFrom() / ResetStats() 基础运行指标
  • 支持 Capabilities() / CapabilityFrom() 显式查看实例能力
  • 支持 Debug() / DebugTokens() 只读调试导出
  • 支持 KeyWith() 统一构造锁 key
  • Lock() 返回的 locker 具备 token-aware 解锁语义,过期旧锁不会误删新锁

快速接入

import _ "github.com/infrago/mutex"
[mutex]
driver = "default"
expire = "3s"
cleanup_interval = "30s"
token_grace = "1m"

驱动实现接口列表

以下接口由驱动实现(来自模块 driver.go):

Driver
  • Connect(*Instance) (Connection, error)
Connection
  • Open() error
  • Close() error
  • Lock(key string, expires time.Duration) error
  • Unlock(key string) error
可选接口
  • CheckerLocked(key string) (bool, error)
  • RefresherRefresh(key string, expires time.Duration) error
  • TokenConnectionLockToken(key string, expires time.Duration) (string, error) / UnlockToken(key, token string) error
  • TokenRefresherRefreshToken(key, token string, expires time.Duration) error
  • CapabilityProviderCapabilities() Capability

全局配置项(所有配置键)

配置段:[mutex]

  • driver
  • weight
  • prefix
  • expire
  • cleanup_interval
  • token_grace
  • setting

说明

  • setting 一般用于向具体驱动透传专用参数
  • 多实例配置请参考模块源码中的 Config/configure 处理逻辑
  • Locked() 现在是只读检查,不再通过“试抢锁再释放”实现
  • Locked() / LockedOn() 为兼容旧接口,遇到检查错误时仍会保守返回 true
  • 需要区分“真的已加锁”还是“检查失败”时,请使用 Check() / CheckOn()
  • TryLock() 语义与当前 Lock() 一致,都是“立即尝试,失败直接返回”
  • WaitLock() 在超时前会按 interval 轮询抢锁,超时返回 ErrTimeout
  • WaitLockContext() 支持外部 context.Context 取消;超时或取消会直接返回 ctx.Err()
  • 长任务建议在持锁期间周期性 Refresh(),否则超过 expire 后锁可能被其它节点重新拿到
  • 显式传入负数 lease 会返回 ErrInvalidLease
  • 模块关闭后再操作会返回 ErrClosed
  • 后端超时会尽量归一到 ErrTimeout
  • 推荐使用 lok, err := mutex.Lock(...) 后调用 lok.Unlock();这条路径会保留锁 token,能避免过期旧 locker 误删新锁
  • lok.Refresh(...) 会沿用该 locker 的 token 做精确续租,比无 token helper 更安全
  • lok.KeepAlive(interval) 会后台自动续租;Unlock() 时会自动停止
  • KeyWith(":", "order", id, "pay") 可以生成更稳定的层级 key,便于 contention 统计聚合
  • mutex.Unlock(...) / mutex.UnlockOn(...) 现在也会优先按本进程记录的 token 安全解锁;如果当前驱动要求 token 但本地没有对应 token,会返回 ErrTokenRequired
  • cleanup_interval 控制主包后台 token 清理周期,默认 30s
  • token_grace 控制过期 token 的兼容保留窗口,默认 1m;超出该窗口后会被后台清理
  • 为了兼容旧 helper 的延迟释放场景,主包会短暂保留最近过期的本地 token;Stats() 中的 ActiveTokens 只统计仍然有效的 token
  • Stats() 返回模块汇总和实例级统计,当前包含 Lock/Unlock/Refresh/Check/Contention/Error/Cleanup/ActiveTokens
  • contention 统计细分到 ContentionByKeyContentionByPrefixHotKeys
  • Capabilities() 会按实例返回 Check/Refresh/Token/TokenRefresh 能力;默认会从驱动实现的接口自动识别,驱动也可以实现 CapabilityProvider 覆盖
  • Debug() 返回模块状态、实例配置、能力、统计和 token 调试快照;DebugTokens() 只返回 token 计数和剩余时间,不暴露 token 原值
  • Debug() / DebugTokens() 只建议用于排查和观测,不建议作为业务逻辑依赖

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidConnection  = errors.New("invalid mutex connection")
	ErrNotReady           = errors.New("mutex is not ready")
	ErrClosed             = errors.New("mutex is closed")
	ErrLocked             = errors.New("mutex already locked")
	ErrLostLock           = errors.New("mutex lock is lost")
	ErrTimeout            = errors.New("mutex timeout")
	ErrInvalidLease       = errors.New("invalid mutex lease")
	ErrUnsupportedCheck   = errors.New("mutex locked check is unsupported")
	ErrUnsupportedRefresh = errors.New("mutex refresh is unsupported")
	ErrTokenRequired      = errors.New("mutex unlock requires token")
)

Functions

func Capabilities added in v0.26.0

func Capabilities() map[string]Capability

func Check added in v0.26.0

func Check(args ...base.Any) (bool, error)

func CheckOn added in v0.26.0

func CheckOn(conn string, args ...base.Any) (bool, error)

func Key

func Key(args ...base.Any) string

func KeyWith added in v0.26.0

func KeyWith(sep string, args ...base.Any) string

func Lock

func Lock(args ...base.Any) (*locker, error)

func LockOn

func LockOn(conn string, args ...base.Any) (*locker, error)

func Locked

func Locked(args ...base.Any) bool

func LockedOn

func LockedOn(conn string, args ...base.Any) bool

func Refresh added in v0.26.0

func Refresh(args ...base.Any) error

func RefreshOn added in v0.26.0

func RefreshOn(conn string, args ...base.Any) error

func ResetStats added in v0.26.0

func ResetStats()

func TryLock added in v0.26.0

func TryLock(args ...base.Any) (*locker, error)

func TryLockOn added in v0.26.0

func TryLockOn(conn string, args ...base.Any) (*locker, error)

func Unlock

func Unlock(args ...base.Any) error

func UnlockOn

func UnlockOn(conn string, args ...base.Any) error

func WaitLock added in v0.26.0

func WaitLock(timeout, interval time.Duration, args ...base.Any) (*locker, error)

func WaitLockContext added in v0.26.0

func WaitLockContext(ctx context.Context, interval time.Duration, args ...base.Any) (*locker, error)

func WaitLockOn added in v0.26.0

func WaitLockOn(conn string, timeout, interval time.Duration, args ...base.Any) (*locker, error)

func WaitLockOnContext added in v0.26.0

func WaitLockOnContext(conn string, ctx context.Context, interval time.Duration, args ...base.Any) (*locker, error)

Types

type Capability added in v0.26.0

type Capability struct {
	Check        bool
	Refresh      bool
	Token        bool
	TokenRefresh bool
}

func CapabilityFrom added in v0.26.0

func CapabilityFrom(conn string) (Capability, error)

type CapabilityProvider added in v0.26.0

type CapabilityProvider interface {
	Capabilities() Capability
}

type Checker added in v0.26.0

type Checker interface {
	Locked(key string) (bool, error)
}

type Config

type Config struct {
	Driver          string
	Weight          int
	Prefix          string
	Expire          time.Duration
	CleanupInterval time.Duration
	TokenGrace      time.Duration
	Setting         base.Map
}

type Configs

type Configs map[string]Config

type Connection added in v0.7.0

type Connection interface {
	Open() error
	Close() error

	Lock(key string, expires time.Duration) error
	Unlock(key string) error
}

Connection defines a mutex connection.

type ContentionStat added in v0.26.0

type ContentionStat struct {
	Name  string
	Count uint64
}

type DebugInfo added in v0.26.0

type DebugInfo struct {
	Opened          bool
	Started         bool
	CleanupInterval time.Duration
	Instances       map[string]DebugInstance
	Tokens          []DebugToken
	Stats           Statistics
}

func Debug added in v0.26.0

func Debug() DebugInfo

type DebugInstance added in v0.26.0

type DebugInstance struct {
	Name            string
	Driver          string
	Weight          int
	Prefix          string
	Expire          time.Duration
	CleanupInterval time.Duration
	TokenGrace      time.Duration
	Setting         base.Map
	Capability      Capability
	Stats           Statistics
}

type DebugToken added in v0.26.0

type DebugToken struct {
	Conn    string
	Key     string
	Count   int
	Active  int
	Expired int
	Entries []DebugTokenEntry
}

func DebugTokens added in v0.26.0

func DebugTokens() []DebugToken

type DebugTokenEntry added in v0.26.0

type DebugTokenEntry struct {
	Active         bool
	Remaining      time.Duration
	GraceRemaining time.Duration
}

type Driver

type Driver interface {
	Connect(*Instance) (Connection, error)
}

Driver defines a mutex driver.

type Instance

type Instance struct {
	Name    string
	Config  Config
	Setting base.Map
	// contains filtered or unexported fields
}

Instance is the driver instance context.

type Module

type Module struct {
	// contains filtered or unexported fields
}

func (*Module) Capabilities added in v0.26.0

func (m *Module) Capabilities() map[string]Capability

func (*Module) CapabilityFrom added in v0.26.0

func (m *Module) CapabilityFrom(conn string) (Capability, error)

func (*Module) Close added in v0.7.0

func (m *Module) Close()

Close closes connections.

func (*Module) Config

func (m *Module) Config(global base.Map)

Config parses global config for mutex.

func (*Module) Debug added in v0.26.0

func (m *Module) Debug() DebugInfo

func (*Module) DebugTokens added in v0.26.0

func (m *Module) DebugTokens() []DebugToken

func (*Module) Lock

func (m *Module) Lock(key string, expires ...time.Duration) error

Lock locks with auto-selected connection.

func (*Module) LockOn

func (m *Module) LockOn(conn string, key string, expires ...time.Duration) error

LockOn locks to a specific connection.

func (*Module) Locked added in v0.26.0

func (m *Module) Locked(key string) (bool, error)

Locked checks lock status with auto-selected connection.

func (*Module) LockedOn added in v0.26.0

func (m *Module) LockedOn(conn, key string) (bool, error)

LockedOn checks lock status on a specific connection without mutating it.

func (*Module) Open added in v0.7.0

func (m *Module) Open()

Open connects all mutex drivers.

func (*Module) Refresh added in v0.26.0

func (m *Module) Refresh(key string, expires ...time.Duration) error

Refresh refreshes a lock lease with auto-selected connection.

func (*Module) RefreshOn added in v0.26.0

func (m *Module) RefreshOn(conn, key string, expires ...time.Duration) error

RefreshOn refreshes a lock lease on a specific connection.

func (*Module) Register

func (m *Module) Register(name string, value base.Any)

Register dispatches registrations.

func (*Module) RegisterConfig added in v0.7.0

func (m *Module) RegisterConfig(name string, cfg Config)

RegisterConfig registers a named mutex config.

func (*Module) RegisterConfigs added in v0.7.0

func (m *Module) RegisterConfigs(configs Configs)

RegisterConfigs registers multiple named mutex configs.

func (*Module) RegisterDriver added in v0.7.0

func (m *Module) RegisterDriver(name string, driver Driver)

RegisterDriver registers a mutex driver.

func (*Module) ResetStats added in v0.26.0

func (m *Module) ResetStats()

func (*Module) Setup added in v0.7.0

func (m *Module) Setup()

Setup initializes defaults.

func (*Module) Start added in v0.7.0

func (m *Module) Start()

Start launches module (no-op).

func (*Module) Stats added in v0.26.0

func (m *Module) Stats() Statistics

func (*Module) StatsFrom added in v0.26.0

func (m *Module) StatsFrom(conn string) (Statistics, error)

func (*Module) Stop added in v0.7.0

func (m *Module) Stop()

Stop stops module (no-op).

func (*Module) Unlock

func (m *Module) Unlock(key string) error

Unlock unlocks with auto-selected connection.

func (*Module) UnlockOn

func (m *Module) UnlockOn(conn, key string) error

UnlockOn unlocks on a specific connection.

type Refresher added in v0.26.0

type Refresher interface {
	Refresh(key string, expires time.Duration) error
}

type Statistics added in v0.26.0

type Statistics struct {
	Lock               uint64
	Unlock             uint64
	Refresh            uint64
	Check              uint64
	Contention         uint64
	Error              uint64
	Cleanup            uint64
	ActiveTokens       int
	ContentionByKey    map[string]uint64
	ContentionByPrefix map[string]uint64
	HotKeys            []ContentionStat
	Instances          map[string]Statistics
}

func Stats added in v0.26.0

func Stats() Statistics

func StatsFrom added in v0.26.0

func StatsFrom(conn string) (Statistics, error)

type TokenConnection added in v0.26.0

type TokenConnection interface {
	LockToken(key string, expires time.Duration) (string, error)
	UnlockToken(key, token string) error
}

type TokenRefresher added in v0.26.0

type TokenRefresher interface {
	RefreshToken(key, token string, expires time.Duration) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL