From 372033cfa316fe0ea67d5477fcb749d206896b2e Mon Sep 17 00:00:00 2001 From: Yun Date: Sun, 5 Oct 2025 16:30:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=94=AF=E6=8C=81cron?= =?UTF-8?q?=E8=A1=A8=E8=BE=BE=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 47 ++++++++++++++++++++++++++++++++++++++++++++--- cmd/main.go | 17 +++++++++++++++-- error.go | 4 ++++ go.mod | 1 + go.sum | 2 ++ leader/leader.go | 2 +- next_time.go | 38 ++++++++++++++++++++++++++++++++++++++ option.go | 42 ++++++++++++++++++++++++++++++++++++++++++ readme.md | 22 +++++++--------------- types.go | 27 ++++++++++++++++----------- 10 files changed, 170 insertions(+), 32 deletions(-) diff --git a/cluster.go b/cluster.go index 9bb00fc..6d60d58 100644 --- a/cluster.go +++ b/cluster.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" "github.com/yuninks/cachex" "github.com/yuninks/lockx" "github.com/yuninks/timerx/heartbeat" @@ -48,9 +49,10 @@ type Cluster struct { priorityKey string // 全局优先级的key usePriority bool // 是否使用优先级 - leader *leader.Leader // Leader - heartbeat *heartbeat.HeartBeat // 心跳 - cache *cachex.Cache // 本地缓存 + leader *leader.Leader // Leader + heartbeat *heartbeat.HeartBeat // 心跳 + cache *cachex.Cache // 本地缓存 + cronParser *cron.Parser // cron表达式解析器 } // 初始化定时器 @@ -85,6 +87,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin usePriority: op.usePriority, stopChan: make(chan struct{}), instanceId: U.String(), + cronParser: op.cronParser, } // 初始化优先级 @@ -346,6 +349,44 @@ func (c *Cluster) EverySpace(ctx context.Context, taskId string, spaceTime time. return c.addJob(ctx, taskId, jobData, callback, extendData) } +// 定时任务 +// 使用的是秒级cron表达式,可以使用Option设置cronParser +// @param ctx context.Context 上下文 +// @param taskId string 任务ID +// @param cronExpression string cron表达式 +// @param callback callback 回调函数 +// @param extendData interface{} 扩展数据 +// @return error +func (l *Cluster) Cron(ctx context.Context, taskId string, cronExpression string, callback func(ctx context.Context, extendData any) error, extendData any, opt ...Option) error { + nowTime := time.Now().In(l.location) + // 获取当天的零点时间 + zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) + + options := Options{} + for _, o := range opt { + o(&options) + } + cronParser := l.cronParser + if options.cronParser != nil { + cronParser = options.cronParser + } + + sche, err := GetCronSche(cronExpression, cronParser) + if err != nil { + l.logger.Errorf(ctx, "Cron cronExpression error:%s", err.Error()) + return err + } + + jobData := JobData{ + JobType: JobTypeCron, + BaseTime: zeroTime, // 默认当天的零点 + CronExpression: cronExpression, + CronSchedule: sche, + } + + return l.addJob(ctx, taskId, jobData, callback, extendData) +} + // 统一添加任务 // @param ctx context.Context 上下文 // @param taskId string 任务ID diff --git a/cmd/main.go b/cmd/main.go index 704e9f9..129376f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,6 +7,7 @@ import ( "time" "github.com/redis/go-redis/v9" + "github.com/robfig/cron/v3" "github.com/yuninks/timerx" "github.com/yuninks/timerx/priority" ) @@ -101,7 +102,7 @@ type OnceWorker struct{} func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, taskId string, attachData interface{}) *timerx.OnceWorkerResp { // 追加写入文件 - file, err := os.OpenFile("./test.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + file, err := os.OpenFile("./test3.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { panic(err) } @@ -142,7 +143,7 @@ func cluster() { // log := loggerx.NewLogger(ctx,loggerx.SetToConsole(),loggerx.SetEscapeHTML(false)) // _ = log - cluster, _ := timerx.InitCluster(ctx, client, "test", timerx.WithPriority(103)) + cluster, _ := timerx.InitCluster(ctx, client, "test2", timerx.WithPriority(104)) err := cluster.EverySpace(ctx, "test_space1", 1*time.Second, aa, "这是秒任务1") fmt.Println(err) err = cluster.EverySpace(ctx, "test_space2", 2*time.Second, aa, "这是秒任务2") @@ -166,6 +167,18 @@ func cluster() { fmt.Println(err) err = cluster.EveryDay(ctx, "test_day3", 10, 30, 30, aa, "这是天任务3") fmt.Println(err) + + // 默认秒级表达式 + err = cluster.Cron(ctx, "test_cron1", "*/5 * * * * ?", aa, "这是cron任务1") + fmt.Println(err) + err = cluster.Cron(ctx, "test_cron2", "0/5 * * * * ?", aa, "这是cron任务2") + fmt.Println("这是cron任务2:", err) + // 自定义解析器 + err = cluster.Cron(ctx, "test_cron3", "@every 2s", aa, "这是cron任务3", timerx.WithCronParserOption(cron.Descriptor)) + fmt.Println("这是cron任务3:", err) + // Linux标准解析器 + err = cluster.Cron(ctx, "test_cron4", "*/5 * * * *", aa, "这是cron任务4", timerx.WithCronParserLinux()) + fmt.Println("这是cron任务4:", err) } func worker() { diff --git a/error.go b/error.go index e36c84e..8e78f08 100644 --- a/error.go +++ b/error.go @@ -29,4 +29,8 @@ var ( ErrTaskIdExists = errors.New("taskId already exists") // 任务已执行 ErrTaskExecuted = errors.New("task already executed") + // cron表达式错误 + ErrCronExpression = errors.New("cron expression error") + // ErrCronParser 错误 + ErrCronParser = errors.New("cron parser error") ) diff --git a/go.mod b/go.mod index 1579eec..dd33d45 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24 require ( github.com/google/uuid v1.6.0 github.com/redis/go-redis/v9 v9.14.0 + github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.11.1 github.com/yuninks/cachex v1.0.5 github.com/yuninks/lockx v1.1.3 diff --git a/go.sum b/go.sum index 5a8b94b..ff4b110 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE= github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= diff --git a/leader/leader.go b/leader/leader.go index f325d98..34144e2 100644 --- a/leader/leader.go +++ b/leader/leader.go @@ -62,7 +62,7 @@ func InitLeader(ctx context.Context, ref redis.UniversalClient, keyPrefix string l.wg.Add(1) go l.leaderElection() - l.logger.Infof(l.ctx, "InitLeader InstanceId %s lockKey:%s", l.instanceId, l.leaderUniLockKey) + l.logger.Infof(l.ctx, "InitLeader InstanceId %s lockKey:%s leaderKey:%s", l.instanceId, l.leaderUniLockKey, l.leaderKey) return l, nil } diff --git a/next_time.go b/next_time.go index 59ba710..a845ba5 100644 --- a/next_time.go +++ b/next_time.go @@ -3,6 +3,8 @@ package timerx import ( "errors" "time" + + "github.com/robfig/cron/v3" ) // 计算该任务下次执行时间 @@ -32,6 +34,8 @@ func GetNextTime(t time.Time, job JobData) (*time.Time, error) { next, err = calculateNextMinuteTime(t, job) case JobTypeInterval: next, err = calculateNextInterval(t, job) + case JobTypeCron: + next, err = calculateNextCronTime(t, job) default: return nil, errors.New("未知的任务类型: " + string(job.JobType)) } @@ -73,6 +77,14 @@ func validateJobData(job JobData) error { if job.BaseTime.IsZero() { return ErrBaseTime } + case JobTypeCron: + if job.CronExpression == "" { + return ErrCronExpression + } + _, err := calculateNextCronTime(time.Now(), job) + if err != nil { + return err + } } if job.Hour < 0 || job.Hour > 23 { @@ -216,6 +228,32 @@ func calculateNextMinuteTime(t time.Time, job JobData) (*time.Time, error) { return &nextMinuteTime, nil } +// 计算cron任务下下次执行时间 +func calculateNextCronTime(t time.Time, job JobData) (*time.Time, error) { + if job.CronExpression == "" { + return nil, ErrCronExpression + } + + s := *job.CronSchedule + + next := s.Next(t) + return &next, nil +} + +func GetCronSche(CronExpression string, cronParser *cron.Parser) (*cron.Schedule, error) { + if CronExpression == "" { + return nil, ErrCronExpression + } + if cronParser == nil { + return nil, ErrCronParser + } + sche, err := cronParser.Parse(CronExpression) + if err != nil { + return nil, err + } + return &sche, nil +} + // 检查是否本周期可以运行 // 检查是否本周期可以运行(已弃用,使用新的时间比较逻辑) // 保留此函数用于向后兼容,但建议使用新的时间计算逻辑 diff --git a/option.go b/option.go index 9882233..b819b96 100644 --- a/option.go +++ b/option.go @@ -3,6 +3,7 @@ package timerx import ( "time" + "github.com/robfig/cron/v3" "github.com/yuninks/timerx/logger" ) @@ -14,9 +15,13 @@ type Options struct { priorityVal int64 batchSize int maxRetryCount int + cronParser *cron.Parser // cron表达式解析器 } func defaultOptions() Options { + + parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + return Options{ logger: logger.NewLogger(), location: time.Local, @@ -25,6 +30,7 @@ func defaultOptions() Options { priorityVal: 0, batchSize: 100, maxRetryCount: 0, + cronParser: &parser, } } @@ -84,3 +90,39 @@ func WithMaxRetryCount(count int) Option { o.maxRetryCount = count } } + +// 添加cron表达式解析器 +func WithCronParser(parser cron.Parser) Option { + return func(o *Options) { + o.cronParser = &parser + } +} + +// 设置cron表达式解析器 秒级 +// "*/5 * * * * ?" => 每隔5秒执行一次 +// "0 0 0 * * ?" => 每天零点执行一次 +// "0 0 0 1 * ?" => 每月1日零点执行一次 +// "0 */5 * * * ?" => 每隔5分钟执行一次 +func WithCronParserSecond() Option { + return func(o *Options) { + parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + o.cronParser = &parser + } +} + +// 设置cron表达式解析器 +// cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor +func WithCronParserOption(options cron.ParseOption) Option { + return func(o *Options) { + parser := cron.NewParser(options) + o.cronParser = &parser + } +} + +// Cron表达式 与Linux的定时任务兼容 +func WithCronParserLinux() Option { + return func(o *Options) { + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + o.cronParser = &parser + } +} diff --git a/readme.md b/readme.md index bef4517..9f1fb28 100644 --- a/readme.md +++ b/readme.md @@ -1,36 +1,28 @@ # 功能支持 -1. 支持本地任务 -2. 支持集群任务 -3. 支持单次任务 +1. [X] 支持本地任务 +2. [X] 支持集群任务 +3. [X] 支持单次任务 # 功能说明 - - # 功能实现 1. 集群间任务调度和任务的唯一依赖于redis进行实现 - # 缺陷 + 1. 针对月的任务,需要注意日期有效性,且在月末的最后一天,需要考虑月末的最后一天的下一个任务执行时间 +2. 集群部署时,存在新旧的代码混合问题,任务调度可能存在问题(需要根据实际需要进行版本上线/下线操作) - -1. 集群部署时,存在新旧的代码混合问题,任务调度可能存在问题(需要根据实际需要进行版本上线/下线操作) ## 方案一 + 1. 启动的时候定时向redis注册任务项 2. 每次计算执行时间的时候根据注册的任务项进行任务计算 3. 注册任务项需要有下线机制,避免能运行它的节点下线了它还被执行 - 现在有根据要求根据系统时间整点运行任务的要求,这个比简单的定时重复更复杂,因为不但要按时执行,并且不能重复执行,需要全局记录任务执行的状态,由于任务的间隔时间不确定,这个任务执行状态的保存周期也是有变化的 # 待实现 + - [ ] 允许执行完重置任务倒计时 - - - - - - diff --git a/types.go b/types.go index 0c54f5d..4c6d77e 100644 --- a/types.go +++ b/types.go @@ -3,6 +3,8 @@ package timerx import ( "context" "time" + + "github.com/robfig/cron/v3" ) type timerStr struct { @@ -23,20 +25,23 @@ const ( JobTypeEveryMinute JobType = "every_minute" // 每分钟 JobTypeEverySecond JobType = "every_second" // 每秒 JobTypeInterval JobType = "interval" // 指定时间间隔 + JobTypeCron JobType = "cron" // cron表达式 ) type JobData struct { - JobType JobType // 任务类型 - TaskId string // 任务ID 全局唯一键(only cluster) - NextTime time.Time // 下次执行时间 - BaseTime time.Time // 基准时间(间隔的基准时间) - IntervalTime time.Duration // 任务间隔时间 - Month time.Month // 每年的第几个月 - Weekday time.Weekday // 每周的周几 - Day int // 每月的第几天 - Hour int // 每天的第几个小时 - Minute int // 每小时的第几分钟 - Second int // 每分钟的第几秒 + JobType JobType // 任务类型 + TaskId string // 任务ID 全局唯一键(only cluster) + NextTime time.Time // 下次执行时间 + BaseTime time.Time // 基准时间(间隔的基准时间) + IntervalTime time.Duration // 任务间隔时间 + Month time.Month // 每年的第几个月 + Weekday time.Weekday // 每周的周几 + Day int // 每月的第几天 + Hour int // 每天的第几个小时 + Minute int // 每小时的第几分钟 + Second int // 每分钟的第几秒 + CronExpression string // cron表达式 + CronSchedule *cron.Schedule // cron表达式解析后的数据 } // 定义各个回调函数