patrol

package module
v0.0.0-...-00c8f2d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2018 License: MIT Imports: 22 Imported by: 0

README

Patrol

Patrol is a zero-dependencies, operator friendly distributed rate limiting HTTP side-car API with eventually consistent asynchronous state replication. It uses a modified version of the Token Bucket algorithm underneath to support CRDT PN-Counter semantics.

Status

This project is alpha status. Don't use it in production yet.

Design

Patrol is designed to be:

  • Easy to deploy: No dependencies on centralized stores.
  • Operator friendly: Simple API and small configuration surface area.
  • Performant: Minimal overhead, high concurrency support.
  • Fault tolerant: Eventually consistent, best-effort state synchronization between cluster nodes.

Installation

go get github.com/tsenart/patrol/cmd/...

Deployment

Integration with edge load balancers via Lua

Patrol is meant to be deployed as a side-car to edge load balancers and reverse proxies that have dynamic routing capabilities with Lua.

The load balancer or reverse proxy needs to be extended so that it asks the side-car Patrol instance if it should pass or block a given request.

Replication

Nodes in the cluster actively replicate state to all other nodes via UDP unicast broadcasting. One message is sent to each cluster node per Bucket take operation that is triggered via the HTTP API.

The full Bucket state is replicated which fits in less than 256 bytes. Together with its merge semantics, this makes a Bucket a state based Convergent Replicated Data Type (CvRDT) based on a PN-Counter.

Clock synchronization

While the sort of rate limiting supported by Patrol is time based (e.g 100 requests per minute), it does not depend on node clocks to be synchronized (e.g via ntpd).

A Bucket stores the local time when it was created in a given node as well as a relative elapsed duration which represents how much the Bucket has advanced in time via successful Takes. Only the elapsed duration is replicated to other nodes and merged with CRDT G-Counter semantics; this value is global across the cluster, but is added to the local, per Bucket Created timestamp to calculate time deltas between successive Take operations and, hence, the number of tokens to refill over time.

By keeping the Created timestamps local and using only relative time arithmetic, we avoid the need to synchronize clocks across the cluster.

Consistency, Availability, Partition-Tolerance (CAP)

Under a network partition, nodes won't be able to actively replicate Bucket state to nodes on other sides of the partition. The effect of this is that a Bucket's global rate limit will be multiplied by the number of sides in a partition and will lead to temporary policy violations until the partition heals; in other words, Patrol fails-closed under netsplits, accepting all requests as long as they don't exceed the global rate limit, ignoring what's going on in the rest of the cluster.

This is a choice of Availability over Consistency: AP in the CAP theorem.

In the future, it might be interesting to make this trade-off configurable and to instead fail open under network partitions, and, thus, reject all requests.

Cluster discovery
static

With static configuration, the ip:port where the replicator service of cluster nodes is bound to should be specified with multiple -cluster-node flags.

A config management tool like Ansible is recommended to automate the provisioning of the OS service scripts with this configuration pre-populated.

API

POST /take/:bucket?rate=30:1m&count=1

Takes count number of tokens from the given :bucket (e.g. IP address) which is replenished at the given rate. If the bucket doesn't exist it creates one.

If not enough tokens are available, an HTTP 429 Too Many Requests response code is returned. Otherwise, an HTTP 200 OK is returned.

Here are examples of configuration values for the rate parameter:

  • 1:1m: 1 token per minute
  • 100:1s: 100 tokens per second
  • 50:1h: 50 tokens per hour

Testing

go test -v ./...

Future work

  • More comprehensive tests.
  • Load test on a real cluster and iterate on results.
  • Write and publish Docker image.
  • Provide working examples of Lua integrations with nginx and Apache Traffic Server.
  • Instrument with Prometheus.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNameTooLarge = fmt.Errorf("bucket name larger than %d", maxBucketNameLength)

ErrNameTooLarge is returns by Bucket.MarshalBinary if the name of the Bucket exceeds the length of 231.

Functions

This section is empty.

Types

type API

type API struct {
	http.Handler
	// contains filtered or unexported fields
}

API implements the Patrol service HTTP API.

func NewAPI

func NewAPI(l *zap.Logger, clock func() time.Time, repo Repo) *API

NewAPI returns a new Patrol API.

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket implements a simple Token Bucket with underlying CRDT PN-Counter semantics which allow it to be merged without coordination with other Buckets.

func (*Bucket) IsZero

func (b *Bucket) IsZero() bool

IsZero returns true if the Bucket's fields are zero valued (apart from the Name and Created timestamp).

func (*Bucket) MarshalBinary

func (b *Bucket) MarshalBinary() ([]byte, error)

MarshalBinary implements the encoding.BinaryMarshaler interface.

func (*Bucket) MarshalLogObject

func (b *Bucket) MarshalLogObject(enc zapcore.ObjectEncoder) error

MarshalLogObject implements the zap.ObjectMarshaler interface

func (*Bucket) Merge

func (b *Bucket) Merge(others ...*Bucket)

Merge merges multiple Buckets using PN-counter CRDT semantics with its counters, picking the largest value for each field.

func (*Bucket) String

func (b *Bucket) String() string

String implements the Stringer interface.

func (*Bucket) Take

func (b *Bucket) Take(now time.Time, r Rate, n uint64) (remaining uint64, ok bool)

Take attempts to take n tokens out of the Bucket with the given filling Rate at time now. It returns the number of remaing tokens and if the take was successful.

func (*Bucket) Tokens

func (b *Bucket) Tokens() uint64

Tokens returns the number of tokens in the Bucket.

func (*Bucket) UnmarshalBinary

func (b *Bucket) UnmarshalBinary(data []byte) error

UnmarshalBinary implements the encoding.BinaryUnmarshaler interface.

type Command

type Command struct {
	Log             *zap.Logger
	APIAddr         string
	NodeAddr        string
	PeerAddrs       []string
	Clock           func() time.Time // For testing
	ShutdownTimeout time.Duration
}

A Command to be used in testing and the cmd/patrol.

func (*Command) Run

func (c *Command) Run(ctx context.Context) (err error)

Run runs the Command and blocks until completion.

type LocalRepo

type LocalRepo struct {
	// contains filtered or unexported fields
}

A LocalRepo stores Buckets locally in-memory. It's safe for concurrent use.

func NewLocalRepo

func NewLocalRepo(clock func() time.Time, bs ...*Bucket) *LocalRepo

NewLocalRepo returns a new LocalRepo with the given Buckets in it.

func (*LocalRepo) GetBucket

func (r *LocalRepo) GetBucket(_ context.Context, name string) (*Bucket, bool)

GetBucket retrieves a Bucket with the given name, creating it first if it doesn't yet exist.

func (*LocalRepo) UpsertBucket

func (r *LocalRepo) UpsertBucket(_ context.Context, b *Bucket) (upserted *Bucket, ok bool)

UpsertBucket upserts the given Bucket in the Repo. If it already exists, the given Bucket is merged with the stored Bucket.

type Rate

type Rate struct {
	Freq int
	Per  time.Duration
}

Rate defines the maximum frequency of some events. Rate is represented as number of events per unit of time. A zero Rate allows no events.

func ParseRate

func ParseRate(v string) (r Rate, err error)

ParseRate returns a new Rate parsed from the give string.

func (Rate) Interval

func (r Rate) Interval() time.Duration

Interval returns the Rate's interval between events.

func (Rate) IsZero

func (r Rate) IsZero() bool

IsZero returns true if either Freq or Per are zero valued.

func (Rate) String

func (r Rate) String() string

String implements the Stringer interface.

func (Rate) Tokens

func (r Rate) Tokens(d time.Duration) float64

Tokens is a unit conversion function from a time duration to the number of tokens which could be accumulated during that duration at the given rate.

type ReplicatedRepo

type ReplicatedRepo struct {
	// contains filtered or unexported fields
}

A ReplicatedRepo stores, retrieves and replicates Buckets across the cluster.

func NewReplicatedRepo

func NewReplicatedRepo(log *zap.Logger, r Repo, addr string, peers []string) (*ReplicatedRepo, error)

NewReplicatedRepo returns a new Repo that receives and sends UDP packets from the given addr to all peers.

func (*ReplicatedRepo) GetBucket

func (r *ReplicatedRepo) GetBucket(ctx context.Context, name string) (*Bucket, bool)

GetBucket gets a Bucket by its name from the local Repo. It creates if it doesn't exist, asking the cluster to send their most up to date version of the Bucket asynchronously.

func (*ReplicatedRepo) Receive

func (r *ReplicatedRepo) Receive(ctx context.Context) error

Receive starts receiving and applying Bucket state updates from other peers.

func (*ReplicatedRepo) UpsertBucket

func (r *ReplicatedRepo) UpsertBucket(ctx context.Context, b *Bucket) (upserted *Bucket, ok bool)

UpsertBucket upserts the given Bucket and broadcasts to all nodes in the cluster.

type Repo

type Repo interface {
	GetBucket(ctx context.Context, name string) (*Bucket, bool)
	UpsertBucket(ctx context.Context, b *Bucket) (merged *Bucket, created bool)
}

A Repo creates Buckets and allows for them to be retrieved later. Implementations must be safe for concurrent use.

Directories

Path Synopsis
cmd
patrol command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL