workerpool

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2022 License: MIT Imports: 10 Imported by: 0

README

workerpool

codecov golangci GoDoc Go Report Card

Package workerpool provides a service for running small parts of code (called jobs) in a background.

Jobs could have contexts, timeouts, rich retry strategies.

Examples


pool := New()
pool.Start()

job := func(ctx context.Context) error {
    fmt.Println("hello")
    return nil
}

pool.Run(job)

AdvancedUsage

pool := New()
pool.Start()

job := func(ctx context.Context) error {
    //
    // some tricky logic goes here
    //

    return nil
}
// add 3 seconds timeout for a job execution
job = AddTimeout(job, time.Second*3)
// retry job execution withing 5 attempts
job = AddRetry(job, strategy.Limit(5))

pool.Run(job)

Documentation

Overview

Package workerpool provides a service for running small parts of code (called jobs) in a background.

Jobs could have contexts, timeouts, rich retry strategies.

Example
pool := New()
pool.Start()

job := func(ctx context.Context) error {
	fmt.Println("hello")
	return nil
}

pool.Run(job)
Example (AdvancedUsage)
pool := New()
pool.Start()

job := func(ctx context.Context) error {
	//
	// some tricky logic goes here
	//

	return nil
}
// add 3 seconds timeout for a job execution
job = AddTimeout(job, time.Second*3)
// retry job execution withing 5 attempts
job = AddRetry(job, strategy.Limit(5))

pool.Run(job)

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job func(ctx context.Context) error

Job is a function that receives context and being run asynchronously by the worker pool.

func AddContext

func AddContext(job Job, c context.Context) Job

AddContext replaces the default context (context.Background()) with the specified one only for this job.

Example
pool := New()
pool.Start()

ctxKey := struct{}{}

job := func(ctx context.Context) error {
	if v, ok := ctx.Value(ctxKey).(string); ok {
		fmt.Println(v)
	}
	return nil
}
// adding custom context to job
ctx := context.WithValue(context.Background(), ctxKey, "from context")
job = AddContext(job, ctx)

pool.Run(job)
pool.Stop()
Output:
from context

func AddLogger

func AddLogger(job Job, l zerolog.Logger) Job

AddLogger replaces job context zerolog logger with the specified one. Could be useful for http handlers.

Example
// mock time for test purposes
zerolog.TimestampFunc = func() time.Time {
	t, _ := time.Parse("2006-01-02", "2021-01-01")
	return t
}

l := zerolog.New(os.Stdout).Level(zerolog.WarnLevel).With().Timestamp().Logger()
l1 := l.With().Str("logger", "logger1").Logger()
l2 := l.With().Str("logger", "logger2").Logger()

pool := New(WithLogger(l))
pool.Start()

job1 := func(ctx context.Context) error {
	time.Sleep(5 * time.Millisecond)
	zerolog.Ctx(ctx).Warn().Msg("hello from job1")
	return nil
}
job2 := func(ctx context.Context) error {
	time.Sleep(10 * time.Millisecond)
	zerolog.Ctx(ctx).Warn().Msg("hello from job2")
	return nil
}
// adding custom jobs
job1 = AddLogger(job1, l1)
job2 = AddLogger(job2, l2)

pool.Run(job1)
pool.Run(job2)

pool.Stop()
Output:
{"level":"warn","logger":"logger1","time":"2021-01-01T00:00:00Z","message":"hello from job1"}
{"level":"warn","logger":"logger2","time":"2021-01-01T00:00:00Z","message":"hello from job2"}

func AddPanicRecovery

func AddPanicRecovery(job Job) Job

AddPanicRecovery to your job. If panic occurs, it converts panic into error.

Example
pool := New()
pool.Start()

job := func(ctx context.Context) error {
	panic("oops")
}
pool.Run(job)
// that will work fine (and get to logger)

pool.Stop()

func AddPostRun

func AddPostRun(job Job, hook func(err error)) Job

AddPostRun middleware allows you to add some logic once was completed or failed. The err parameter in hook will contain error received from job.

Example
pool := New()
pool.Start()

job := func(ctx context.Context) error {
	// example job will always fail
	return errors.New("unrecoverable error")
}
job = AddPostRun(job, func(err error) {
	if err != nil {
		fmt.Println(err.Error())
	}
})

pool.Run(job)
time.Sleep(time.Millisecond * 100)
pool.Stop()
Output:
unrecoverable error

func AddRetry

func AddRetry(job Job, strategies ...strategy.Strategy) Job

AddRetry middleware allows you to apply retry strategies to your job.

See https://github.com/Rican7/retry for details.

Example
pool := New()
pool.Start()

retryCount := 0

job := func(ctx context.Context) error {
	retryCount++
	// example job will always fail
	return errors.New("unrecoverable error")
}
job = AddRetry(job, strategy.Limit(5))

pool.Run(job)
time.Sleep(time.Millisecond * 100)
pool.Stop()

fmt.Println(retryCount)
Output:
5

func AddTimeout

func AddTimeout(job Job, timeout time.Duration) Job

AddTimeout middleware allows you to add timeout to your job. It will be set on the job context.

Example
pool := New()
pool.Start()

job := func(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-time.After(200 * time.Millisecond):
		fmt.Println("job ok")
		return nil
	}
}
// adding timeout to job
job = AddTimeout(job, time.Millisecond*100)
// adding post run hook to job to output timeout error
job = AddPostRun(job, func(err error) {
	if err != nil {
		fmt.Println(err.Error())
	}
})

pool.Run(job)
time.Sleep(time.Millisecond * 300)
pool.Stop()
Output:
context deadline exceeded

type Pool

type Pool struct {

	// DefaultContext is a context factory, generates default context for every job
	DefaultContext func() context.Context
	// contains filtered or unexported fields
}

func New

func New(opts ...PoolOption) *Pool

New Pool constructor

func (*Pool) Run

func (s *Pool) Run(job Job)

Run job within the pool

func (*Pool) Start

func (s *Pool) Start()

Start required num of workers.

func (*Pool) Stop

func (s *Pool) Stop()

Stop all active workers. Don't forget to call this on your application shutdown.

This call is blocking and waits for all the jobs to complete.

type PoolOption

type PoolOption func(service *Pool)

PoolOption is a functional parameter used in constructor

func WithLogger

func WithLogger(zl zerolog.Logger) PoolOption

WithLogger replaces the default logger with the specified one.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL