diff --git a/example/once/main.go b/example/once/main.go index e82b3a0..6c40c7d 100644 --- a/example/once/main.go +++ b/example/once/main.go @@ -30,7 +30,7 @@ func main() { ch := make(chan ChanStatus, 1000) go func() { - for a := 0; a < 100; a++ { + for a := 0; a < 10; a++ { go func(a int) { for status := range ch { // fmt.Println("协程", a, "处理任务", status) @@ -47,9 +47,9 @@ func main() { go func() { // 一千万任务,每个任务间隔1秒 - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { runTime := t.Add(time.Duration(i) * time.Second) - for j := 0; j < 100; j++ { + for j := 0; j < 10; j++ { ch <- ChanStatus{ I: i, J: j, diff --git a/once.go b/once.go index c04eb04..570652e 100644 --- a/once.go +++ b/once.go @@ -88,6 +88,13 @@ const ( jobTypeList = "list" ) +type createSource string + +const ( + createSourceDefault = "default" + createSourceRetry = "retry" +) + // 初始化 func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) (*Once, error) { op := newOptions(opts...) @@ -411,20 +418,20 @@ func (l *Once) Create(ctx context.Context, taskType OnceTaskType, taskId string, return ErrDelayTime } execTime := time.Now().Add(delayTime) - return l.create(ctx, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0) + return l.create(ctx, createSourceDefault, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0) } // 指定时间执行(不覆盖) func (l *Once) CreateByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData any) error { - return l.create(ctx, jobTypeOnce, taskType, taskId, []time.Time{executeTime}, attachData, 0) + return l.create(ctx, createSourceDefault, jobTypeOnce, taskType, taskId, []time.Time{executeTime}, attachData, 0) } // 指定多个时间执行(不覆盖) func (l *Once) CreateByList(ctx context.Context, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData any) error { - return l.create(ctx, jobTypeList, taskType, taskId, taskTimes, attachData, 0) + return l.create(ctx, createSourceDefault, jobTypeList, taskType, taskId, taskTimes, attachData, 0) } -func (l *Once) create(ctx context.Context, jobType jobType, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData any, runCount int) error { +func (l *Once) create(ctx context.Context, source createSource, jobType jobType, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData any, runCount int) error { if len(taskTimes) <= 0 { l.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount) return ErrExecuteTime @@ -432,6 +439,20 @@ func (l *Once) create(ctx context.Context, jobType jobType, taskType OnceTaskTyp redisKey := l.buildRedisKey(taskType, taskId) + if source != createSourceRetry { + // 这里加一个全局锁 从Retry来的不需要 因为已经加锁了 + lock, err := lockx.NewGlobalLock(ctx, l.redis, l.globalLockPrefix+redisKey) + if err != nil { + l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s", taskId) + return err + } + if b, err := lock.Lock(); !b { + l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s %+v", taskId, err) + return err + } + defer lock.Unlock() + } + score, err := l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Result() if err != nil { if errors.Is(err, redis.Nil) { @@ -596,5 +617,5 @@ func (l *Once) handleRetry(ctx context.Context, taskType OnceTaskType, taskId st taskType, taskId, ed.RunCount) // 不覆盖的新建 - return l.create(ctx, ed.JobType, taskType, taskId, ed.TaskTimes, ed.Data, ed.RunCount) + return l.create(ctx, createSourceRetry, ed.JobType, taskType, taskId, ed.TaskTimes, ed.Data, ed.RunCount) }