diff --git a/go.mod b/go.mod index ed1fe1d..a0b6456 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.24 require ( github.com/go-redis/redis/v8 v8.11.5 github.com/google/uuid v1.6.0 - github.com/satori/go.uuid v1.2.0 github.com/stretchr/testify v1.11.1 github.com/yuninks/cachex v1.0.5 github.com/yuninks/lockx v1.1.2 diff --git a/go.sum b/go.sum index 131368f..cb52100 100644 --- a/go.sum +++ b/go.sum @@ -23,8 +23,6 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= diff --git a/single.go b/single.go index 909953a..652f6b4 100644 --- a/single.go +++ b/single.go @@ -11,7 +11,7 @@ import ( "sync/atomic" "time" - uuid "github.com/satori/go.uuid" + "github.com/google/uuid" "github.com/yuninks/timerx/logger" ) @@ -68,20 +68,13 @@ func (l *Single) startDaemon() { // 停止所有定时任务 func (s *Single) Stop() { + close(s.stopChan) + if s.cancel != nil { s.cancel() } - close(s.stopChan) - s.wg.Wait() - // 清理所有资源 - s.workerList.Range(func(k, v interface{}) bool { - if timer, ok := v.(timerStr); ok { - close(timer.CanRunning) - } - s.workerList.Delete(k) - return true - }) + s.wg.Wait() } // 获取任务数量 @@ -99,8 +92,8 @@ func (l *Single) MaxIndex() int64 { } // 定时器主循环 -func (s *Single) timerLoop() { - defer s.wg.Done() +func (l *Single) timerLoop() { + defer l.wg.Done() ticker := time.NewTicker(100 * time.Millisecond) // 提高精度到100ms defer ticker.Stop() @@ -108,21 +101,21 @@ func (s *Single) timerLoop() { for { select { case t := <-ticker.C: - s.nextTimeMux.RLock() - nextTime := s.nextTime - s.nextTimeMux.RUnlock() + l.nextTimeMux.RLock() + nextTime := l.nextTime + l.nextTimeMux.RUnlock() if t.Before(nextTime) { continue } - s.iterator(s.ctx) + l.iterator(l.ctx) - case <-s.ctx.Done(): - s.logger.Infof(s.ctx, "timer: context cancelled, stopping timer loop") + case <-l.ctx.Done(): + l.logger.Infof(l.ctx, "timer: context cancelled, stopping timer loop") return - case <-s.stopChan: - s.logger.Infof(s.ctx, "timer: received stop signal, stopping timer loop") + case <-l.stopChan: + l.logger.Infof(l.ctx, "timer: received stop signal, stopping timer loop") return } } @@ -343,14 +336,21 @@ func (s *Single) updateNextTimeIfEarlier(candidate time.Time) { // 删除定时器 func (l *Single) Del(index int64) { - if val, ok := l.workerList.Load(index); ok { - if timer, ok := val.(timerStr); ok { - close(timer.CanRunning) - } + if _, ok := l.workerList.Load(index); ok { l.workerList.Delete(index) } } +func (l *Single) DelByTaskId(taskId string) { + l.workerList.Range(func(k, v interface{}) bool { + timeStr, ok := v.(timerStr) + if ok && timeStr.TaskId == taskId { + l.workerList.Delete(k) + } + return true + }) +} + // 迭代定时器列表 func (l *Single) iterator(ctx context.Context) { // 当前时间 @@ -399,7 +399,10 @@ func (l *Single) iterator(ctx context.Context) { // 执行任务 func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) { // 创建带追踪ID的上下文 - traceCtx := context.WithValue(ctx, "trace_id", uuid.NewV4().String()) + + u, _ := uuid.NewV7() + + traceCtx := context.WithValue(ctx, "trace_id", u.String()) s.logger.Infof(traceCtx, "timer Single begin taskId:%s originTime:%d", timer.TaskId, originTime.UnixMilli()) traceCtx, cancel := context.WithTimeout(traceCtx, s.timeout) // 设置执行超时 defer cancel() diff --git a/types.go b/types.go index d9c216b..f407de3 100644 --- a/types.go +++ b/types.go @@ -6,11 +6,11 @@ import ( ) type timerStr struct { - Callback func(ctx context.Context, extendData interface{}) error // 需要回调的方法 - CanRunning chan (struct{}) // 是否允许执行(only single) - TaskId string // 任务ID 全局唯一键(only cluster) - ExtendData interface{} // 附加参数 - JobData *JobData // 任务时间数据 + Callback func(ctx context.Context, extendData any) error // 需要回调的方法 + CanRunning chan (struct{}) // 是否允许执行(only single) + TaskId string // 任务ID 全局唯一键 + ExtendData any // 附加参数 + JobData *JobData // 任务时间数据 } type JobType string