poly

package module
v0.0.0-...-35dcfdc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package poly provides a generic worker pool with independent operations.

A WorkerPool executes a user-supplied function concurrently across a fixed number of goroutines. Callers create independent Op instances via NewOperation; each operation tracks its own pending requests, results, errors, and processing metrics.

Architecture

The pool owns a channel of closures (in) and a set of worker goroutines that pull from it. Each Op has its own [sendQueue] — a single sender goroutine batches requests and feeds them into the pool channel. This avoids spawning a goroutine per request.

Pool (N workers)
  ^
  | wp.in <- func()
  |
Op sender goroutine
  ^
  | q.push(req)
  |
AddRequest

Lifecycle

The pool lives until its context is cancelled. Operations are independent: each has its own context, and the end function returned by NewOperation cancels only that operation. Workers process requests from any active operation. When the pool context is cancelled, all workers exit, all sender goroutines drain their queues, and pending request counters are decremented so that Op.Wait and Op.Results unblock.

Error handling

If the user function returns an error, the operation is cancelled with that error as cause. Subsequent requests for the same operation are dropped. The error is retrievable via Op.Err.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrOperationEnded = errors.New("operation ended")

ErrOperationEnded is the cause set on an operation's context when the end function returned by NewOperation is called.

Functions

This section is empty.

Types

type Metrics

type Metrics struct {
	// OperationsTotal is the number of requests that completed
	// successfully and whose results were sent to the consumer.
	OperationsTotal int

	// AverageProcessingDuration is the mean wall-clock time of
	// successful fn calls, computed as calcTimeSum / OperationsTotal.
	AverageProcessingDuration time.Duration
}

Metrics holds processing statistics for an Op.

type Op

type Op[ReqType, RespType any] struct {
	// contains filtered or unexported fields
}

Op is a handle to an independent batch of requests submitted to a WorkerPool. It tracks the number of pending requests, collects results, and accumulates processing metrics.

An Op is created by NewOperation and should not be copied. Use Op.AddRequest to submit work, Op.Wait or Op.Results to consume results, and the end function to release resources.

func NewOperation

func NewOperation[ReqType, RespType any](wp *WorkerPool[ReqType, RespType], ctx context.Context) (*Op[ReqType, RespType], func())

NewOperation creates an independent Op bound to the pool. The returned end function cancels the operation with ErrOperationEnded and must be called when the operation is no longer needed.

Each Op gets a dedicated sender goroutine that batches calls to Op.AddRequest and feeds them into the pool's worker channel. This avoids creating a goroutine per request.

op, end := poly.NewOperation(wp, ctx)
defer end()
Example

Two independent operations share a single pool. Each operation tracks its own requests and results separately.

package main

import (
	"context"
	"fmt"

	"github.com/s4bb4t/poly"
)

func main() {
	wp := poly.New(context.Background(), func(_ context.Context, n int) (int, error) {
		return n * 10, nil
	}, 4)

	op1, end1 := poly.NewOperation(wp, context.Background())
	defer end1()
	op2, end2 := poly.NewOperation(wp, context.Background())
	defer end2()

	op1.AddRequest(1)
	op2.AddRequest(2)

	m1 := op1.Wait()
	m2 := op2.Wait()

	fmt.Println("op1 processed:", m1.OperationsTotal)
	fmt.Println("op2 processed:", m2.OperationsTotal)
}
Output:
op1 processed: 1
op2 processed: 1

func (*Op[ReqType, RespType]) AddRequest

func (o *Op[ReqType, RespType]) AddRequest(req ReqType)

AddRequest submits a request for processing by the pool. It is safe to call from multiple goroutines. Requests submitted after the operation or pool context is cancelled are dropped.

Example
package main

import (
	"context"
	"fmt"

	"github.com/s4bb4t/poly"
)

func main() {
	wp := poly.New(context.Background(), func(_ context.Context, s string) (string, error) {
		return "hello " + s, nil
	}, 4)

	op, end := poly.NewOperation(wp, context.Background())
	defer end()

	op.AddRequest("world")

	for v := range op.Results() {
		fmt.Println(v)
	}
}
Output:
hello world

func (*Op[ReqType, RespType]) Err

func (o *Op[ReqType, RespType]) Err() error

Err returns the error that caused the operation to stop, or nil if the operation is still running or completed without error. After the end function is called, Err returns ErrOperationEnded.

Example

Err returns the error that cancelled the operation, or nil.

package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/s4bb4t/poly"
)

func main() {
	myErr := errors.New("something went wrong")

	wp := poly.New(context.Background(), func(_ context.Context, n int) (int, error) {
		if n == 1 {
			return 0, myErr
		}
		return n, nil
	}, 1)

	op, end := poly.NewOperation(wp, context.Background())
	defer end()

	op.AddRequest(1)
	op.Wait()

	fmt.Println(op.Err())
}
Output:
something went wrong
Example (Ended)

Err returns ErrOperationEnded after the end function is called.

package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/s4bb4t/poly"
)

func main() {
	wp := poly.New(context.Background(), func(_ context.Context, n int) (int, error) {
		return n, nil
	}, 4)

	op, end := poly.NewOperation(wp, context.Background())

	op.AddRequest(1)
	op.Wait()
	end()

	fmt.Println(errors.Is(op.Err(), poly.ErrOperationEnded))
}
Output:
true

func (*Op[ReqType, RespType]) Metrics

func (o *Op[ReqType, RespType]) Metrics(resetOnRead bool) Metrics

Metrics returns a snapshot of processing statistics. If resetOnRead is true, the counters are atomically reset to zero after reading.

Example

Metrics returns processing statistics. Pass true to reset counters atomically after reading.

package main

import (
	"context"
	"fmt"

	"github.com/s4bb4t/poly"
)

func main() {
	wp := poly.New(context.Background(), func(_ context.Context, n int) (int, error) {
		return n, nil
	}, 4)

	op, end := poly.NewOperation(wp, context.Background())
	defer end()

	for i := 0; i < 5; i++ {
		op.AddRequest(i)
	}
	op.Wait()

	m := op.Metrics(true) // read and reset
	fmt.Println("total:", m.OperationsTotal)
	fmt.Println("avg > 0:", m.AverageProcessingDuration > 0)

	after := op.Metrics(false) // counters were reset
	fmt.Println("after reset:", after.OperationsTotal)
}
Output:
total: 5
avg > 0: true
after reset: 0

func (*Op[ReqType, RespType]) Results

func (o *Op[ReqType, RespType]) Results() <-chan RespType

Results returns a channel that receives each successful result as it becomes available. The channel is closed when all requests have been consumed or the operation context is cancelled.

Results must not be called concurrently with Op.Wait on the same Op.

Example

Results returns a channel that yields each successful result. The channel closes when all requests are consumed.

package main

import (
	"context"
	"fmt"
	"sort"

	"github.com/s4bb4t/poly"
)

func main() {
	wp := poly.New(context.Background(), func(_ context.Context, n int) (int, error) {
		return n * 2, nil
	}, 4)

	op, end := poly.NewOperation(wp, context.Background())
	defer end()

	op.AddRequest(3)
	op.AddRequest(7)

	var results []int
	for v := range op.Results() {
		results = append(results, v)
	}

	sort.Ints(results)
	fmt.Println(results)
}
Output:
[6 14]

func (*Op[ReqType, RespType]) Wait

func (o *Op[ReqType, RespType]) Wait() (m Metrics)

Wait blocks until all submitted requests have been processed or the operation context is cancelled. Results are drained internally (not forwarded to the caller). On successful completion it returns the accumulated Metrics; on cancellation it returns the zero value.

Wait must not be called concurrently with Op.Results on the same Op.

Example

Wait blocks until every submitted request has been processed, then returns the accumulated metrics.

package main

import (
	"context"
	"fmt"

	"github.com/s4bb4t/poly"
)

func main() {
	wp := poly.New(context.Background(), func(_ context.Context, n int) (int, error) {
		return n, nil
	}, 4)

	op, end := poly.NewOperation(wp, context.Background())
	defer end()

	for i := 0; i < 10; i++ {
		op.AddRequest(i)
	}

	m := op.Wait()
	fmt.Println("total:", m.OperationsTotal)
}
Output:
total: 10

type WorkerPool

type WorkerPool[ReqType, RespType any] struct {
	// contains filtered or unexported fields
}

WorkerPool executes a user-supplied function fn across a fixed number of worker goroutines. Work is submitted through Op instances created by NewOperation. Multiple operations share the same pool and workers.

The pool lives until ctx is cancelled; after that, workers exit and new requests are dropped.

func New

func New[ReqType, RespType any](ctx context.Context, fn func(context.Context, ReqType) (RespType, error), workersCnt int) *WorkerPool[ReqType, RespType]

New creates a WorkerPool with workersCnt goroutines and returns it. Workers start immediately and run until ctx is cancelled.

wp := poly.New(ctx, fetchURL, 20)
Example

Create a pool of 4 workers that squares integers, then collect all results.

package main

import (
	"context"
	"fmt"
	"sort"

	"github.com/s4bb4t/poly"
)

func main() {
	wp := poly.New(context.Background(), func(_ context.Context, n int) (int, error) {
		return n * n, nil
	}, 4)

	op, end := poly.NewOperation(wp, context.Background())
	defer end()

	for i := 1; i <= 5; i++ {
		op.AddRequest(i)
	}

	var results []int
	for v := range op.Results() {
		results = append(results, v)
	}

	sort.Ints(results)
	fmt.Println(results)
}
Output:
[1 4 9 16 25]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL