本地定时任务

This commit is contained in:
Yun
2025-10-02 17:54:38 +08:00
parent 304d27e0ac
commit 2d6e77352f
4 changed files with 34 additions and 34 deletions
-1
View File
@@ -5,7 +5,6 @@ go 1.24
require ( require (
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/satori/go.uuid v1.2.0
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
github.com/yuninks/cachex v1.0.5 github.com/yuninks/cachex v1.0.5
github.com/yuninks/lockx v1.1.2 github.com/yuninks/lockx v1.1.2
-2
View File
@@ -23,8 +23,6 @@ github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+29 -26
View File
@@ -11,7 +11,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
uuid "github.com/satori/go.uuid" "github.com/google/uuid"
"github.com/yuninks/timerx/logger" "github.com/yuninks/timerx/logger"
) )
@@ -68,20 +68,13 @@ func (l *Single) startDaemon() {
// 停止所有定时任务 // 停止所有定时任务
func (s *Single) Stop() { func (s *Single) Stop() {
close(s.stopChan)
if s.cancel != nil { if s.cancel != nil {
s.cancel() s.cancel()
} }
close(s.stopChan)
s.wg.Wait()
// 清理所有资源 s.wg.Wait()
s.workerList.Range(func(k, v interface{}) bool {
if timer, ok := v.(timerStr); ok {
close(timer.CanRunning)
}
s.workerList.Delete(k)
return true
})
} }
// 获取任务数量 // 获取任务数量
@@ -99,8 +92,8 @@ func (l *Single) MaxIndex() int64 {
} }
// 定时器主循环 // 定时器主循环
func (s *Single) timerLoop() { func (l *Single) timerLoop() {
defer s.wg.Done() defer l.wg.Done()
ticker := time.NewTicker(100 * time.Millisecond) // 提高精度到100ms ticker := time.NewTicker(100 * time.Millisecond) // 提高精度到100ms
defer ticker.Stop() defer ticker.Stop()
@@ -108,21 +101,21 @@ func (s *Single) timerLoop() {
for { for {
select { select {
case t := <-ticker.C: case t := <-ticker.C:
s.nextTimeMux.RLock() l.nextTimeMux.RLock()
nextTime := s.nextTime nextTime := l.nextTime
s.nextTimeMux.RUnlock() l.nextTimeMux.RUnlock()
if t.Before(nextTime) { if t.Before(nextTime) {
continue continue
} }
s.iterator(s.ctx) l.iterator(l.ctx)
case <-s.ctx.Done(): case <-l.ctx.Done():
s.logger.Infof(s.ctx, "timer: context cancelled, stopping timer loop") l.logger.Infof(l.ctx, "timer: context cancelled, stopping timer loop")
return return
case <-s.stopChan: case <-l.stopChan:
s.logger.Infof(s.ctx, "timer: received stop signal, stopping timer loop") l.logger.Infof(l.ctx, "timer: received stop signal, stopping timer loop")
return return
} }
} }
@@ -343,14 +336,21 @@ func (s *Single) updateNextTimeIfEarlier(candidate time.Time) {
// 删除定时器 // 删除定时器
func (l *Single) Del(index int64) { func (l *Single) Del(index int64) {
if val, ok := l.workerList.Load(index); ok { if _, ok := l.workerList.Load(index); ok {
if timer, ok := val.(timerStr); ok {
close(timer.CanRunning)
}
l.workerList.Delete(index) l.workerList.Delete(index)
} }
} }
func (l *Single) DelByTaskId(taskId string) {
l.workerList.Range(func(k, v interface{}) bool {
timeStr, ok := v.(timerStr)
if ok && timeStr.TaskId == taskId {
l.workerList.Delete(k)
}
return true
})
}
// 迭代定时器列表 // 迭代定时器列表
func (l *Single) iterator(ctx context.Context) { func (l *Single) iterator(ctx context.Context) {
// 当前时间 // 当前时间
@@ -399,7 +399,10 @@ func (l *Single) iterator(ctx context.Context) {
// 执行任务 // 执行任务
func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) { func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) {
// 创建带追踪ID的上下文 // 创建带追踪ID的上下文
traceCtx := context.WithValue(ctx, "trace_id", uuid.NewV4().String())
u, _ := uuid.NewV7()
traceCtx := context.WithValue(ctx, "trace_id", u.String())
s.logger.Infof(traceCtx, "timer Single begin taskId:%s originTime:%d", timer.TaskId, originTime.UnixMilli()) s.logger.Infof(traceCtx, "timer Single begin taskId:%s originTime:%d", timer.TaskId, originTime.UnixMilli())
traceCtx, cancel := context.WithTimeout(traceCtx, s.timeout) // 设置执行超时 traceCtx, cancel := context.WithTimeout(traceCtx, s.timeout) // 设置执行超时
defer cancel() defer cancel()
+5 -5
View File
@@ -6,11 +6,11 @@ import (
) )
type timerStr struct { type timerStr struct {
Callback func(ctx context.Context, extendData interface{}) error // 需要回调的方法 Callback func(ctx context.Context, extendData any) error // 需要回调的方法
CanRunning chan (struct{}) // 是否允许执行(only single) CanRunning chan (struct{}) // 是否允许执行(only single)
TaskId string // 任务ID 全局唯一键(only cluster) TaskId string // 任务ID 全局唯一键
ExtendData interface{} // 附加参数 ExtendData any // 附加参数
JobData *JobData // 任务时间数据 JobData *JobData // 任务时间数据
} }
type JobType string type JobType string