zenq

package module
v0.0.0-...-fdf27ea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2022 License: MIT Imports: 7 Imported by: 1

README

ZenQ

A low-latency thread-safe queue in golang implemented using a lock-free ringbuffer

Features

  • Much faster than native channels in both SPSC (single-producer-single-consumer) and MPSC (multi-producer-single-consumer) modes in terms of time/op
  • More resource efficient in terms of memory_allocation/op and num_allocations/op evident while benchmarking large batch size inputs
  • Handles the case where NUM_WRITER_GOROUTINES > NUM_CPU_CORES much better than native channels
  • Selection from multiple ZenQs just like golang's select{} ensuring fair selection and no starvation
  • Closing a ZenQ

Benchmarks to support the above claims here

Installation

You need Golang 1.18.x or above since this package uses generics

$ go get github.com/alphadose/zenq@v2.6.2

Usage

  1. Simple Read/Write
package main

import (
	"fmt"

	"github.com/alphadose/zenq"
)

type payload struct {
	alpha int
	beta  string
}

func main() {
	zq := zenq.New[payload](10)

	for j := 0; j < 5; j++ {
		go func() {
			for i := 0; i < 20; i++ {
				zq.Write(payload{
					alpha: i,
					beta:  fmt.Sprint(i),
				})
			}
		}()
	}

	for i := 0; i < 100; i++ {
		if data, queueOpen := zq.Read(); queueOpen {
			fmt.Printf("%+v\n", data)
		}
	}
}
  1. Selection from multiple ZenQs just like golang's native select{}. The selection process is fair i.e no single ZenQ gets starved
package main

import (
	"fmt"

	"github.com/alphadose/zenq"
)

type custom1 struct {
	alpha int
	beta  string
}

type custom2 struct {
	gamma int
}

const size = 100

var (
	zq1 = zenq.New[int](size)
	zq2 = zenq.New[string](size)
	zq3 = zenq.New[custom1](size)
	zq4 = zenq.New[*custom2](size)
)

func main() {
	go looper(intProducer)
	go looper(stringProducer)
	go looper(custom1Producer)
	go looper(custom2Producer)

	for i := 0; i < 40; i++ {

		// Selection occurs here
		if data, ok := zenq.Select(zq1, zq2, zq3, zq4); ok {
			switch data.(type) {
			case int:
				fmt.Printf("Received int %d\n", data)
			case string:
				fmt.Printf("Received string %s\n", data)
			case custom1:
				fmt.Printf("Received custom data type number 1 %#v\n", data)
			case *custom2:
				fmt.Printf("Received pointer %#v\n", data)
			}
		}
	}
}

func intProducer(ctr int) { zq1.Write(ctr) }

func stringProducer(ctr int) { zq2.Write(fmt.Sprint(ctr * 10)) }

func custom1Producer(ctr int) { zq3.Write(custom1{alpha: ctr, beta: fmt.Sprint(ctr)}) }

func custom2Producer(ctr int) { zq4.Write(&custom2{gamma: 1 << ctr}) }

func looper(producer func(ctr int)) {
	for i := 0; i < 10; i++ {
		producer(i)
	}
}

Benchmarks

Benchmarking code available here

Note that if you run the benchmarks with --race flag then ZenQ will perform slower because the --race flag slows down the atomic operations in golang. Under normal circumstances, ZenQ will outperform golang native channels.

Hardware Specs
❯ neofetch
                    'c.          alphadose@ReiEki.local
                 ,xNMM.          ----------------------
               .OMMMMo           OS: macOS 12.3 21E230 arm64
               OMMM0,            Host: MacBookAir10,1
     .;loddo:' loolloddol;.      Kernel: 21.4.0
   cKMMMMMMMMMMNWMMMMMMMMMM0:    Uptime: 6 hours, 41 mins
 .KMMMMMMMMMMMMMMMMMMMMMMMWd.    Packages: 86 (brew)
 XMMMMMMMMMMMMMMMMMMMMMMMX.      Shell: zsh 5.8
;MMMMMMMMMMMMMMMMMMMMMMMM:       Resolution: 1440x900
:MMMMMMMMMMMMMMMMMMMMMMMM:       DE: Aqua
.MMMMMMMMMMMMMMMMMMMMMMMMX.      WM: Rectangle
 kMMMMMMMMMMMMMMMMMMMMMMMMWd.    Terminal: iTerm2
 .XMMMMMMMMMMMMMMMMMMMMMMMMMMk   Terminal Font: FiraCodeNerdFontComplete-Medium 16 (normal)
  .XMMMMMMMMMMMMMMMMMMMMMMMMK.   CPU: Apple M1
    kMMMMMMMMMMMMMMMMMMMMMMd     GPU: Apple M1
     ;KMMMMMMMWXXWMMMMMMMk.      Memory: 1370MiB / 8192MiB
       .cooc,.    .,coo:.

Terminology
  • NUM_WRITERS -> The number of goroutines concurrently writing to ZenQ/Channel
  • INPUT_SIZE -> The number of input payloads to be passed through ZenQ/Channel from producers to consumer
Computed from benchstat of 30 benchmarks each via go test -benchmem -bench=. benchmarks/simple/*.go

name                                     time/op
_Chan_NumWriters1_InputSize600-8          23.4µs ± 1%
_ZenQ_NumWriters1_InputSize600-8          17.7µs ± 0%
_Chan_NumWriters3_InputSize60000-8        5.54ms ± 5%
_ZenQ_NumWriters3_InputSize60000-8        2.63ms ± 2%
_Chan_NumWriters8_InputSize6000000-8       687ms ± 2%
_ZenQ_NumWriters8_InputSize6000000-8       243ms ± 4%
_Chan_NumWriters100_InputSize6000000-8     1.59s ± 4%
_ZenQ_NumWriters100_InputSize6000000-8     296ms ± 2%
_Chan_NumWriters1000_InputSize7000000-8    1.97s ± 0%
_ZenQ_NumWriters1000_InputSize7000000-8    409ms ± 2%
_Chan_Million_Blocking_Writers-8           10.4s ± 4%
_ZenQ_Million_Blocking_Writers-8           1.83s ±10%

name                                     alloc/op
_Chan_NumWriters1_InputSize600-8           0.00B
_ZenQ_NumWriters1_InputSize600-8           0.00B
_Chan_NumWriters3_InputSize60000-8          117B ±63%
_ZenQ_NumWriters3_InputSize60000-8        22.1B ±122%
_Chan_NumWriters8_InputSize6000000-8     1.01kB ±196%
_ZenQ_NumWriters8_InputSize6000000-8      1.12kB ±89%
_Chan_NumWriters100_InputSize6000000-8    42.6kB ±37%
_ZenQ_NumWriters100_InputSize6000000-8    11.3kB ±28%
_Chan_NumWriters1000_InputSize7000000-8    481kB ± 7%
_ZenQ_NumWriters1000_InputSize7000000-8   90.5kB ± 6%
_Chan_Million_Blocking_Writers-8           553MB ± 0%
_ZenQ_Million_Blocking_Writers-8           123MB ± 4%

name                                     allocs/op
_Chan_NumWriters1_InputSize600-8            0.00
_ZenQ_NumWriters1_InputSize600-8            0.00
_Chan_NumWriters3_InputSize60000-8          0.00
_ZenQ_NumWriters3_InputSize60000-8          0.00
_Chan_NumWriters8_InputSize6000000-8       3.43 ±162%
_ZenQ_NumWriters8_InputSize6000000-8        5.23 ±53%
_Chan_NumWriters100_InputSize6000000-8       158 ±20%
_ZenQ_NumWriters100_InputSize6000000-8      26.3 ±29%
_Chan_NumWriters1000_InputSize7000000-8    1.76k ± 2%
_ZenQ_NumWriters1000_InputSize7000000-8     48.3 ±28%
_Chan_Million_Blocking_Writers-8           2.00M ± 0%
_ZenQ_Million_Blocking_Writers-8           1.00M ± 0%

The above results show that ZenQ is more efficient than channels in all 3 metrics i.e time/op, mem_alloc/op and num_allocs/op for the following tested cases:-

  1. SPSC
  2. MPSC with NUM_WRITER_GOROUTINES < NUM_CPU_CORES
  3. MPSC with NUM_WRITER_GOROUTINES > NUM_CPU_CORES

Cherry on the Cake

In SPSC mode ZenQ is faster than channels by 78 seconds in case of input size 6 * 108

❯ go run benchmarks/simple/main.go

With Input Batch Size: 60 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 33.417µs
ZenQ Runner completed transfer in: 19.667µs
====================================================================

With Input Batch Size: 600 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 86.5µs
ZenQ Runner completed transfer in: 54µs
====================================================================

With Input Batch Size: 6000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 1.144208ms
ZenQ Runner completed transfer in: 842.083µs
====================================================================

With Input Batch Size: 6000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 1.122868875s
ZenQ Runner completed transfer in: 377.636334ms
====================================================================

With Input Batch Size: 600000000 and Num Concurrent Writers: 1

Native Channel Runner completed transfer in: 1m51.694234834s
ZenQ Runner completed transfer in: 33.276190167s
====================================================================

Documentation

Index

Constants

View Source
const (
	// Both reads and writes are possible
	StateOpen = iota
	// No further writes can be performed and you can only read upto the last committed write in this state
	StateClosedForWrites
	// Neither reads nor writes are possible, queue is fully exhausted
	StateFullyClosed
)

ZenQ global state enums

View Source
const (
	// Open for being selected
	SelectionOpen = iota
	// Running state
	SelectionRunning
)

ZenQ selector state enums

View Source
const (
	SlotEmpty = iota
	SlotBusy
	SlotCommitted
	SlotClosed
)

ZenQ Slot state enums

Variables

This section is empty.

Functions

func Fastlog2

func Fastlog2(x float64) float64

func Fastrand

func Fastrand() uint32

func GetG

func GetG() unsafe.Pointer

GetG returns the pointer to the current goroutine defined in the asm files

func Readgstatus

func Readgstatus(gp unsafe.Pointer) uint32

func Select

func Select(streams ...Selectable) (data any, ok bool)

Select selects a single element out of multiple ZenQs the second parameter tells if all ZenQs were closed or not before reading, in which case the data returned is nil

Types

type List

type List struct {
	// contains filtered or unexported fields
}

List is a lock-free linked list theory -> https://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf pseudocode -> https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html

func NewList

func NewList() List

NewList returns a new list

func (*List) Dequeue

func (l *List) Dequeue() (value *Selection)

Dequeue removes and returns the value at the head of the queue to the memory pool It returns nil if the list is empty

func (*List) Enqueue

func (l *List) Enqueue(value *Selection)

Enqueue inserts a value into the list

type SelectFactory

type SelectFactory struct {
	// contains filtered or unexported fields
}

type Selectable

type Selectable interface {
	IsClosed() bool
	EnqueueSelector(*Selection)
	ReadFromBackLog() (data any, ok bool)
	Signal() uint8
}

Selectable is an interface for getting selected among many others

type Selection

type Selection struct {
	ThreadPtr *unsafe.Pointer
	Data      any
	// contains filtered or unexported fields
}

Selection is an object shared by a selector and its children ZenQs This object is used for selection notification

func (*Selection) AllQueuesClosed

func (sel *Selection) AllQueuesClosed() (allQueuesClosed bool)

AllQueuesClosed returns whether all the queues present in selection are closed or not

func (*Selection) DecrementReferenceCount

func (sel *Selection) DecrementReferenceCount()

DecrementReferenceCount decrements the reference count by 1 and puts the object back into the pool if it reaches 0

func (*Selection) IncrementReferenceCount

func (sel *Selection) IncrementReferenceCount()

IncrementReferenceCount does exactly what it says

func (*Selection) SignalQueueClosure

func (sel *Selection) SignalQueueClosure() (allQueuesClosed bool)

SignalQueueClosure signals the closure of one ZenQ to the selector thread it returns if all queues were closed or not in which case the calling thread must goready() the selector thread

type Slot

type Slot[T any] struct {
	State       uint32
	WriteParker *ThreadParker[T]
	Item        T
}

type ThreadParker

type ThreadParker[T any] struct {
	// contains filtered or unexported fields
}

ThreadParker is a data-structure used for sleeping and waking up goroutines on user call useful for saving up resources by parking excess goroutines and pre-empt them when required with minimal latency overhead Uses the same lock-free linked list implementation as in `list.go`

func NewThreadParker

func NewThreadParker[T any](n unsafe.Pointer) *ThreadParker[T]

NewThreadParker returns a new thread parker.

func (*ThreadParker[T]) Park

func (tp *ThreadParker[T]) Park(nextNode unsafe.Pointer)

Park parks the current calling goroutine This keeps only one parked goroutine in state at all times the parked goroutine is called with minimal overhead via goready() due to both being in userland This ensures there is no thundering herd https://en.wikipedia.org/wiki/Thundering_herd_problem

func (*ThreadParker[T]) Ready

func (tp *ThreadParker[T]) Ready() (data T, ok bool, freeable *parkSpot[T])

Ready calls one parked goroutine from the queue if available

type ZenQ

type ZenQ[T any] struct {
	SelectFactory
	// contains filtered or unexported fields
}

ZenQ is the CPU cache optimized ringbuffer implementation

func New

func New[T any](size uint64) *ZenQ[T]

New returns a new queue given its payload type passed as a generic parameter

func (*ZenQ[T]) Close

func (self *ZenQ[T]) Close() (alreadyClosedForWrites bool)

Close closes the ZenQ for further writes You can only read uptill the last committed write after closing This function will be blocking in case the queue is full ZenQ is closed from a writer goroutine by design, hence it should always be called from a writer goroutine and never from a reader goroutine which might cause the reader to get blocked and hence deadlock It returns if the queue was already closed for writes or not

func (*ZenQ[T]) CloseAsync

func (self *ZenQ[T]) CloseAsync()

CloseAsync closes the channel asynchronously Useful when an user wants to close the channel from a reader end without blocking the thread

func (*ZenQ[T]) Dump

func (self *ZenQ[T]) Dump()

Dump dumps the current queue state Unsafe to be called from multiple goroutines

func (*ZenQ[T]) EnqueueSelector

func (self *ZenQ[T]) EnqueueSelector(sel *Selection)

EnqueueSelector pushes a calling selector to this ZenQ's selector waitlist

func (*ZenQ[T]) IsClosed

func (self *ZenQ[T]) IsClosed() (closed bool)

IsClosed returns whether the zenq is closed for both reads and writes

func (*ZenQ[T]) Read

func (self *ZenQ[T]) Read() (data T, queueOpen bool)

Read reads a value from the queue, you can once read once per object

func (*ZenQ[T]) ReadFromBackLog

func (self *ZenQ[T]) ReadFromBackLog() (data any, ok bool)

ReadFromBackLog tries to read a data from backlog if available

func (*ZenQ[T]) Reset

func (self *ZenQ[T]) Reset()

Reset resets the queue state This also releases all parked goroutines if any and drains all committed writes

func (*ZenQ[T]) Signal

func (self *ZenQ[T]) Signal() (sig uint8)

Signal is the mechanism by which a selector notifies this ZenQ's auxillary thread to contest for the selection

func (*ZenQ[T]) Write

func (self *ZenQ[T]) Write(value T) (queueClosedForWrites bool)

Write writes a value to the queue It returns whether the queue is currently open for writes or not If not then it might be still open for reads, which can be checked by calling zenq.IsClosed()

Directories

Path Synopsis
benchmarks
selector command
simple command
examples
selector command
simple command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL