From 2fa0430403f0c74f6925479791d5786692731534 Mon Sep 17 00:00:00 2001 From: Yun Date: Fri, 25 Jul 2025 22:58:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BC=98=E5=85=88=E7=BA=A7?= =?UTF-8?q?=E7=AD=96=E7=95=A5=E5=9C=A8=E5=90=84=E9=9B=86=E7=BE=A4=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 124 ++++--------------------------------------- once.go | 44 ++++++++++----- priority/option.go | 2 +- priority/priority.go | 39 +++++++++----- 4 files changed, 65 insertions(+), 144 deletions(-) diff --git a/cluster.go b/cluster.go index b49cd95..226dffe 100644 --- a/cluster.go +++ b/cluster.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "runtime/debug" - "strconv" "sync" "time" @@ -15,6 +14,7 @@ import ( "github.com/yuninks/cachex" "github.com/yuninks/lockx" "github.com/yuninks/timerx/logger" + "github.com/yuninks/timerx/priority" ) // 功能描述 @@ -44,8 +44,8 @@ type Cluster struct { listKey string // 可执行的任务列表的key setKey string // 重入集合的key - priority int // 全局优先级 - priorityKey string // 全局优先级的key + priority *priority.Priority // 全局优先级 + priorityKey string // 全局优先级的key } var clu *Cluster = nil @@ -65,7 +65,6 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin logger: op.logger, keyPrefix: keyPrefix, location: op.location, - priority: op.priority, lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 listKey: "timer:cluster_listKey" + keyPrefix, // 列表 @@ -76,23 +75,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin // 设置锁的超时时间 lockx.InitOption(lockx.SetTimeout(op.timeout)) + // 初始化优先级 + clu.priority = priority.InitPriority(ctx, red, clu.priorityKey, op.priority, priority.SetLogger(clu.logger)) + // 监听任务 go clu.watch() - priorityTime := time.NewTicker(time.Second * 10) - go func(ctx context.Context) { - clu.setPriority() - Loop: - for { - select { - case <-priorityTime.C: - clu.setPriority() - case <-ctx.Done(): - break Loop - } - } - }(ctx) - timer := time.NewTicker(time.Millisecond * 200) go func(ctx context.Context) { @@ -100,7 +88,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin for { select { case <-timer.C: - if !clu.canRun() { + if !clu.priority.IsLatest(ctx) { continue } clu.getTask() @@ -114,100 +102,6 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin return clu } -// 判断是否可执行 -func (l *Cluster) canRun() bool { - // 加缓存 - str, err := l.redis.Get(l.ctx, l.priorityKey).Result() - - fmt.Println(str, err) - - if err != nil { - if err == redis.Nil && l.priority == 0 { - return true - } - l.logger.Errorf(l.ctx, "获取全局优先级失败:%s", err.Error()) - return false - } - - strPriority, err := strconv.Atoi(str) - if err != nil { - l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error()) - return false - } - if l.priority >= strPriority { - return true - } - return false -} - -// 设置全局优先级 -func (l *Cluster) setPriority() bool { - // redis lua脚本 - // 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl - script := ` - -- KEYS[1] 是全局优先级的key - local priorityKey = KEYS[1] - -- ARGV[1] 是新的优先级 - local priority = ARGV[1] - -- ARGV[2] 是过期时间 - local expireTime = ARGV[2] - - -- 校验参数完整性 - if not priorityKey or not priority or not expireTime then - return redis.error_reply("Missing required arguments") - end - - -- 尝试将字符串转换为数字 - local currentPriority = redis.call('get', priorityKey) - local currentPriorityNum = tonumber(currentPriority) - local newPriorityNum = tonumber(priority) - - if not currentPriority then - -- 如果当前优先级不存在,则设置新优先级并设置TTL - redis.call('set', priorityKey, priority, 'ex', expireTime) - return { "SET", expireTime } - elseif currentPriorityNum < newPriorityNum then - -- 如果当前优先级小于新优先级,则更新优先级并更新TTL - redis.call('set', priorityKey, priority, 'ex', expireTime) - return { "RESET", expireTime } - elseif currentPriorityNum == newPriorityNum then - -- 优先级相同则更新TTL - redis.call('expire', priorityKey, expireTime) - return { "UPDATE", expireTime } - else - -- 如果当前优先级大于新优先级,则不更新 - return { "NOAUCH", '0' } - end - ` - priority := fmt.Sprintf("%d", l.priority) - - expireTime := (time.Second*30).Seconds() // 设置过期时间为1分钟 - res, err := l.redis.Eval(l.ctx, script, []string{l.priorityKey}, priority, expireTime).Result() - if err != nil { - l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error()) - return false - } - - fmt.Printf("设置全局优先级返回值:%+v", res) - - // 处理返回值,包含操作结果和 TTL - resultArray := res.([]interface{}) - if len(resultArray) < 2 { - l.logger.Errorf(l.ctx, "设置全局优先级失败: 返回值格式不正确") - return false - } - operationResult := resultArray[0].(string) - ttl := resultArray[1].(string) - - if operationResult == "SET" || operationResult == "UPDATE" { - l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority) - return true - } - _ = ttl - l.logger.Infof(l.ctx, "设置全局优先级未更新:%s", priority) - return false -} - // 每月执行一次 // @param ctx 上下文 // @param taskId 任务ID @@ -468,7 +362,7 @@ func (c *Cluster) watch() { go func() { for { - if !c.canRun() { + if !c.priority.IsLatest(c.ctx) { // 如果全局优先级不满足就不执行 time.Sleep(time.Second * 5) continue @@ -502,7 +396,7 @@ func (c *Cluster) watch() { go func() { for { - if !c.canRun() { + if !c.priority.IsLatest(c.ctx) { // 如果全局优先级不满足就不执行 time.Sleep(time.Second * 5) continue diff --git a/once.go b/once.go index 5295e78..1b3b11c 100644 --- a/once.go +++ b/once.go @@ -12,6 +12,7 @@ import ( "github.com/go-redis/redis/v8" uuid "github.com/satori/go.uuid" "github.com/yuninks/timerx/logger" + "github.com/yuninks/timerx/priority" ) // 功能描述 @@ -21,13 +22,15 @@ import ( // 单次的任务队列 type Once struct { - ctx context.Context - logger logger.Logger - zsetKey string - listKey string - redis redis.UniversalClient - worker Callback - keyPrefix string + ctx context.Context + logger logger.Logger + zsetKey string + listKey string + redis redis.UniversalClient + worker Callback + keyPrefix string + priority *priority.Priority // 全局优先级 + priorityKey string // 全局优先级的key } type OnceWorkerResp struct { @@ -64,14 +67,18 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c op := newOptions(opts...) once.Do(func() { wo = &Once{ - ctx: ctx, - logger: op.logger, - zsetKey: "timer:once_zsetkey" + keyPrefix, - listKey: "timer:once_listkey" + keyPrefix, - redis: re, - worker: call, - keyPrefix: keyPrefix, + ctx: ctx, + logger: op.logger, + zsetKey: "timer:once_zsetkey" + keyPrefix, + listKey: "timer:once_listkey" + keyPrefix, + priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key + redis: re, + worker: call, + keyPrefix: keyPrefix, } + // 初始化优先级 + wo.priority = priority.InitPriority(ctx, re, wo.priorityKey, op.priority, priority.SetLogger(wo.logger)) + go wo.getTask() go wo.watch() }) @@ -170,6 +177,10 @@ Loop: for { select { case <-timer.C: + if !w.priority.IsLatest(w.ctx) { + continue + } + script := ` local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) for i,v in ipairs(token) do @@ -190,6 +201,11 @@ Loop: // 监听任务 func (w *Once) watch() { for { + if !w.priority.IsLatest(w.ctx) { + time.Sleep(time.Second * 5) + continue + } + keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() if err != nil { // fmt.Println("watch err:", err) diff --git a/priority/option.go b/priority/option.go index f99f133..e99462a 100644 --- a/priority/option.go +++ b/priority/option.go @@ -8,7 +8,7 @@ import ( type Options struct { updateInterval time.Duration // 更新间隔 - expireTime time.Duration + expireTime time.Duration // 有效时间 logger logger.Logger } diff --git a/priority/priority.go b/priority/priority.go index f7c9bb4..aa14222 100644 --- a/priority/priority.go +++ b/priority/priority.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "sync/atomic" "time" "github.com/go-redis/redis/v8" @@ -13,34 +14,37 @@ import ( // 多版本场景判断当前是否最新版本 type Priority struct { - ctx context.Context - priority int // 优先级 - redis redis.UniversalClient - redisKey string - logger logger.Logger - expireTime time.Duration + ctx context.Context + priority int // 优先级 + redis redis.UniversalClient + redisKey string + logger logger.Logger + expireTime time.Duration + updateInterval time.Duration // 更新间隔 + deadTime int64 // 缓存时间戳,单位秒 } func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int, opts ...Option) *Priority { conf := newOptions(opts...) pro := &Priority{ - ctx: ctx, - priority: priority, - redis: re, - logger: conf.logger, - redisKey: "timer:priority_" + keyPrefix, - expireTime: conf.expireTime, + ctx: ctx, + priority: priority, + redis: re, + logger: conf.logger, + redisKey: "timer:priority_" + keyPrefix, + expireTime: conf.expireTime, + updateInterval: conf.updateInterval, } // 更新间隔 - updateTnterval := time.NewTicker(conf.updateInterval) + ut := time.NewTicker(conf.updateInterval) go func(ctx context.Context) { pro.setPriority() Loop: for { select { - case <-updateTnterval.C: + case <-ut.C: pro.setPriority() case <-ctx.Done(): break Loop @@ -53,6 +57,10 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin func (l *Priority) IsLatest(ctx context.Context) bool { // 加缓存 + if atomic.LoadInt64(&l.deadTime) > time.Now().Unix() { + return true + } + str, err := l.redis.Get(l.ctx, l.redisKey).Result() if err != nil { @@ -133,6 +141,9 @@ func (l *Priority) setPriority() bool { if operationResult == "SET" || operationResult == "UPDATE" { l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority) + + atomic.StoreInt64(&l.deadTime, time.Now().Add(l.updateInterval).Unix()) + return true } _ = ttl