From 62bdf4fcd2d133bf3b3aecf80e6c90394739b7d8 Mon Sep 17 00:00:00 2001 From: Yun Date: Sun, 28 Sep 2025 22:57:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=83=A8=E5=88=86=E9=94=99?= =?UTF-8?q?=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- error.go | 3 ++ once.go | 10 +++---- single.go | 86 ++++++++++++++++++++++++++----------------------------- 3 files changed, 49 insertions(+), 50 deletions(-) diff --git a/error.go b/error.go index 5f8ec91..988464c 100644 --- a/error.go +++ b/error.go @@ -3,6 +3,7 @@ package timerx import "errors" var ( + // 定时器不存在 ErrTimerNotFound = errors.New("timer not found") // 任务ID不能为空 ErrTaskIdEmpty = errors.New("taskId can not be empty") @@ -24,4 +25,6 @@ var ( ErrIntervalTime = errors.New("interval time must be greater than 0") // 任务Id已存在 ErrTaskIdExists = errors.New("taskId already exists") + // 任务已执行 + ErrTaskExecuted = errors.New("task already executed") ) diff --git a/once.go b/once.go index b535507..4757635 100644 --- a/once.go +++ b/once.go @@ -330,7 +330,7 @@ func (w *Once) save(taskType OnceTaskType, taskId string, delayTime time.Duratio }) _, err := pipe.Exec(w.ctx) if err != nil { - w.logger.Errorf(w.ctx, "save task failed:%w", err) + w.logger.Errorf(w.ctx, "save task failed:%v", err) return err } @@ -374,7 +374,7 @@ func (w *Once) Delete(taskType OnceTaskType, taskId string) error { _, err := pipe.Exec(w.ctx) if err != nil { - w.logger.Errorf(w.ctx, "delete task failed:%w", err) + w.logger.Errorf(w.ctx, "delete task failed:%v", err) return err } @@ -431,7 +431,7 @@ func (l *Once) processTask(key string) { taskType, taskId, err := l.parseRedisKey(key) if err != nil { - l.logger.Errorf(ctx, "processTask parseRedisKey:%w key:%s", err, key) + l.logger.Errorf(ctx, "processTask parseRedisKey:%v key:%s", err, key) return } @@ -479,14 +479,14 @@ func (l *Once) processTask(key string) { // 删除任务 l.logger.Infof(ctx, "processTask delete key:%s", key) if err := l.Delete(taskType, taskId); err != nil { - l.logger.Errorf(ctx, "processTask delete errprocessTask delete err:%w", err) + l.logger.Errorf(ctx, "processTask delete errprocessTask delete err:%v", err) } return } // 重新放入队列 if err := l.handleRetry(ctx, taskType, taskId, &ed, resp); err != nil { - l.logger.Errorf(ctx, "processTask handleRetry err:%w", err) + l.logger.Errorf(ctx, "processTask handleRetry err:%v", err) } } diff --git a/single.go b/single.go index 5cce885..909953a 100644 --- a/single.go +++ b/single.go @@ -19,9 +19,6 @@ import ( // 1. 这个定时器的作用范围是本机 // 2. 适用简单的时间间隔定时任务 -// 避免执行重复 -var singleHasRun sync.Map - type Single struct { ctx context.Context cancel context.CancelFunc @@ -54,15 +51,21 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { timeout: op.timeout, } - sin.wg.Add(1) - go sin.timerLoop(ctx) - - sin.wg.Add(1) - go sin.cleanupLoop(ctx) + sin.startDaemon() return sin } +func (l *Single) startDaemon() { + + l.wg.Add(1) + go l.timerLoop() + + l.wg.Add(1) + go l.cleanupLoop() + +} + // 停止所有定时任务 func (s *Single) Stop() { if s.cancel != nil { @@ -96,7 +99,7 @@ func (l *Single) MaxIndex() int64 { } // 定时器主循环 -func (s *Single) timerLoop(ctx context.Context) { +func (s *Single) timerLoop() { defer s.wg.Done() ticker := time.NewTicker(100 * time.Millisecond) // 提高精度到100ms @@ -113,20 +116,20 @@ func (s *Single) timerLoop(ctx context.Context) { continue } - s.iterator(ctx) + s.iterator(s.ctx) - case <-ctx.Done(): - s.logger.Infof(ctx, "timer: context cancelled, stopping timer loop") + case <-s.ctx.Done(): + s.logger.Infof(s.ctx, "timer: context cancelled, stopping timer loop") return case <-s.stopChan: - s.logger.Infof(ctx, "timer: received stop signal, stopping timer loop") + s.logger.Infof(s.ctx, "timer: received stop signal, stopping timer loop") return } } } // 清理循环 -func (s *Single) cleanupLoop(ctx context.Context) { +func (s *Single) cleanupLoop() { defer s.wg.Done() ticker := time.NewTicker(time.Minute) @@ -138,7 +141,7 @@ func (s *Single) cleanupLoop(ctx context.Context) { now := time.Now() cleanupTime := now.Add(-2 * time.Minute) // 清理2分钟前的记录 - s.hasRun.Range(func(k, v interface{}) bool { + s.hasRun.Range(func(k, v any) bool { t, ok := v.(time.Time) if !ok || t.Before(cleanupTime) { s.hasRun.Delete(k) @@ -146,11 +149,11 @@ func (s *Single) cleanupLoop(ctx context.Context) { return true }) - case <-ctx.Done(): - s.logger.Infof(ctx, "timer: context cancelled, stopping cleanup loop") + case <-s.ctx.Done(): + s.logger.Infof(s.ctx, "timer: context cancelled, stopping cleanup loop") return case <-s.stopChan: - s.logger.Infof(ctx, "timer: received stop signal, stopping cleanup loop") + s.logger.Infof(s.ctx, "timer: received stop signal, stopping cleanup loop") return } } @@ -166,7 +169,7 @@ func (s *Single) cleanupLoop(ctx context.Context) { // @param callback 回调函数 // @param extendData 扩展数据 // @return error -func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { +func (c *Single) EveryMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { nowTime := time.Now().In(c.location) @@ -190,7 +193,7 @@ func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, // @param hour int 小时 // @param minute int 分钟 // @param second int 秒 -func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { +func (c *Single) EveryWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { nowTime := time.Now().In(c.location) jobData := JobData{ @@ -207,7 +210,7 @@ func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, } // 每天执行一次 -func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { +func (c *Single) EveryDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { nowTime := time.Now().In(c.location) jobData := JobData{ @@ -223,7 +226,7 @@ func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int } // 每小时执行一次 -func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { +func (c *Single) EveryHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { nowTime := time.Now().In(c.location) jobData := JobData{ @@ -238,7 +241,7 @@ func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second } // 每分钟执行一次 -func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { +func (c *Single) EveryMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { nowTime := time.Now().In(c.location) jobData := JobData{ @@ -252,7 +255,7 @@ func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callb } // 特定时间间隔 -func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { +func (c *Single) EverySpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { nowTime := time.Now().In(c.location) if spaceTime < 0 { @@ -397,7 +400,7 @@ func (l *Single) iterator(ctx context.Context) { func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) { // 创建带追踪ID的上下文 traceCtx := context.WithValue(ctx, "trace_id", uuid.NewV4().String()) - s.logger.Infof(traceCtx, "timer: 开始执行任务 %s", timer.TaskId) + s.logger.Infof(traceCtx, "timer Single begin taskId:%s originTime:%d", timer.TaskId, originTime.UnixMilli()) traceCtx, cancel := context.WithTimeout(traceCtx, s.timeout) // 设置执行超时 defer cancel() @@ -410,13 +413,6 @@ func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime tim } }() - // 检查任务是否已执行 - taskKey := fmt.Sprintf("%s:%d", timer.TaskId, originTime.UnixNano()) - if _, loaded := s.hasRun.LoadOrStore(taskKey, time.Now()); loaded { - s.logger.Errorf(traceCtx, "timer: 任务已执行,跳过本次执行 %s", timer.TaskId) - return - } - // 执行回调 if err := s.doTask(traceCtx, timer, originTime); err != nil { s.logger.Errorf(traceCtx, "timer: 任务执行失败: %s", err.Error()) @@ -431,26 +427,26 @@ func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime tim // 定时器操作类 // 这里不应painc func (l *Single) doTask(ctx context.Context, timeStr timerStr, originTime time.Time) error { + // 检查任务是否已执行 + taskKey := fmt.Sprintf("%s:%d", timeStr.TaskId, originTime.UnixMilli()) + if _, loaded := l.hasRun.LoadOrStore(taskKey, time.Now()); loaded { + l.logger.Errorf(ctx, "timer: 任务已执行,跳过本次执行 %s", timeStr.TaskId) + return ErrTaskExecuted + } + defer func() { if err := recover(); err != nil { - l.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack())) + l.logger.Errorf(ctx, "timer Single call panic err:%+v stack:%s", err, string(debug.Stack())) } }() - nuiKey := timeStr.TaskId + originTime.String() - - // timeStr.TaskId - - _, loaded := singleHasRun.LoadOrStore(nuiKey, time.Now()) - if loaded { - // 已经存在,说明已经执行过了 - l.logger.Errorf(ctx, "timer: 任务已执行,跳过本次执行 %s", nuiKey) - return nil + err := timeStr.Callback(ctx, timeStr.ExtendData) + if err != nil { + l.logger.Errorf(ctx, "timer Single call back %s, err: %v", timeStr.TaskId, err) + return err } - ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String()) - - return timeStr.Callback(ctx, timeStr.ExtendData) + return nil } // 更新下次执行时间