From 2c762dee2a0133b19d717a29843f9cd10a1d1c4a Mon Sep 17 00:00:00 2001 From: yun Date: Fri, 10 Oct 2025 21:03:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9Option=E4=B8=8Eonce=E7=9A=84?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 7 ++- error.go | 8 +++ example/once/main.go | 33 +++++++++++-- once.go | 114 ++++++++++++++++++++++++++++++------------- option.go | 45 ++++++++++------- 5 files changed, 148 insertions(+), 59 deletions(-) diff --git a/cluster.go b/cluster.go index a9230b5..e106326 100644 --- a/cluster.go +++ b/cluster.go @@ -370,10 +370,7 @@ func (l *Cluster) Cron(ctx context.Context, taskId string, cronExpression string // 获取当天的零点时间 zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) - options := Options{} - for _, o := range opt { - o(&options) - } + options := newEmptyOptions(opt...) cronParser := l.cronParser if options.cronParser != nil { cronParser = options.cronParser @@ -544,6 +541,8 @@ func (c *Cluster) executeTasks() { } if len(taskID) < 2 { + c.logger.Errorf(c.ctx, "Invalid BLPop result: %v", taskID) + // 数据异常,继续下一个 continue } diff --git a/error.go b/error.go index 8e78f08..74ec14f 100644 --- a/error.go +++ b/error.go @@ -33,4 +33,12 @@ var ( ErrCronExpression = errors.New("cron expression error") // ErrCronParser 错误 ErrCronParser = errors.New("cron parser error") + // ExecuteTime 错误 + ErrExecuteTime = errors.New("execute time error") + // RunCount 错误 + ErrRunCount = errors.New("run count error") + // DelayTime 错误 + ErrDelayTime = errors.New("delay time error") + // 任务已存在 + ErrTaskExists = errors.New("task already exists") ) diff --git a/example/once/main.go b/example/once/main.go index c8364fd..e82b3a0 100644 --- a/example/once/main.go +++ b/example/once/main.go @@ -27,13 +27,34 @@ func main() { panic(err) } + ch := make(chan ChanStatus, 1000) + + go func() { + for a := 0; a < 100; a++ { + go func(a int) { + for status := range ch { + // fmt.Println("协程", a, "处理任务", status) + // time.Sleep(10 * time.Millisecond) // 模拟处理时间 + err = once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("task_%d_%d", status.I, status.J), status.T, fmt.Sprintf("任务数据_%d_%d 预期时间%s", status.I, status.J, status.T.Format("2006-01-02 15:04:05"))) + if err != nil { + fmt.Println("保存任务失败:", err) + } + } + }(a) + } + }() + go func() { // 一千万任务,每个任务间隔1秒 - for i := 0; i < 10000000; i++ { + for i := 0; i < 100; i++ { runTime := t.Add(time.Duration(i) * time.Second) - for j := 0; j < 50000; j++ { - once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("task_%d_%d", i, j), runTime, fmt.Sprintf("任务数据_%d_%d 预期时间%s", i, j, runTime.Format("2006-01-02 15:04:05"))) + for j := 0; j < 100; j++ { + ch <- ChanStatus{ + I: i, + J: j, + T: runTime, + } } } }() @@ -42,6 +63,12 @@ func main() { } +type ChanStatus struct { + I int + J int + T time.Time +} + func getRedis() *redis.Client { client := redis.NewClient(&redis.Options{ Addr: "127.0.0.1" + ":" + "6379", diff --git a/once.go b/once.go index c99349c..ea0aa1d 100644 --- a/once.go +++ b/once.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "runtime/debug" + "sort" "strconv" "strings" "sync" @@ -51,7 +52,7 @@ type Once struct { keySeparator string // 分割符 timeout time.Duration // 任务执行超时时间 - maxRetryCount int // 最大重试次数 0代表不限 + maxRunCount int // 最大重试次数 0代表不限 } type OnceWorkerResp struct { @@ -74,11 +75,19 @@ type Callback interface { } type extendData struct { - Delay time.Duration - Data any - RetryCount int // 重试次数 + TaskTimes []time.Time + Data any + RunCount int // 运行次数 + JobType jobType } +type jobType string + +const ( + jobTypeOnce = "once" + jobTypeList = "list" +) + // 初始化 func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) (*Once, error) { op := newOptions(opts...) @@ -109,7 +118,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c instanceId: u.String(), keySeparator: "[:]", timeout: op.timeout, - maxRetryCount: op.maxRetryCount, + maxRunCount: op.maxRunCount, } // 初始化优先级 @@ -288,6 +297,12 @@ func (l *Once) executeTasks() { continue } + if len(keys) < 2 { + l.logger.Errorf(l.ctx, "Invalid task data: %v", keys) + // 数据异常,继续下一个 + continue + } + // 处理任务 go l.processTask(keys[1]) } @@ -316,45 +331,69 @@ func (l *Once) parseRedisKey(key string) (OnceTaskType, string, error) { // @param delayTime time.Duration 延迟时间 // @param attachData interface{} 附加数据 func (l *Once) Save(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { - return l.save(ctx, taskType, taskId, delayTime, attachData, 0) + execTime := time.Now().Add(delayTime) + return l.save(ctx, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0) } // 指定时间添加任务(覆盖) func (l *Once) SaveByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData interface{}) error { - return l.save(ctx, taskType, taskId, time.Until(executeTime), attachData, 0) + return l.save(ctx, jobTypeOnce, taskType, taskId, []time.Time{executeTime}, attachData, 0) } // 添加任务(覆盖) // 重复插入就代表覆盖 -func (w *Once) save(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}, retryCount int) error { - if delayTime <= 0 { - w.logger.Errorf(ctx, "delay time must be positive delayTime:%v taskType:%v taskId:%v attachData:%v retryCount:%v", delayTime, taskType, taskId, attachData, retryCount) - return fmt.Errorf("delay time must be positive") +func (w *Once) save(ctx context.Context, jobType jobType, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData interface{}, runCount int) error { + if len(taskTimes) == 0 { + w.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount) + return ErrExecuteTime + } + + if jobType == jobTypeList && len(taskTimes) <= runCount { + w.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount) + return ErrRunCount + } + + // 根据时间从小到大排序 + sort.Slice(taskTimes, func(i, j int) bool { + return taskTimes[i].Before(taskTimes[j]) + }) + + latestTime := taskTimes[len(taskTimes)-1] + if latestTime.Before(time.Now()) { + w.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount) + return ErrDelayTime + } + + nextTime := taskTimes[0] + if jobType == jobTypeList { + nextTime = taskTimes[runCount] } redisKey := w.buildRedisKey(taskType, taskId) - executeTime := time.Now().Add(delayTime) ed := extendData{ - Delay: delayTime, - Data: attachData, - RetryCount: retryCount, + TaskTimes: taskTimes, + Data: attachData, + RunCount: runCount, + JobType: jobType, } b, _ := json.Marshal(ed) // 使用事务确保原子性 pipe := w.redis.TxPipeline() - dataExpire := delayTime + time.Minute*30 + expiresTime := latestTime.Add(time.Minute * 30) + + dataExpire := time.Until(expiresTime) pipe.SetEx(w.ctx, w.keyPrefix+redisKey, b, dataExpire) pipe.ZAdd(w.ctx, w.zsetKey, redis.Z{ - Score: float64(executeTime.UnixMilli()), + Score: float64(nextTime.UnixMilli()), Member: redisKey, }) _, err := pipe.Exec(w.ctx) if err != nil { - w.logger.Errorf(w.ctx, "save task failed:%v taskType:%v taskId:%v attachData:%v retryCount:%v", err, taskType, taskId, attachData, retryCount) + w.logger.Errorf(w.ctx, "save task failed:%v taskType:%v taskId:%v attachData:%v retryCount:%v", err, taskType, taskId, attachData, runCount) return err } @@ -363,18 +402,23 @@ func (w *Once) save(ctx context.Context, taskType OnceTaskType, taskId string, d // 添加任务(不覆盖) func (l *Once) Create(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error { - return l.create(ctx, taskType, taskId, delayTime, attachData, 0) + if delayTime <= 0 { + l.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v", taskType, taskId, attachData) + return ErrDelayTime + } + execTime := time.Now().Add(delayTime) + return l.create(ctx, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0) } // 指定时间执行(不覆盖) func (l *Once) CreateByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData any) error { - delay := time.Until(executeTime) - return l.create(ctx, taskType, taskId, delay, attachData, 0) + return l.create(ctx, jobTypeOnce, taskType, taskId, []time.Time{executeTime}, attachData, 0) } -func (l *Once) create(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any, retryCount int) error { - if delayTime <= 0 { - return fmt.Errorf("delay time must be positive") +func (l *Once) create(ctx context.Context, jobType jobType, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData any, runCount int) error { + if len(taskTimes) <= 0 { + l.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount) + return ErrExecuteTime } redisKey := l.buildRedisKey(taskType, taskId) @@ -382,16 +426,17 @@ func (l *Once) create(ctx context.Context, taskType OnceTaskType, taskId string, score, err := l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Result() if err != nil { if errors.Is(err, redis.Nil) { - return l.Save(ctx, taskType, taskId, delayTime, attachData) + return l.save(ctx, jobTypeOnce, taskType, taskId, taskTimes, attachData, runCount) } l.logger.Errorf(l.ctx, "redis.ZScore err:%v", err) return err } if score > 0 { - return fmt.Errorf("task already exists") + l.logger.Errorf(l.ctx, "task exists taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount) + return ErrTaskExists } - return l.save(ctx, taskType, taskId, delayTime, attachData, retryCount) + return l.save(ctx, jobTypeOnce, taskType, taskId, taskTimes, attachData, runCount) } // 删除任务 @@ -523,23 +568,24 @@ func (l *Once) processTask(key string) { func (l *Once) handleRetry(ctx context.Context, taskType OnceTaskType, taskId string, ed *extendData, resp *OnceWorkerResp) error { // 限制重试次数 - ed.RetryCount++ - if l.maxRetryCount > 0 && ed.RetryCount > l.maxRetryCount { - l.logger.Infof(ctx, "handleRetry task exceeded retry limit: %s %s %d", taskType, taskId, l.maxRetryCount) + ed.RunCount++ + if l.maxRunCount > 0 && ed.RunCount > l.maxRunCount { + l.logger.Infof(ctx, "handleRetry task exceeded retry limit: %s %s %d", taskType, taskId, l.maxRunCount) return nil } // 更新延迟时间 - if resp.DelayTime > 0 { - ed.Delay = resp.DelayTime + if ed.JobType == jobTypeOnce { + ed.TaskTimes = []time.Time{time.Now().Add(resp.DelayTime)} } + if resp.AttachData != nil { ed.Data = resp.AttachData } l.logger.Infof(ctx, "handleRetry retrying task: %s:%s, retry count: %d", - taskType, taskId, ed.RetryCount) + taskType, taskId, ed.RunCount) // 不覆盖的新建 - return l.create(ctx, taskType, taskId, ed.Delay, ed.Data, ed.RetryCount) + return l.create(ctx, ed.JobType, taskType, taskId, ed.TaskTimes, ed.Data, ed.RunCount) } diff --git a/option.go b/option.go index 38959cf..33a6116 100644 --- a/option.go +++ b/option.go @@ -8,14 +8,14 @@ import ( ) type Options struct { - logger logger.Logger - location *time.Location - timeout time.Duration // 任务最长执行时间 - usePriority bool - priorityVal int64 - batchSize int - maxRetryCount int - cronParser *cron.Parser // cron表达式解析器 + logger logger.Logger + location *time.Location + timeout time.Duration // 任务最长执行时间 + usePriority bool + priorityVal int64 + batchSize int + maxRunCount int // 单个任务最大运行次数 0 代表不限 + cronParser *cron.Parser // cron表达式解析器 } func defaultOptions() Options { @@ -24,19 +24,20 @@ func defaultOptions() Options { parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) return Options{ - logger: logger.NewLogger(), - location: time.Local, - timeout: time.Hour, // - usePriority: false, - priorityVal: 0, - batchSize: 100, - maxRetryCount: 0, - cronParser: &parser, + logger: logger.NewLogger(), + location: time.Local, + timeout: time.Hour, // + usePriority: false, + priorityVal: 0, + batchSize: 100, + maxRunCount: 0, + cronParser: &parser, } } type Option func(*Options) +// 返回带默认值的配置 func newOptions(opts ...Option) Options { o := defaultOptions() for _, opt := range opts { @@ -45,6 +46,15 @@ func newOptions(opts ...Option) Options { return o } +// 返回空的配置 +func newEmptyOptions(opts ...Option) Options { + o := Options{} + for _, opt := range opts { + opt(&o) + } + return o +} + // 设置日志 func WithLogger(log logger.Logger) Option { return func(o *Options) { @@ -88,7 +98,7 @@ func WithMaxRetryCount(count int) Option { if count < 0 { count = 0 } - o.maxRetryCount = count + o.maxRunCount = count } } @@ -144,4 +154,3 @@ func WithCronParserDescriptor() Option { o.cronParser = &parser } } -