diff --git a/cluster.go b/cluster.go index 5f7b5fb..be93567 100644 --- a/cluster.go +++ b/cluster.go @@ -14,13 +14,19 @@ import ( "github.com/go-redis/redis/v8" ) +// 功能描述 +// 这是基于Redis的定时任务调度器,能够有效的在服务集群里面调度任务,避免了单点压力过高或单点故障问题 +// 由于所有的服务代码是一致的,也就是一个定时任务将在所有的服务都有注册,具体调度到哪个服务运行看调度结果 + +// 暂不支持删除定时器,因为这个定时器的设计是基于全局的,如果删除了,那么其他服务就不知道了 + // 单例模式 var clusterOnceLimit sync.Once // 已注册的任务列表 var clusterWorkerList sync.Map -type cluster struct { +type Cluster struct { ctx context.Context redis *redis.Client lockKey string // 全局计算的key @@ -29,14 +35,14 @@ type cluster struct { listKey string // 可执行的任务列表的key } -var clu *cluster = nil +var clu *Cluster = nil -func InitCluster(ctx context.Context, red *redis.Client) *cluster { +func InitCluster(ctx context.Context, red *redis.Client) *Cluster { clusterOnceLimit.Do(func() { - clu = &cluster{ + clu = &Cluster{ ctx: ctx, redis: red, - lockKey: "timer:cluster_globalLockKey", + lockKey: "timer:cluster_globalLockKey", // 定时器的全局锁 nextKey: "timer:cluster_nextKey", zsetKey: "timer:cluster_zsetKey", listKey: "timer:cluster_listKey", @@ -63,7 +69,7 @@ func InitCluster(ctx context.Context, red *redis.Client) *cluster { return clu } -func (c *cluster) AddTimer(ctx context.Context, uniqueKey string, spaceTime time.Duration, callback callback, extend ExtendParams) error { +func (c *Cluster) Add(ctx context.Context, uniqueKey string, spaceTime time.Duration, callback callback, extendData interface{}) error { _, ok := clusterWorkerList.Load(uniqueKey) if ok { return errors.New("key已存在") @@ -86,12 +92,12 @@ func (c *cluster) AddTimer(ctx context.Context, uniqueKey string, spaceTime time nowTime := time.Now() t := timerStr{ - BeginTime: nowTime, - NextTime: nowTime, - SpaceTime: spaceTime, - Callback: callback, - Extend: extend, - UniqueKey: uniqueKey, + BeginTime: nowTime, + NextTime: nowTime, + SpaceTime: spaceTime, + Callback: callback, + ExtendData: extendData, + UniqueKey: uniqueKey, } clusterWorkerList.Store(uniqueKey, t) @@ -119,7 +125,7 @@ func (c *cluster) AddTimer(ctx context.Context, uniqueKey string, spaceTime time } // 计算下一次执行的时间 -func (c *cluster) getNextTime() { +func (c *Cluster) getNextTime() { // log.Println("begin computer") ctx, cancel := context.WithCancel(c.ctx) @@ -184,31 +190,8 @@ func getNextExecTime(beforeTime time.Time, spaceTime time.Duration) time.Time { } // 获取任务 -func (c *cluster) getTask() { +func (c *Cluster) getTask() { // 定时去Redis获取任务 - // zb := redis.ZRangeBy{ - // Min: "0", - // Max: fmt.Sprintf("%+v", time.Now().UnixMilli()), - // } - - // taskList, _ := c.redis.ZRangeByScore(c.ctx, c.zsetKey, &zb).Result() - - // if len(taskList) == 0 { - // return - // } - - // p := c.redis.Pipeline() - - // for _, val := range taskList { - // // 添加到可执行队列 - // p.LPush(c.ctx, c.listKey, val) - // // 删除有序集合 - // p.ZRem(c.ctx, c.zsetKey, val) - // } - // _, err := p.Exec(c.ctx) - // // fmt.Println(err) - // _ = err - script := ` local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) for i,v in ipairs(token) do @@ -222,7 +205,7 @@ func (c *cluster) getTask() { } // 监听任务 -func (c *cluster) watch() { +func (c *Cluster) watch() { // 执行任务 for { keys, err := c.redis.BLPop(c.ctx, time.Second*10, c.listKey).Result() @@ -236,8 +219,6 @@ func (c *cluster) watch() { // 执行任务 func doTask(ctx context.Context, red *redis.Client, taskId string) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() defer func() { if err := recover(); err != nil { @@ -262,8 +243,6 @@ func doTask(ctx context.Context, red *redis.Client, taskId string) { } defer lock.Unlock() - ctx = context.WithValue(ctx, extendParamKey, t.Extend) - // 执行任务 - t.Callback(ctx) + t.Callback(ctx,t.ExtendData) } diff --git a/example_test.go b/cluster_test.go similarity index 100% rename from example_test.go rename to cluster_test.go diff --git a/worker.go b/once.go similarity index 57% rename from worker.go rename to once.go index e23ffea..afdbe79 100644 --- a/worker.go +++ b/once.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "log" + "runtime/debug" "strings" "sync" "time" @@ -11,25 +13,34 @@ import ( "github.com/go-redis/redis/v8" ) +// 功能描述 +// 1. 任务全局唯一 +// 2. 任务只执行一次 +// 3. 任务执行失败可以重新放入队列 + // 单次的任务队列 type worker struct { ctx context.Context zsetKey string listKey string redis *redis.Client - worker WorkerInterface + worker Callback } type WorkerCode int const ( - WorkerCodeSuccess WorkerCode = 0 - WorkerCodeAgain WorkerCode = -1 + WorkerCodeSuccess WorkerCode = 0 // 处理完成(不需要重入) + WorkerCodeAgain WorkerCode = -1 // 需要继续定时,默认原来的时间 ) // 需要考虑执行失败重新放入队列的情况 -type WorkerInterface interface { - Worker(uniqueKey string, jobType string, data map[string]interface{}) WorkerCode +type Callback interface { + // 任务执行 + // uniqueKey: 任务唯一标识 + // jobType: 任务类型,用于区分任务 + // data: 任务数据 + Worker(uniqueKey string, jobType string, data interface{}) (WorkerCode, time.Duration) } var wo *worker = nil @@ -37,21 +48,22 @@ var once sync.Once type extendData struct { Delay time.Duration - Data map[string]interface{} + Data interface{} } -func InitWorker(ctx context.Context, re *redis.Client, w WorkerInterface) *worker { +// 初始化 +func InitOnce(ctx context.Context, re *redis.Client, w Callback) *worker { once.Do(func() { wo = &worker{ ctx: ctx, - zsetKey: "timer:job_zsetkey", - listKey: "timer:job_listkey", + zsetKey: "timer:once_zsetkey", + listKey: "timer:once_listkey", redis: re, worker: w, } go wo.getTask() - go wo.execTask() + go wo.watch() }) return wo @@ -59,7 +71,7 @@ func InitWorker(ctx context.Context, re *redis.Client, w WorkerInterface) *worke // 添加任务 // 重复插入就代表覆盖 -func (w *worker) Add(uniqueKey string, jobType string, delayTime time.Duration, data map[string]interface{}) error { +func (w *worker) Add(uniqueKey string, jobType string, delayTime time.Duration, data interface{}) error { if delayTime.Abs() != delayTime { return fmt.Errorf("时间间隔不能为负数") } @@ -125,8 +137,8 @@ Loop: } } -// 执行任务 -func (w *worker) execTask() { +// 监听任务 +func (w *worker) watch() { for { keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() if err != nil { @@ -134,26 +146,39 @@ func (w *worker) execTask() { continue } - go func() { - s := strings.Split(keys[1], "[:]") + go w.doTask(keys[1]) - // 读取数据 - str, err := w.redis.Get(w.ctx, keys[1]).Result() - if err != nil { - fmt.Println("execJob err:", err) - return - } - ed := extendData{} - json.Unmarshal([]byte(str), &ed) - - fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05")) - code := w.worker.Worker(s[0], s[1], ed.Data) - - if code == WorkerCodeAgain { - // 重新放入队列 - fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05")) - w.Add(s[0], s[1], ed.Delay, ed.Data) - } - }() + } +} + +func (w *worker) doTask(key string) { + defer func() { + if err := recover(); err != nil { + fmt.Println("timer:定时器出错", err) + log.Println("errStack", string(debug.Stack())) + } + }() + + s := strings.Split(key, "[:]") + + // 读取数据 + str, err := w.redis.Get(w.ctx, key).Result() + if err != nil { + fmt.Println("execJob err:", err) + return + } + ed := extendData{} + json.Unmarshal([]byte(str), &ed) + + fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05")) + code, t := w.worker.Worker(s[0], s[1], ed.Data) + + if code == WorkerCodeAgain { + // 重新放入队列 + fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05")) + if t != 0 && t == t.Abs() { + ed.Delay = t + } + w.Add(s[0], s[1], ed.Delay, ed.Data) } } diff --git a/once_test.go b/once_test.go new file mode 100644 index 0000000..2b97dd1 --- /dev/null +++ b/once_test.go @@ -0,0 +1,2 @@ +package timer + diff --git a/single.go b/single.go index 638dc02..827a8a9 100644 --- a/single.go +++ b/single.go @@ -14,27 +14,23 @@ import ( // 定时器 // 原理:每毫秒的时间触发 +// 单机版重复时间间隔定时器 // uuid -> timerStr var timerMap = make(map[string]*timerStr) var timerMapMux sync.Mutex -var timerCount int // 当前定时数目 -var onceLimit sync.Once // 实现单例 +var timerCount int // 当前定时数目 +var onceLimit sync.Once // 实现单例 +type Single struct{} - - - - -type single struct{} - -var sin *single = nil +var sin *Single = nil // 定时器类 -func InitSingle(ctx context.Context) *single { +func InitSingle(ctx context.Context) *Single { onceLimit.Do(func() { - sin = &single{} + sin = &Single{} timer := time.NewTicker(time.Millisecond * 200) go func(ctx context.Context) { @@ -47,7 +43,7 @@ func InitSingle(ctx context.Context) *single { continue } // 迭代定时器 - sin.iteratorTimer(ctx, t) + sin.iterator(ctx, t) // fmt.Println("timer: 执行") case <-ctx.Done(): // 跳出循环 @@ -62,7 +58,12 @@ func InitSingle(ctx context.Context) *single { } // 间隔定时器 -func (s *single) AddTimer(space time.Duration, call callback, extend ExtendParams) (int, error) { +// @param space 间隔时间 +// @param call 回调函数 +// @param extend 附加参数 +// @return int 定时器索引 +// @return error 错误 +func (s *Single) Add(space time.Duration, call callback, extend interface{}) (int, error) { timerMapMux.Lock() defer timerMapMux.Unlock() @@ -81,7 +82,7 @@ func (s *single) AddTimer(space time.Duration, call callback, extend ExtendParam SpaceTime: space, CanRunning: make(chan struct{}, 1), UniqueKey: "", - Extend: extend, + ExtendData: extend, } timerMap[fmt.Sprintf("%d", timerCount)] = &t @@ -94,21 +95,15 @@ func (s *single) AddTimer(space time.Duration, call callback, extend ExtendParam return timerCount, nil } -// 添加需要定时的规则 -func (s *single) AddToTimer(space time.Duration, call callback) int { - extend := ExtendParams{} - count, _ := s.AddTimer(space, call, extend) - return count -} - -func (s *single) DelToTimer(index string) { +// 删除定时器 +func (s *Single) Del(index string) { timerMapMux.Lock() defer timerMapMux.Unlock() delete(timerMap, index) } // 迭代定时器列表 -func (s *single) iteratorTimer(ctx context.Context, nowTime time.Time) { +func (s *Single) iterator(ctx context.Context, nowTime time.Time) { timerMapMux.Lock() defer timerMapMux.Unlock() @@ -157,7 +152,7 @@ func (s *single) iteratorTimer(ctx context.Context, nowTime time.Time) { } }() // fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag) - s.timerAction(ctx, v.Callback, v.UniqueKey, v.Extend) + s.doTask(ctx, v.Callback, v.UniqueKey, v.ExtendData) default: // fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag) return @@ -182,28 +177,12 @@ func (s *single) iteratorTimer(ctx context.Context, nowTime time.Time) { // 定时器操作类 // 这里不应painc -func (s *single) timerAction(ctx context.Context, call callback, uniqueKey string, extend ExtendParams) bool { +func (s *Single) doTask(ctx context.Context, call callback, uniqueKey string, extend interface{}) error { defer func() { if err := recover(); err != nil { fmt.Println("timer:定时器出错", err) log.Println("errStack", string(debug.Stack())) } }() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - // 附加数据 - ctx = context.WithValue(ctx, extendParamKey, extend) - - return call(ctx) -} - -// 快捷方法 -func GetExtendParams(ctx context.Context) (*ExtendParams, error) { - val := ctx.Value(extendParamKey) - params, ok := val.(ExtendParams) - if !ok { - return nil, errors.New("没找到参数") - } - return ¶ms, nil + return call(ctx, extend) } diff --git a/timer_test.go b/single_test.go similarity index 100% rename from timer_test.go rename to single_test.go diff --git a/types.go b/types.go index 60a186f..4fc5600 100644 --- a/types.go +++ b/types.go @@ -12,13 +12,14 @@ type timerStr struct { NextTime time.Time // [删]下一次执行的时间 SpaceTime time.Duration // 任务间隔时间 UniqueKey string // 全局唯一键 - Extend ExtendParams // 附加参数 + ExtendData interface{} // 附加参数 } // 扩展参数 -type ExtendParams struct { - Params map[string]interface{} // 带出去的参数 -} +// +// type ExtendParams struct { +// Params map[string]interface{} // 带出去的参数 +// } var nextTime = time.Now() // 下一次执行的时间 type ContextValueKey string // 定义context 传递的Key类型 @@ -27,4 +28,4 @@ const ( ) // 定义各个回调函数 -type callback func(ctx context.Context) bool +type callback func(ctx context.Context, extendData interface{}) error