本地定时任务添加Cron
This commit is contained in:
+20
-13
@@ -254,6 +254,7 @@ func (c *Cluster) EveryMonth(ctx context.Context, taskId string, day int, hour i
|
|||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
JobType: JobTypeEveryMonth,
|
JobType: JobTypeEveryMonth,
|
||||||
|
TaskId: taskId,
|
||||||
// CreateTime: nowTime,
|
// CreateTime: nowTime,
|
||||||
Day: day,
|
Day: day,
|
||||||
Hour: hour,
|
Hour: hour,
|
||||||
@@ -261,7 +262,7 @@ func (c *Cluster) EveryMonth(ctx context.Context, taskId string, day int, hour i
|
|||||||
Second: second,
|
Second: second,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.addJob(ctx, taskId, jobData, callback, extendData)
|
return c.addJob(ctx, jobData, callback, extendData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 每周执行一次
|
// 每周执行一次
|
||||||
@@ -276,6 +277,7 @@ func (c *Cluster) EveryWeek(ctx context.Context, taskId string, week time.Weekda
|
|||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
JobType: JobTypeEveryWeek,
|
JobType: JobTypeEveryWeek,
|
||||||
|
TaskId: taskId,
|
||||||
// CreateTime: nowTime,
|
// CreateTime: nowTime,
|
||||||
Weekday: week,
|
Weekday: week,
|
||||||
Hour: hour,
|
Hour: hour,
|
||||||
@@ -283,7 +285,7 @@ func (c *Cluster) EveryWeek(ctx context.Context, taskId string, week time.Weekda
|
|||||||
Second: second,
|
Second: second,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.addJob(ctx, taskId, jobData, callback, extendData)
|
return c.addJob(ctx, jobData, callback, extendData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 每天执行一次
|
// 每天执行一次
|
||||||
@@ -292,13 +294,14 @@ func (c *Cluster) EveryDay(ctx context.Context, taskId string, hour int, minute
|
|||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
JobType: JobTypeEveryDay,
|
JobType: JobTypeEveryDay,
|
||||||
|
TaskId: taskId,
|
||||||
// CreateTime: nowTime,
|
// CreateTime: nowTime,
|
||||||
Hour: hour,
|
Hour: hour,
|
||||||
Minute: minute,
|
Minute: minute,
|
||||||
Second: second,
|
Second: second,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.addJob(ctx, taskId, jobData, callback, extendData)
|
return c.addJob(ctx, jobData, callback, extendData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 每小时执行一次
|
// 每小时执行一次
|
||||||
@@ -307,12 +310,13 @@ func (c *Cluster) EveryHour(ctx context.Context, taskId string, minute int, seco
|
|||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
JobType: JobTypeEveryHour,
|
JobType: JobTypeEveryHour,
|
||||||
|
TaskId: taskId,
|
||||||
// CreateTime: nowTime,
|
// CreateTime: nowTime,
|
||||||
Minute: minute,
|
Minute: minute,
|
||||||
Second: second,
|
Second: second,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.addJob(ctx, taskId, jobData, callback, extendData)
|
return c.addJob(ctx, jobData, callback, extendData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 每分钟执行一次
|
// 每分钟执行一次
|
||||||
@@ -321,11 +325,12 @@ func (c *Cluster) EveryMinute(ctx context.Context, taskId string, second int, ca
|
|||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
JobType: JobTypeEveryMinute,
|
JobType: JobTypeEveryMinute,
|
||||||
|
TaskId: taskId,
|
||||||
// CreateTime: nowTime,
|
// CreateTime: nowTime,
|
||||||
Second: second,
|
Second: second,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.addJob(ctx, taskId, jobData, callback, extendData)
|
return c.addJob(ctx, jobData, callback, extendData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 特定时间间隔
|
// 特定时间间隔
|
||||||
@@ -342,11 +347,12 @@ func (c *Cluster) EverySpace(ctx context.Context, taskId string, spaceTime time.
|
|||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
JobType: JobTypeInterval,
|
JobType: JobTypeInterval,
|
||||||
|
TaskId: taskId,
|
||||||
BaseTime: zeroTime, // 默认当天的零点
|
BaseTime: zeroTime, // 默认当天的零点
|
||||||
IntervalTime: spaceTime,
|
IntervalTime: spaceTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.addJob(ctx, taskId, jobData, callback, extendData)
|
return c.addJob(ctx, jobData, callback, extendData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 定时任务
|
// 定时任务
|
||||||
@@ -379,12 +385,13 @@ func (l *Cluster) Cron(ctx context.Context, taskId string, cronExpression string
|
|||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
JobType: JobTypeCron,
|
JobType: JobTypeCron,
|
||||||
|
TaskId: taskId,
|
||||||
BaseTime: zeroTime, // 默认当天的零点
|
BaseTime: zeroTime, // 默认当天的零点
|
||||||
CronExpression: cronExpression,
|
CronExpression: cronExpression,
|
||||||
CronSchedule: sche,
|
CronSchedule: sche,
|
||||||
}
|
}
|
||||||
|
|
||||||
return l.addJob(ctx, taskId, jobData, callback, extendData)
|
return l.addJob(ctx, jobData, callback, extendData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 统一添加任务
|
// 统一添加任务
|
||||||
@@ -394,11 +401,11 @@ func (l *Cluster) Cron(ctx context.Context, taskId string, cronExpression string
|
|||||||
// @param callback callback 回调函数
|
// @param callback callback 回调函数
|
||||||
// @param extendData interface{} 扩展数据
|
// @param extendData interface{} 扩展数据
|
||||||
// @return error
|
// @return error
|
||||||
func (l *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
|
func (l *Cluster) addJob(ctx context.Context, jobData JobData, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
|
||||||
// 判断是否重复
|
// 判断是否重复
|
||||||
_, ok := l.workerList.Load(taskId)
|
_, ok := l.workerList.Load(jobData.TaskId)
|
||||||
if ok {
|
if ok {
|
||||||
l.logger.Errorf(ctx, "Cluster addJob taskId exits:%s", taskId)
|
l.logger.Errorf(ctx, "Cluster addJob taskId exits:%s", jobData.TaskId)
|
||||||
return ErrTaskIdExists
|
return ErrTaskIdExists
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -412,13 +419,13 @@ func (l *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, ca
|
|||||||
t := timerStr{
|
t := timerStr{
|
||||||
Callback: callback,
|
Callback: callback,
|
||||||
ExtendData: extendData,
|
ExtendData: extendData,
|
||||||
TaskId: taskId,
|
TaskId: jobData.TaskId,
|
||||||
JobData: &jobData,
|
JobData: &jobData,
|
||||||
}
|
}
|
||||||
|
|
||||||
l.workerList.Store(taskId, t)
|
l.workerList.Store(jobData.TaskId, t)
|
||||||
|
|
||||||
l.logger.Infof(ctx, "Cluster addJob taskId:%s", taskId)
|
l.logger.Infof(ctx, "Cluster addJob taskId:%s", jobData.TaskId)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
+5
-2
@@ -169,9 +169,9 @@ func cluster() {
|
|||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
|
|
||||||
// 默认秒级表达式
|
// 默认秒级表达式
|
||||||
err = cluster.Cron(ctx, "test_cron1", "*/5 * * * * ?", aa, "这是cron任务1")
|
err = cluster.Cron(ctx, "test_cron1", "*/5 * * * * ?", aa, "这是cron任务1", timerx.WithCronParserSecond())
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
err = cluster.Cron(ctx, "test_cron2", "0/5 * * * * ?", aa, "这是cron任务2")
|
err = cluster.Cron(ctx, "test_cron2", "0/5 * * * * ?", aa, "这是cron任务2", timerx.WithCronParserSecond())
|
||||||
fmt.Println("这是cron任务2:", err)
|
fmt.Println("这是cron任务2:", err)
|
||||||
// 自定义解析器
|
// 自定义解析器
|
||||||
err = cluster.Cron(ctx, "test_cron3", "@every 2s", aa, "这是cron任务3", timerx.WithCronParserOption(cron.Descriptor))
|
err = cluster.Cron(ctx, "test_cron3", "@every 2s", aa, "这是cron任务3", timerx.WithCronParserOption(cron.Descriptor))
|
||||||
@@ -179,6 +179,9 @@ func cluster() {
|
|||||||
// Linux标准解析器
|
// Linux标准解析器
|
||||||
err = cluster.Cron(ctx, "test_cron4", "*/5 * * * *", aa, "这是cron任务4", timerx.WithCronParserLinux())
|
err = cluster.Cron(ctx, "test_cron4", "*/5 * * * *", aa, "这是cron任务4", timerx.WithCronParserLinux())
|
||||||
fmt.Println("这是cron任务4:", err)
|
fmt.Println("这是cron任务4:", err)
|
||||||
|
// 仅符号解析器
|
||||||
|
err = cluster.Cron(ctx, "test_cron5", "@every 5s", aa, "这是cron任务5", timerx.WithCronParserDescriptor())
|
||||||
|
fmt.Println("这是cron任务5:", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func worker() {
|
func worker() {
|
||||||
|
|||||||
@@ -127,3 +127,21 @@ func WithCronParserLinux() Option {
|
|||||||
o.cronParser = &parser
|
o.cronParser = &parser
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cron表达式 符号
|
||||||
|
// @yearly @annually => 每年执行一次,等同于 "0 0 0 1 1 *"
|
||||||
|
// @monthly => 每月执行一次,等同于 "0 0 0 1 * *"
|
||||||
|
// @weekly => 每周执行一次,等同于 "0 0 0 * * 0"
|
||||||
|
// @daily @midnight => 每天执行一次,等同于 "0 0 0 * * *"
|
||||||
|
// @hourly => 每小时执行一次,等同于 "0 0 * * * *"
|
||||||
|
// @minutely => 每分钟执行一次,等同于 "0 * * * * *"
|
||||||
|
// @secondly => 每秒执行一次,等同于 "* * * * * *"
|
||||||
|
// @every(time.Duration) => 每隔指定时间执行一次,等同于 "@every 5s"
|
||||||
|
|
||||||
|
func WithCronParserDescriptor() Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
parser := cron.NewParser(cron.Descriptor)
|
||||||
|
o.cronParser = &parser
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/robfig/cron/v3"
|
||||||
"github.com/yuninks/timerx/logger"
|
"github.com/yuninks/timerx/logger"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -31,6 +32,7 @@ type Single struct {
|
|||||||
stopChan chan struct{}
|
stopChan chan struct{}
|
||||||
hasRun sync.Map
|
hasRun sync.Map
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
cronParser *cron.Parser // cron表达式解析器
|
||||||
}
|
}
|
||||||
|
|
||||||
// 定时器类
|
// 定时器类
|
||||||
@@ -48,6 +50,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
|
|||||||
nextTime: time.Now(),
|
nextTime: time.Now(),
|
||||||
stopChan: make(chan struct{}),
|
stopChan: make(chan struct{}),
|
||||||
timeout: op.timeout,
|
timeout: op.timeout,
|
||||||
|
cronParser: op.cronParser,
|
||||||
}
|
}
|
||||||
|
|
||||||
sin.startDaemon()
|
sin.startDaemon()
|
||||||
@@ -270,6 +273,36 @@ func (c *Single) EverySpace(ctx context.Context, taskId string, spaceTime time.D
|
|||||||
return c.addJob(ctx, jobData, callback, extendData)
|
return c.addJob(ctx, jobData, callback, extendData)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Single) Cron(ctx context.Context, taskId string, cronExpression string, callback func(ctx context.Context, extendData any) error, extendData any, opt ...Option) (int64, error) {
|
||||||
|
nowTime := time.Now().In(l.location)
|
||||||
|
// 获取当天的零点时间
|
||||||
|
zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location())
|
||||||
|
|
||||||
|
options := Options{}
|
||||||
|
for _, o := range opt {
|
||||||
|
o(&options)
|
||||||
|
}
|
||||||
|
cronParser := l.cronParser
|
||||||
|
if options.cronParser != nil {
|
||||||
|
cronParser = options.cronParser
|
||||||
|
}
|
||||||
|
|
||||||
|
sche, err := GetCronSche(cronExpression, cronParser)
|
||||||
|
if err != nil {
|
||||||
|
l.logger.Errorf(ctx, "timer Single Cron cronExpression error:%s", err.Error())
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
jobData := JobData{
|
||||||
|
JobType: JobTypeCron,
|
||||||
|
TaskId: taskId,
|
||||||
|
BaseTime: zeroTime, // 默认当天的零点
|
||||||
|
CronExpression: cronExpression,
|
||||||
|
CronSchedule: sche,
|
||||||
|
}
|
||||||
|
return l.addJob(ctx, jobData, callback, extendData)
|
||||||
|
}
|
||||||
|
|
||||||
// 间隔定时器
|
// 间隔定时器
|
||||||
// @param space 间隔时间
|
// @param space 间隔时间
|
||||||
// @param call 回调函数
|
// @param call 回调函数
|
||||||
|
|||||||
Reference in New Issue
Block a user