From 85d041753e0fbb7e22e14f7b29f5a96e7db8562c Mon Sep 17 00:00:00 2001 From: Yun Date: Wed, 24 Sep 2025 17:26:33 +0800 Subject: [PATCH] =?UTF-8?q?once=E4=BD=BF=E7=94=A8=E5=B0=81=E8=A3=85?= =?UTF-8?q?=E7=9A=84leader=E5=92=8Cheartbeat?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/main.go | 46 ++-- heartbeat/heartbeat.go | 154 ++++++++++++ heartbeat/options.go | 59 +++++ leader/leader.go | 163 ++++++++++++ leader/options.go | 51 ++++ once.go | 552 ++++++++++++++++++++++++++++------------- option.go | 6 +- 7 files changed, 844 insertions(+), 187 deletions(-) create mode 100644 heartbeat/heartbeat.go create mode 100644 heartbeat/options.go create mode 100644 leader/leader.go create mode 100644 leader/options.go diff --git a/cmd/main.go b/cmd/main.go index 2fb6de5..a6575ff 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -26,8 +26,8 @@ func main() { // re() // d() - cluster() - // once() + // cluster() + once() // prioritys() select {} @@ -38,7 +38,7 @@ func prioritys() { client := getRedis() ctx := context.Background() - pro := priority.InitPriority(ctx, client, "test", 10) + pro, _ := priority.InitPriority(ctx, client, "test", 10) for { b := pro.IsLatest(ctx) @@ -59,10 +59,13 @@ func once() { } ops := []timerx.Option{ - timerx.SetPriority(ver), + timerx.WithPriority(ver), } - one := timerx.InitOnce(ctx, client, "test", w, ops...) + one, err := timerx.InitOnce(ctx, client, "test_once", w, ops...) + if err != nil { + panic(err) + } d := OnceData{ Num: 3, @@ -76,17 +79,17 @@ func once() { // d = OnceData{ // Num: 4, // } - dd := 123 + // dd := 123 // dy, _ = json.Marshal(d) - err = one.Save("test", "test4", 2*time.Second, dd) - if err != nil { - fmt.Println(err) - } + // err = one.Save("test", "test4", 2*time.Second, dd) + // if err != nil { + // fmt.Println(err) + // } - err = one.Save("test", "test5", 5*time.Second, dd) - if err != nil { - fmt.Println(err) - } + // err = one.Save("test", "test5", 5*time.Second, dd) + // if err != nil { + // fmt.Println(err) + // } } @@ -97,6 +100,15 @@ type OnceData struct { type OnceWorker struct{} func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, taskId string, attachData interface{}) *timerx.OnceWorkerResp { + // 追加写入文件 + file, err := os.OpenFile("./test.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + panic(err) + } + defer file.Close() + + file.WriteString(fmt.Sprintf("执行时间:%s\n", time.Now().Format("2006-01-02 15:04:05"))) + fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println(taskType, taskId) @@ -119,7 +131,7 @@ func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, ta return &timerx.OnceWorkerResp{ Retry: true, AttachData: attachData, - DelayTime: 10 * time.Second, + DelayTime: time.Second, } } @@ -130,7 +142,7 @@ func cluster() { // log := loggerx.NewLogger(ctx,loggerx.SetToConsole(),loggerx.SetEscapeHTML(false)) // _ = log - cluster := timerx.InitCluster(ctx, client, "test", timerx.SetPriority(103)) + cluster, _ := timerx.InitCluster(ctx, client, "test", timerx.WithPriority(103)) err := cluster.EverySpace(ctx, "test_space1", 1*time.Second, aa, "这是秒任务1") fmt.Println(err) err = cluster.EverySpace(ctx, "test_space2", 2*time.Second, aa, "这是秒任务2") @@ -195,7 +207,7 @@ func re() { client := getRedis() ctx := context.Background() - cl := timerx.InitCluster(ctx, client, "kkkk") + cl, _ := timerx.InitCluster(ctx, client, "kkkk") cl.EverySpace(ctx, "test1", 1*time.Millisecond, aa, "data") cl.EverySpace(ctx, "test2", 1*time.Millisecond, aa, "data") cl.EverySpace(ctx, "test3", 1*time.Millisecond, aa, "data") diff --git a/heartbeat/heartbeat.go b/heartbeat/heartbeat.go new file mode 100644 index 0000000..dfb2c42 --- /dev/null +++ b/heartbeat/heartbeat.go @@ -0,0 +1,154 @@ +package heartbeat + +import ( + "context" + "errors" + "strconv" + "sync" + "time" + + "github.com/go-redis/redis/v8" + "github.com/yuninks/timerx/leader" + "github.com/yuninks/timerx/logger" + "github.com/yuninks/timerx/priority" +) + +// 心跳 +// 作用:上报实例存活状态 + +type HeartBeat struct { + ctx context.Context + cancel context.CancelFunc + + redis redis.UniversalClient // redis + logger logger.Logger + + priority *priority.Priority // (允许nil)全局优先级 + leader *leader.Leader // (允许nil)领导 + + heartbeatKey string // 心跳Key 有序集合 + instanceId string // 实例ID + + wg sync.WaitGroup +} + +func InitHeartBeat(ctx context.Context, ref redis.UniversalClient, keyPrefix string, opts ...Option) (*HeartBeat, error) { + + if ref == nil { + return nil, errors.New("redis is nil") + } + + op := newOptions(opts...) + + ctx, cancel := context.WithCancel(ctx) + + l := &HeartBeat{ + ctx: ctx, + cancel: cancel, + + heartbeatKey: "timer:heartbeat_key" + keyPrefix, + + priority: op.priority, + redis: ref, + logger: op.logger, + leader: op.leader, + instanceId: op.instanceId, + } + + l.logger.Infof(l.ctx, "InitHeartBeat InstanceId %s lockKey:%s", l.instanceId, l.heartbeatKey) + + l.startDaemon() + + return l, nil +} + +func (l *HeartBeat) Close() { + l.cancel() + l.cleanHeartbeat(true) + l.wg.Wait() +} + +func (l *HeartBeat) startDaemon() { + + l.wg.Add(1) + go l.heartbeatLoop() + + l.wg.Add(1) + go l.cleanHeartbeatLoop() + +} + +// 心跳上报 +// 需要确定当前存活的实例&当前实例是否是领导 +func (l *HeartBeat) heartbeatLoop() { + defer l.wg.Done() + + // 先执行一次 + l.heartbeat() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-l.ctx.Done(): + return + case <-ticker.C: + l.heartbeat() + } + } + +} + +// 单次心跳 +func (l *HeartBeat) heartbeat() error { + err := l.redis.ZAdd(l.ctx, l.heartbeatKey, &redis.Z{ + Score: float64(time.Now().UnixMilli()), + Member: l.instanceId, + }).Err() + + if err != nil { + l.logger.Errorf(l.ctx, "heartbeat redis.ZAdd err:%v", err) + return err + } + + return nil +} + +// 心跳清理(leader可操作) +func (l *HeartBeat) cleanHeartbeatLoop() { + defer l.wg.Done() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-l.ctx.Done(): + return + case <-ticker.C: + if l.leader != nil { + if !l.leader.IsLeader() { + // l.logger.Infof(l.ctx, "cleanHeartbeatLoop not leader") + continue + } + } + l.cleanHeartbeat(false) + } + } + +} + +// 单次清理 +func (l *HeartBeat) cleanHeartbeat(cleanSelf bool) error { + + // 仅移除自己 + if cleanSelf { + return l.redis.ZRem(l.ctx, l.heartbeatKey, l.instanceId).Err() + } + + // 移除心跳 + l.redis.ZRemRangeByScore(l.ctx, l.heartbeatKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Second).UnixMilli(), 10)).Err() + + return nil +} diff --git a/heartbeat/options.go b/heartbeat/options.go new file mode 100644 index 0000000..24c12ef --- /dev/null +++ b/heartbeat/options.go @@ -0,0 +1,59 @@ +package heartbeat + +import ( + "github.com/google/uuid" + "github.com/yuninks/timerx/leader" + "github.com/yuninks/timerx/logger" + "github.com/yuninks/timerx/priority" +) + +type Options struct { + logger logger.Logger + instanceId string + priority *priority.Priority // 全局优先级 + leader *leader.Leader +} + +func defaultOptions() Options { + + u, _ := uuid.NewV7() + + return Options{ + logger: logger.NewLogger(), + instanceId: u.String(), + } +} + +type Option func(*Options) + +func newOptions(opts ...Option) Options { + o := defaultOptions() + for _, opt := range opts { + opt(&o) + } + return o +} + +func WithLogger(log logger.Logger) Option { + return func(o *Options) { + o.logger = log + } +} + +func WithPriority(p *priority.Priority) Option { + return func(o *Options) { + o.priority = p + } +} + +func WithLeader(l *leader.Leader) Option { + return func(o *Options) { + o.leader = l + } +} + +func WithInstanceId(instanceId string) Option { + return func(o *Options) { + o.instanceId = instanceId + } +} diff --git a/leader/leader.go b/leader/leader.go new file mode 100644 index 0000000..0e50cf4 --- /dev/null +++ b/leader/leader.go @@ -0,0 +1,163 @@ +package leader + +// 竞选Leader + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/go-redis/redis/v8" + "github.com/yuninks/lockx" + "github.com/yuninks/timerx/logger" + "github.com/yuninks/timerx/priority" +) + +type Leader struct { + ctx context.Context + cancel context.CancelFunc + + isLeader bool // 是否是领导 + leaderLock sync.RWMutex // 领导锁 + leaderUniLockKey string // 领导唯一锁 + leaderKey string // 上报当前的Leader + + redis redis.UniversalClient // redis + logger logger.Logger + + priority *priority.Priority // 全局优先级 + wg sync.WaitGroup + + instanceId string // 实例ID +} + +// leader负责选举 + +func InitLeader(ctx context.Context, ref redis.UniversalClient, keyPrefix string, opts ...Option) (*Leader, error) { + + if ref == nil { + return nil, errors.New("redis is nil") + } + + op := newOptions(opts...) + + ctx, cancel := context.WithCancel(ctx) + + l := &Leader{ + ctx: ctx, + cancel: cancel, + redis: ref, + leaderUniLockKey: "timer:leader_lockKey" + keyPrefix, + priority: op.priority, + instanceId: op.instanceId, + logger: op.logger, + } + + l.wg.Add(1) + go l.leaderElection() + + l.logger.Infof(l.ctx, "InitLeader InstanceId %s lockKey:%s", l.instanceId, l.leaderUniLockKey) + + return l, nil +} + +func (l *Leader) Close() { + l.cancel() + l.wg.Wait() +} + +// 领导选举 +// 领导作用:全局推选一个人计算执行时间&移入队列,避免每个都进行计算浪费资源 +func (l *Leader) leaderElection() { + defer l.wg.Done() + + // 先执行一次 + l.getLeaderLock() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + l.getLeaderLock() + case <-l.ctx.Done(): + return + } + } +} + +// 成为领导 +func (l *Leader) getLeaderLock() error { + + // 非当前优先级不用抢leader + if l.priority != nil && !l.priority.IsLatest(l.ctx) { + return nil + } + + ctx, cancel := context.WithCancel(l.ctx) + defer cancel() + + // 尝试加锁 + lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderUniLockKey) + if err != nil { + l.logger.Errorf(l.ctx, "getLeaderLock err:%+v", err) + return err + } + if b, _ := lock.Lock(); !b { + // 加锁失败 非Reader + l.leaderLock.Lock() + l.isLeader = false + l.leaderLock.Unlock() + return nil + } + defer lock.Unlock() + + // 加锁成功 + l.leaderLock.Lock() + l.isLeader = true + l.leaderLock.Unlock() + + // 上报当前的Leader实例 + l.redis.Set(l.ctx, l.leaderKey, l.instanceId, time.Hour*24) + + l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue()) + + go func() { + if l.priority == nil { + return + } + + for { + select { + case <-ctx.Done(): + return + default: + if !l.priority.IsLatest(l.ctx) { + cancel() + return + } + time.Sleep(100 * time.Millisecond) + } + } + }() + + // 等待超时退出 + <-lock.GetCtx().Done() + + // 已过期 + // l.leaderLock.Lock() + // l.isLeader = false + // l.leaderLock.Unlock() + + return nil + +} + +// isCurrentLeader 检查当前实例是否是leader +func (c *Leader) IsLeader() bool { + c.leaderLock.RLock() + defer c.leaderLock.RUnlock() + return c.isLeader +} diff --git a/leader/options.go b/leader/options.go new file mode 100644 index 0000000..bb3e4c9 --- /dev/null +++ b/leader/options.go @@ -0,0 +1,51 @@ +package leader + +import ( + "github.com/google/uuid" + "github.com/yuninks/timerx/logger" + "github.com/yuninks/timerx/priority" +) + +type Options struct { + logger logger.Logger + instanceId string + priority *priority.Priority // 全局优先级 +} + +func defaultOptions() Options { + + u, _ := uuid.NewV7() + + return Options{ + logger: logger.NewLogger(), + instanceId: u.String(), + } +} + +type Option func(*Options) + +func newOptions(opts ...Option) Options { + o := defaultOptions() + for _, opt := range opts { + opt(&o) + } + return o +} + +func WithLogger(log logger.Logger) Option { + return func(o *Options) { + o.logger = log + } +} + +func WithPriority(p *priority.Priority) Option { + return func(o *Options) { + o.priority = p + } +} + +func WithInstanceId(instanceId string) Option { + return func(o *Options) { + o.instanceId = instanceId + } +} diff --git a/once.go b/once.go index 722a0be..a341b58 100644 --- a/once.go +++ b/once.go @@ -3,13 +3,18 @@ package timerx import ( "context" "encoding/json" + "errors" "fmt" "runtime/debug" + "strconv" "strings" + "sync" "time" "github.com/go-redis/redis/v8" - uuid "github.com/satori/go.uuid" + "github.com/google/uuid" + "github.com/yuninks/timerx/heartbeat" + "github.com/yuninks/timerx/leader" "github.com/yuninks/timerx/logger" "github.com/yuninks/timerx/priority" ) @@ -21,22 +26,36 @@ import ( // 单次的任务队列 type Once struct { - ctx context.Context - logger logger.Logger - zsetKey string - listKey string - redis redis.UniversalClient - worker Callback - keyPrefix string - priority *priority.Priority // 全局优先级 - priorityKey string // 全局优先级的key - usePriority bool + ctx context.Context // ctx + cancel context.CancelFunc // cancel + logger logger.Logger // 日志 + zsetKey string // 任务列表 有序集合 + listKey string // + redis redis.UniversalClient // Redis + worker Callback // 回调 + keyPrefix string // + priority *priority.Priority // 全局优先级 + usePriority bool // + batchSize int // 每批大小 + wg sync.WaitGroup // + + stopChan chan struct{} // + instanceId string // 实例ID + + leader *leader.Leader // 领导 + heartbeat *heartbeat.HeartBeat + + executeInfoKey string // 执行情况的key 有序集合 + keySeparator string // 分割符 + timeout time.Duration // 任务执行超时时间 + + maxRetryCount int // 最大重试次数 0代表不限 } type OnceWorkerResp struct { - Retry bool // 是否重试 true - DelayTime time.Duration - AttachData interface{} + Retry bool // 是否重试 true + DelayTime time.Duration // 等待时间 + AttachData any // 附加数据 } type OnceTaskType string @@ -49,46 +68,224 @@ type Callback interface { // @param data interface{} 任务数据 // @return WorkerCode 任务执行结果 // @return time.Duration 任务执行时间间隔 - Worker(ctx context.Context, taskType OnceTaskType, taskId string, attachData interface{}) *OnceWorkerResp + Worker(ctx context.Context, taskType OnceTaskType, taskId string, attachData any) *OnceWorkerResp } -// var wo *Once = nil -// var once sync.Once - type extendData struct { - Delay time.Duration - Data interface{} + Delay time.Duration + Data any + RetryCount int // 重试次数 } // 初始化 -func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) *Once { +func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) (*Once, error) { + op := newOptions(opts...) + if re == nil { - panic("redis client is nil") + op.logger.Errorf(ctx, "redis client is nil") + return nil, errors.New("redis client is nil") } - op := newOptions(opts...) - // once.Do(func() { + ctx, cancel := context.WithCancel(ctx) + + u, _ := uuid.NewV7() + wo := &Once{ - ctx: ctx, - logger: op.logger, - zsetKey: "timer:once_zsetkey" + keyPrefix, - listKey: "timer:once_listkey" + keyPrefix, - priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key - usePriority: op.usePriority, - redis: re, - worker: call, - keyPrefix: keyPrefix, + ctx: ctx, + cancel: cancel, + logger: op.logger, + zsetKey: "timer:once_zsetkey" + keyPrefix, + listKey: "timer:once_listkey" + keyPrefix, + executeInfoKey: "timer:once_executeInfoKey" + keyPrefix, + usePriority: op.usePriority, + redis: re, + worker: call, + keyPrefix: keyPrefix, + batchSize: op.batchSize, + stopChan: make(chan struct{}), + instanceId: u.String(), + keySeparator: "[:]", + timeout: op.timeout, + maxRetryCount: op.maxRetryCount, } + // 初始化优先级 if wo.usePriority { - wo.priority = priority.InitPriority(ctx, re, wo.priorityKey, op.priorityVal, priority.SetLogger(wo.logger)) + pri, err := priority.InitPriority(ctx, + re, + keyPrefix, + op.priorityVal, + priority.WithLogger(wo.logger), + ) + if err != nil { + wo.logger.Errorf(ctx, "InitPriority err:%v", err) + return nil, err + } + wo.priority = pri } - go wo.getTask() - go wo.watch() - // }) + // 初始化leader + le, err := leader.InitLeader( + ctx, + re, + wo.keyPrefix, + leader.WithLogger(wo.logger), + leader.WithPriority(wo.priority), + leader.WithInstanceId(wo.instanceId), + ) + if err != nil { + wo.logger.Infof(ctx, "InitLeader err:%v", err) + return nil, err + } + wo.leader = le - return wo + // 初始化心跳 + heart, err := heartbeat.InitHeartBeat( + ctx, + re, + wo.keyPrefix, + heartbeat.WithInstanceId(wo.instanceId), + heartbeat.WithLeader(wo.leader), + heartbeat.WithLogger(wo.logger), + heartbeat.WithPriority(wo.priority), + ) + if err != nil { + wo.logger.Errorf(ctx, "InitHeartBeat err:%v", err) + return nil, err + } + wo.heartbeat = heart + + wo.startDaemon() + + return wo, nil +} + +// Close 停止集群定时器 +func (l *Once) Close() { + close(l.stopChan) + l.heartbeat.Close() + l.cancel() + l.wg.Wait() +} + +func (l *Once) startDaemon() { + + // 任务调度 + l.wg.Add(1) + go l.scheduleTasks() + + // 任务执行 + l.wg.Add(1) + go l.executeTasks() + + // 清理过期任务 + l.wg.Add(1) + go l.cleanExecuteInfoLoop() + +} + +func (l *Once) cleanExecuteInfoLoop() { + l.wg.Done() + + ticker := time.NewTicker(time.Minute * 5) + defer ticker.Stop() + + for { + select { + case <-l.stopChan: + return + case <-l.ctx.Done(): + return + case <-ticker.C: + if l.leader.IsLeader() { + l.cleanExecuteInfo() + } + } + } +} + +// 清除过期任务 +func (l *Once) cleanExecuteInfo() error { + // 移除执行信息 + l.redis.ZRemRangeByScore(l.ctx, l.executeInfoKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Minute).UnixMilli(), 10)).Err() + return nil +} + +// 任务调度 领导 +func (l *Once) scheduleTasks() { + defer l.wg.Done() + + timer := time.NewTicker(time.Millisecond * 200) + defer timer.Stop() + + for { + select { + case <-l.stopChan: + return + case <-l.ctx.Done(): + return + case <-timer.C: + // 优先级 + if l.usePriority { + if !l.priority.IsLatest(l.ctx) { + continue + } + } + // 领导 + if !l.leader.IsLeader() { + continue + } + + l.batchGetTasks() + } + } + +} + +// 任务执行 +func (l *Once) executeTasks() { + defer l.wg.Done() + + for { + + if l.usePriority { + if !l.priority.IsLatest(l.ctx) { + time.Sleep(time.Second * 5) + continue + } + } + + select { + case <-l.stopChan: + return + case <-l.ctx.Done(): + return + default: + + keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() + if err != nil { + continue + } + + go l.processTask(keys[1]) + + } + } + +} + +// 构建Redis key +func (l *Once) buildRedisKey(taskType OnceTaskType, taskId string) string { + return fmt.Sprintf("%s%s%s", taskType, l.keySeparator, taskId) +} + +// 解析Redis Key +func (l *Once) parseRedisKey(key string) (OnceTaskType, string, error) { + parts := strings.Split(key, l.keySeparator) + if len(parts) < 2 { + return "", "", fmt.Errorf("invalid key format: %s", key) + } + return OnceTaskType(parts[0]), parts[1], nil } // 添加任务(覆盖) @@ -97,82 +294,82 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c // @param uniTaskId string 任务唯一标识 // @param delayTime time.Duration 延迟时间 // @param attachData interface{} 附加数据 -func (w *Once) Save(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { - if delayTime.Abs() != delayTime { - return fmt.Errorf("时间间隔不能为负数") - } - if delayTime == 0 { - return fmt.Errorf("时间间隔不能为0") +func (l *Once) Save(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { + return l.save(taskType, taskId, delayTime, attachData, 0) +} + +// 添加任务(覆盖) +// 重复插入就代表覆盖 +func (w *Once) save(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}, retryCount int) error { + if delayTime <= 0 { + return fmt.Errorf("delay time must be positive") } - redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) + redisKey := w.buildRedisKey(taskType, taskId) + executeTime := time.Now().Add(delayTime) ed := extendData{ - Delay: delayTime, - Data: attachData, + Delay: delayTime, + Data: attachData, + RetryCount: retryCount, } b, _ := json.Marshal(ed) - // 写入附加数据 - _, err := w.redis.SetEX(w.ctx, w.keyPrefix+redisKey, b, delayTime+time.Minute*30).Result() - if err != nil { - w.logger.Errorf(w.ctx, "写入附加数据失败:%s", err.Error()) - return err - } + // 使用事务确保原子性 + pipe := w.redis.TxPipeline() - // 写入执行时间 - _, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{ - Score: float64(time.Now().Add(delayTime).UnixMilli()), + dataExpire := delayTime + time.Minute*30 + + pipe.SetEX(w.ctx, w.keyPrefix+redisKey, b, dataExpire) + pipe.ZAdd(w.ctx, w.zsetKey, &redis.Z{ + Score: float64(executeTime.UnixMilli()), Member: redisKey, - }).Result() - - return err -} - -// 添加任务(不覆盖) -func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { - if delayTime.Abs() != delayTime { - return fmt.Errorf("时间间隔不能为负数") - } - if delayTime == 0 { - return fmt.Errorf("时间间隔不能为0") - } - - redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) - // 判断有序集合Key是否存在,存在则报错,不存在则写入 - if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 { - - ed := extendData{ - Delay: delayTime, - Data: attachData, - } - b, _ := json.Marshal(ed) - - // 写入附加数据 - _, err := l.redis.SetEX(l.ctx, l.keyPrefix+redisKey, b, delayTime+time.Minute*30).Result() - if err != nil { - l.logger.Errorf(l.ctx, "写入附加数据失败:%s", err.Error()) - return err - } - _, err = l.redis.ZAdd(l.ctx, l.zsetKey, &redis.Z{ - Score: float64(time.Now().Add(delayTime).UnixMilli()), - Member: redisKey, - }).Result() - if err != nil { - return err - } + }) + _, err := pipe.Exec(w.ctx) + if err != nil { + w.logger.Errorf(w.ctx, "save task failed:%w", err) + return err } return nil } +// 添加任务(不覆盖) +func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { + if delayTime <= 0 { + return fmt.Errorf("delay time must be positive") + } + + redisKey := l.buildRedisKey(taskType, taskId) + + score, err := l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return l.Save(taskType, taskId, delayTime, attachData) + } + l.logger.Errorf(l.ctx, "redis.ZScore err:%v", err) + return err + } + if score > 0 { + return fmt.Errorf("task already exists") + } + + return l.Save(taskType, taskId, delayTime, attachData) +} + // 删除任务 func (w *Once) Delete(taskType OnceTaskType, taskId string) error { - redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) + redisKey := w.buildRedisKey(taskType, taskId) - w.redis.Del(w.ctx, redisKey).Result() + pipe := w.redis.TxPipeline() + pipe.Del(w.ctx, redisKey) + pipe.ZRem(w.ctx, w.zsetKey, redisKey) - w.redis.ZRem(w.ctx, w.zsetKey, redisKey).Result() + _, err := pipe.Exec(w.ctx) + if err != nil { + w.logger.Errorf(w.ctx, "delete task failed:%w", err) + return err + } return nil } @@ -182,96 +379,117 @@ func (l *Once) Get(taskType OnceTaskType, taskId string) { // } -// 获取任务 -func (w *Once) getTask() { - timer := time.NewTicker(time.Millisecond * 200) - defer timer.Stop() +// 批量获取任务 +func (l *Once) batchGetTasks() { + script := ` + local tasks = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'LIMIT', 0, ARGV[2]) + if #tasks == 0 then return 0 end + + for i, task in ipairs(tasks) do + redis.call('zrem', KEYS[1], task) + redis.call('lpush', KEYS[2], task) + end + return #tasks + ` -Loop: - for { - select { - case <-timer.C: - if w.usePriority { - if !w.priority.IsLatest(w.ctx) { - continue - } - } - - script := ` - local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) - for i,v in ipairs(token) do - redis.call('zrem',KEYS[1],v) - redis.call('lpush',KEYS[2],v) - end - return "OK" - ` - w.redis.Eval(w.ctx, script, []string{w.zsetKey, w.listKey}, 0, time.Now().UnixMilli()).Result() - // fmt.Println(i, err) - - case <-w.ctx.Done(): - break Loop - } + result, err := l.redis.Eval( + l.ctx, + script, + []string{l.zsetKey, l.listKey}, + time.Now().UnixMilli(), + l.batchSize, + ).Result() + if err != nil && err != redis.Nil { + l.logger.Errorf(l.ctx, "batch get tasks failed: %s", err.Error()) + return } -} - -// 监听任务 -func (w *Once) watch() { - for { - if w.usePriority { - if !w.priority.IsLatest(w.ctx) { - time.Sleep(time.Second * 5) - continue - } - } - - keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() - if err != nil { - // fmt.Println("watch err:", err) - continue - } - - ctx := context.WithValue(w.ctx, "trace_id", uuid.NewV4().String()) - - go w.doTask(ctx, keys[1]) + if count, ok := result.(int64); ok && count > 0 { + l.logger.Infof(l.ctx, "moved %d tasks to ready queue", count) } } // 执行任务 -func (l *Once) doTask(ctx context.Context, key string) { - defer func() { - if err := recover(); err != nil { - l.logger.Errorf(ctx, "timer:回调任务panic:%s stack:%s", err, string(debug.Stack())) - } - }() +func (l *Once) processTask(key string) { + begin := time.Now() - s := strings.Split(key, "[:]") + ctx, cancel := context.WithTimeout(l.ctx, l.timeout) + defer cancel() - // 读取数据 - redisKey := l.keyPrefix + key - str, err := l.redis.Get(ctx, redisKey).Result() + u, _ := uuid.NewV7() + + ctx = context.WithValue(ctx, "trace_id", u.String()) + + l.logger.Infof(ctx, "processTask start key:%s", key) + + taskType, taskId, err := l.parseRedisKey(key) if err != nil { - l.logger.Errorf(ctx, "获取数据失败 key:%s err:%s", key, err) + l.logger.Errorf(ctx, "processTask parseRedisKey:%w key:%s", err, key) return } - l.logger.Infof(ctx, "任务执行:%s 参数:%s", key, str) + // 上报执行情况 + executeVal := fmt.Sprintf("%s|%s|%s|%s", key, l.instanceId, u.String(), begin.Format(time.RFC3339Nano)) + l.redis.ZAdd(ctx, l.executeInfoKey, &redis.Z{ + Score: float64(begin.UnixMilli()), + Member: executeVal, + }) + + defer func() { + if err := recover(); err != nil { + l.logger.Errorf(ctx, "processTask panic:%s stack:%s", err, string(debug.Stack())) + } + }() + + // 读取数据 + redisKey := l.keyPrefix + l.buildRedisKey(taskType, taskId) + str, err := l.redis.Get(ctx, redisKey).Result() + if err != nil { + l.logger.Errorf(ctx, "processTask redis.Get key:%s err:%s", key, err) + return + } ed := extendData{} json.Unmarshal([]byte(str), &ed) - resp := l.worker.Worker(ctx, OnceTaskType(s[0]), s[1], ed.Data) - if resp == nil { + resp := l.worker.Worker(ctx, taskType, taskId, ed.Data) + l.logger.Infof(ctx, "processTask exec key:%s resp:%+v data:%s", key, resp, str) + + if resp == nil || !resp.Retry { + // 完成 删除任务 + // 删除任务 + l.logger.Infof(ctx, "processTask delete key:%s", key) + if err := l.Delete(taskType, taskId); err != nil { + l.logger.Errorf(ctx, "processTask delete errprocessTask delete err:%w", err) + } return } - if resp.Retry { - // 重新放入队列 - if resp.DelayTime != 0 && resp.DelayTime == resp.DelayTime.Abs() { - ed.Delay = resp.DelayTime - } - ed.Data = resp.AttachData - l.logger.Infof(ctx, "任务重新放入队列:%s", key) - l.Create(OnceTaskType(s[0]), s[1], ed.Delay, ed.Data) + // 重新放入队列 + if err := l.handleRetry(ctx, taskType, taskId, &ed, resp); err != nil { + l.logger.Errorf(ctx, "processTask handleRetry err:%w", err) } } + +func (l *Once) handleRetry(ctx context.Context, taskType OnceTaskType, taskId string, + ed *extendData, resp *OnceWorkerResp) error { + // 限制重试次数 + ed.RetryCount++ + if l.maxRetryCount > 0 && ed.RetryCount > l.maxRetryCount { + l.logger.Infof(ctx, "handleRetry task exceeded retry limit: %s %s %d", taskType, taskId, l.maxRetryCount) + return nil + } + + // 更新延迟时间 + if resp.DelayTime > 0 { + ed.Delay = resp.DelayTime + } + if resp.AttachData != nil { + ed.Data = resp.AttachData + } + + l.logger.Infof(ctx, "handleRetry retrying task: %s:%s, retry count: %d", + taskType, taskId, ed.RetryCount) + + return l.save(taskType, taskId, ed.Delay, ed.Data, ed.RetryCount) +} diff --git a/option.go b/option.go index 5fd17a2..5135297 100644 --- a/option.go +++ b/option.go @@ -24,7 +24,7 @@ func defaultOptions() Options { usePriority: false, priorityVal: 0, batchSize: 100, - maxRetryCount: 100, + maxRetryCount: 0, } } @@ -78,8 +78,8 @@ func WithBatchSize(size int) Option { func WithMaxRetryCount(count int) Option { return func(o *Options) { - if count <= 0 { - count = 1 + if count < 0 { + count = 0 } o.maxRetryCount = count }