orcaq

package module
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2020 License: MIT Imports: 12 Imported by: 0

README

OrcaQ | A Lightweight Durable Worker Queue

Many thanks to the Original Author (@chiefnoah) for this awesome project!

View it here: https://github.com/chiefnoah/goalpost

TODO:

  • convert queue to use nutsdb in favor of boltdb

  • add 'schedule' job feature, to schedule jobs into the future

  • add 'repeat' job feature, to repeat the job X number of times

  • add all required methods for managing the job queue

  • add metadata/tags property to jobs

  • add human friendly label to a job

  • update test to reflect changes to package

  • add audit log to job and task state

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AuditMessage added in v0.1.3

type AuditMessage struct {
	Message string
	Time    time.Time
}

An AuditMessage represents something that has happened

type Job

type Job struct {
	Label    string          // human friendly label
	Status   string          // current status of the job. Will always be overwritten.
	ID       []byte          // this will be a UUID, stored as bytes. Will always be overwritten.
	Tasks    []*Task         // the actual tasks that make up this job.
	Deadline time.Time       // Deadline is the time by when the job should be executed. Default is now.
	AuditLog []*AuditMessage // the audit log will contain the full history of the job
}

A Job is one or more Task that represents work to be done by the Agent.

func DecodeJob

func DecodeJob(b []byte) (*Job, error)

DecodeJob decodes a gob encoded byte array into a Job struct and returns a pointer to it

func (*Job) Bytes

func (j *Job) Bytes() []byte

Bytes will encode the job to bytes

type Queue

type Queue struct {
	//ID is a unique identifier for a Queue
	ID string

	//PollRate the duration to Sleep each worker before checking the queue for jobs again
	//queue for jobs again.
	//Default: 500 milliseconds
	PollRate time.Duration
	// contains filtered or unexported fields
}

Queue represents a queue

func Init

func Init(filepath string) (*Queue, error)

Init creates a connection to the internal database and initializes the Queue type filepath must be a valid path to a file. It cannot be shared between instances of a Queue. If the file cannot be opened r/w, an error is returned.

func (*Queue) Close

func (q *Queue) Close() error

Close attempts to gracefully shutdown all workers in a queue and shutdown the db connection

func (*Queue) GetJobByID

func (q *Queue) GetJobByID(id []byte) (*Job, error)

GetJobByID returns a pointer to a Job based on the primary key identifier id

func (*Queue) ListJobs added in v0.1.3

func (q *Queue) ListJobs(postJobs bool) ([]*Job, error)

ListJobs will return a list of jobs within the selected queue

func (*Queue) PushJob

func (q *Queue) PushJob(j *Job) ([]byte, error)

PushJob pushes a job to the queue and notifies workers Job.ID is always overwritten

func (*Queue) RegisterWorker

func (q *Queue) RegisterWorker(w Worker)

RegisterWorker registers a Worker to handle queued Jobs

func (*Queue) SaveJobState added in v0.1.3

func (q *Queue) SaveJobState(job *Job, ttl uint32) error

SaveJobState will write the job state to disk

type RecoverableWorkerError

type RecoverableWorkerError struct {
	// contains filtered or unexported fields
}

RecoverableWorkerError defines an error that a worker DoWork func can return that indicates the message should be retried

func NewRecoverableWorkerError

func NewRecoverableWorkerError(message string) RecoverableWorkerError

NewRecoverableWorkerError creates a new RecoverableWorkerError

func (RecoverableWorkerError) Error

func (e RecoverableWorkerError) Error() string

type Task added in v0.1.3

type Task struct {
	ID         string          // this will be a UUID that is generated at runtime.
	Label      string          // optional human friendly label
	Status     string          // current status of the task
	RetryCount int             // number of times this task has been retried
	RetryMax   int             // number of times this task can be retried before failing hard.
	Cmd        string          // the command that is to be ran
	Arg        []string        // any arguments that the command requires
	Stdout     bytes.Buffer    // results of the cmd that would be sent to stdout
	Stderr     bytes.Buffer    // results of the cmd that would be sent to stderr
	ExitCode   int             // exit code of the task
	AuditLog   []*AuditMessage // the audit log will contain all events that are associated with this task
}

A Task is a single unit of work.

func (*Task) Run added in v0.1.3

func (t *Task) Run()

Run will attempt to execute the cmd on the task struct.

type Worker

type Worker interface {
	//Start is called when a worker picks up a job from the queue
	//Context can be used for cancelling jobs early when Close
	//is called on the Queue
	Start(context.Context, *Job) error
	//ID is a semi-unique identifier for a worker.
	ID() string
}

Worker represents a worker for handling Jobs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL