diff --git a/cmd/main.go b/cmd/main.go index a6575ff..0b1c30b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -110,12 +110,12 @@ func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, ta file.WriteString(fmt.Sprintf("执行时间:%s\n", time.Now().Format("2006-01-02 15:04:05"))) fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) - fmt.Println(taskType, taskId) + // fmt.Println(taskType, taskId) - fmt.Printf("原来的参数:%+v %T\n", attachData, attachData) + // fmt.Printf("原来的参数:%+v %T\n", attachData, attachData) - v, ok := attachData.(int64) - fmt.Println("vvvvvvv", v, ok) + // v, ok := attachData.(int64) + // fmt.Println("vvvvvvv", v, ok) // fmt.Printf() // d := OnceData{} diff --git a/once.go b/once.go index a341b58..86fe7e5 100644 --- a/once.go +++ b/once.go @@ -13,6 +13,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/google/uuid" + "github.com/yuninks/lockx" "github.com/yuninks/timerx/heartbeat" "github.com/yuninks/timerx/leader" "github.com/yuninks/timerx/logger" @@ -335,7 +336,11 @@ func (w *Once) save(taskType OnceTaskType, taskId string, delayTime time.Duratio } // 添加任务(不覆盖) -func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { +func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error { + return l.create(taskType, taskId, delayTime, attachData, 0) +} + +func (l *Once) create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any, retryCount int) error { if delayTime <= 0 { return fmt.Errorf("delay time must be positive") } @@ -354,7 +359,7 @@ func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Durat return fmt.Errorf("task already exists") } - return l.Save(taskType, taskId, delayTime, attachData) + return l.save(taskType, taskId, delayTime, attachData, retryCount) } // 删除任务 @@ -428,6 +433,18 @@ func (l *Once) processTask(key string) { return } + // 这里加一个全局锁 + lock, err := lockx.NewGlobalLock(ctx, l.redis, key) + if err != nil { + l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s", taskId) + return + } + if b, err := lock.Lock(); !b { + l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s %+v", taskId, err) + return + } + defer lock.Unlock() + // 上报执行情况 executeVal := fmt.Sprintf("%s|%s|%s|%s", key, l.instanceId, u.String(), begin.Format(time.RFC3339Nano)) l.redis.ZAdd(ctx, l.executeInfoKey, &redis.Z{ @@ -491,5 +508,6 @@ func (l *Once) handleRetry(ctx context.Context, taskType OnceTaskType, taskId st l.logger.Infof(ctx, "handleRetry retrying task: %s:%s, retry count: %d", taskType, taskId, ed.RetryCount) - return l.save(taskType, taskId, ed.Delay, ed.Data, ed.RetryCount) + // 不覆盖的新建 + return l.create(taskType, taskId, ed.Delay, ed.Data, ed.RetryCount) }