komi

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2023 License: Apache-2.0 Imports: 8 Imported by: 2

README

Komi - subarashi go pooling 🍡

Go Report Card GoDoc PRs Welcome

komi

Motivation

Go is great for setting up easy parallel jobs and processes, however, it is not easy when one starts confusing concurrency with parallelism and ending up endlessly fighting race conditions. komi is a generic pooling library that will satisfy your hunger.

Usage

Say you want to run a function foo(v) that performs some kind of work on parameter v, be it a database operation, a syscall, an IO operation, etc. (possibilities are endless!) Setting up a pool and sending jobs is as trivial as

pool := komi.New(komi.WorkSimple(foo))
defer pool.Close()
// other code...
err = pool.Submit(v) // will block if pool is full

Notice that pool.Close() will gracefully free all the resources and channels occupied by pool by waiting for final jobs to complete. pool.Close(true) will force pool closure.

But what if you want to collect outputs of work performed on v with foo(v) w?

pool := komi.New(komi.Work(foo))
defer pool.Close()
// collect outputs with pool.Outputs() channel
go func() {
	// Errors omitted for brevity
	outputs, _ := pool.Outputs()
	for output := range outputs {
		// output is the result of `foo(v)`
	}
}()
// other code...
err = pool.Submit(v) // will block if pool is full

But what if you want to collect errors as well? Consider foo(v) error

pool := komi.New(komi.WorkSimpleWithErrors(foo))
defer pool.Close()
// collect errors with with pool.Errors() channel...
// other code...
err = pool.Submit(v) // will block if pool is full

Or with foo(v) (w, error)!

pool := komi.New(komi.WorkWithErrors(foo))
defer pool.Close()
// collect outputs with pool.Outputs() channel
// collect errors with pool.Errors() channel...
// other code...
err = pool.Submit(v) // will block if pool is full

So, depending on what function you give, any work type is handled by the pool on the fly! If work given doesn't produce outputs, pool.Outputs() will return nil, similarly, if work given doesn't produce errors, pool.Errors() will return nil.

Note: if work produces outputs or errors, those activated channels need to be consumed by the user, otherwise, when reaching size number of elements in either (if active), work will be blocked until the destination channel is consumed.

Connectors

Unique feature of komi is that each pool can be connected with each other. Say you have two functions, where one opens file's contents, openFile(filename string) (string, error), and the other counts the number of words, countWords(contents string) int.

Two pools can be created,

opener := komi.NewWithSettings(komi.WorkWithErrors(openFile), &komi.Settings{
	Name:     "Opener 📂 ",
	Laborers: 1,
	Size:     4,
})

counter := komi.NewWithSettings(komi.Work(countWords), &komi.Settings{
	Name:     "Counter 📚 ",
	Laborers: 10,
	Size:     20,
})

We can wire the outputs of opener to be automatically fed into counter with

opener.Connect(counter)

So now, those two pools are "connected". We would call this relationship as opener being the dependent (child) pool and counter being the connected (parent) pool.

              Opener 📂                   Counter 📚
filenames  ┌─────────────┐  contents   ┌──────────────┐  word counts
 ───────>  │  openFile   │  ────────>  │  countWords  │  ────────>
 .Submit   └─────────────┘  .Connect   └──────────────┘  .Outputs
            pool: opener                 pool: counter 

komi's pools are smart enough that only by calling counter.Close(), it will issue a shutdown command back to opener and wait until it's closed. This closing logic procedure will happen with any number of connected pools.

If you have pools 1,2,...,N-1,N connected in form 1->2->...->N-1->N, user can only call .Close() on pool N, as it would be responsible for sending a closure request to N-1, and so on until 2 sends the shutdown request to 1. When 1 is closed, the closure will resume on 2, up until N-1 and N, where the latter will return from the original Close() call.

Please note that none of the pools 1,2,...,N-1 in the above will honor user's closure request, as it should come from their connected (parent) pool.

Quirks

When the parent-most pool is closing, it will wait for all the child pools to complete their jobs. This behavior can be overwritten by calling .Close(true), which would skip any waiting of childrens' queued jobs and skip any waiting of the parent-most pool's queued jobs.

Operations

Pools support waiting (blocking) until the pool has no jobs waiting for completion with pool.Wait().

Some other quality of life operations are also provided,

  • Submit(v) will submit job v to be performed by the pool.
  • Close() will close the pool if and only if it's disconnected or the parent-most pool.
  • Close(true) will close the pool ignoring any pending jobs.
  • Outputs() will return channel that the user should listen to for outputs (if work generated them).
  • Errors() will return channel that the user shoud listen to for errors (if work generates them).
  • IsConnected() will return true if the pool is a child of another pool, thus sending its outputs.
  • IsClosed() will return true if the pool has gracefully shutdown.
  • JobsCompleted() will return the number of jobs this pool has completed.
  • JobsWaiting() will return the number of jobs waiting in queue and currently in-work.
  • JobsSucceeded() will return the number of jobs completed with a non-nil errors.
  • Name() will return the pool's name (defaults to Komi 🍡 ).

Settings

You can tune the performance and behavior of the pool with komi.NewWithSetttings by providing *komi.Settings,

  • Laborers sets the number of pool's laborers.
  • Size sets the size of the pool (how many jobs can wait until pool.Submit is blocked).
  • Ratio sets the ratio in size = ratio * number of laborers equation (only if size has not been manually set).
  • LogLevel sets the pool's logging level to level.
  • Debug sets the pool's logging level to DebugLevel.
  • Name sets the pool's name as shown in logs.

Stability

This is a brand new library I built for my static website generator, where it's used extensively and in production. However, there are no guarantees provided for this library, that is, until something like v1.0 is out, in which case, I would promise to maintain backward compatibility.

Please use it with knowing your risks. However, if you use a tagged version or a commit hash in your go.mod, you should be fine.

Future work

Some future items in mind:

  • Adding an error handler to komi polls, which if given, will be invoked if non-nil errors are returned by work.
  • More tests

Developers

Please consider giving it a try and filing an issue or a pull request.

Thank you!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Work

func Work[I, O any](work func(I) O) poolWork[I, O]

Work should be used to set work with outputs but no errors.

func WorkSimple

func WorkSimple[I any](work func(I)) poolWork[I, noValue]

WorkSimple should be used to set work with no outputs nor errors.

func WorkSimpleWithErrors

func WorkSimpleWithErrors[I any](work func(I) error) poolWork[I, noValue]

WorkSimpleWithErrors should be used to set work with no outputs but with errors.

func WorkWithErrors

func WorkWithErrors[I, O any](work func(I) (O, error)) poolWork[I, O]

WorkWithErrors should be used to set work with both outputs and errors.

Types

type Pool

type Pool[I, O any] struct {
	// contains filtered or unexported fields
}

Pool is a fantastic golang pool that can take work of any form and perform it in a go-idiomatic way of producing outputs (optional) and errors (optional).

func New

func New[I, O any](optionWork poolWork[I, O]) *Pool[I, O]

New creates a new pool with sensible defaults.

func NewWithSettings

func NewWithSettings[I, O any](optionWork poolWork[I, O], settings *Settings) *Pool[I, O]

NewWithSettings creates a new pool with custom pool tunings enabled.

func (*Pool[_, _]) Close

func (p *Pool[_, _]) Close(force ...bool)

Close will issue a pool closure request and takes a bool value, if true, any pending jobs will be ignored and forcefully closed. Note that the user can request a pool closure if and only if it is not connected to another pool. In that case, the parent pool will have to issue the closure request.

func (*Pool[I, O]) Connect

func (p *Pool[I, O]) Connect(parent PoolConnector[O]) error

func (*Pool[_, _]) Debug

func (p *Pool[_, _]) Debug()

Debug enables the debug logging in the pool.

func (Pool[I, _]) Errors

func (p Pool[I, _]) Errors() (chan PoolError[I], error)

Errors returns a channel of errors generated by the pool, nil if pool is closed, doesn't produce errors, or an error handler is set.

func (Pool[_, _]) IsClosed

func (p Pool[_, _]) IsClosed() bool

IsClosed returns true if the pool is closed, false otherwise.

func (Pool[_, _]) IsConnected

func (p Pool[_, _]) IsConnected() bool

IsConnected will return true if this pool already has an active connector. This is equivalent to having a connected (parent) pool.

func (Pool[_, _]) JobsCompleted

func (p Pool[_, _]) JobsCompleted() int64

JobsCompleted will return the number of jobs completed by the pool.

func (Pool[_, _]) JobsSucceeded

func (p Pool[_, _]) JobsSucceeded() int64

JobsSucceeded will return the number of jobs succeeded (non-nil errors).

func (Pool[_, _]) JobsWaiting

func (p Pool[_, _]) JobsWaiting() int64

JobsWaiting will return the number of jobs currently waiting and executing by the pool.

func (*Pool[_, _]) Name

func (p *Pool[_, _]) Name() string

Name returns the name of the pool.

func (Pool[_, O]) Outputs

func (p Pool[_, O]) Outputs() (chan O, error)

Outputs returns a channel of outputs generated by the pool, nil if pool is closed or doesn't produce outputs.

func (*Pool[_, _]) SetLevel

func (p *Pool[_, _]) SetLevel(level log.Level)

SetLevel the logging level of the pool.

func (Pool[I, _]) Submit

func (p Pool[I, _]) Submit(job I) error

Submit sends a job to the pool for processing.

func (Pool[_, _]) Wait

func (p Pool[_, _]) Wait()

Wait wil block until the pool has no waiting jobs, see `With...` options.

type PoolConnector

type PoolConnector[O any] interface {
	// Submit will submit a job to the connected (parent) pool.
	Submit(O) error

	// IsClosed returns true if the connected (parent) pool is closed,
	// false otherwise.
	IsClosed() bool

	// Name returns the name of the connected (parent) pool.
	Name() string
	// contains filtered or unexported methods
}

PoolConnector is an interface that should be used by other pools when connecting to them and by users to send pools around as well.

type PoolError

type PoolError[I any] struct {
	// Job is the job that returned a non-nil error when work was performed on it.
	Job I

	// Error is the error returned by pool's work performer.
	Error error
}

PoolError is produced by the pool when a work performed by the pool fails with a non-nil error, so the user can debug and look what happened.

type Settings

type Settings struct {
	// Laborers is the number of laborers (work performers) that should
	// run in parallel.
	Laborers int

	// Size sets the Size of the pool, or how many inputs and outputs (each
	// has separate Size) can be set by the pool. If Size is reached, submissions
	// or work results will be blocked.
	Size int

	// Ratio is the ratio that is used (unless size is set manually)
	// for setting the size to the number of laborers.
	Ratio int

	// Debug will set the logger to show all Debug logs too.
	Debug bool

	// Name is the Name of the pool.
	Name string

	// LogLevel defaults to warn, can be set by the user.
	LogLevel log.Level
	// contains filtered or unexported fields
}

Settings is an internal struct that tunes the pool as requested.

type Signal

type Signal struct{}

Signal is a cheap type to be sent through channels.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL