Documentation
¶
Overview ¶
Package db 提供数据库连接与管理的核心能力。
Package db provides core capabilities for database connection and management.
主要特性 / Features:
- 多驱动支持:MySQL、SQLite、ClickHouse Multi-driver support: MySQL, SQLite, ClickHouse
- Options 结构体配置连接参数,支持 YAML 反序列化 Options struct for connection configuration, supports YAML deserialization
- 连接池:MaxIdleConns、MaxOpenConns、ConnMaxLifetime、ConnMaxIdleTime Connection pool: MaxIdleConns, MaxOpenConns, ConnMaxLifetime, ConnMaxIdleTime
- 懒连接:首次 GetDB 时建立连接,连接失败每 10 秒重试 Lazy connection: connect on first GetDB call; retry every 10s on failure
- 双通道日志:SQL 走标准 log(Output),业务(连接/重试)走 Logger(对接 slog) Dual-channel logging: SQL via Output (std log), business (connect/retry) via Logger (slog)
- 全局多实例:按 Name 注册,按名获取 Global multi-instance: register by Name, fetch by name
典型用法 / Typical usage:
// 通过 CreateDB 统一入口,自动从 URL 解析驱动类型
// Unified entry via CreateDB, auto-parses driver type from URL
db.CreateDB(db.Options{Name: "default", URL: "mysql://user:pass@tcp(host:3306)/db?charset=utf8mb4&parseTime=True&loc=Local"})
// 或直接调用具体驱动的构造函数 / Or call driver-specific constructors directly
db.NewMySQL(db.Options{Name: "default", URL: "user:pass@tcp(host:3306)/db?charset=utf8mb4"})
// 业务层按名获取 GORM 实例(自动懒连接 + 带 context)
// Get GORM instance by name (lazy connect + context-aware)
gormDB := db.GetDB(ctx, "default")
stream.go 提供基于原生 database/sql 的流式查询能力,绕过 GORM ORM 层, 适用于大结果集的逐行或分批处理场景,避免一次性加载所有数据到内存。
主要函数:
- Stream: 逐行流式处理,每行回调 handle(index, row)
- StreamBatch: 分批流式处理,每 batch 行回调 fun(rows)
- Query: 基于 StreamBatch 的便捷查询,返回所有结果
Index ¶
- func CreateDB(options Options) error
- func GetDB(ctx context.Context, name string) *gorm.DB
- func Load(db *db) *db
- func NewClickHouse(options Options) error
- func NewMySQL(options Options) error
- func NewSQLite(options Options) error
- func OpenClickHouse(ctx context.Context, options Options) (*gorm.DB, error)
- func OpenMySQL(ctx context.Context, options Options) (*gorm.DB, error)
- func OpenSQLite(ctx context.Context, options Options) (*gorm.DB, error)
- func Query(ctx context.Context, name string, query string, args ...any) ([]map[string]any, error)
- func Stream(ctx context.Context, name string, query string, args []any, ...) (int64, error)
- func StreamBatch(ctx context.Context, name, query string, args []any, batch int, ...) (int64, error)
- type DB
- type DBLogger
- func (l *DBLogger) Error(ctx context.Context, msg string, data ...any)
- func (l *DBLogger) Info(ctx context.Context, msg string, data ...any)
- func (l *DBLogger) LogMode(level logger.LogLevel) logger.Interface
- func (l *DBLogger) Trace(ctx context.Context, begin time.Time, ...)
- func (l *DBLogger) Warn(ctx context.Context, msg string, data ...any)
- type Logger
- type Option
- func ConnMaxIdleTime(d time.Duration) Option
- func ConnMaxLifeTime(d time.Duration) Option
- func GenMySQL(host string, port int, username, password, database, charset, location string) Option
- func Log(l Logger) Option
- func LogLevel(lv logger.LogLevel) Option
- func LongQueryTime(d time.Duration) Option
- func MaxIdleConns(n int) Option
- func MaxOpenConns(n int) Option
- func Name(name string) Option
- func TimeZone(tz string) Option
- func URL(url string) Option
- func WithOutput(w io.Writer) Option
- type Options
- type Output
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateDB ¶
CreateDB 根据 URL 中的 scheme 自动创建对应类型的数据库连接并注册到全局 dbm。 这是一个统一入口函数,内部使用 strings.Cut 以 "://" 为分隔符将 Options.URL 拆分为 Schema(驱动类型)和 DSN(数据源名称),然后按 Schema(大小写无关,通过 strings.ToLower) 分别调用 NewMySQL、NewSQLite 或 NewClickHouse。
CreateDB automatically creates and registers a database connection based on the scheme parsed from Options.URL. It uses strings.Cut to split URL at "://" into Schema (driver type) and DSN (data source name), then dispatches (case-insensitive via strings.ToLower) to NewMySQL, NewSQLite, or NewClickHouse.
支持的 Schema / Supported Schema values:
- "mysql" → NewMySQL
- "sqlite" → NewSQLite
- "clickhouse" → NewClickHouse
返回值 / Returns:
- error: 若 Schema 不在支持列表中,返回 "unknown database schema: ..." 错误; 若底层 NewXxx 注册失败也会返回相应 error。 Returns "unknown database schema: ..." if Schema is not supported; also propagates errors from underlying NewXxx calls.
参数 / Parameters:
- options: Options 结构体,必须设置 URL(格式 scheme://dsn)和 Name。 Schema 由函数内部通过 strings.Cut 从 URL 自动解析,无需外部设置。 Options struct; URL (scheme://dsn) and Name are required. Schema is auto-parsed from URL via strings.Cut; no need to set externally.
示例 / Example:
db.CreateDB(db.Options{Name: "default", URL: "mysql://user:pass@tcp(host:3306)/mydb?charset=utf8mb4"})
db.CreateDB(db.Options{Name: "local", URL: "sqlite://:memory:"})
db.CreateDB(db.Options{Name: "analytics", URL: "clickhouse://user:pass@localhost:9000/mydb"})
func GetDB ¶
GetDB 按名称获取 GORM 实例,懒连接 + 带 context。业务层主要入口。 Gets GORM instance by name with lazy connect and context. Main entry for business layer.
func Load ¶
func Load(db *db) *db
Load 将 DB 实例注册到全局 dbm。由 NewMySQL/NewSQLite 内部调用。 Registers DB instance into dbm. Called internally by NewMySQL/NewSQLite.
func NewClickHouse ¶
NewClickHouse 创建 ClickHouse 数据库连接并注册到全局 dbm。 Creates ClickHouse connection and registers it to global dbm.
参数 / Parameters:
- options: Options 结构体,需设置 Name 和 URL 字段。 若通过 CreateDB 调用,URL 为 DSN 部分(不含 scheme "clickhouse://"),Schema 已被拆分。 若直接调用,URL 应为纯 ClickHouse DSN。 Options struct; Name and URL are required. When called via CreateDB, URL is the DSN part (without "clickhouse://" scheme). When called directly, URL should be a raw ClickHouse DSN.
返回值 / Returns:
- error: Name 为空或注册失败时返回错误 / Error if Name is empty or registration fails
行为 / Behavior:
- 内部调用 setup(options).open(clickhouse.Open);setup 通过 LoadOptions 与 newOptions 合并非零字段与默认值 Calls setup(options).open(clickhouse.Open); setup merges non-zero fields with defaults via LoadOptions and newOptions
- 连接为懒加载:首次调用 db.GetDB(ctx, name) 时才真正建连 Lazy connect: connects on first db.GetDB(ctx, name) call
- ClickHouse 不能跳过默认事务,否则无法插入数据(在 OpenDB 中处理) ClickHouse requires default transaction for inserts (handled in OpenDB)
示例 / Example:
// 通过 CreateDB 统一入口(推荐)/ Via CreateDB (recommended)
db.CreateDB(db.Options{Name: "analytics", URL: "clickhouse://user:pass@localhost:9000/mydb"})
// 直接调用(URL 为纯 DSN,不含 scheme)/ Direct call (URL is raw DSN, no scheme)
db.NewClickHouse(db.Options{Name: "clickhouse", URL: "user:pass@localhost:9000/mydb"})
gormDB := db.GetDB(ctx, "clickhouse")
func NewMySQL ¶
NewMySQL 创建 MySQL 数据库连接并注册到全局 dbm。 Creates MySQL connection and registers it to global dbm.
参数 / Parameters:
- options: Options 结构体,需设置 Name 和 URL 字段。 若通过 CreateDB 调用,URL 为 DSN 部分(不含 scheme "mysql://"),Schema 已被拆分。 若直接调用,URL 应为纯 MySQL DSN(如 "user:pass@tcp(host:3306)/db?charset=utf8mb4")。 Options struct; Name and URL are required. When called via CreateDB, URL is the DSN part (without "mysql://" scheme). When called directly, URL should be a raw MySQL DSN.
返回值 / Returns:
- error: Name 为空或注册失败时返回错误 / Error if Name is empty or registration fails
行为 / Behavior:
- 内部调用 setup(options).open(mysql.Open);setup 通过 LoadOptions 与 newOptions 合并非零字段与默认值 Calls setup(options).open(mysql.Open); setup merges non-zero fields with defaults via LoadOptions and newOptions
- 连接为懒加载:首次调用 db.GetDB(ctx, name) 时才真正建连 Lazy connect: connects on first db.GetDB(ctx, name) call
- 连接失败时每 10 秒自动重试,Logger 输出重试日志 Retries every 10s on connect failure; Logger logs retries
示例 / Example:
// 通过 CreateDB 统一入口(推荐)/ Via CreateDB (recommended)
db.CreateDB(db.Options{Name: "default", URL: "mysql://user:pass@tcp(127.0.0.1:3306)/mydb?charset=utf8mb4&parseTime=True&loc=Local"})
// 直接调用(URL 为纯 DSN,不含 scheme)/ Direct call (URL is raw DSN, no scheme)
db.NewMySQL(db.Options{Name: "default", URL: "user:pass@tcp(127.0.0.1:3306)/mydb?charset=utf8mb4&parseTime=True&loc=Local"})
gormDB := db.GetDB(ctx, "default")
func NewSQLite ¶
NewSQLite 创建 SQLite 数据库连接并注册到全局 dbm。 Creates SQLite connection and registers it to global dbm.
参数 / Parameters:
- options: Options 结构体,需设置 Name 和 URL 字段。 若通过 CreateDB 调用,URL 为 DSN 部分(不含 scheme "sqlite://"),Schema 已被拆分。 若直接调用,URL 应为纯 SQLite DSN。 Options struct; Name and URL are required. When called via CreateDB, URL is the DSN part (without "sqlite://" scheme). When called directly, URL should be a raw SQLite DSN.
URL(DSN)格式 / URL (DSN) format:
- ":memory:" - 内存数据库,单测常用 In-memory database, common for unit tests
- "/path/to/file.db" - 文件数据库,开发/轻量部署 File-based database, for dev or light deployment
返回值 / Returns:
- error: Name 为空或注册失败时返回错误 / Error if Name is empty or registration fails
行为 / Behavior:
- 内部调用 setup(options).open(sqlite.Open);setup 通过 LoadOptions 与 newOptions 合并非零字段与默认值 Calls setup(options).open(sqlite.Open); setup merges non-zero fields with defaults via LoadOptions and newOptions
- 连接为懒加载:首次 db.GetDB(ctx, name) 时才建连 Lazy connect: connects on first db.GetDB(ctx, name) call
示例 / Example:
// 通过 CreateDB 统一入口(推荐)/ Via CreateDB (recommended)
db.CreateDB(db.Options{Name: "default", URL: "sqlite://:memory:"})
// 直接调用(URL 为纯 DSN,不含 scheme)/ Direct call (URL is raw DSN, no scheme)
db.NewSQLite(db.Options{Name: "default", URL: ":memory:"})
db.NewSQLite(db.Options{Name: "local", URL: "/data/iot-mesh.db"})
func OpenClickHouse ¶
OpenClickHouse 立即打开 ClickHouse 连接并返回 *gorm.DB,不向全局 dbm 注册。 适用于单次任务、迁移脚本等不需要 GetDB 按名取实例的场景。
Opens ClickHouse immediately and returns *gorm.DB without registering to global dbm. Use for one-off jobs, migrations, etc., when name-based GetDB is not needed.
func OpenMySQL ¶
OpenMySQL 立即打开 MySQL 连接并返回 *gorm.DB,不向全局 dbm 注册。 适用于单次任务、迁移脚本等不需要 GetDB 按名取实例的场景。
Opens MySQL immediately and returns *gorm.DB without registering to global dbm. Use for one-off jobs, migrations, etc., when name-based GetDB is not needed.
func OpenSQLite ¶
OpenSQLite 立即打开 SQLite 连接并返回 *gorm.DB,不向全局 dbm 注册。 适用于单次任务、迁移脚本、本地工具等不需要 GetDB 按名取实例的场景。
Opens SQLite immediately and returns *gorm.DB without registering to global dbm. Use for one-off jobs, migrations, local tools, etc., when name-based GetDB is not needed.
func Query ¶
Query 便捷查询函数,基于 StreamBatch 实现,内部以 100 行为一批, 将所有结果收集到 []map[string]any 后一次性返回。 适合结果集较小、不需要流式处理的普通查询场景。
results, err := db.Query(ctx, "default", "SELECT id, name FROM users WHERE status = ?", "active")
func Stream ¶
func Stream(ctx context.Context, name string, query string, args []any, handle func(index int64, row []any) error) (int64, error)
Stream 逐行流式查询,对每一行调用 handle 回调。 使用原生 database/sql 接口绕过 GORM,每行数据根据列的 Go 类型自动格式转换。
参数:
- ctx: 上下文,支持超时与取消
- name: 数据库实例名(已通过 NewMySQL/NewSQLite 等注册)
- query: SQL 查询语句
- args: 参数化查询的参数(防止 SQL 注入)
- handle: 回调函数,index 为当前行号(从 1 开始),row 为该行各列的值
返回处理的总行数和错误。
cnt, err := db.Stream(ctx, "default", "SELECT id, name FROM users WHERE age > ?", []any{18},
func(index int64, row []any) error {
fmt.Printf("Row %d: %v\n", index, row)
return nil
})
func StreamBatch ¶
func StreamBatch(ctx context.Context, name, query string, args []any, batch int, fun func(rows []map[string]any) error) (int64, error)
StreamBatch 分批流式查询,每积累 batch 行后调用一次 fun 回调。 结果组织为 []map[string]any(列名→值),适合批量插入、批量发送等场景。 最后不足 batch 的剩余行也会触发一次回调。
参数:
- ctx: 上下文,支持超时与取消
- name: 数据库实例名
- query: SQL 查询语句
- args: 参数化查询的参数
- batch: 每批处理的行数
- fun: 批处理回调函数,接收当前批次的所有行数据
返回处理的总行数和错误。
cnt, err := db.StreamBatch(ctx, "default", "SELECT * FROM logs", nil, 500,
func(rows []map[string]any) error {
return bulkInsert(rows)
})
Types ¶
type DBLogger ¶
type DBLogger struct {
LogLevel logger.LogLevel // 当前日志级别 Silent|Error|Warn|Info / Current log level
LongQueryTime time.Duration // 慢查询阈值,超过打 SLOW 前缀 / Slow query threshold for SLOW prefix
// contains filtered or unexported fields
}
DBLogger 实现 gorm.io/gorm/logger.Interface,将 GORM 的 SQL 执行日志写入 Output。 Implements gorm/logger.Interface; writes SQL execution logs to Output.
日志前缀 / Log prefixes:
- SQL: 普通 SQL 语句及执行时间、影响行数 Normal SQL with execution time and rows affected
- SLOW: 执行时间超过 LongQueryTime 的慢查询 Queries exceeding LongQueryTime
- ERROR: 执行出错的 SQL 及错误信息 Failed SQL with error
- I/W/E: GORM 内部 Info/Warn/Error 级别消息 GORM internal Info/Warn/Error messages
func (*DBLogger) Error ¶
Error 输出 GORM Error 级别消息。带调用者 文件:行号。 Logs GORM Error message with caller file:line.
func (*DBLogger) Info ¶
Info 输出 GORM Info 级别消息。带调用者 文件:行号。 Logs GORM Info message with caller file:line.
func (*DBLogger) LogMode ¶
LogMode 返回指定日志级别的新 DBLogger 实例。实现 logger.Interface 所需。 Returns new DBLogger at given log level. Required by logger.Interface.
func (*DBLogger) Trace ¶
func (l *DBLogger) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error)
Trace 记录每次 SQL 执行的详情,是 logger.Interface 的核心方法。 Logs each SQL execution; core method of logger.Interface.
输出格式 / Output format:
- 正常:SQL: file.go:42 0.35ms | 5 rows | SELECT … Normal: SQL: file.go:42 0.35ms | 5 rows | SELECT …
- 慢查询:SLOW: file.go:42 1520.00ms (>3s) | 100 rows | SELECT … Slow: SLOW: file.go:42 1520.00ms (>3s) | 100 rows | SELECT …
- 出错:ERROR: file.go:42 record not found | 0.12ms | 0 rows | SELECT … Error: ERROR: file.go:42 record not found | 0.12ms | 0 rows | SELECT …
type Logger ¶
type Logger interface {
Info(ctx context.Context, msg string, args ...any)
Warn(ctx context.Context, msg string, args ...any)
Error(ctx context.Context, msg string, args ...any)
}
Logger 业务日志接口(连接、重试等),外部注入,通常对接 slog。 Business logger for connect/retry etc.; injected, typically wired to slog.
type Option ¶
type Option func(*Options)
Option 函数式选项,用于灵活配置 Options。 Functional option for configuring Options.
func ConnMaxIdleTime ¶
ConnMaxIdleTime 设置连接的最大空闲时间。默认 5 分钟。空闲超过该时长的连接会被关闭。 主要用于回收被上游(MySQL wait_timeout / 中间 LB 空闲超时)静默关闭的空闲连接, 避免 "write tcp ... broken pipe" 错误。取值应小于链路上最小的空闲超时(一般 LB 为 900s)。 Sets max idle duration of a connection. Default 5 minutes. Reclaims idle connections that may have been silently closed by upstream (MySQL wait_timeout / LB idle timeout), preventing "broken pipe" errors.
func ConnMaxLifeTime ¶
ConnMaxLifeTime 设置连接的最大存活时间。默认 30 分钟。超时的连接会被关闭并回收。 取值建议严格小于 MySQL 的 wait_timeout 以及链路上任何 LB 的 TCP 空闲超时,避免拿到已被对端关闭的死连接(broken pipe)。 Sets max connection lifetime. Default 30 minutes. Should be strictly less than MySQL's wait_timeout and any LB TCP idle timeout to avoid using connections silently closed by upstream (broken pipe).
func GenMySQL ¶
GenMySQL 根据 host、port、user、password、database 等参数生成 MySQL 连接 URL。 Generates MySQL connection URL from host, port, user, password, database, etc.
示例 / Example: GenMySQL("localhost", 3306, "root", "pwd", "mydb", "utf8mb4", "Local")
func Log ¶
Log 设置业务日志(连接、重试等),通常对接 slog。 Sets business logger (connect, retry, etc.), typically wired to slog.
func LogLevel ¶
LogLevel 设置 GORM 日志级别:Silent、Error、Warn、Info。 Sets GORM log level: Silent, Error, Warn, Info.
func LongQueryTime ¶
LongQueryTime 设置慢查询阈值。超过此时间的 SQL 会在日志中标记为 "SLOW:",默认 3 秒。 Sets slow query threshold. SQLs exceeding this duration are logged with "SLOW:" prefix, default 3s.
func MaxIdleConns ¶
MaxIdleConns 设置连接池最大空闲连接数。默认 200。 Sets max idle connections in pool. Default 200.
func MaxOpenConns ¶
MaxOpenConns 设置连接池最大打开连接数。默认 200,0 表示不限。 Sets max open connections in pool. Default 200; 0 means unlimited.
func Name ¶
Name 设置数据库实例名称。用于全局注册与按名获取。 Sets the database instance name for global registration and lookup.
func TimeZone ¶
TimeZone 设置数据库时区,如 "Local"、"UTC"、"Asia/Shanghai"。 Sets database timezone, e.g. "Local", "UTC", "Asia/Shanghai".
func URL ¶
URL 直接设置连接串,格式 scheme://dsn。内部会解析 scheme 填充 Schema 字段。 Sets connection URL (scheme://dsn). Parses scheme to populate Schema.
示例 / Example: URL("mysql://user:pass@tcp(host:3306)/db") 或 URL("sqlite://:memory:")
func WithOutput ¶
WithOutput 设置 SQL 日志输出目标,使用标准 log 格式(不走 slog)。 Sets SQL log output target using standard log format (not slog).
type Options ¶
type Options struct {
Name string `yaml:"Name"` // 实例名,用于 GetDB(ctx, name) / Instance name for lookup
URL string `yaml:"URL"` // 连接串 scheme://dsn,如 mysql://... 或 sqlite://... / Connection URL
TimeZone string `yaml:"TimeZone"` // 时区,如 "Local" / Timezone
MaxIdleConns int `yaml:"MaxIdleConns"` // 连接池最大空闲连接数,默认 10 / Max idle connections
MaxOpenConns int `yaml:"MaxOpenConns"` // 连接池最大打开连接数,默认 50 / Max open connections
ConnMaxLifeTime time.Duration `yaml:"ConnMaxLifeTime"` // 连接最大存活时间,默认 30m / Conn max lifetime
ConnMaxIdleTime time.Duration `yaml:"ConnMaxIdleTime"` // 连接最大空闲时间,默认 5m;用于回收被上游(MySQL wait_timeout / LB 空闲超时)静默关闭的连接 / Conn max idle time; reclaim conns silently closed by upstream
LongQueryTime time.Duration `yaml:"LongQueryTime"` // 慢查询阈值,超过打 SLOW 日志,默认 3s / Slow query threshold
Output io.Writer `yaml:"-"` // SQL 日志输出,默认 os.Stdout / SQL log output
Log Logger `yaml:"-"` // 业务日志(连接、重试),对接 slog / Business logger
LogLevel logger.LogLevel `yaml:"-"` // GORM 日志级别,默认 Warn / GORM log level
// contains filtered or unexported fields
}
Options 数据库连接配置,支持 YAML 反序列化与函数式选项。 Database connection options, supports YAML deserialization and functional options.
func (*Options) LoadOptions ¶
LoadOptions 将 Options 中所有已设置的字段转为 []Option 切片,供 setup 使用。 仅包含非零值字段,未设置的字段由 newOptions 的默认值兜底。
Converts all set fields in Options to []Option slice for use with setup. Only includes non-zero-value fields; unset fields fall back to newOptions defaults.