diff --git a/cluster.go b/cluster.go index e106326..a3f0721 100644 --- a/cluster.go +++ b/cluster.go @@ -54,6 +54,8 @@ type Cluster struct { cache *cachex.Cache // 本地缓存 cronParser *cron.Parser // cron表达式解析器 batchSize int // 批量获取任务的数量 + workerChan chan struct{} // worker + maxWorkers int // 最大worker数量 } // 初始化定时器 @@ -85,16 +87,29 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合 - usePriority: op.usePriority, + usePriority: false, stopChan: make(chan struct{}), instanceId: U.String(), cronParser: op.cronParser, batchSize: op.batchSize, + workerChan: make(chan struct{}, op.maxWorkers), + maxWorkers: op.maxWorkers, } // 初始化优先级 - if clu.usePriority { + if op.priorityType != priorityTypeNone { + + clu.usePriority = true + + if op.priorityType == priorityTypeVersion { + pVal, err := priority.PriorityByVersion(op.priorityVersion) + if err != nil { + clu.logger.Errorf(ctx, "PriorityByVersion version:%s err:%v", op.priorityVersion, err) + return nil, err + } + op.priorityVal = pVal + } pri, err := priority.InitPriority( ctx, @@ -337,15 +352,14 @@ func (c *Cluster) EveryMinute(ctx context.Context, taskId string, second int, ca // 特定时间间隔 func (c *Cluster) EverySpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error { - nowTime := time.Now().In(c.location) if spaceTime < 0 { c.logger.Errorf(ctx, "间隔时间不能小于0") return errors.New("间隔时间不能小于0") } - // 获取当天的零点时间 - zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) + // 固定时间点为20250101 00:00:00,便于计算下一次执行时间 + zeroTime := time.Date(2025, 1, 1, 0, 0, 0, 0, c.location) jobData := JobData{ JobType: JobTypeInterval, @@ -366,9 +380,8 @@ func (c *Cluster) EverySpace(ctx context.Context, taskId string, spaceTime time. // @param extendData interface{} 扩展数据 // @return error func (l *Cluster) Cron(ctx context.Context, taskId string, cronExpression string, callback func(ctx context.Context, extendData any) error, extendData any, opt ...Option) error { - nowTime := time.Now().In(l.location) - // 获取当天的零点时间 - zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) + // 固定时间点为20250101 00:00:00,便于计算下一次执行时间 + zeroTime := time.Date(2025, 1, 1, 0, 0, 0, 0, l.location) options := newEmptyOptions(opt...) cronParser := l.cronParser @@ -519,34 +532,41 @@ func (c *Cluster) executeTasks() { defer c.wg.Done() for { + select { case <-c.stopChan: return case <-c.ctx.Done(): return - default: - if c.usePriority && !c.priority.IsLatest(c.ctx) { - time.Sleep(5 * time.Second) - continue - } + case c.workerChan <- struct{}{}: + func() { + defer func() { + <-c.workerChan + }() - taskID, err := c.redis.BLPop(c.ctx, 10*time.Second, c.listKey).Result() - if err != nil { - if err != redis.Nil { - c.logger.Errorf(c.ctx, "Failed to pop task: %v", err) - // Redis 异常,休眠一会儿 + if c.usePriority && !c.priority.IsLatest(c.ctx) { time.Sleep(5 * time.Second) + return } - continue - } - if len(taskID) < 2 { - c.logger.Errorf(c.ctx, "Invalid BLPop result: %v", taskID) - // 数据异常,继续下一个 - continue - } + taskID, err := c.redis.BLPop(c.ctx, 10*time.Second, c.listKey).Result() + if err != nil { + if err != redis.Nil { + c.logger.Errorf(c.ctx, "Failed to pop task: %v", err) + // Redis 异常,休眠一会儿 + time.Sleep(5 * time.Second) + } + return + } - go c.processTask(taskID[1]) + if len(taskID) < 2 { + c.logger.Errorf(c.ctx, "Invalid BLPop result: %v", taskID) + // 数据异常,继续下一个 + return + } + + go c.processTask(taskID[1]) + }() } } @@ -559,6 +579,7 @@ type ReJobData struct { // 执行任务 func (l *Cluster) processTask(taskId string) { + begin := time.Now() ctx, cancel := context.WithTimeout(l.ctx, l.timeout) diff --git a/example/cluster/main.go b/example/cluster/main.go index 90d8cc6..b4a5650 100644 --- a/example/cluster/main.go +++ b/example/cluster/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "path/filepath" "time" "github.com/redis/go-redis/v9" @@ -19,7 +20,7 @@ func main() { ops := []timerx.Option{} - clu, err := timerx.InitCluster(ctx, client, "cluster", ops...) + clu, err := timerx.InitCluster(ctx, client, "cluster_01", ops...) if err != nil { panic(err) } @@ -39,22 +40,43 @@ func main() { // space func space(ctx context.Context, clu *timerx.Cluster) { // 每秒执行一次 - err := clu.EverySpace(ctx, "space_test_second", 1*time.Second, callback, "space 这是秒任务") + err := clu.EverySpace(ctx, "space_test_second_1", 1*time.Second, callback, "space 这是秒任务") fmt.Println(err) + err = clu.EverySpace(ctx, "space_test_second_5", 5*time.Second, callback, "space 这是5秒任务") + fmt.Println(err) + // 每分钟执行一次 - err = clu.EverySpace(ctx, "space_test_minute", 1*time.Minute, callback, "space 这是分钟任务") + err = clu.EverySpace(ctx, "space_test_minute_1", 1*time.Minute, callback, "space 这是分钟任务") fmt.Println(err) + err = clu.EverySpace(ctx, "space_test_minute_5", 5*time.Minute, callback, "space 这是5分钟任务") + fmt.Println(err) + // 每小时执行一次 - err = clu.EverySpace(ctx, "space_test_hour", 1*time.Hour, callback, "space 这是小时任务") + err = clu.EverySpace(ctx, "space_test_hour_1", 1*time.Hour, callback, "space 这是小时任务") fmt.Println(err) + err = clu.EverySpace(ctx, "space_test_hour_2", 2*time.Hour, callback, "space 这是2小时任务") + fmt.Println(err) + err = clu.EverySpace(ctx, "space_test_hour_3", 3*time.Hour, callback, "space 这是3小时任务") + fmt.Println(err) + err = clu.EverySpace(ctx, "space_test_hour_4", 4*time.Hour, callback, "space 这是4小时任务") + fmt.Println(err) + err = clu.EverySpace(ctx, "space_test_hour_5", 5*time.Hour, callback, "space 这是5小时任务") + fmt.Println(err) + // 每天执行一次 - err = clu.EverySpace(ctx, "space_test_day", 24*time.Hour, callback, "space 这是天任务") + err = clu.EverySpace(ctx, "space_test_day_1", 24*time.Hour, callback, "space 这是天任务") fmt.Println(err) + err = clu.EverySpace(ctx, "space_test_day_2", 2*24*time.Hour, callback, "space 这是2天任务") + fmt.Println(err) + err = clu.EverySpace(ctx, "space_test_day_3", 3*24*time.Hour, callback, "space 这是3天任务") + fmt.Println(err) + // 每周执行一次 - err = clu.EverySpace(ctx, "space_test_week", 7*24*time.Hour, callback, "space 这是周任务") + err = clu.EverySpace(ctx, "space_test_week_1", 7*24*time.Hour, callback, "space 这是7天任务") fmt.Println(err) + // 每月执行一次 - err = clu.EverySpace(ctx, "space_test_month", 30*24*time.Hour, callback, "space 这是月任务") + err = clu.EverySpace(ctx, "space_test_month_1", 30*24*time.Hour, callback, "space 这是30天任务") fmt.Println(err) } @@ -252,6 +274,13 @@ func getRedis() *redis.Client { func callback(ctx context.Context, extendData any) error { fmt.Println("任务执行了", extendData, "时间:", time.Now().Format("2006-01-02 15:04:05")) + // 解析文件路径,每天一个文件 + path, _ := filepath.Abs("./") + // 拼接文件路径 + save_path = filepath.Join(path,"/cache/cluster/"+time.Now().Format("2006-01-02")+".log") + // 创建文件夹 + dir := filepath.Dir(save_path) + os.MkdirAll(dir, 0755) // 追加到文件 file, err := os.OpenFile(save_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) diff --git a/example/once/main.go b/example/once/main.go index 6c40c7d..f0f691b 100644 --- a/example/once/main.go +++ b/example/once/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "path/filepath" "time" "github.com/redis/go-redis/v9" @@ -20,13 +21,152 @@ const ( func main() { ctx := context.Background() - t := time.Now() + client := getRedis() - once, err := timerx.InitOnce(ctx, client, "once_test", &OnceWorker{}, timerx.WithBatchSize(1000)) + once, err := timerx.InitOnce(ctx, client, "once_01", &OnceWorker{}, timerx.WithBatchSize(1000)) if err != nil { panic(err) } + // intervalSaveTime(ctx, once) + // intervalSave(ctx, once) + // intervalcreateTask(ctx, once) + // intervalCreateTime(ctx, once) + // benchmarkJob(ctx, once) + stabilityTest(ctx, once) + + select {} + +} + +// 稳定性测试 +func stabilityTest(ctx context.Context, once *timerx.Once) { + + timer := time.NewTicker(time.Second) + + for { + select { + case t := <-timer.C: + fmt.Println("time:", t) + + str := t.Format("2006-01-02 15:04:05") + + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_0_%d", time.Now().Unix()), 0, "Create 间隔0s ["+str+"]") + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_1_%d", time.Now().Unix()), time.Second*1, "Create 间隔1s ["+str+"]") + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_10_%d", time.Now().Unix()), time.Second*10, "Create 间隔10s ["+str+"]") + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_15_%d", time.Now().Unix()), time.Second*15, "Create 间隔15s ["+str+"]") + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_30_%d", time.Now().Unix()), time.Second*30, "Create 间隔30s ["+str+"]") + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_60_%d", time.Now().Unix()), time.Second*60, "Create 间隔60s ["+str+"]") + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_120_%d", time.Now().Unix()), time.Second*120, "Create 间隔120s ["+str+"]") // 2分钟 + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_300_%d", time.Now().Unix()), time.Second*300, "Create 间隔300s ["+str+"]") // 5分钟 + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_900_%d", time.Now().Unix()), time.Second*900, "Create 间隔900s ["+str+"]") // 15分钟 + once.Create(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_1800_%d", time.Now().Unix()), time.Second*1800, "Create 间隔1800s ["+str+"]") // 30分钟 + + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_0_%d", time.Now().Unix()), 0, "Save 间隔0s ["+str+"]") + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_1_%d", time.Now().Unix()), time.Second*1, "Save 间隔1s ["+str+"]") + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_10_%d", time.Now().Unix()), time.Second*10, "Save 间隔10s ["+str+"]") + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_15_%d", time.Now().Unix()), time.Second*15, "Save 间隔15s ["+str+"]") + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_30_%d", time.Now().Unix()), time.Second*30, "Save 间隔30s ["+str+"]") + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_60_%d", time.Now().Unix()), time.Second*60, "Save 间隔60s ["+str+"]") + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_120_%d", time.Now().Unix()), time.Second*120, "Save 间隔120s ["+str+"]") // 2分钟 + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_300_%d", time.Now().Unix()), time.Second*300, "Save 间隔300s ["+str+"]") // 5分钟 + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_900_%d", time.Now().Unix()), time.Second*900, "Save 间隔900s ["+str+"]") // 15分钟 + once.Save(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_1800_%d", time.Now().Unix()), time.Second*1800, "Save 间隔1800s ["+str+"]") // 30分钟 + + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_0_%d", time.Now().Unix()), time.Now(), "CreateByTime 当前时间 ["+str+"]") + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_1_%d", time.Now().Unix()), time.Now().Add(time.Second*1), "CreateByTime 当前时间+1s ["+str+"]") + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_10_%d", time.Now().Unix()), time.Now().Add(time.Second*10), "CreateByTime 当前时间+10s ["+str+"]") + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_15_%d", time.Now().Unix()), time.Now().Add(time.Second*15), "CreateByTime 当前时间+15s ["+str+"]") + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_30_%d", time.Now().Unix()), time.Now().Add(time.Second*30), "CreateByTime 当前时间+30s ["+str+"]") + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_60_%d", time.Now().Unix()), time.Now().Add(time.Second*60), "CreateByTime 当前时间+60s ["+str+"]") + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_120_%d", time.Now().Unix()), time.Now().Add(time.Second*120), "CreateByTime 当前时间+120s ["+str+"]") // 2分钟 + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_300_%d", time.Now().Unix()), time.Now().Add(time.Second*300), "CreateByTime 当前时间+300s ["+str+"]") // 5分钟 + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_900_%d", time.Now().Unix()), time.Now().Add(time.Second*900), "CreateByTime 当前时间+900s ["+str+"]") // 15分钟 + once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_create_by_1800_%d", time.Now().Unix()), time.Now().Add(time.Second*1800), "CreateByTime 当前时间+1800s ["+str+"]") // 30分钟 + + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_0_%d", time.Now().Unix()), time.Now(), "SaveByTime 当前时间 ["+str+"]") + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_1_%d", time.Now().Unix()), time.Now().Add(time.Second*1), "SaveByTime 当前时间+1s ["+str+"]") + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_10_%d", time.Now().Unix()), time.Now().Add(time.Second*10), "SaveByTime 当前时间+10s ["+str+"]") + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_15_%d", time.Now().Unix()), time.Now().Add(time.Second*15), "SaveByTime 当前时间+15s ["+str+"]") + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_30_%d", time.Now().Unix()), time.Now().Add(time.Second*30), "SaveByTime 当前时间+30s ["+str+"]") + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_60_%d", time.Now().Unix()), time.Now().Add(time.Second*60), "SaveByTime 当前时间+60s ["+str+"]") + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_120_%d", time.Now().Unix()), time.Now().Add(time.Second*120), "SaveByTime 当前时间+120s ["+str+"]") // 2分钟 + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_300_%d", time.Now().Unix()), time.Now().Add(time.Second*300), "SaveByTime 当前时间+300s ["+str+"]") // 5分钟 + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_900_%d", time.Now().Unix()), time.Now().Add(time.Second*900), "SaveByTime 当前时间+900s ["+str+"]") // 15分钟 + once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("stabilityTest_task_save_by_1800_%d", time.Now().Unix()), time.Now().Add(time.Second*1800), "SaveByTime 当前时间+1800s ["+str+"]") // 30分钟 + } + } + +} + +// 指定时间测试 +func intervalSaveTime(ctx context.Context, once *timerx.Once) { + + beginTime := time.Now() + + for i := 1; i < 100; i++ { + + execTime := beginTime.Add(time.Second * time.Duration(i)) + + err := once.SaveByTime(ctx, timerx.OnceTaskType("intervalSaveTime"), fmt.Sprintf("intervalSaveTime_task_%d", i), execTime, fmt.Sprintf("任务数据_%d 预期时间%s", i, execTime.Format("2006-01-02 15:04:05"))) + fmt.Println(err) + } + +} + +// 间隔测试 +func intervalSave(ctx context.Context, once *timerx.Once) { + + beginTime := time.Now() + + for i := 1; i < 100; i++ { + + execTime := beginTime.Add(time.Second * time.Duration(i)) + + err := once.Save(ctx, timerx.OnceTaskType("intervalSaveTime"), fmt.Sprintf("intervalSaveTime_task_%d", i), time.Until(execTime), fmt.Sprintf("任务数据_%d 预期时间%s", i, execTime.Format("2006-01-02 15:04:05"))) + fmt.Println(err) + } + +} + +// 创建间隔测试 +func intervalcreateTask(ctx context.Context, once *timerx.Once) { + + beginTime := time.Now() + + for i := 1; i < 100; i++ { + + execTime := beginTime.Add(time.Second * time.Duration(i)) + + err := once.Create(ctx, timerx.OnceTaskType("intervalSaveTime"), fmt.Sprintf("intervalSaveTime_task_%d", i), time.Until(execTime), fmt.Sprintf("任务数据A_%d 预期时间%s", i, execTime.Format("2006-01-02 15:04:05"))) + fmt.Println(err) + err = once.Create(ctx, timerx.OnceTaskType("intervalSaveTime"), fmt.Sprintf("intervalSaveTime_task_%d", i), time.Until(execTime), fmt.Sprintf("任务数据B_%d 预期时间%s", i, execTime.Format("2006-01-02 15:04:05"))) + fmt.Println(err) + } + +} + +func intervalCreateTime(ctx context.Context, once *timerx.Once) { + + beginTime := time.Now() + + for i := 1; i < 100; i++ { + + execTime := beginTime.Add(time.Second * time.Duration(i)) + + err := once.CreateByTime(ctx, timerx.OnceTaskType("intervalSaveTime"), fmt.Sprintf("intervalSaveTime_task_%d", i), execTime, fmt.Sprintf("任务数据A_%d 预期时间%s", i, execTime.Format("2006-01-02 15:04:05"))) + fmt.Println(err) + err = once.CreateByTime(ctx, timerx.OnceTaskType("intervalSaveTime"), fmt.Sprintf("intervalSaveTime_task_%d", i), execTime, fmt.Sprintf("任务数据B_%d 预期时间%s", i, execTime.Format("2006-01-02 15:04:05"))) + fmt.Println(err) + } + +} + +// 压力测试 +func benchmarkJob(ctx context.Context, once *timerx.Once) { + + t := time.Now() + ch := make(chan ChanStatus, 1000) go func() { @@ -35,7 +175,7 @@ func main() { for status := range ch { // fmt.Println("协程", a, "处理任务", status) // time.Sleep(10 * time.Millisecond) // 模拟处理时间 - err = once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("task_%d_%d", status.I, status.J), status.T, fmt.Sprintf("任务数据_%d_%d 预期时间%s", status.I, status.J, status.T.Format("2006-01-02 15:04:05"))) + err := once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("task_%d_%d", status.I, status.J), status.T, fmt.Sprintf("任务数据_%d_%d 预期时间%s", status.I, status.J, status.T.Format("2006-01-02 15:04:05"))) if err != nil { fmt.Println("保存任务失败:", err) } @@ -58,9 +198,6 @@ func main() { } } }() - - select {} - } type ChanStatus struct { @@ -95,6 +232,14 @@ func callback(ctx context.Context, extendData any) error { fmt.Println("任务执行了", extendData, "时间:", time.Now().Format("2006-01-02 15:04:05")) + // 解析文件路径,每天一个文件 + path, _ := filepath.Abs("./") + // 拼接文件路径 + save_path = filepath.Join(path, "/cache/once/"+time.Now().Format("2006-01-02")+".log") + // 创建文件夹 + dir := filepath.Dir(save_path) + os.MkdirAll(dir, 0755) + // 追加到文件 file, err := os.OpenFile(save_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { @@ -102,7 +247,7 @@ func callback(ctx context.Context, extendData any) error { return err } defer file.Close() - _, err = file.WriteString(fmt.Sprintf("执行时间:%v %s\n", extendData, time.Now().Format("2006-01-02 15:04:05"))) + _, err = file.WriteString(fmt.Sprintf("执行时间:%v [%s] \n", extendData, time.Now().Format("2006-01-02 15:04:05"))) if err != nil { fmt.Println("写入文件失败:", err) return err diff --git a/example/single/main.go b/example/single/main.go index 58f9938..d9ba9a3 100644 --- a/example/single/main.go +++ b/example/single/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "path/filepath" "time" "github.com/redis/go-redis/v9" @@ -35,22 +36,44 @@ func main() { // space func space(ctx context.Context, clu *timerx.Single) { // 每秒执行一次 - _, err := clu.EverySpace(ctx, "space_test_second", 1*time.Second, callback, "space 这是秒任务") + _, err := clu.EverySpace(ctx, "space_test_second_1", 1*time.Second, callback, "space 这是秒任务") fmt.Println(err) + _, err = clu.EverySpace(ctx, "space_test_second_5", 5*time.Second, callback, "space 这是5秒任务") + fmt.Println(err) + // 每分钟执行一次 - _, err = clu.EverySpace(ctx, "space_test_minute", 1*time.Minute, callback, "space 这是分钟任务") + _, err = clu.EverySpace(ctx, "space_test_minute_1", 1*time.Minute, callback, "space 这是分钟任务") fmt.Println(err) + _, err = clu.EverySpace(ctx, "space_test_minute_5", 5*time.Minute, callback, "space 这是5分钟任务") + fmt.Println(err) + // 每小时执行一次 - _, err = clu.EverySpace(ctx, "space_test_hour", 1*time.Hour, callback, "space 这是小时任务") + _, err = clu.EverySpace(ctx, "space_test_hour_1", 1*time.Hour, callback, "space 这是小时任务") fmt.Println(err) + _, err = clu.EverySpace(ctx, "space_test_hour_2", 2*time.Hour, callback, "space 这是2小时任务") + fmt.Println(err) + _, err = clu.EverySpace(ctx, "space_test_hour_3", 3*time.Hour, callback, "space 这是3小时任务") + fmt.Println(err) + _, err = clu.EverySpace(ctx, "space_test_hour_4", 4*time.Hour, callback, "space 这是4小时任务") + fmt.Println(err) + _, err = clu.EverySpace(ctx, "space_test_hour_5", 5*time.Hour, callback, "space 这是5小时任务") + fmt.Println(err) + // 每天执行一次 - _, err = clu.EverySpace(ctx, "space_test_day", 24*time.Hour, callback, "space 这是天任务") + _, err = clu.EverySpace(ctx, "space_test_day_1", 24*time.Hour, callback, "space 这是天任务") fmt.Println(err) + _, err = clu.EverySpace(ctx, "space_test_day_2", 2*24*time.Hour, callback, "space 这是2天任务") + fmt.Println(err) + _, err = clu.EverySpace(ctx, "space_test_day_3", 3*24*time.Hour, callback, "space 这是3天任务") + fmt.Println(err) + // 每周执行一次 - _, err = clu.EverySpace(ctx, "space_test_week", 7*24*time.Hour, callback, "space 这是周任务") + _, err = clu.EverySpace(ctx, "space_test_week_1", 7*24*time.Hour, callback, "space 这是周任务") fmt.Println(err) + + // 每月执行一次 - _, err = clu.EverySpace(ctx, "space_test_month", 30*24*time.Hour, callback, "space 这是月任务") + _, err = clu.EverySpace(ctx, "space_test_month_1", 30*24*time.Hour, callback, "space 这是月任务") fmt.Println(err) } @@ -249,6 +272,14 @@ func callback(ctx context.Context, extendData any) error { fmt.Println("任务执行了", extendData, "时间:", time.Now().Format("2006-01-02 15:04:05")) + // 解析文件路径,每天一个文件 + path, _ := filepath.Abs("./") + // 拼接文件路径 + save_path = filepath.Join(path, "/cache/single/"+time.Now().Format("2006-01-02")+".log") + // 创建文件夹 + dir := filepath.Dir(save_path) + os.MkdirAll(dir, 0755) + // 追加到文件 file, err := os.OpenFile(save_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { diff --git a/go.mod b/go.mod index dd33d45..86a9323 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,8 @@ require ( github.com/redis/go-redis/v9 v9.14.0 github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.11.1 - github.com/yuninks/cachex v1.0.5 - github.com/yuninks/lockx v1.1.3 + github.com/yuninks/cachex v1.0.6 + github.com/yuninks/lockx v1.1.4 ) require ( diff --git a/go.sum b/go.sum index ff4b110..bf39558 100644 --- a/go.sum +++ b/go.sum @@ -25,10 +25,10 @@ github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM= -github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM= -github.com/yuninks/lockx v1.1.3 h1:OA6rb4/XOj+M+1vKLs8fqsU4ZCadvga+oARTWTHpwx0= -github.com/yuninks/lockx v1.1.3/go.mod h1:+HyRozwQHMHrykyOFlotV4Z+z2yrgRSdDl8TxxRMFzw= +github.com/yuninks/cachex v1.0.6 h1:G5dJ8O0gLAGnLwydEHVCSSGv+p1z8KfZdjb5NdxxsVA= +github.com/yuninks/cachex v1.0.6/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM= +github.com/yuninks/lockx v1.1.4 h1:Wrd4aAU5apNWAamZM7yqoNlwHDjGY/X1YYrLXcgj1+s= +github.com/yuninks/lockx v1.1.4/go.mod h1:+HyRozwQHMHrykyOFlotV4Z+z2yrgRSdDl8TxxRMFzw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/once.go b/once.go index 570652e..cb28ed1 100644 --- a/once.go +++ b/once.go @@ -52,7 +52,9 @@ type Once struct { keySeparator string // 分割符 timeout time.Duration // 任务执行超时时间 - maxRunCount int // 最大重试次数 0代表不限 + maxRunCount int // 最大重试次数 0代表不限 + workerChan chan struct{} // worker + maxWorkers int // 最大worker数量 } type OnceWorkerResp struct { @@ -116,7 +118,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c listKey: "timer:once_listkey" + keyPrefix, executeInfoKey: "timer:once_executeInfoKey" + keyPrefix, globalLockPrefix: "timer:once_globalLockPrefix" + keyPrefix, - usePriority: op.usePriority, + usePriority: false, redis: re, worker: call, keyPrefix: keyPrefix, @@ -126,10 +128,24 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c keySeparator: "[:]", timeout: op.timeout, maxRunCount: op.maxRunCount, + workerChan: make(chan struct{}, op.maxWorkers), + maxWorkers: op.maxWorkers, } // 初始化优先级 - if wo.usePriority { + if op.priorityType != priorityTypeNone { + + wo.usePriority = true + + if op.priorityType == priorityTypeVersion { + pVal, err := priority.PriorityByVersion(op.priorityVersion) + if err != nil { + wo.logger.Errorf(ctx, "PriorityByVersion version:%s err:%v", op.priorityVersion, err) + return nil, err + } + op.priorityVal = pVal + } + pri, err := priority.InitPriority( ctx, re, @@ -280,38 +296,40 @@ func (l *Once) executeTasks() { for { - if l.usePriority { - if !l.priority.IsLatest(l.ctx) { - time.Sleep(time.Second * 5) - continue - } - } - select { case <-l.stopChan: return case <-l.ctx.Done(): return - default: + case l.workerChan <- struct{}{}: + func() { + defer func() { + <-l.workerChan + }() - keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() - if err != nil { - if err != redis.Nil { - l.logger.Errorf(l.ctx, "Failed to pop task: %v", err) - // Redis 异常,休眠一会儿再重试 + if l.usePriority && !l.priority.IsLatest(l.ctx) { time.Sleep(time.Second * 5) + return } - continue - } - if len(keys) < 2 { - l.logger.Errorf(l.ctx, "Invalid task data: %v", keys) - // 数据异常,继续下一个 - continue - } - // 处理任务 - go l.processTask(keys[1]) + keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() + if err != nil { + if err != redis.Nil { + l.logger.Errorf(l.ctx, "Failed to pop task: %v", err) + // Redis 异常,休眠一会儿再重试 + time.Sleep(time.Second * 5) + } + return + } + if len(keys) < 2 { + l.logger.Errorf(l.ctx, "Invalid task data: %v", keys) + // 数据异常,继续下一个 + return + } + // 处理任务 + go l.processTask(keys[1]) + }() } } @@ -338,6 +356,9 @@ func (l *Once) parseRedisKey(key string) (OnceTaskType, string, error) { // @param delayTime time.Duration 延迟时间 // @param attachData interface{} 附加数据 func (l *Once) Save(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error { + if delayTime < 0 { + delayTime = 0 + } execTime := time.Now().Add(delayTime) return l.save(ctx, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0) } @@ -414,8 +435,7 @@ func (w *Once) save(ctx context.Context, jobType jobType, taskType OnceTaskType, // 添加任务(不覆盖) func (l *Once) Create(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error { if delayTime <= 0 { - l.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v", taskType, taskId, attachData) - return ErrDelayTime + delayTime = 0 } execTime := time.Now().Add(delayTime) return l.create(ctx, createSourceDefault, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0) @@ -523,6 +543,7 @@ func (l *Once) batchGetTasks() { // 执行任务 func (l *Once) processTask(key string) { + begin := time.Now() ctx, cancel := context.WithTimeout(l.ctx, l.timeout) diff --git a/option.go b/option.go index 33a6116..c96373c 100644 --- a/option.go +++ b/option.go @@ -8,30 +8,41 @@ import ( ) type Options struct { - logger logger.Logger - location *time.Location - timeout time.Duration // 任务最长执行时间 - usePriority bool - priorityVal int64 - batchSize int - maxRunCount int // 单个任务最大运行次数 0 代表不限 - cronParser *cron.Parser // cron表达式解析器 + logger logger.Logger + location *time.Location + timeout time.Duration // 任务最长执行时间 + priorityType priorityType // 策略类型 0.不使用 1.优先级 2.版本 + priorityVal int64 // 策略优先级 + priorityVersion string // 策略版本的集 + batchSize int + maxRunCount int // 单个任务最大运行次数 0代表不限 + maxWorkers int // 最大工作协程数 + cronParser *cron.Parser // cron表达式解析器 } +type priorityType int8 + +const ( + priorityTypeNone priorityType = 0 // 不使用优先级 + priorityTypePriority priorityType = 1 + priorityTypeVersion priorityType = 2 // 版本 +) + func defaultOptions() Options { // 默认使用Linux的定时任务兼容 parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) return Options{ - logger: logger.NewLogger(), - location: time.Local, - timeout: time.Hour, // - usePriority: false, - priorityVal: 0, - batchSize: 100, - maxRunCount: 0, - cronParser: &parser, + logger: logger.NewLogger(), + location: time.Local, + timeout: time.Hour, // + priorityType: priorityTypeNone, + priorityVal: 0, + batchSize: 100, + maxRunCount: 0, + maxWorkers: 100, + cronParser: &parser, } } @@ -79,11 +90,18 @@ func WithTimeout(d time.Duration) Option { // 设置优先级 func WithPriority(priority int64) Option { return func(o *Options) { - o.usePriority = true + o.priorityType = priorityTypePriority o.priorityVal = priority } } +func WithPriorityByVersion(version string) Option { + return func(o *Options) { + o.priorityType = priorityTypeVersion + o.priorityVersion = version + } +} + func WithBatchSize(size int) Option { return func(o *Options) { if size <= 1 { @@ -102,6 +120,15 @@ func WithMaxRetryCount(count int) Option { } } +func WithMaxWorkers(count int) Option { + return func(o *Options) { + if count < 0 { + count = 10 + } + o.maxWorkers = count + } +} + // 添加cron表达式解析器 func WithCronParser(parser cron.Parser) Option { return func(o *Options) { diff --git a/priority/version_test.go b/priority/version_test.go index 9bc310c..7f83959 100644 --- a/priority/version_test.go +++ b/priority/version_test.go @@ -6,7 +6,6 @@ import ( "github.com/yuninks/timerx/priority" ) - func TestVersionToPriority(t *testing.T) { tests := []struct { name string @@ -20,6 +19,18 @@ func TestVersionToPriority(t *testing.T) { want: 1002003000000, wantErr: false, }, + { + name: "standard version0", + version: "0.0.0", + want: 0, + wantErr: false, + }, + { + name: "standard version1", + version: "1.0.0", + want: 1000000000000, + wantErr: false, + }, { name: "version with v prefix", version: "v1.2.3", diff --git a/readme.md b/readme.md index 9f1fb28..35bfc56 100644 --- a/readme.md +++ b/readme.md @@ -14,6 +14,7 @@ 1. 针对月的任务,需要注意日期有效性,且在月末的最后一天,需要考虑月末的最后一天的下一个任务执行时间 2. 集群部署时,存在新旧的代码混合问题,任务调度可能存在问题(需要根据实际需要进行版本上线/下线操作) +3. 主从切换时也要做到平滑上下线 ## 方案一 diff --git a/single.go b/single.go index 4a80ba0..01a6333 100644 --- a/single.go +++ b/single.go @@ -27,12 +27,12 @@ type Single struct { nextTime time.Time nextTimeMux sync.RWMutex wg sync.WaitGroup - workerList sync.Map - timerIndex int64 - stopChan chan struct{} - hasRun sync.Map - timeout time.Duration - cronParser *cron.Parser // cron表达式解析器 + workerList sync.Map // 任务列表,key为taskId,value为worker + timerIndex int64 // 任务索引,用于生成taskId + stopChan chan struct{} // 停止信号 + hasRun sync.Map // 记录已经执行的任务,key为taskId,value为执行时间 + timeout time.Duration // 单次任务超时时间 + cronParser *cron.Parser // cron表达式解析器 } // 定时器类 @@ -252,15 +252,14 @@ func (c *Single) EveryMinute(ctx context.Context, taskId string, second int, cal // 特定时间间隔 func (c *Single) EverySpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { - nowTime := time.Now().In(c.location) if spaceTime < 0 { c.logger.Errorf(ctx, "间隔时间不能小于0") return 0, ErrIntervalTime } - // 获取当天的零点时间 - zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) + // 固定时间点为20250101 00:00:00,便于计算下一次执行时间 + zeroTime := time.Date(2025, 1, 1, 0, 0, 0, 0, c.location) jobData := JobData{ JobType: JobTypeInterval, @@ -274,9 +273,8 @@ func (c *Single) EverySpace(ctx context.Context, taskId string, spaceTime time.D } func (l *Single) Cron(ctx context.Context, taskId string, cronExpression string, callback func(ctx context.Context, extendData any) error, extendData any, opt ...Option) (int64, error) { - nowTime := time.Now().In(l.location) - // 获取当天的零点时间 - zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) + // 固定时间点为20250101 00:00:00,便于计算下一次执行时间 + zeroTime := time.Date(2025, 1, 1, 0, 0, 0, 0, l.location) options := Options{} for _, o := range opt {