gosync

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2024 License: MIT Imports: 8 Imported by: 2

Documentation

Overview

Copyright 2016 The Go Authors. All rights reserved. Use of this source code is governed by a BSD-style license that can be found in the LICENSE file. inspired by https://github.com/a8m/syncmap,https://github.com/smallnest/gofer/blob/master/syncx/sync_map.go

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func OnceFunc added in v0.0.5

func OnceFunc(f func()) func()

OnceFunc returns a function wrapping f which ensures f is only executed once even if the returned function is executed multiple times.

Types

type AtomicDuration added in v0.0.5

type AtomicDuration struct {
	// contains filtered or unexported fields
}

AtomicDuration is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions.

func NewAtomicDuration added in v0.0.5

func NewAtomicDuration(duration time.Duration) AtomicDuration

NewAtomicDuration initializes a new AtomicDuration with a given value.

func (*AtomicDuration) Add added in v0.0.5

func (d *AtomicDuration) Add(duration time.Duration) time.Duration

Add atomically adds duration to the value.

func (*AtomicDuration) CompareAndSwap added in v0.0.5

func (d *AtomicDuration) CompareAndSwap(oldval, newval time.Duration) (swapped bool)

CompareAndSwap automatically swaps the old with the new value.

func (*AtomicDuration) Get added in v0.0.5

func (d *AtomicDuration) Get() time.Duration

Get atomically returns the current value.

func (*AtomicDuration) Set added in v0.0.5

func (d *AtomicDuration) Set(duration time.Duration)

Set atomically sets duration as new value.

type CallbackSerializer added in v0.0.5

type CallbackSerializer struct {
	// contains filtered or unexported fields
}

CallbackSerializer provides a mechanism to schedule callbacks in a synchronized manner. It provides a FIFO guarantee on the order of execution of scheduled callbacks. New callbacks can be scheduled by invoking the Schedule() method.

This type is safe for concurrent access.

func NewCallbackSerializer added in v0.0.5

func NewCallbackSerializer(ctx context.Context) *CallbackSerializer

NewCallbackSerializer returns a new CallbackSerializer instance. The provided context will be passed to the scheduled callbacks. Users should cancel the provided context to shutdown the CallbackSerializer. It is guaranteed that no callbacks will be added once this context is canceled, and any pending un-run callbacks will be executed before the serializer is shut down.

func (*CallbackSerializer) Done added in v0.0.5

func (cs *CallbackSerializer) Done() <-chan struct{}

Done returns a channel that is closed after the context passed to NewCallbackSerializer is canceled and all callbacks have been executed.

func (*CallbackSerializer) Schedule added in v0.0.5

func (cs *CallbackSerializer) Schedule(f func(ctx context.Context)) bool

Schedule adds a callback to be scheduled after existing callbacks are run.

Callbacks are expected to honor the context when performing any blocking operations, and should return early when the context is canceled.

Return value indicates if the callback was successfully added to the list of callbacks to be executed by the serializer. It is not possible to add callbacks once the context passed to NewCallbackSerializer is cancelled.

type Group

type Group struct {
	// contains filtered or unexported fields
}

A Group is a collection of goroutines working on subtasks that are part of the same overall task.

A zero Group is valid and does not cancel on error.

Example (JustErrors)

JustErrors illustrates the use of a Group in place of a sync.WaitGroup to simplify goroutine counting and error handling. This example is derived from the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.

var g Group
var urls = []string{
	"http://www.golang.org/",
	"http://www.google.com/",
	"http://www.somestupidname.com/",
}
for _, url := range urls {
	// Launch a goroutine to fetch the URL.
	url := url // https://golang.org/doc/faq#closures_and_goroutines
	g.Go(func(context.Context) error {
		// Fetch the URL.
		resp, err := http.Get(url)
		if err == nil {
			resp.Body.Close()
		}
		return err
	})
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
	fmt.Println("Successfully fetched all URLs.")
}
Example (Parallel)

Parallel illustrates the use of a Group for synchronizing a simple parallel task: the "Google Search 2.0" function from https://talks.golang.org/2012/concurrency.slide#46, augmented with a Context and error-handling.

Google := func(ctx context.Context, query string) ([]Result, error) {
	g := WithContext(ctx)

	searches := []Search{Web, Image, Video}
	results := make([]Result, len(searches))
	for i, search := range searches {
		i, search := i, search // https://golang.org/doc/faq#closures_and_goroutines
		g.Go(func(context.Context) error {
			result, err := search(ctx, query)
			if err == nil {
				results[i] = result
			}
			return err
		})
	}
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return results, nil
}

results, err := Google(context.Background(), "golang")
if err != nil {
	fmt.Fprintln(os.Stderr, err)
	return
}
for _, result := range results {
	fmt.Println(result)
}
Output:
web result for "golang"
image result for "golang"
video result for "golang"

func WithCancel

func WithCancel(ctx context.Context) *Group

WithCancel create a new Group and an associated Context derived from ctx.

given function from Go will receive context derived from this ctx, The derived Context is canceled the first time a function passed to Go returns a non-nil error or the first time Wait returns, whichever occurs first.

func WithContext

func WithContext(ctx context.Context) *Group

WithContext create a Group. given function from Go will receive this context,

func (*Group) GOMAXPROCS

func (g *Group) GOMAXPROCS(n int)

GOMAXPROCS set max goroutine to work.

func (*Group) Go

func (g *Group) Go(f func(ctx context.Context) error)

Go calls the given function in a new goroutine.

The first call to return a non-nil error cancels the group; its error will be returned by Wait.

func (*Group) Wait

func (g *Group) Wait() error

Wait blocks until all function calls from the Go method have returned, then returns the first non-nil error (if any) from them.

type Map added in v0.0.2

type Map[K comparable, V any] struct {
	// contains filtered or unexported fields
}

Map is like a Go map[any]any but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes run in amortized constant time.

The Map type is specialized. Most code should use a plain Go map instead, with separate locking or coordination, for better type safety and to make it easier to maintain other invariants along with the map content.

The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex.

The zero Map is empty and ready for use. A Map must not be copied after first use.

In the terminology of the Go memory model, Map arranges that a write operation “synchronizes before” any read operation that observes the effect of the write, where read and write operations are defined as follows. Load, LoadAndDelete, LoadOrStore, Swap, CompareAndSwap, and CompareAndDelete are read operations; Delete, LoadAndDelete, Store, and Swap are write operations; LoadOrStore is a write operation when it returns loaded set to false; CompareAndSwap is a write operation when it returns swapped set to true; and CompareAndDelete is a write operation when it returns deleted set to true.

func NewMap added in v0.0.3

func NewMap[K comparable, V any]() *Map[K, V]

Map is like a Go map[interface{}]interface{} but is safe for concurrent use by multiple goroutines without additional locking or coordination. Loads, stores, and deletes run in amortized constant time.

func (*Map[K, V]) Clear added in v0.0.5

func (m *Map[K, V]) Clear()

Clear removes all entries from the map.

func (*Map[K, V]) Clone added in v0.0.5

func (m *Map[K, V]) Clone() *Map[K, V]

Clone returns a shallow copy of the map.

func (*Map[K, V]) CompareAndDelete added in v0.0.5

func (m *Map[K, V]) CompareAndDelete(key K, old V) (deleted bool)

CompareAndDelete deletes the entry for key if its value is equal to old. The old value must be of a comparable type.

If there is no current value for key in the map, CompareAndDelete returns false (even if the old value is the nil interface value).

func (*Map[K, V]) CompareAndSwap added in v0.0.5

func (m *Map[K, V]) CompareAndSwap(key K, old, new V) bool

CompareAndSwap swaps the old and new values for key if the value stored in the map is equal to old. The old value must be of a comparable type.

func (*Map[K, V]) Delete added in v0.0.2

func (m *Map[K, V]) Delete(key K)

Delete deletes the value for a key.

func (*Map[K, V]) Len added in v0.0.5

func (m *Map[K, V]) Len() int

Len returns the number of entries in the map.

func (*Map[K, V]) Load added in v0.0.2

func (m *Map[K, V]) Load(key K) (value V, ok bool)

Load returns the value stored in the map for a key, or nil if no value is present. The ok result indicates whether value was found in the map.

func (*Map[K, V]) LoadAndDelete added in v0.0.2

func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool)

LoadAndDelete deletes the value for a key, returning the previous value if any. The loaded result reports whether the key was present.

func (*Map[K, V]) LoadOrStore added in v0.0.2

func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)

LoadOrStore returns the existing value for the key if present. Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored.

func (*Map[K, V]) Range added in v0.0.2

func (m *Map[K, V]) Range(f func(key K, value V) bool)

Range calls f sequentially for each key and value present in the map. If f returns false, range stops the iteration.

Range does not necessarily correspond to any consistent snapshot of the Map's contents: no key will be visited more than once, but if the value for any key is stored or deleted concurrently (including by f), Range may reflect any mapping for that key from any point during the Range call. Range does not block other methods on the receiver; even f itself may call any method on m.

Range may be O(N) with the number of elements in the map even if f returns false after a constant number of calls.

func (*Map[K, V]) Store added in v0.0.2

func (m *Map[K, V]) Store(key K, value V)

Store sets the value for a key.

func (*Map[K, V]) Swap added in v0.0.5

func (m *Map[K, V]) Swap(key K, value V) (previous V, loaded bool)

Swap swaps the value for a key and returns the previous value if any. The loaded result reports whether the key was present.

type OnceEvent added in v0.0.5

type OnceEvent struct {
	// contains filtered or unexported fields
}

OnceEvent represents a one-time event that may occur in the future.

func NewOnceEvent added in v0.0.5

func NewOnceEvent() *OnceEvent

NewOnceEvent returns a new, ready-to-use Event.

func (*OnceEvent) Done added in v0.0.5

func (e *OnceEvent) Done() <-chan struct{}

Done returns a channel that will be closed when Fire is called.

func (*OnceEvent) Fire added in v0.0.5

func (e *OnceEvent) Fire() bool

Fire causes e to complete. It is safe to call multiple times, and concurrently. It returns true iff this call to Fire caused the signaling channel returned by Done to close.

func (*OnceEvent) HasFired added in v0.0.5

func (e *OnceEvent) HasFired() bool

HasFired returns true if Fire has been called.

type Ontime added in v0.0.4

type Ontime struct {
	// contains filtered or unexported fields
}

Ontime provides a mechanism to cache a value for a given duration.

func (*Ontime) Do added in v0.0.4

func (o *Ontime) Do(duration time.Duration, f func())

Do executes f if the cache is empty, and caches the result for duration.

type PointerWithReset added in v0.0.6

type PointerWithReset[T any] interface {
	*T

	Reset()
}

type Pool

type Pool[T any, P PointerWithReset[T]] struct {
	New func() P
	// contains filtered or unexported fields
}

func NewPool

func NewPool[T any, P PointerWithReset[T]](new func() P) *Pool[T, P]

func (*Pool[T, P]) Get

func (p *Pool[T, P]) Get() P

func (*Pool[T, P]) Put

func (p *Pool[T, P]) Put(value P)

type PubSub added in v0.0.5

type PubSub struct {
	// contains filtered or unexported fields
}

PubSub is a simple one-to-many publish-subscribe system that supports messages of arbitrary type. It guarantees that messages are delivered in the same order in which they were published.

Publisher invokes the Publish() method to publish new messages, while subscribers interested in receiving these messages register a callback via the Subscribe() method.

Once a PubSub is stopped, no more messages can be published, but any pending published messages will be delivered to the subscribers. Done may be used to determine when all published messages have been delivered.

func NewPubSub added in v0.0.5

func NewPubSub(ctx context.Context) *PubSub

NewPubSub returns a new PubSub instance. Users should cancel the provided context to shutdown the PubSub.

func (*PubSub) Done added in v0.0.5

func (ps *PubSub) Done() <-chan struct{}

Done returns a channel that is closed after the context passed to NewPubSub is canceled and all updates have been sent to subscribers.

func (*PubSub) Publish added in v0.0.5

func (ps *PubSub) Publish(msg any)

Publish publishes the provided message to the PubSub, and invokes callbacks registered by subscribers asynchronously.

func (*PubSub) Subscribe added in v0.0.5

func (ps *PubSub) Subscribe(sub Subscriber) (cancel func())

Subscribe registers the provided Subscriber to the PubSub.

If the PubSub contains a previously published message, the Subscriber's OnMessage() callback will be invoked asynchronously with the existing message to begin with, and subsequently for every newly published message.

The caller is responsible for invoking the returned cancel function to unsubscribe itself from the PubSub.

type Subscriber added in v0.0.5

type Subscriber interface {
	// OnMessage is invoked when a new message is published. Implementations
	// must not block in this method.
	OnMessage(msg any)
}

Subscriber represents an entity that is subscribed to messages published on a PubSub. It wraps the callback to be invoked by the PubSub when a new message is published.

type Unbounded added in v0.0.5

type Unbounded[T any] struct {
	// contains filtered or unexported fields
}

Unbounded is an implementation of an unbounded buffer which does not use extra goroutines. This is typically used for passing updates from one entity to another within gRPC.

All methods on this type are thread-safe and don't block on anything except the underlying mutex used for synchronization.

Unbounded supports values of any type to be stored in it by using a channel of `any`. This means that a call to Put() incurs an extra memory allocation, and also that users need a type assertion while reading. For performance critical code paths, using Unbounded is strongly discouraged and defining a new type specific implementation of this buffer is preferred. See internal/transport/transport.go for an example of this.

func NewUnbounded added in v0.0.5

func NewUnbounded[T any]() *Unbounded[T]

NewUnbounded returns a new instance of Unbounded.

func (*Unbounded[T]) Close added in v0.0.5

func (b *Unbounded[T]) Close()

Close closes the unbounded buffer. No subsequent data may be Put(), and the channel returned from Get() will be closed after all the data is read and Load() is called for the final time.

func (*Unbounded[T]) Get added in v0.0.5

func (b *Unbounded[T]) Get() <-chan T

Get returns a read channel on which values added to the buffer, via Put(), are sent on.

Upon reading a value from this channel, users are expected to call Load() to send the next buffered value onto the channel if there is any.

If the unbounded buffer is closed, the read channel returned by this method is closed after all data is drained.

func (*Unbounded[T]) Load added in v0.0.5

func (b *Unbounded[T]) Load()

Load sends the earliest buffered data, if any, onto the read channel returned by Get(). Users are expected to call this every time they successfully read a value from the read channel.

func (*Unbounded[T]) Put added in v0.0.5

func (b *Unbounded[T]) Put(t T) error

Put adds t to the unbounded buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL