From f46e8e220af5a11a55bd2f45dbe7dc24b0745ce2 Mon Sep 17 00:00:00 2001 From: Yun Date: Wed, 8 May 2024 13:01:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=80=E4=BA=9B=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 14 +++++++++----- cmd/main.go | 2 +- once.go | 29 ++++++++++++++++------------- readme.md | 23 ++++++++++++++--------- single.go | 3 ++- 5 files changed, 42 insertions(+), 29 deletions(-) diff --git a/cluster.go b/cluster.go index c4b0e73..afe3014 100644 --- a/cluster.go +++ b/cluster.go @@ -48,11 +48,11 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts ctx: ctx, redis: red, logger: op.logger, - lockKey: keyPrefix + "timer:cluster_globalLockKey", // 定时器的全局锁 - nextKey: keyPrefix + "timer:cluster_nextKey", // 下一次 - zsetKey: keyPrefix + "timer:cluster_zsetKey", // 有序集合 - listKey: keyPrefix + "timer:cluster_listKey", // 列表 - setKey: keyPrefix + "timer:cluster_setKey", // 重入集合 + lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 + nextKey: "timer:cluster_nextKey" + keyPrefix, // 下一次 + zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 + listKey: "timer:cluster_listKey" + keyPrefix, // 列表 + setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 } // 监听任务 @@ -228,6 +228,7 @@ func (c *Cluster) getNextTime() { nowTime := time.Now() + // 根据内部注册的任务列表计算下一次执行的时间 clusterWorkerList.Range(func(key, value interface{}) bool { val := value.(timerStr) beforeTime := execTime[val.TaskId] @@ -262,6 +263,8 @@ func getNextExecTime(ts timerStr) time.Time { } nextTime := ts.NextTime.Add(ts.SpaceTime) ts.NextTime = nextTime + + // 递归计算直到拿到下一次执行的时间 if nextTime.Before(nowTime) { nextTime = getNextExecTime(ts) } @@ -305,6 +308,7 @@ func (c *Cluster) watch() { } }() + // 处理重入任务 go func() { for { taskId, err := c.redis.SPop(c.ctx, c.setKey).Result() diff --git a/cmd/main.go b/cmd/main.go index ff811de..39aacf7 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "code.yun.ink/pkg/timerx" + "github.com/yuninks/timerx" "github.com/go-redis/redis/v8" ) diff --git a/once.go b/once.go index cfbaa25..699764f 100644 --- a/once.go +++ b/once.go @@ -19,8 +19,9 @@ import ( // 3. 任务执行失败支持快捷重新加入队列 // 单次的任务队列 -type worker struct { +type Once struct { ctx context.Context + logger Logger zsetKey string listKey string redis *redis.Client @@ -43,7 +44,7 @@ type Callback interface { Worker(jobType string, uniqueKey string, data interface{}) (WorkerCode, time.Duration) } -var wo *worker = nil +var wo *Once = nil var once sync.Once type extendData struct { @@ -52,15 +53,16 @@ type extendData struct { } // 初始化 -func InitOnce(ctx context.Context, re *redis.Client, jobGlobalName string, jobCallback Callback) *worker { - +func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Callback, opts ...Option) *Once { + op := newOptions(opts...) once.Do(func() { - wo = &worker{ + wo = &Once{ ctx: ctx, - zsetKey: "timer:once_zsetkey" + jobGlobalName, - listKey: "timer:once_listkey" + jobGlobalName, + logger: op.logger, + zsetKey: "timer:once_zsetkey" + keyPrefix, + listKey: "timer:once_listkey" + keyPrefix, redis: re, - worker: jobCallback, + worker: call, } go wo.getTask() go wo.watch() @@ -71,7 +73,7 @@ func InitOnce(ctx context.Context, re *redis.Client, jobGlobalName string, jobCa // 添加任务 // 重复插入就代表覆盖 -func (w *worker) Add(jobType string, uniqueKey string, delayTime time.Duration, data interface{}) error { +func (w *Once) Add(jobType string, uniqueKey string, delayTime time.Duration, data interface{}) error { if delayTime.Abs() != delayTime { return fmt.Errorf("时间间隔不能为负数") } @@ -101,7 +103,7 @@ func (w *worker) Add(jobType string, uniqueKey string, delayTime time.Duration, } // 删除任务 -func (w *worker) Del(jobType string, uniqueKey string) error { +func (w *Once) Del(jobType string, uniqueKey string) error { redisKey := fmt.Sprintf("%s[:]%s", jobType, uniqueKey) w.redis.Del(w.ctx, redisKey).Result() @@ -112,7 +114,7 @@ func (w *worker) Del(jobType string, uniqueKey string) error { } // 获取任务 -func (w *worker) getTask() { +func (w *Once) getTask() { timer := time.NewTicker(time.Millisecond * 200) defer timer.Stop() @@ -138,7 +140,7 @@ Loop: } // 监听任务 -func (w *worker) watch() { +func (w *Once) watch() { for { keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() if err != nil { @@ -151,7 +153,8 @@ func (w *worker) watch() { } } -func (w *worker) doTask(key string) { +// 执行任务 +func (w *Once) doTask(key string) { defer func() { if err := recover(); err != nil { fmt.Println("timer:定时器出错", err) diff --git a/readme.md b/readme.md index 8a25d15..5adb637 100644 --- a/readme.md +++ b/readme.md @@ -1,14 +1,19 @@ -开发目标 +# 功能支持 -1. 支持单机定时 -2. 支持集群定时 -3. 支持间隔定时 -4. 支持固定时间 -5. 支持全局唯一 +1. 支持本地任务 +2. 支持集群任务 +3. 支持单次任务 + +# 功能说明 -设计思想 -1. 不再单独区分单机还是集群,统一按集群处理,单机只是集群里面只有一个节点 -2. 计算和执行分离,计算只负责计算,执行只负责执行,计算和执行之间通过消息队列进行通信 + +# 功能实现 + +1. 集群间任务调度和任务的唯一依赖于redis进行实现 +# 缺陷 + +1. 集群部署时,存在新旧的代码混合问题,任务调度可能存在问题(需要根据实际需要进行版本上线/下线操作) + diff --git a/single.go b/single.go index 1b1266d..d8569ec 100644 --- a/single.go +++ b/single.go @@ -11,8 +11,9 @@ import ( "time" ) -// 定时器 +// 简单定时器 // 1. 这个定时器的作用范围是本机 +// 2. 适用简单的时间间隔定时任务 // uuid -> timerStr var timerMap = make(map[string]*timerStr)