diff --git a/cluster.go b/cluster.go index 12c847d..007f7a2 100644 --- a/cluster.go +++ b/cluster.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "runtime/debug" + "strconv" "sync" "time" @@ -41,6 +42,9 @@ type Cluster struct { zsetKey string // 有序集合的key listKey string // 可执行的任务列表的key setKey string // 重入集合的key + + priority int // 全局优先级 + priorityKey string // 全局优先级的key } var clu *Cluster = nil @@ -53,17 +57,19 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin op := newOptions(opts...) clu = &Cluster{ - ctx: ctx, - redis: red, - cache: cachex.NewCache(), - timeout: op.timeout, - logger: op.logger, - keyPrefix: keyPrefix, - location: op.location, - lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 - zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 - listKey: "timer:cluster_listKey" + keyPrefix, // 列表 - setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 + ctx: ctx, + redis: red, + cache: cachex.NewCache(), + timeout: op.timeout, + logger: op.logger, + keyPrefix: keyPrefix, + location: op.location, + priority: op.priority, + lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 + zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 + listKey: "timer:cluster_listKey" + keyPrefix, // 列表 + setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 + priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key } // 设置锁的超时时间 @@ -72,6 +78,20 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin // 监听任务 go clu.watch() + priorityTime := time.NewTicker(time.Second * 10) + go func(ctx context.Context) { + clu.setPriority() + Loop: + for { + select { + case <-priorityTime.C: + clu.setPriority() + case <-ctx.Done(): + break Loop + } + } + }(ctx) + timer := time.NewTicker(time.Millisecond * 200) go func(ctx context.Context) { @@ -79,6 +99,9 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin for { select { case <-timer.C: + if !clu.canRun() { + continue + } clu.getTask() clu.getNextTime() case <-ctx.Done(): @@ -90,6 +113,100 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin return clu } +// 判断是否可执行 +func (l *Cluster) canRun() bool { + // 加缓存 + str, err := l.redis.Get(l.ctx, l.priorityKey).Result() + + fmt.Println(str, err) + + if err != nil { + if err == redis.Nil && l.priority == 0 { + return true + } + l.logger.Errorf(l.ctx, "获取全局优先级失败:%s", err.Error()) + return false + } + + strPriority, err := strconv.Atoi(str) + if err != nil { + l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error()) + return false + } + if l.priority >= strPriority { + return true + } + return false +} + +// 设置全局优先级 +func (l *Cluster) setPriority() bool { + // redis lua脚本 + // 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl + script := ` + -- KEYS[1] 是全局优先级的key + local priorityKey = KEYS[1] + -- ARGV[1] 是新的优先级 + local priority = ARGV[1] + -- ARGV[2] 是过期时间 + local expireTime = ARGV[2] + + -- 校验参数完整性 + if not priorityKey or not priority or not expireTime then + return redis.error_reply("Missing required arguments") + end + + -- 尝试将字符串转换为数字 + local currentPriority = redis.call('get', priorityKey) + local currentPriorityNum = tonumber(currentPriority) + local newPriorityNum = tonumber(priority) + + if not currentPriority then + -- 如果当前优先级不存在,则设置新优先级并设置TTL + redis.call('set', priorityKey, priority, 'ex', expireTime) + return { "SET", expireTime } + elseif currentPriorityNum < newPriorityNum then + -- 如果当前优先级小于新优先级,则更新优先级并更新TTL + redis.call('set', priorityKey, priority, 'ex', expireTime) + return { "RESET", expireTime } + elseif currentPriorityNum == newPriorityNum then + -- 优先级相同则更新TTL + redis.call('expire', priorityKey, expireTime) + return { "UPDATE", expireTime } + else + -- 如果当前优先级大于新优先级,则不更新 + return { "NOAUCH", '0' } + end + ` + priority := fmt.Sprintf("%d", l.priority) + + expireTime := (time.Second*30).Seconds() // 设置过期时间为1分钟 + res, err := l.redis.Eval(l.ctx, script, []string{l.priorityKey}, priority, expireTime).Result() + if err != nil { + l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error()) + return false + } + + fmt.Printf("设置全局优先级返回值:%+v", res) + + // 处理返回值,包含操作结果和 TTL + resultArray := res.([]interface{}) + if len(resultArray) < 2 { + l.logger.Errorf(l.ctx, "设置全局优先级失败: 返回值格式不正确") + return false + } + operationResult := resultArray[0].(string) + ttl := resultArray[1].(string) + + if operationResult == "SET" || operationResult == "UPDATE" { + l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority) + return true + } + _ = ttl + l.logger.Infof(l.ctx, "设置全局优先级未更新:%s", priority) + return false +} + // 每月执行一次 // @param ctx 上下文 // @param taskId 任务ID @@ -349,6 +466,13 @@ func (c *Cluster) watch() { // 执行任务 go func() { for { + + if !c.canRun() { + // 如果全局优先级不满足就不执行 + time.Sleep(time.Second * 5) + continue + } + keys, err := c.redis.BLPop(c.ctx, time.Second*10, c.listKey).Result() if err != nil { if err != redis.Nil { @@ -376,6 +500,13 @@ func (c *Cluster) watch() { // 处理重入任务 go func() { for { + + if !c.canRun() { + // 如果全局优先级不满足就不执行 + time.Sleep(time.Second * 5) + continue + } + res, err := c.redis.SPop(c.ctx, c.setKey).Result() if err != nil { if err == redis.Nil { diff --git a/cmd/main.go b/cmd/main.go index e5b099b..53e219a 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -24,8 +24,8 @@ func main() { // re() // d() - // cluster() - once() + cluster() + // once() select {} @@ -89,7 +89,7 @@ func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, func cluster() { client := getRedis() ctx := context.Background() - cluster := timerx.InitCluster(ctx, client, "test") + cluster := timerx.InitCluster(ctx, client, "test",timerx.SetPriority(101)) err := cluster.EverySpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务") fmt.Println(err) err = cluster.EveryMinute(ctx, "test_min", 15, aa, "这是分钟任务") diff --git a/option.go b/option.go index cc08bcd..bf91541 100644 --- a/option.go +++ b/option.go @@ -3,16 +3,18 @@ package timerx import "time" type Options struct { - logger Logger - location *time.Location - timeout time.Duration + logger Logger + location *time.Location + timeout time.Duration + priority int } func defaultOptions() Options { return Options{ - logger: NewLogger(), - location: time.Local, - timeout: time.Hour, + logger: NewLogger(), + location: time.Local, + timeout: time.Hour, + priority: 0, } } @@ -46,3 +48,10 @@ func SetTimeout(d time.Duration) Option { o.timeout = d } } + +// 设置优先级 +func SetPriority(priority int) Option { + return func(o *Options) { + o.priority = priority + } +}