Documentation
¶
Overview ¶
Package tasker is a light distribute producer&consumer task model based on beego.
Task ¶
the main description of message or job. the state machine like this:
+----> failed
|
pending ----+----> running -----+----> success
^ |
| v
+---------------- retry
Index ¶
- Constants
- Variables
- func Consume(topic string, fn ConsumeFn, concurency ...int) (int, error)
- func FQDN() string
- func Init(MachineID func() (uint16, error), CheckMachineID func(uint16) bool) (err error)
- func InitAllTask()
- func InitIDGEN(MachineID func() (uint16, error), CheckMachineID func(uint16) bool) error
- func MsgQConsume(m MsgQ) error
- func MsgQInitTask(m MsgQ)
- func MsgQPublish(m MsgQ) error
- func MsgQPublishWithRetry(m MsgQ, timeout time.Duration, retry int) error
- func RegisterModel()
- func RegisterTask(item MsgQ)
- func SetStats(s Stats)
- func Stat(topic, status string, duration time.Duration)
- type ConsumeFn
- type Core
- type MsgQ
- type Stats
- type Task
Constants ¶
const ( TaskStatPending = "pending" TaskStatRunning = "running" TaskStatRetry = "retry" TaskStatFailed = "failed" TaskStatSuccess = "success" )
Variables ¶
var InstanceID uint16
InstanceID is the tasker instance uniq key.
var IsMaster bool
IsMaster when true, the instance is master instance.
var RegisteredTask map[string]MsgQ
RegisteredTask is a map record all consumer task
var UniqID *sonyflake.Sonyflake
UniqID use for generate worker id.
Functions ¶
func FQDN ¶
func FQDN() string
FQDN Get Fully Qualified Domain Name returns "unknown" or hostanme in case of error
func Init ¶
Init will initialize the tasker instance, include:
- generate the InstanceID use MachineID func, use instance private ip address when MachineID is nil
- start race master in goroutine
- initialize all task
func InitAllTask ¶
func InitAllTask()
InitAllTask will init all RegisteredTask to beego toolbox, this will start consume task
func MsgQInitTask ¶
func MsgQInitTask(m MsgQ)
MsgQInitTask add consume task to beego toolbox task, run consumer interval.
func MsgQPublishWithRetry ¶
MsgQPublishWithRetry represents publish with retry and timeout set
func RegisterModel ¶
func RegisterModel()
func RegisterTask ¶
func RegisterTask(item MsgQ)
RegisterTask is used for consumer task at init func, register them self to the tasker
Types ¶
type Core ¶
type Core struct {
Id int
MasterInstanceID uint16 `orm:"column(master_instance_id)"`
MasterFQDN string `orm:"column(master_fqdn)"`
Updated time.Time `orm:"auto_now"`
MasterOutOfDate int64 // ms
InstanceHeartbeat int64 // ms
}
Core is the task package config table.
`MasterOutOfDate` means the time master state can be save, instance can race to be master when the duration of `Updated` to now bigger than this. `InstanceHeartbeat` is the max interval Instance should be check Master, should be less than `MasterOutOfDate`
type MsgQ ¶
type MsgQ interface {
New() MsgQ
Topic() string
TaskSpec() string
Concurency() int
Exec(uint64) error
}
MsgQ is interface for msg, all messages should be implements these interfaces.
type Task ¶
type Task struct {
Id int
Topic string
Status string
Timeout int // 超时ms
Retry int
Input string `orm:"type(text)"`
WorkerId uint64
Created time.Time `orm:"auto_now_add"`
Updated time.Time `orm:"auto_now"`
Log string `orm:"type(text)"`
}
Task is the core object for tasker package.
func NewSimpleTask ¶
NewSimpleTask is new task with default settings.