diff --git a/cluster.go b/cluster.go index 6d60d58..36b8be5 100644 --- a/cluster.go +++ b/cluster.go @@ -254,6 +254,7 @@ func (c *Cluster) EveryMonth(ctx context.Context, taskId string, day int, hour i jobData := JobData{ JobType: JobTypeEveryMonth, + TaskId: taskId, // CreateTime: nowTime, Day: day, Hour: hour, @@ -261,7 +262,7 @@ func (c *Cluster) EveryMonth(ctx context.Context, taskId string, day int, hour i Second: second, } - return c.addJob(ctx, taskId, jobData, callback, extendData) + return c.addJob(ctx, jobData, callback, extendData) } // 每周执行一次 @@ -276,6 +277,7 @@ func (c *Cluster) EveryWeek(ctx context.Context, taskId string, week time.Weekda jobData := JobData{ JobType: JobTypeEveryWeek, + TaskId: taskId, // CreateTime: nowTime, Weekday: week, Hour: hour, @@ -283,7 +285,7 @@ func (c *Cluster) EveryWeek(ctx context.Context, taskId string, week time.Weekda Second: second, } - return c.addJob(ctx, taskId, jobData, callback, extendData) + return c.addJob(ctx, jobData, callback, extendData) } // 每天执行一次 @@ -292,13 +294,14 @@ func (c *Cluster) EveryDay(ctx context.Context, taskId string, hour int, minute jobData := JobData{ JobType: JobTypeEveryDay, + TaskId: taskId, // CreateTime: nowTime, Hour: hour, Minute: minute, Second: second, } - return c.addJob(ctx, taskId, jobData, callback, extendData) + return c.addJob(ctx, jobData, callback, extendData) } // 每小时执行一次 @@ -307,12 +310,13 @@ func (c *Cluster) EveryHour(ctx context.Context, taskId string, minute int, seco jobData := JobData{ JobType: JobTypeEveryHour, + TaskId: taskId, // CreateTime: nowTime, Minute: minute, Second: second, } - return c.addJob(ctx, taskId, jobData, callback, extendData) + return c.addJob(ctx, jobData, callback, extendData) } // 每分钟执行一次 @@ -321,11 +325,12 @@ func (c *Cluster) EveryMinute(ctx context.Context, taskId string, second int, ca jobData := JobData{ JobType: JobTypeEveryMinute, + TaskId: taskId, // CreateTime: nowTime, Second: second, } - return c.addJob(ctx, taskId, jobData, callback, extendData) + return c.addJob(ctx, jobData, callback, extendData) } // 特定时间间隔 @@ -342,11 +347,12 @@ func (c *Cluster) EverySpace(ctx context.Context, taskId string, spaceTime time. jobData := JobData{ JobType: JobTypeInterval, + TaskId: taskId, BaseTime: zeroTime, // 默认当天的零点 IntervalTime: spaceTime, } - return c.addJob(ctx, taskId, jobData, callback, extendData) + return c.addJob(ctx, jobData, callback, extendData) } // 定时任务 @@ -379,12 +385,13 @@ func (l *Cluster) Cron(ctx context.Context, taskId string, cronExpression string jobData := JobData{ JobType: JobTypeCron, + TaskId: taskId, BaseTime: zeroTime, // 默认当天的零点 CronExpression: cronExpression, CronSchedule: sche, } - return l.addJob(ctx, taskId, jobData, callback, extendData) + return l.addJob(ctx, jobData, callback, extendData) } // 统一添加任务 @@ -394,11 +401,11 @@ func (l *Cluster) Cron(ctx context.Context, taskId string, cronExpression string // @param callback callback 回调函数 // @param extendData interface{} 扩展数据 // @return error -func (l *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error { +func (l *Cluster) addJob(ctx context.Context, jobData JobData, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error { // 判断是否重复 - _, ok := l.workerList.Load(taskId) + _, ok := l.workerList.Load(jobData.TaskId) if ok { - l.logger.Errorf(ctx, "Cluster addJob taskId exits:%s", taskId) + l.logger.Errorf(ctx, "Cluster addJob taskId exits:%s", jobData.TaskId) return ErrTaskIdExists } @@ -412,13 +419,13 @@ func (l *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, ca t := timerStr{ Callback: callback, ExtendData: extendData, - TaskId: taskId, + TaskId: jobData.TaskId, JobData: &jobData, } - l.workerList.Store(taskId, t) + l.workerList.Store(jobData.TaskId, t) - l.logger.Infof(ctx, "Cluster addJob taskId:%s", taskId) + l.logger.Infof(ctx, "Cluster addJob taskId:%s", jobData.TaskId) return nil } diff --git a/cmd/main.go b/cmd/main.go index 129376f..ef4c28e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -169,9 +169,9 @@ func cluster() { fmt.Println(err) // 默认秒级表达式 - err = cluster.Cron(ctx, "test_cron1", "*/5 * * * * ?", aa, "这是cron任务1") + err = cluster.Cron(ctx, "test_cron1", "*/5 * * * * ?", aa, "这是cron任务1", timerx.WithCronParserSecond()) fmt.Println(err) - err = cluster.Cron(ctx, "test_cron2", "0/5 * * * * ?", aa, "这是cron任务2") + err = cluster.Cron(ctx, "test_cron2", "0/5 * * * * ?", aa, "这是cron任务2", timerx.WithCronParserSecond()) fmt.Println("这是cron任务2:", err) // 自定义解析器 err = cluster.Cron(ctx, "test_cron3", "@every 2s", aa, "这是cron任务3", timerx.WithCronParserOption(cron.Descriptor)) @@ -179,6 +179,9 @@ func cluster() { // Linux标准解析器 err = cluster.Cron(ctx, "test_cron4", "*/5 * * * *", aa, "这是cron任务4", timerx.WithCronParserLinux()) fmt.Println("这是cron任务4:", err) + // 仅符号解析器 + err = cluster.Cron(ctx, "test_cron5", "@every 5s", aa, "这是cron任务5", timerx.WithCronParserDescriptor()) + fmt.Println("这是cron任务5:", err) } func worker() { diff --git a/option.go b/option.go index b82d562..38959cf 100644 --- a/option.go +++ b/option.go @@ -127,3 +127,21 @@ func WithCronParserLinux() Option { o.cronParser = &parser } } + +// Cron表达式 符号 +// @yearly @annually => 每年执行一次,等同于 "0 0 0 1 1 *" +// @monthly => 每月执行一次,等同于 "0 0 0 1 * *" +// @weekly => 每周执行一次,等同于 "0 0 0 * * 0" +// @daily @midnight => 每天执行一次,等同于 "0 0 0 * * *" +// @hourly => 每小时执行一次,等同于 "0 0 * * * *" +// @minutely => 每分钟执行一次,等同于 "0 * * * * *" +// @secondly => 每秒执行一次,等同于 "* * * * * *" +// @every(time.Duration) => 每隔指定时间执行一次,等同于 "@every 5s" + +func WithCronParserDescriptor() Option { + return func(o *Options) { + parser := cron.NewParser(cron.Descriptor) + o.cronParser = &parser + } +} + diff --git a/single.go b/single.go index cbdbcc0..4a80ba0 100644 --- a/single.go +++ b/single.go @@ -11,6 +11,7 @@ import ( "time" "github.com/google/uuid" + "github.com/robfig/cron/v3" "github.com/yuninks/timerx/logger" ) @@ -31,6 +32,7 @@ type Single struct { stopChan chan struct{} hasRun sync.Map timeout time.Duration + cronParser *cron.Parser // cron表达式解析器 } // 定时器类 @@ -41,13 +43,14 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { ctx, cancel := context.WithCancel(ctx) sin := &Single{ - ctx: ctx, - cancel: cancel, - logger: op.logger, - location: op.location, - nextTime: time.Now(), - stopChan: make(chan struct{}), - timeout: op.timeout, + ctx: ctx, + cancel: cancel, + logger: op.logger, + location: op.location, + nextTime: time.Now(), + stopChan: make(chan struct{}), + timeout: op.timeout, + cronParser: op.cronParser, } sin.startDaemon() @@ -270,6 +273,36 @@ func (c *Single) EverySpace(ctx context.Context, taskId string, spaceTime time.D return c.addJob(ctx, jobData, callback, extendData) } +func (l *Single) Cron(ctx context.Context, taskId string, cronExpression string, callback func(ctx context.Context, extendData any) error, extendData any, opt ...Option) (int64, error) { + nowTime := time.Now().In(l.location) + // 获取当天的零点时间 + zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) + + options := Options{} + for _, o := range opt { + o(&options) + } + cronParser := l.cronParser + if options.cronParser != nil { + cronParser = options.cronParser + } + + sche, err := GetCronSche(cronExpression, cronParser) + if err != nil { + l.logger.Errorf(ctx, "timer Single Cron cronExpression error:%s", err.Error()) + return 0, err + } + + jobData := JobData{ + JobType: JobTypeCron, + TaskId: taskId, + BaseTime: zeroTime, // 默认当天的零点 + CronExpression: cronExpression, + CronSchedule: sche, + } + return l.addJob(ctx, jobData, callback, extendData) +} + // 间隔定时器 // @param space 间隔时间 // @param call 回调函数