task

package module
v1.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 14, 2026 License: MIT Imports: 6 Imported by: 2

README

task: High-Performance In-Memory Task Runner

Go Report Card

中文 | English

task is a zero-dependency, concurrency-controlled in-memory task execution library for Go.

It bridges the gap between raw go func() (which is uncontrolled and risky) and heavy distributed queues (like RabbitMQ/Redis). It is designed to provide Bulkheading (resource isolation) and Fault Tolerance (panic recovery) for your Go services.

Why use task?

  • Protection: Limits the number of concurrent goroutines to prevent OOM (Out of Memory).
  • Fault Tolerance: Automatically catches panics in tasks to keep the main process alive.
  • Graceful Shutdown: Ensures all running tasks are completed before the server exits (Drain pattern).
  • Type Safety: Use generics to return typed results (Future[T]), eliminating interface assertions.
  • Observability: Provides real-time metrics (Active, Queued, Processed, Panics).
  • Zero Dependency: Built using only the Go standard library.

Installation

go get github.com/oy3o/task

Quick Start

1. Simple Safe Goroutine (No Pool)

If you just want to run a background job safely without managing a pool:

import "github.com/oy3o/task"

func main() {
    // Replaces 'go func()'
    // Automatically recovers from panics and logs stack traces
    task.Go(context.Background(), func(ctx context.Context) {
        doWork()
    }, nil)
}
2. Worker Pool (Concurrency Limit)

For high-throughput scenarios, use a Runner to limit concurrency.

func main() {
    // Create a runner with 10 workers and a buffer queue of 1000
    r := task.NewRunner(
        task.WithMaxWorkers(10),
        task.WithQueueSize(1000),
        task.WithErrorHandler(func(ctx context.Context, p any) {
            fmt.Printf("Task panicked: %v\n", p)
        }),
    )

    // Start workers
    r.Start(context.Background())

    // Submit tasks (Non-blocking)
    err := r.Submit(func(ctx context.Context) {
        // Do heavy work here...
        time.Sleep(time.Second)
    })

    if err == task.ErrQueueFull {
        // Handle backpressure (e.g., return 503 to client)
    }

    // Graceful Stop (Waits for running tasks to finish)
    r.Stop(context.Background())
}
3. Synchronous Wait (SubmitAndWait)

Useful when you need concurrency limits but need the result immediately. Note: If the submitted task panics, SubmitAndWait will return nil (success) immediately after recovery, preventing deadlocks.

err := r.SubmitAndWait(ctx, func(ctx context.Context) {
    // process item...
})

if err != nil {
    // Handle error (Timeout, QueueFull, or RunnerClosed)
}
4. Async Results (Generics)

Retrieve results from background tasks in a type-safe manner using Future[T].

// Submit a task that returns a string
future, err := task.Submit(r, func(ctx context.Context) (string, error) {
    // ... business logic ...
    return "success", nil
})

if err != nil {
    // Handle submission error (e.g., QueueFull)
}

// Wait for result with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

val, err := future.Get(ctx) // val is string
if err != nil {
    // Handle timeout, task error, or task panic
}
fmt.Println("Result:", val)

Observability

Monitor your worker pool health in real-time.

stats := r.Stats()
fmt.Printf("Active: %d, Queued: %d, Total Processed: %d\n", 
    stats.ActiveWorkers, 
    stats.QueuedTasks, 
    stats.TotalProcessed,
)

Integration with Appx

task.Runner implements the standard Start/Stop pattern, making it easy to integrate with application lifecycle managers.

// In your main.go
func run() {
    runner := task.NewRunner(task.WithMaxWorkers(50))
    app := appx.New() // Your application
    
    // Dependency Injection
    app.AddService(runner) 
    
    app.Run() // Runner starts with app and stops gracefully with app
}

Options

Option Description Default
WithMaxWorkers(n) Max number of concurrent goroutines. 10
WithQueueSize(n) Max number of tasks waiting in queue. 1000
WithErrorHandler(fn) Custom callback for handling panics. Log to stdout

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueFull 表示任务队列已满,无法接受新任务。
	// 调用者应该处理此错误(例如:返回 503,或者降级)。
	ErrQueueFull = errors.New("task: queue is full")

	// ErrRunnerClosed 表示 Runner 已经停止或正在停止,不再接受新任务。
	ErrRunnerClosed = errors.New("task: runner is closed")
)

Functions

func DefaultConfig

func DefaultConfig() *config

DefaultConfig 返回默认配置

func Go

func Go(ctx context.Context, fn func(context.Context), handler ErrorHandler)

Go 是 go func() 的安全替代品。 它启动一个新的协程并执行 fn,自带 Panic 保护。

func Join added in v1.4.1

func Join[T any](ctx context.Context, futures ...*Future[T]) ([]T, error)

Join 等待所有 Future 完成,并返回结果切片。 只要有一个 Future 报错,它会立即返回该错误(Fail Fast),但不会取消其他正在运行的任务。 类似 JS 的 Promise.all

func Safely

func Safely(ctx context.Context, fn func(context.Context), handler ErrorHandler) (panicked bool)

Safely 执行一个函数,并捕获可能发生的 Panic。 fn: 接收上下文的任务函数 handler: 发生 Panic 时的回调 返回值 panicked 用于精准通知调用方任务是否异常崩溃。

Types

type ErrorHandler

type ErrorHandler func(ctx context.Context, p any)

ErrorHandler 用于处理任务执行过程中的 Panic 或错误。 ctx: 发生错误时的上下文 p: recover() 捕获的对象 (可能为 nil)

type Future added in v1.4.1

type Future[T any] struct {
	// contains filtered or unexported fields
}

Future 是一个轻量级的句柄,只持有结果和信号通道 相比 Promise,它没有锁,没有回调链,内存占用极小

func Submit added in v1.4.1

func Submit[T any](callerCtx context.Context, r *Runner, fn func(ctx context.Context) (T, error)) (*Future[T], error)

Submit 泛型提交函数。 这是对 Runner 的扩展,不修改 Runner 内部结构,只在应用层封装。

func (*Future[T]) Get added in v1.4.1

func (f *Future[T]) Get(ctx context.Context) (T, error)

Get 阻塞直到任务完成。 吸收了 Promise 的 Await 体验,但更符合 Go 的 select 习惯。

type Option

type Option func(*config)

Option 定义配置修改函数

func WithErrorHandler

func WithErrorHandler(h ErrorHandler) Option

WithErrorHandler 设置 Panic 处理回调。 建议在此处集成 zap/zerolog 等日志库。

func WithMaxWorkers

func WithMaxWorkers(n int) Option

WithMaxWorkers 设置最大并发 Worker 数。

func WithQueueSize

func WithQueueSize(n int) Option

WithQueueSize 设置等待队列的长度。

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner 是一个并发限制的任务执行器。

func NewRunner

func NewRunner(opts ...Option) *Runner

func (*Runner) Start

func (r *Runner) Start(ctx context.Context) error

func (*Runner) Stats

func (r *Runner) Stats() Stats

Stats 获取当前 Runner 的状态快照

func (*Runner) Stop

func (r *Runner) Stop(ctx context.Context) error

func (*Runner) Submit

func (r *Runner) Submit(task TaskFunc) error

Submit 提交异步任务

func (*Runner) SubmitAndWait

func (r *Runner) SubmitAndWait(ctx context.Context, task TaskFunc) error

SubmitAndWait 提交任务并等待完成 (同步模式) 适用于 HTTP 请求中需要并发处理但必须等待结果的场景。

type Stats

type Stats struct {
	// 配置参数
	MaxWorkers int `json:"max_workers"`
	QueueSize  int `json:"queue_size"`

	// 实时状态
	ActiveWorkers int64 `json:"active_workers"` // 当前正在执行任务的 Worker 数
	QueuedTasks   int   `json:"queued_tasks"`   // 当前排队等待的任务数

	// 累积计数 (自启动以来)
	TotalProcessed uint64 `json:"total_processed"` // 成功处理的任务数
	TotalPanics    uint64 `json:"total_panics"`    // 发生的 Panic 总数
	TotalRefused   uint64 `json:"total_refused"`   // 因队列满被拒绝的任务数
}

Stats 包含 Runner 的运行时快照

type TaskFunc

type TaskFunc func(ctx context.Context)

TaskFunc 定义了要执行的任务函数签名

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL