push

package module
v0.0.0-...-9396c32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2018 License: MIT Imports: 1 Imported by: 4

README

go-push-components

standard-readme compliant

Push Components replace the need for golang chan and select constructs, which require no polling and are instead designed to "push" new work to the client when available.

Table of Contents

Background

A common golang idiom when a component needs to receive messages from another component is to poll a chan using a select loop. In this construct, the message sender publishes to the chan and the receiver pulls from it. chan is a bare-bones component. It does not manage concurrency and it requires continuous CPU work by the receiver for the polling the chan.

Push Components decouple the sender and receiver, as chan does, but it also helpfully manages other aspects of such an exchange pattern:

  • concurrency
  • overload
  • drain, and graceful shutdown
  • counting items in the component
  • emptying the component

Push Components provided in this package:

  • PushQueue
  • PushStack

Install

go get -u github.com/blocktop/go-push-components

Usage

Create the push component (in this case a queue), tell it concurrency and capacity attributes, and provide a worker function.

q := push.NewPushQueue(2, 50, worker)
q.Start()
   
func worker(item Item) {
  doWorkWithItem(item)
}

API

Maintainers

@strobus

Contribute

PRs accepted.

Small note: If editing the README, please conform to the standard-readme specification.

License

MIT © 2018 J. Strobus White

Documentation

Overview

Package push provides components that can replace the need for chan loop and select constructs. These components require no polling and are instead designed to "push" new work to the client when available.

The components provide easy concurrency and buffer size monitoring and eventing.

Example

	  q := push.NewPushQueue(2, 50, worker)
   q.Start()

   func worker(item Item) {
       doWorkWithItem(item)
   }

In the above example, the worker will be called as a goroutine at most two times concurrently.

Events

The following events are provided for client programs to respond to what is happening in the push component. * Overload(Item) -- fired if the client attempts to write more items into the push component than its configured buffer size. The items will not be added to the component's buffer. However, they will be sent to the client's Overload event handler.

* FirstOverload(Item) -- same as Overload except that it happens only on the first occurance.

* Drained -- fired when the push component finishes processing all items after the client calls the Drain method. This signals to the client that (for example) the program may be safely without interrupting processing.

An example of the client's program blocking until all items are finished processing:

	   done := make(chan bool, 1)
		 q.OnDrained(func() {
		     done <- true
    })
    q.Drain()
    <-done

    os.Exit(0) // etc..

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type PushBatchQueue

type PushBatchQueue struct {
	// contains filtered or unexported fields
}

PushBatchQueue holds the processing and state information of a PushBatchQueue.

func NewPushBatchQueue

func NewPushBatchQueue(concurrency int, depth int, batchSize int, worker func([]interface{})) *PushBatchQueue

NewPushBatchQueue creates a new PushBatchQueue with the given concurrency, depth and worker. The worker is the function that will be called to process a queue item. The concurrency is the number of times the worker function will be called in parallel. The depth is the maximum capacity of the queue.

func (*PushBatchQueue) Count

func (q *PushBatchQueue) Count() int

Count returns the current number of items in the queue.

func (*PushBatchQueue) Depth

func (q *PushBatchQueue) Depth() int

Depth returns the maximum capacity of the queue.

func (*PushBatchQueue) Drain

func (q *PushBatchQueue) Drain()

Drain processes remaining items in the queue and prevents new items from being put onto the queue.

func (*PushBatchQueue) DropOldestOnOverload

func (q *PushBatchQueue) DropOldestOnOverload()

DropOldestOnOverload tells the queue to drop the oldest item in the queue on the floor when an overload occurs. The default behavior is to drop the item being added.

func (*PushBatchQueue) Empty

func (q *PushBatchQueue) Empty()

Empty removes all items currently in the queue. This method does not affect the started, stopped, or draining state of the queue.

func (*PushBatchQueue) IsFull

func (q *PushBatchQueue) IsFull() bool

IsFull indicates whether the queue can accept new items.

func (*PushBatchQueue) IsStarted

func (q *PushBatchQueue) IsStarted() bool

IsStarted indicates whether the queue is started. This method returns true when the queue is available to clients to Put items. IsStarted returns false when the queue is draining.

func (*PushBatchQueue) OnDrained

func (q *PushBatchQueue) OnDrained(f func())

OnDrained sets an event handler that will be called when the draining is complete.

func (*PushBatchQueue) OnFirstOverload

func (q *PushBatchQueue) OnFirstOverload(f func(interface{}))

OnFirstOverload sets an event handler that will be called the first time a client attempts to overload the queue.

func (*PushBatchQueue) OnOverload

func (q *PushBatchQueue) OnOverload(f func(interface{}))

OnOverload sets an event handler that will be called *every time* a client attempts to overload the queue. The handler is passed the value of the Overload register.

func (*PushBatchQueue) OverloadCount

func (q *PushBatchQueue) OverloadCount() int

OverloadCount returns the number of times that clients attempted to Put items exceeding queue depth or while the queue was draining. The exceeding items were dropped on the floor. This count is reset when Start is called.

func (*PushBatchQueue) Put

func (q *PushBatchQueue) Put(item interface{})

Put adds an item to the queue for processing. If the count of items in the queue is at the queue depth, then the Overload flag is set and the item is dropped on the floor.

func (*PushBatchQueue) Start

func (q *PushBatchQueue) Start()

Start begins queue processing. Start panics if no worker has been set.

func (*PushBatchQueue) Stop

func (q *PushBatchQueue) Stop()

Stop ends processing of queue items. This also ends draining of items if Drain has been called.

type PushQueue

type PushQueue struct {
	// contains filtered or unexported fields
}

PushQueue holds the processing and state information of a PushQueue.

func NewPushQueue

func NewPushQueue(concurrency int, depth int, worker func(interface{})) *PushQueue

NewPushQueue creates a new PushQueue with the given concurrency, depth and worker. The worker is the function that will be called to process a queue item. The concurrency is the number of times the worker function will be called in parallel. The depth is the maximum capacity of the queue.

func (*PushQueue) Count

func (q *PushQueue) Count() int

Count returns the current number of items in the queue.

func (*PushQueue) Depth

func (q *PushQueue) Depth() int

Depth returns the maximum capacity of the queue.

func (*PushQueue) Drain

func (q *PushQueue) Drain()

Drain processes remaining items in the queue and prevents new items from being put onto the queue.

func (*PushQueue) DropOldestOnOverload

func (q *PushQueue) DropOldestOnOverload()

DropOldestOnOverload tells the queue to drop the oldest item in the queue on the floor when an overload occurs. The default behavior is to drop the item being added.

func (*PushQueue) Empty

func (q *PushQueue) Empty()

Empty removes all items currently in the queue. This method does not affect the started, stopped, or draining state of the queue.

func (*PushQueue) IsFull

func (q *PushQueue) IsFull() bool

IsFull indicates whether the queue can accept new items.

func (*PushQueue) IsStarted

func (q *PushQueue) IsStarted() bool

IsStarted indicates whether the queue is started. This method returns true when the queue is available to clients to Put items. IsStarted returns false when the queue is draining.

func (*PushQueue) OnDrained

func (q *PushQueue) OnDrained(f func())

OnDrained sets an event handler that will be called when the draining is complete.

func (*PushQueue) OnFirstOverload

func (q *PushQueue) OnFirstOverload(f func(interface{}))

OnFirstOverload sets an event handler that will be called the first time a client attempts to overload the queue.

func (*PushQueue) OnOverload

func (q *PushQueue) OnOverload(f func(interface{}))

OnOverload sets an event handler that will be called *every time* a client attempts to overload the queue. The handler is passed the value of the Overload register.

func (*PushQueue) OverloadCount

func (q *PushQueue) OverloadCount() int

OverloadCount returns the number of times that clients attempted to Put items exceeding queue depth or while the queue was draining. The exceeding items were dropped on the floor. This count is reset when Start is called.

func (*PushQueue) Put

func (q *PushQueue) Put(item interface{})

Put adds an item to the queue for processing. If the count of items in the queue is at the queue depth, then the Overload flag is set and the item is dropped on the floor.

func (*PushQueue) PutItems

func (q *PushQueue) PutItems(items ...interface{})

func (*PushQueue) Start

func (q *PushQueue) Start()

Start begins queue processing. Start panics if no worker has been set.

func (*PushQueue) Stop

func (q *PushQueue) Stop()

Stop ends processing of queue items. This also ends draining of items if Drain has been called.

type PushQueuePut

type PushQueuePut interface {
	Put(interface{})
	Depth() int
	Count() int
	IsStarted() bool
}

PushQueuePut provides an interface that can be passed to clients to work with the queue. It contains only the methods required by a client.

type PushStack

type PushStack struct {
	// contains filtered or unexported fields
}

PushStack holds the processing and state information of a PushStack.

func NewPushStack

func NewPushStack(concurrency int, height int, worker func(interface{})) *PushStack

NewPushStack creates a new PushStack with the given concurrency, height and worker. The worker is the function that will be called to process a stack item. The concurrency is the number of times the worker function will be called in parallel. The height is the maximum capacity of the stack.

func (*PushStack) Count

func (s *PushStack) Count() int

Count returns the current number of items in the stack.

func (*PushStack) Drain

func (s *PushStack) Drain()

Drain processes remaining items in the stack and prevents new items from being put onto the stack.

func (*PushStack) Empty

func (s *PushStack) Empty()

Empty removes all items currently in the stack. This method does not affect the started, stopped, or draining state of the stack.

func (*PushStack) Height

func (s *PushStack) Height() int

Height returns the maximum capacity of the stack.

func (*PushStack) IsFull

func (s *PushStack) IsFull() bool

IsFull indicates whether the stack can accept new items.

func (*PushStack) IsStarted

func (s *PushStack) IsStarted() bool

IsStarted indicates whether the stack is started. This method returns true when the stack is available to clients to Put items. IsStarted returns false when the stack is draining.

func (*PushStack) OnDrained

func (s *PushStack) OnDrained(f func())

OnDrained sets an event handler that will be called when the draining is complete.

func (*PushStack) OnFirstOverload

func (s *PushStack) OnFirstOverload(f func(interface{}))

OnFirstOverload sets an event handler that will be called the first time a client attempts to overload the stack.

func (*PushStack) OnOverload

func (s *PushStack) OnOverload(f func(interface{}))

OnOverload sets an event handler that will be called *every time* a client attempts to overload the stack. The handler is passed the value of the Overload register.

func (*PushStack) Overload

func (s *PushStack) Overload() int

Overload returns the number of times that clients attempted to Put items exceeding stack height or while the stack was draining. The exceeding items were dropped on the floor. This count is reset when Start is called.

func (*PushStack) Push

func (s *PushStack) Push(item interface{})

Push adds an item to the stack for processing. If the count of items in the stack is at the stack height, then the Overload flag is set and the first item added is dropped on the floor. The dropped item is sent to the OnOverload and OnFirstOverload (if this is the first time) event handlers.

func (*PushStack) Start

func (s *PushStack) Start()

Start begins stack processing. Start panics if no worker has been set.

func (*PushStack) Stop

func (s *PushStack) Stop()

Stop ends processing of stack items. This also ends draining of items if Drain has been called.

type PushStackPut

type PushStackPut interface {
	Push(interface{})
	Height() int
	Count() int
	IsStarted() bool
}

PushStackPut provides an interface that can be passed to clients to work with the stack. It contains only the methods required by a client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL