discovery

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2024 License: GPL-3.0 Imports: 14 Imported by: 0

README

Red Discovery

red discovery is one of the derivatives of the RedQueen project, used to implement simple service registration/discovery based on RedQueen

Get red discovery:

go get github.com/RealFax/red-discovery@latest

Note: In go1.21, you need to enable GOEXPERIMENT=loopvar, this issue will be solved in the near future

API

Register a service
endpoint := &discovery.Endpoint{
    ID:          "node-1",
    PeerAddress: "localhost:8080",
    Metadata:    nil,
}
// set endpoint ttl
endpoint.SetTTL(1200)

// put endpoint metadata
_ = endpoint.PutMetadata(discovery.NewKVMetadataFromMap(map[string]string{
"name": "super-node-1",
}))

if err := client.Register(nil, naming, endpoint); err != nil {
// handler register error
}
Discovery a service
if err := client.Discovery(nil, naming); err != nil {
	// handle discovery error
}
Invoke service
srv, found := client.Service(naming)
if !found {
	// handle service not found error (discovery not called first)
}

conn, err := srv.NextAliveConn()
if err != nil {
	// handle error
}

// invoke grpc api
// after the call is completed, conn.Release() should be called to release the connection
conn.Target()
Service status listener
listenerID, err := client.UseListener(naming, func(ready bool, conn *discovery.GrpcPoolConn, wg *sync.WaitGroup) {
	// handle listener callback
	// you should defer called wg.Done
})
if err != nil {
	// handle UseListener error
}

// destroy listener by listener id
client.DestroyListener(naming, listenerID)

For more usage, see Example...

Documentation

Index

Examples

Constants

View Source
const (
	MaxEndpointSize uint64 = 8192
)

Variables

View Source
var (
	ErrServiceNotExist      = errors.New("sdr: service not existed")
	ErrDiscoveryHasExist    = errors.New("sdr: discovery has existed")
	ErrShouldDiscoveryFirst = errors.New("sdr: should discovery first")
	ErrServiceUnreachable   = errors.New("sdr: service unreachable")
)
View Source
var (
	ErrInvalidEndpointPathFormat = errors.New("sdr: ParseEndpointPath invalid endpoint path format")
)

Functions

func AutoKeepAlive

func AutoKeepAlive(ctx context.Context, namespace *string, naming string, client *Client, endpoint *Endpoint) error

func ParseEndpointPath

func ParseEndpointPath(path string) (string, string, error)

Types

type Client

type Client struct {
	Namespace *string

	DiscoveryAndRegister
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, endpoints []string, dialOpts ...grpc.DialOption) (*Client, error)

func NewWithClient

func NewWithClient(ctx context.Context, c *client.Client, dialOpts ...grpc.DialOption) *Client

func (*Client) Close

func (c *Client) Close() error

func (*Client) Service

func (c *Client) Service(naming string) (Service, bool)
Example
package main

import (
	"context"
	discovery "github.com/RealFax/red-discovery"
)

const (
	naming = "pkg.discovery.test"
)

var (
	client *discovery.Client
)

func init() {
	var err error
	if client, err = discovery.New(context.Background(), []string{
		"127.0.0.1:5230",
		"127.0.0.1:4230",
		"127.0.0.1:3230",
	}); err != nil {
		panic("init sdr client error, cause: " + err.Error())
	}
}

func main() {
	srv, found := client.Service(naming)
	if !found {
		// handle service not found error (discovery not called first)
	}

	conn, err := srv.NextAliveConn()
	if err != nil {
		// handle error
	}

	// invoke grpc api
	// after the call is completed, conn.Release() should be called to release the connection
	conn.Target()
}

func (*Client) SetNamespace

func (c *Client) SetNamespace(namespace string)

type DiscoveryAndRegister

type DiscoveryAndRegister interface {
	// ReleaseDiscovery cancel the discovery of a Naming.
	ReleaseDiscovery(naming string)

	// Discovery a Naming, will open a goroutine to achieve continuous discovery of Naming.
	Discovery(namespace *string, naming string) error

	// Unregister one or more services using an Endpoint ID.
	Unregister(namespace *string, naming string, ids ...string) error

	// Register one or more services with Naming.
	Register(namespace *string, naming string, endpoints ...*Endpoint) error

	// UseListener
	//
	// Monitors whether a Naming is available.
	// When ready is true, it means that there are available services for this Naming.
	// When it is false, it means that there are no services available under the Naming.
	UseListener(naming string, callback ListenCallbackFunc) (string, error)

	// DestroyListener cancel listening to Naming based on the ListenerID returned by UseListener.
	DestroyListener(naming, listenerID string)
}

func NewDiscoveryAndRegister

func NewDiscoveryAndRegister(
	ctx context.Context,
	services *maputil.Map[string, Service],
	c *client.Client,
	dialOpts ...grpc.DialOption,
) DiscoveryAndRegister

type Endpoint

type Endpoint struct {
	ID          string              `json:"id"`
	PeerAddress string              `json:"peer-addr"`
	Metadata    jsoniter.RawMessage `json:"metadata,omitempty"`
	// contains filtered or unexported fields
}

func NewEndpoint

func NewEndpoint(id string, peerAddr string, ttl uint32, md jsoniter.RawMessage) *Endpoint

func ParseEndpoint

func ParseEndpoint(b []byte) (*Endpoint, error)

func (*Endpoint) Expired

func (e *Endpoint) Expired() bool

func (*Endpoint) Key

func (e *Endpoint) Key() string

func (*Endpoint) Marshal

func (e *Endpoint) Marshal() ([]byte, error)

func (*Endpoint) PutMetadata

func (e *Endpoint) PutMetadata(md EndpointMetadata) (err error)

func (*Endpoint) SetTTL

func (e *Endpoint) SetTTL(ttl uint32)

func (*Endpoint) TTL

func (e *Endpoint) TTL() uint32

func (*Endpoint) Value

func (e *Endpoint) Value() *Endpoint

func (*Endpoint) WithNaming

func (e *Endpoint) WithNaming(naming string) string

type EndpointMetadata

type EndpointMetadata interface {
	Entry() (jsoniter.RawMessage, error)
}

type KVMetadata

type KVMetadata[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewKVMetadata

func NewKVMetadata[K comparable, V any]() *KVMetadata[K, V]

func NewKVMetadataFromMap

func NewKVMetadataFromMap[K comparable, V any](m map[K]V) *KVMetadata[K, V]

func (*KVMetadata[K, V]) Clone

func (v *KVMetadata[K, V]) Clone(m map[K]V)

func (*KVMetadata[K, V]) Del

func (v *KVMetadata[K, V]) Del(key K)

func (*KVMetadata[K, V]) Entry

func (v *KVMetadata[K, V]) Entry() (jsoniter.RawMessage, error)

func (*KVMetadata[K, V]) Get

func (v *KVMetadata[K, V]) Get(key K) (V, bool)

func (*KVMetadata[K, V]) Map

func (v *KVMetadata[K, V]) Map() map[K]V

func (*KVMetadata[K, V]) Set

func (v *KVMetadata[K, V]) Set(key K, value V)

type ListenCallbackFunc

type ListenCallbackFunc func(ready bool, conn *grpc.ClientConn, wg *sync.WaitGroup)

type Service

type Service interface {
	// Naming returns service naming.
	Naming() string

	// SetNaming set service naming.
	SetNaming(naming string)

	WithEndpointNaming(endpointID string) string

	// Alive returns whether the current service is available.
	Alive() bool

	// AddEndpoints by endpoints slice.
	AddEndpoints(endpoints ...*Endpoint)

	// DelEndpoints by id slice.
	DelEndpoints(ids ...string)

	RangeEndpoints(f func(endpoint *Endpoint) bool)

	// AliveConn returns the grpc connection of all service endpoints on internal.
	//
	// DON'T CLOSE GRPC CONN
	AliveConn() map[string]*grpc.ClientConn

	// NextAliveConn returns the internal grpc connection through the load balancing algorithm.
	//
	// DON"T CLOSE GRPC CONN
	NextAliveConn() (*grpc.ClientConn, error)

	// CloseAliveConn Close internal all grpc conn.
	CloseAliveConn()

	// DialContext realizes the grpc connect function of internal load balancing through round-robin algorithm.
	DialContext(ctx context.Context, opts ...grpc.DialOption) (*grpc.ClientConn, error)

	// DialAll connect to all endpoints.
	DialAll(ctx context.Context, opts ...grpc.DialOption) (map[string]*grpc.ClientConn, error)
}

func NewService

func NewService(ctx context.Context, naming string, dialOpts ...grpc.DialOption) Service

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL