agscheduler

package module
v0.5.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2024 License: MIT Imports: 20 Imported by: 0

README

AGScheduler

test codecov Go Report Card Go Reference GitHub release (with filter) GitHub go.mod Go version (subdirectory of monorepo) license

Advanced Golang Scheduler (AGScheduler) is a task scheduling library for Golang that supports multiple scheduling types, dynamically changing and persistent jobs, remote call, and cluster

English | 简体中文

Features

  • Supports three scheduling types
    • One-off execution
    • Interval execution
    • Cron-style scheduling
  • Supports multiple job store methods
  • Supports remote call
  • Supports cluster
    • Remote worker nodes
    • Scheduler high availability (Experimental)

Framework

Framework

Usage

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/kwkwc/agscheduler"
	"github.com/kwkwc/agscheduler/stores"
)

func printMsg(ctx context.Context, j agscheduler.Job) {
	slog.Info(fmt.Sprintf("Run job `%s` %s\n\n", j.FullName(), j.Args))
}

func main() {
	agscheduler.RegisterFuncs(
		agscheduler.FuncPkg{Func: printMsg},
	)

	store := &stores.MemoryStore{}
	scheduler := &agscheduler.Scheduler{}
	scheduler.SetStore(store)

	job1 := agscheduler.Job{
		Name:     "Job1",
		Type:     agscheduler.TYPE_INTERVAL,
		Interval: "2s",
		Timezone: "UTC",
		Func:     printMsg,
		Args:     map[string]any{"arg1": "1", "arg2": "2", "arg3": "3"},
	}
	job1, _ = scheduler.AddJob(job1)
	slog.Info(fmt.Sprintf("%s.\n\n", job1))

	job2 := agscheduler.Job{
		Name:     "Job2",
		Type:     agscheduler.TYPE_CRON,
		CronExpr: "*/1 * * * *",
		Timezone: "Asia/Shanghai",
		FuncName: "main.printMsg",
		Args:     map[string]any{"arg4": "4", "arg5": "5", "arg6": "6", "arg7": "7"},
	}
	job2, _ = s.AddJob(job2)
	slog.Info(fmt.Sprintf("%s.\n\n", job2))

	job3 := agscheduler.Job{
		Name:     "Job3",
		Type:     agscheduler.TYPE_DATETIME,
		StartAt:  "2023-09-22 07:30:08",
		Timezone: "America/New_York",
		Func:     printMsg,
		Args:     map[string]any{"arg8": "8", "arg9": "9"},
	}
	job3, _ = s.AddJob(job3)
	slog.Info(fmt.Sprintf("%s.\n\n", job3))

	jobs, _ := s.GetAllJobs()
	slog.Info(fmt.Sprintf("Scheduler get all jobs %s.\n\n", jobs))

	scheduler.Start()

	select {}
}

Register Funcs

Since golang can't serialize functions, you need to register them with RegisterFuncs before scheduler.Start()

gRPC

// Server
grservice := services.GRPCService{
	Scheduler: scheduler,
	Address:   "127.0.0.1:36360",
}
grservice.Start()

// Client
conn, _ := grpc.Dial("127.0.0.1:36360", grpc.WithTransportCredentials(insecure.NewCredentials()))
client := pb.NewSchedulerClient(conn)
client.AddJob(ctx, job)

HTTP

// Server
hservice := services.HTTPService{
	Scheduler: scheduler,
	Address:   "127.0.0.1:36370",
}
hservice.Start()

// Client
mJob := map[string]any{...}
bJob, _ := json.Marshal(mJob)
resp, _ := http.Post("http://127.0.0.1:36370/scheduler/job", "application/json", bytes.NewReader(bJob))

Cluster

// Main Node
cnMain := &agscheduler.ClusterNode{
	Endpoint:     "127.0.0.1:36380",
	EndpointGRPC: "127.0.0.1:36360",
	EndpointHTTP: "127.0.0.1:36370",
	Queue:        "default",
}
schedulerMain.SetStore(storeMain)
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()

// Worker Node
cnNode := &agscheduler.ClusterNode{
	EndpointMain: "127.0.0.1:36380",
	Endpoint:     "127.0.0.1:36381",
	EndpointGRPC: "127.0.0.1:36361",
	EndpointHTTP: "127.0.0.1:36371",
	Queue:        "worker",
}
schedulerNode.SetStore(storeNode)
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()

Cluster HA (High Availability, Experimental)


// HA requires the following conditions to be met:
//
// 1. The number of HA nodes in the cluster must be odd
// 2. All HA nodes need to connect to the same Store (excluding `MemoryStore`)
// 3. The `Mode` of the `ClusterNode` needs to be set to `HA`
// 4. The main HA node must be started first

// Main HA Node
cnMain := &agscheduler.ClusterNode{..., Mode: "HA"}

// HA Node
cnNode1 := &agscheduler.ClusterNode{..., Mode: "HA"}
cnNode2 := &agscheduler.ClusterNode{..., Mode: "HA"}

// Worker Node
cnNode3 := &agscheduler.ClusterNode{...}

Base API

gRPC Function HTTP Method HTTP Endpoint
GetInfo GET /info
GetFuncs GET /funcs

Scheduler API

gRPC Function HTTP Method HTTP Endpoint
AddJob POST /scheduler/job
GetJob GET /scheduler/job/:id
GetAllJobs GET /scheduler/jobs
UpdateJob PUT /scheduler/job
DeleteJob DELETE /scheduler/job/:id
DeleteAllJobs DELETE /scheduler/jobs
PauseJob POST /scheduler/job/:id/pause
ResumeJob POST /scheduler/job/:id/resume
RunJob POST /scheduler/job/run
ScheduleJob POST /scheduler/job/schedule
Start POST /scheduler/start
Stop POST /scheduler/stop

Cluster API

gRPC Function HTTP Method HTTP Endpoint
GetNodes GET /cluster/nodes

Examples

Complete example

Development

# Clone code
git clone git@github.com:kwkwc/agscheduler.git

# Working directory
cd agscheduler

# Install dependencies
make install

# Up CI services
make up-ci-services

# Run check
make check-all

Thanks

APScheduler

simple-raft

Documentation

Index

Constants

View Source
const (
	TYPE_DATETIME = "datetime"
	TYPE_INTERVAL = "interval"
	TYPE_CRON     = "cron"
)

constant indicating a job's type

View Source
const (
	STATUS_RUNNING = "running"
	STATUS_PAUSED  = "paused"
)

constant indicating a job's status

View Source
const Version = "0.5.3"

Variables

View Source
var FuncMap = make(map[string]FuncPkg)

Record the actual path of function and the corresponding function. Since golang can't serialize functions, need to register them with `RegisterFuncs` before using it.

View Source
var GetClusterNode = (*Scheduler).getClusterNode
View Source
var GetStore = (*Scheduler).getStore

Functions

func CalcNextRunTime

func CalcNextRunTime(j Job) (time.Time, error)

Calculate the next run time, different job type will be calculated in different ways, when the job is paused, will return `9999-09-09 09:09:09`.

func FuncMapReadable added in v0.5.3

func FuncMapReadable() []map[string]string

func GetNextRunTimeMax added in v0.5.3

func GetNextRunTimeMax() (time.Time, error)

func JobToPbJobPtr added in v0.0.9

func JobToPbJobPtr(j Job) (*pb.Job, error)

Used to gRPC Protobuf

func JobsToPbJobsPtr added in v0.0.9

func JobsToPbJobsPtr(js []Job) (*pb.Jobs, error)

Used to gRPC Protobuf

func RegisterFuncs

func RegisterFuncs(fps ...FuncPkg)

func StateDump added in v0.0.9

func StateDump(j Job) ([]byte, error)

Serialize Job and convert to Bytes

Types

type ClusterNode added in v0.2.0

type ClusterNode struct {
	// Main node RPC listening address.
	// If you are the main, `EndpointMain` is the same as `Endpoint`.
	// Default: `127.0.0.1:36380`
	EndpointMain string
	// The unique identifier of this node.
	// RPC listening address.
	// Used to expose the cluster's internal API.
	// Default: `127.0.0.1:36380`
	Endpoint string
	// gRPC listening address.
	// Used to expose the external API.
	// Default: `127.0.0.1:36360`
	EndpointGRPC string
	// HTTP listening address.
	// Used to expose the external API.
	// Default: `127.0.0.1:36370`
	EndpointHTTP string
	// Useful when a job specifies a queue.
	// A queue can correspond to multiple nodes.
	// Default: `default`
	Queue string
	// Node mode, for Scheduler high availability.
	// If the value is `HA`, the node will join the raft group.
	// Default: “, Options `HA`
	Mode string

	// Bind to each other and the Scheduler.
	Scheduler *Scheduler

	// For Scheduler high availability.
	// Bind to each other and the Raft.
	Raft *Raft
	// Used to mark the status of Cluster Scheduler operation.
	SchedulerCanStart bool
	// contains filtered or unexported fields
}

Each node provides `Cluster RPC`, `gRPC`, `HTTP` services, but only the main node starts the scheduler, the other worker nodes register with the main node and then run jobs from the main node via the RPC's `RunJob` API.

func (*ClusterNode) GetEndpointMain added in v0.4.0

func (cn *ClusterNode) GetEndpointMain() string

func (*ClusterNode) HANodeMap added in v0.3.0

func (cn *ClusterNode) HANodeMap() TypeNodeMap

func (*ClusterNode) IsMainNode added in v0.3.0

func (cn *ClusterNode) IsMainNode() bool

func (*ClusterNode) MainNode added in v0.3.0

func (cn *ClusterNode) MainNode() map[string]any

func (*ClusterNode) NodeMapCopy added in v0.3.0

func (cn *ClusterNode) NodeMapCopy() TypeNodeMap

func (*ClusterNode) NodeMapToPbNodesPtr added in v0.4.0

func (cn *ClusterNode) NodeMapToPbNodesPtr() *pb.Nodes

Used to gRPC Protobuf

func (*ClusterNode) RPCHeartbeat added in v0.4.0

func (cn *ClusterNode) RPCHeartbeat(args *Node, reply *Node)

RPC API

func (*ClusterNode) RPCRegister added in v0.2.0

func (cn *ClusterNode) RPCRegister(args *Node, reply *Node)

RPC API

func (*ClusterNode) RegisterNodeRemote added in v0.2.0

func (cn *ClusterNode) RegisterNodeRemote(ctx context.Context) error

Used for worker node

After initialization, node need to register with the main node and synchronize cluster node information.

func (*ClusterNode) SetEndpointMain added in v0.4.0

func (cn *ClusterNode) SetEndpointMain(endpoint string)

type FuncPkg added in v0.5.3

type FuncPkg struct {
	Func func(context.Context, Job)
	// About this function.
	Info string
}

type FuncUnregisteredError added in v0.0.7

type FuncUnregisteredError string

func (FuncUnregisteredError) Error added in v0.0.7

func (e FuncUnregisteredError) Error() string

type HeartbeatArgs added in v0.3.0

type HeartbeatArgs struct {
	Term           int
	LeaderEndpoint string

	SchedulerCanStart bool
}

type HeartbeatReply added in v0.3.0

type HeartbeatReply struct {
	Term int
}

type Job

type Job struct {
	// The unique identifier of this job, automatically generated.
	// It should not be set manually.
	Id string `json:"id"`
	// User defined.
	Name string `json:"name"`
	// Optional: `TYPE_DATETIME` | `TYPE_INTERVAL` | `TYPE_CRON`
	Type string `json:"type"`
	// It can be used when Type is `TYPE_DATETIME`.
	StartAt string `json:"start_at"`
	// This field is useless.
	EndAt string `json:"end_at"`
	// It can be used when Type is `TYPE_INTERVAL`.
	Interval string `json:"interval"`
	// It can be used when Type is `TYPE_CRON`.
	CronExpr string `json:"cron_expr"`
	// Refer to `time.LoadLocation`.
	// Default: `UTC`
	Timezone string `json:"timezone"`
	// The job actually runs the function,
	// and you need to register it through 'RegisterFuncs' before using it.
	// Since it cannot be stored by serialization,
	// when using gRPC or HTTP calls, you should use `FuncName`.
	Func func(context.Context, Job) `json:"-"`
	// The actual path of `Func`.
	// This field has a higher priority than `Func`
	FuncName string `json:"func_name"`
	// Arguments for `Func`.
	Args map[string]any `json:"args"`
	// The running timeout of `Func`.
	// Default: `1h`
	Timeout string `json:"timeout"`
	// Used in cluster mode, if empty, randomly pick a node to run `Func`.
	Queues []string `json:"queues"`

	// Automatic update, not manual setting.
	LastRunTime time.Time `json:"last_run_time"`
	// Automatic update, not manual setting.
	// When the job is paused, this field is set to `9999-09-09 09:09:09`.
	NextRunTime time.Time `json:"next_run_time"`
	// Optional: `STATUS_RUNNING` | `STATUS_PAUSED`
	// It should not be set manually.
	Status string `json:"status"`
}

Carry the information of the scheduled job

func PbJobPtrToJob added in v0.0.9

func PbJobPtrToJob(pbJob *pb.Job) Job

Used to gRPC Protobuf

func PbJobsPtrToJobs added in v0.0.9

func PbJobsPtrToJobs(pbJs *pb.Jobs) []Job

Used to gRPC Protobuf

func StateLoad added in v0.0.9

func StateLoad(state []byte) (Job, error)

Deserialize Bytes and convert to Job

func (*Job) FullName added in v0.0.9

func (j *Job) FullName() string

func (*Job) LastRunTimeWithTimezone added in v0.0.8

func (j *Job) LastRunTimeWithTimezone() time.Time

func (*Job) NextRunTimeWithTimezone added in v0.0.8

func (j *Job) NextRunTimeWithTimezone() time.Time

func (Job) String

func (j Job) String() string

type JobNotFoundError added in v0.0.6

type JobNotFoundError string

func (JobNotFoundError) Error added in v0.0.6

func (e JobNotFoundError) Error() string

type JobSlice added in v0.1.2

type JobSlice []Job

`sort.Interface`, sorted by 'NextRunTime', ascend.

func (JobSlice) Len added in v0.1.2

func (js JobSlice) Len() int

func (JobSlice) Less added in v0.1.2

func (js JobSlice) Less(i, j int) bool

func (JobSlice) Swap added in v0.1.2

func (js JobSlice) Swap(i, j int)

type JobTimeoutError added in v0.2.1

type JobTimeoutError struct {
	FullName string
	Timeout  string
	Err      error
}

func (*JobTimeoutError) Error added in v0.2.1

func (e *JobTimeoutError) Error() string

type Node added in v0.2.0

type Node struct {
	EndpointMain string
	Endpoint     string
	EndpointGRPC string
	EndpointHTTP string
	Queue        string
	Mode         string

	NodeMap TypeNodeMap
}

type Raft added in v0.3.0

type Raft struct {
	// contains filtered or unexported fields
}

func (*Raft) RPCHeartbeat added in v0.3.0

func (rf *Raft) RPCHeartbeat(args HeartbeatArgs, reply *HeartbeatReply) error

func (*Raft) RPCRequestVote added in v0.3.0

func (rf *Raft) RPCRequestVote(args VoteArgs, reply *VoteReply) error

type Role added in v0.3.0

type Role int
const (
	Follower Role = iota + 1
	Candidate
	Leader
)

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

In standalone mode, the scheduler only needs to run jobs on a regular basis. In cluster mode, the scheduler also needs to be responsible for allocating jobs to cluster nodes.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(j Job) (Job, error)

func (*Scheduler) DeleteAllJobs added in v0.0.3

func (s *Scheduler) DeleteAllJobs() error

func (*Scheduler) DeleteJob

func (s *Scheduler) DeleteJob(id string) error

func (*Scheduler) GetAllJobs added in v0.0.6

func (s *Scheduler) GetAllJobs() ([]Job, error)

func (*Scheduler) GetJob

func (s *Scheduler) GetJob(id string) (Job, error)

func (*Scheduler) Info added in v0.4.0

func (s *Scheduler) Info() map[string]any

func (*Scheduler) IsClusterMode added in v0.3.0

func (s *Scheduler) IsClusterMode() bool

func (*Scheduler) IsRunning added in v0.3.1

func (s *Scheduler) IsRunning() bool

func (*Scheduler) PauseJob

func (s *Scheduler) PauseJob(id string) (Job, error)

func (*Scheduler) ResumeJob

func (s *Scheduler) ResumeJob(id string) (Job, error)

func (*Scheduler) RunJob added in v0.1.5

func (s *Scheduler) RunJob(j Job) error

func (*Scheduler) ScheduleJob added in v0.2.2

func (s *Scheduler) ScheduleJob(j Job) error

Used in cluster mode. Select a worker node

func (*Scheduler) SetClusterNode added in v0.2.0

func (s *Scheduler) SetClusterNode(ctx context.Context, cn *ClusterNode) error

Bind the cluster node

func (*Scheduler) SetStore

func (s *Scheduler) SetStore(sto Store) error

Bind the store

func (*Scheduler) Start

func (s *Scheduler) Start()

In addition to being called manually, it is also called after `AddJob`.

func (*Scheduler) Stop

func (s *Scheduler) Stop()

In addition to being called manually, there is no job in store that will also be called.

func (*Scheduler) UpdateJob

func (s *Scheduler) UpdateJob(j Job) (Job, error)

type Store

type Store interface {
	// Initialization functions for each store,
	// called when the scheduler run `SetStore`.
	Init() error

	// Add job to this store.
	AddJob(j Job) error

	// Get the job from this store.
	//  @return error `JobNotFoundError` if there are no job.
	GetJob(id string) (Job, error)

	// Get all jobs from this store.
	GetAllJobs() ([]Job, error)

	// Update job in store with a newer version.
	UpdateJob(j Job) error

	// Delete the job from this store.
	DeleteJob(id string) error

	// Delete all jobs from this store.
	DeleteAllJobs() error

	// Get the earliest next run time of all the jobs stored in this store,
	// or `time.Time{}` if there are no job.
	// Used to set the wakeup interval for the scheduler.
	GetNextRunTime() (time.Time, error)

	// Clear all resources bound to this store.
	Clear() error
}

Defines the interface that each store must implement.

type TypeNodeMap added in v0.3.0

type TypeNodeMap map[string]map[string]any

type VoteArgs added in v0.3.0

type VoteArgs struct {
	Term              int
	CandidateEndpoint string
}

type VoteReply added in v0.3.0

type VoteReply struct {
	Term        int
	VoteGranted bool
}

Directories

Path Synopsis
cluster command
grpc command
http command
stores command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL