marina

package module
v0.0.0-...-7671b10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2020 License: MIT Imports: 8 Imported by: 0

README

marina

An abstract library that implements a topic-based publish/subscribe mechanism, and using the concept of the digital twins that include the interfaces to bind remote service providers, achieves the decoupling from specific network protocol libraries.

test coverage

  • 100.0% of statements

low dependence

  1. cabinet (Using the tree-structure topics manager.)
  2. kademlia (Used for the twin‘s identity, cause support the distributed system.)
  3. bytesutil (Used for the binary codec.)
  4. testify (Used in testing.)
  5. goleak (Used in testing.)

Source of inspiration

At first, it was to build a stream-based message-hub service over the peer-to-peer network, which does not rely on the MQTT protocol. With the introduction of twin concept, it can be completely decoupled from the communication module, so that a peer-to-peer network is only an option, which can be a traditional cluster or a single-point server, and which can use the TCP or UDP protocol.
So it has stronger flexibility and versatility.

Design Architecture Diagram ( pdf // keynote )


% sysctl -a | grep machdep.cpu | grep 'brand_'
machdep.cpu.brand_string: Intel(R) Core(TM) i5-7267U CPU @ 3.10GHz

% go test . -cover -v
=== RUN   TestPacket
--- PASS: TestPacket (0.00s)
=== RUN   TestPublishWorker
--- PASS: TestPublishWorker (0.01s)
=== RUN   TestPublishWorkerForMultipleSubscribe
--- PASS: TestPublishWorkerForMultipleSubscribe (0.01s)
=== RUN   TestSubscribeWorker
--- PASS: TestSubscribeWorker (0.00s)
=== RUN   TestTaskPool
--- PASS: TestTaskPool (0.00s)
=== RUN   TestTwinsPool
--- PASS: TestTwinsPool (0.02s)
PASS
coverage: 100.0% of statements
ok      github.com/TheSmallBoat/marina  0.138s  coverage: 100.0% of statements

% go test -bench=. -benchtime=10s
goos: darwin
goarch: amd64
pkg: github.com/TheSmallBoat/marina
BenchmarkTaskPool-4     34860289               348 ns/op               0 B/op          0 allocs/op
BenchmarkTwinsPool-4    19888471               599 ns/op        2336.85 MB/s          40 B/op          3 allocs/op
PASS
ok      github.com/TheSmallBoat/marina  25.257s

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MessagePacket

type MessagePacket struct {
	// contains filtered or unexported fields
}

func NewMessagePacket

func NewMessagePacket(pubKadId *kademlia.ID, mid uint32, qos byte, topic []byte, payLoad []byte) *MessagePacket

func UnmarshalMessagePacket

func UnmarshalMessagePacket(buf []byte) (*MessagePacket, error)

func (*MessagePacket) AppendTo

func (mp *MessagePacket) AppendTo(dst []byte) []byte

func (*MessagePacket) Release

func (mp *MessagePacket) Release()

func (*MessagePacket) SetBrokerKadId

func (mp *MessagePacket) SetBrokerKadId(kadId *kademlia.ID)

func (*MessagePacket) SetSubscriberKadId

func (mp *MessagePacket) SetSubscriberKadId(kadId *kademlia.ID)

type PublishWorker

type PublishWorker struct {
	// contains filtered or unexported fields
}

The publish message-packets come from the producers.

func NewPublishWorker

func NewPublishWorker(bKadId *kademlia.ID, tTree *cabinet.TTree) *PublishWorker

func (*PublishWorker) Close

func (p *PublishWorker) Close()

func (*PublishWorker) EntitiesFor

func (p *PublishWorker) EntitiesFor(topic []byte) []interface{}

func (*PublishWorker) EntitiesNumFor

func (p *PublishWorker) EntitiesNumFor(topic []byte) int

func (*PublishWorker) Wait

func (p *PublishWorker) Wait()

func (*PublishWorker) WorkFor

func (p *PublishWorker) WorkFor(pkt *MessagePacket)

type SubscribeWorker

type SubscribeWorker struct {
	// contains filtered or unexported fields
}

The subscribe packets come from the peer-nodes.

func NewSubscribeWorker

func NewSubscribeWorker(twp *TwinsPool, tTree *cabinet.TTree) *SubscribeWorker

func (*SubscribeWorker) Close

func (s *SubscribeWorker) Close()

func (*SubscribeWorker) PeerNodeSubscribe

func (s *SubscribeWorker) PeerNodeSubscribe(prd *TwinServiceProvider, qos byte, topic []byte)

kid : the subscribe-peer-node kadId

func (*SubscribeWorker) PeerNodeUnSubscribe

func (s *SubscribeWorker) PeerNodeUnSubscribe(pubK kademlia.PublicKey, qos byte, topic []byte)

func (*SubscribeWorker) Wait

func (s *SubscribeWorker) Wait()

type TwinServiceProvider

type TwinServiceProvider interface {
	KadID() *kademlia.ID
	Push(data []byte) error
}

The remote service provider for the twin.

type TwinsPool

type TwinsPool struct {
	// contains filtered or unexported fields
}

func NewTwinsPool

func NewTwinsPool() *TwinsPool

func (*TwinsPool) Close

func (tp *TwinsPool) Close()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL