wts

package module
v0.0.0-...-448db01 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2022 License: MIT Imports: 12 Imported by: 1

README

Websub Transport System

Websub transport system (WTS) is a janky data communication library written in go. It transports go structs over websub as JSON objects, along with some meta-data, such as date/time sent, event type, and sender.

Importing

Go version 1.18+ is required.

go get github.com/notnotquinn/wts

What does it do?

WTS sends go structs as JSON over websub, and has 3 major parts: Node, the Actor interface, and the Emitter interface.

A node is a host for Actors and Emitters, and is the unit of communication. Nodes communicate with each other based on the needs of their Actors and Emitters.

An Emitter just sends events, and other nodes can subscribe to those "data" events.

An actor listens to action "request"s, and calls a callback and sends an "executed" event when the requests are received. When the callback is not performed successfully (or the request is denied), the executed event is not sent.

Actors and Emitters are identified by their "Entity URLs", which an emitter and actor may share. An Entity URL has 3 Event URLs associated with it, one for each event. These Event URLs are the websub topics that are published and listened to. To subscribe to a remote node's events, you must know the entity URL and if the entity is an emitter or an actor, or both. You must also provide a go struct to unmarshal the data from, (or you may provide interface{})

The only current working example is in ./test/, if you would like to have a look. Its quite janky.

Documentation

Index

Constants

View Source
const (
	// websub Content-Type used for event payloads.
	PayloadContentType string = "application/vnd.wts-event-payload.v1+json"
)

Variables

View Source
var (
	ErrMismatchedTypeHash = errors.New("mismatched type hash while decoding EventPayload")
)

Functions

func AddActor

func AddActor[MsgType any](node *Node, a Actor[MsgType]) error

func AddActorHook

func AddActorHook[MsgType any](
	node *Node,
	actorURL string,
	onRequested func(eventURL string, msg *EventPayload[MsgType]),
	onExecuted func(eventURL string, msg *EventPayload[MsgType]),
) (broadcastRequest func(msg MsgType) error, err error)

func AddEmitter

func AddEmitter[MsgType any](node *Node, e Emitter[MsgType]) error

func AddEmitterHook

func AddEmitterHook[MsgType any](
	node *Node,
	actorURL string,
	onData OnEventFunc[MsgType],
) (broadcastData func(msg MsgType) error, err error)

func EncodeMessage

func EncodeMessage[MsgType any](
	msg MsgType,
	eventType EventType,
	sender string,
) ([]byte, error)

EncodeMessage encodes a message to JSON

func NewEncoderProxy

func NewEncoderProxy[MsgType any]() *encoderProxy

Types

type Actor

type Actor[MsgType any] interface {
	// Returns a human-readable, URL safe name for this actor. (can contain slashes)
	Name() string
	// ShouldAct returns whether the Act method
	// should be called for this message.
	ShouldAct(msg *EventPayload[MsgType]) (ok bool)
	// Act performs an action, and returns whether the
	// action was completed successfully.
	Act(msg *EventPayload[MsgType]) (ok bool)
}

Actor performs an action on behalf of the node, which gets requests from other services.

If you would like to trigger the actor programmatically please do it through the Node the actor uses, so the proper events can be fired.

func NewFuncActor

func NewFuncActor[MsgType any](
	name string,
	shouldAct IndicatorFunc[MsgType],
	act IndicatorFunc[MsgType],
) Actor[MsgType]

NewFuncActor creates a new actor that calls the passed functions for its methods.

type BasicEmitter

type BasicEmitter[MsgType any] struct {
	EmitterName string
	DataChannel <-chan MsgType
}

BasicEmitter is a basic implementation of an Emitter.

func (*BasicEmitter[MsgType]) DataEvents

func (e *BasicEmitter[MsgType]) DataEvents() <-chan MsgType

DataEvents returns a channel that passes dataEvents to be published.

func (*BasicEmitter[MsgType]) Name

func (e *BasicEmitter[MsgType]) Name() string

Name returns a human-readable constant name for this emitter.

type Emitter

type Emitter[MsgType any] interface {
	// Returns a human-readable constant name for this emitter.
	Name() string
	// Returns a channel that passes dataEvents to be published.
	DataEvents() <-chan MsgType
}

An Emitter is a data source for a Node to publish data events about.

func NewBasicEmitter

func NewBasicEmitter[MsgType any](name string, ch <-chan MsgType) Emitter[MsgType]

NewBasicEmitter creates and returns a new BasicEmitter as Emitter[MsgType]

type EventPayload

type EventPayload[MsgType any] struct {
	// The event data.
	Data      MsgType   `json:"data"`
	DateSent  time.Time `json:"dateSent"`
	EventType EventType `json:"eventType"`
	Sender    string    `json:"sender"`
}

EventPayload is the message that is sent over websub.

func DecodeMessage

func DecodeMessage[MsgType any](bytes []byte) (*EventPayload[MsgType], error)

DecodeMessage decodes a message from JSON.

func (*EventPayload[MsgType]) CopyToAny

func (e *EventPayload[MsgType]) CopyToAny() *EventPayload[any]

CopyToAny creates a new copy of e with the [any] type parameter

type EventType

type EventType string

An EventType is associated with a EventPayload, and tells the receiver what type of event the payload is

const (
	// A request event is received by the node when
	// another service requested the action to be performed
	//
	// The node consults the actor and performs the action,
	// once the action is complete the node will send an
	// executed event.
	Request EventType = "request"
	// An 'executed' event is sent by the node when the
	// action is performed successfully.
	Executed EventType = "executed"
	// A data event is sent by the node when an emitter
	// emits a data event locally.
	Data EventType = "data"
)

func ParseEventURL

func ParseEventURL(eventURL string) (entityURL string, eventType EventType, err error)

ParseEventURL parses the event type and entity of an Event URL

An event URL is any url that ends with /data /request or /executed

type FuncActor

type FuncActor[MsgType any] struct {
	ActorName     string
	ShouldActFunc IndicatorFunc[MsgType]
	ActFunc       IndicatorFunc[MsgType]
}

FuncActor is a basic implementation of an actor.

func (FuncActor[MsgType]) Act

func (a FuncActor[MsgType]) Act(msg *EventPayload[MsgType]) (ok bool)

Act performs an action, and returns whether the action was completed successfully.

func (FuncActor[MsgType]) Name

func (a FuncActor[MsgType]) Name() string

Name returns a human-readable name for this actor.

func (FuncActor[MsgType]) ShouldAct

func (a FuncActor[MsgType]) ShouldAct(msg *EventPayload[MsgType]) (ok bool)

ShouldAct returns whether the Act method should be called for this message.

type IndicatorFunc

type IndicatorFunc[MsgType any] func(msg *EventPayload[MsgType]) (ok bool)

IndicatorFunc indicates something in relation to an event payload.

type Node

type Node struct {
	// websub publisher used to publish websub events for communication
	*websub.Publisher
	// websub subscriber used to listen for websub events for communication
	*websub.Subscriber
	// contains filtered or unexported fields
}

Node acts as a GenericActor or a GenericEmitter or both.

func NewNode

func NewNode(baseURL, hubURL string, options ...NodeOption) *Node

NewNode creates a new node with the provided options.

func (*Node) BaseURL

func (n *Node) BaseURL() string

func (*Node) Broadcast

func (n *Node) Broadcast(eventURL string, msgData any) error

func (*Node) BroadcastAny

func (n *Node) BroadcastAny(eventURL string, msgData any) error

BroadcastAny does not perform type checks

func (*Node) ServeHTTP

func (n *Node) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Node) SubscribeAll

func (n *Node) SubscribeAll() error

SubscribeAll subscribes to topics required by the node to function.

func (*Node) UnsubscribeAll

func (n *Node) UnsubscribeAll() error

UnsubscribeAll removes all required subscriptions for the node

type NodeOption

type NodeOption func(n *Node)

func WithPublisherOptions

func WithPublisherOptions(opts ...websub.PublisherOption) NodeOption

WithPublisherOptions defaults to

[]websub.PublisherOption{
  websub.PAdvertiseInvalidTopics(true),
  websub.PWithPostBodyAsContent(true),
}

Defaults take precedence over options set here, and base url is set in relation to the Node's baseURL.

func WithSubscriberOptions

func WithSubscriberOptions(
	opts ...websub.SubscriberOption,
) NodeOption

WithSubscriberOptions defaults to

[]websub.SubscriberOption{
}

Defaults take precedence over options set here, and base url is set in relation to the Node's baseURL.

type OnEventFunc

type OnEventFunc[MsgType any] func(eventURL string, msg *EventPayload[MsgType])

Directories

Path Synopsis
test command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL