添加支持cron表达式

This commit is contained in:
Yun
2025-10-05 16:30:41 +08:00
parent 27717c26be
commit 372033cfa3
10 changed files with 170 additions and 32 deletions
+41
View File
@@ -11,6 +11,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
"github.com/yuninks/cachex" "github.com/yuninks/cachex"
"github.com/yuninks/lockx" "github.com/yuninks/lockx"
"github.com/yuninks/timerx/heartbeat" "github.com/yuninks/timerx/heartbeat"
@@ -51,6 +52,7 @@ type Cluster struct {
leader *leader.Leader // Leader leader *leader.Leader // Leader
heartbeat *heartbeat.HeartBeat // 心跳 heartbeat *heartbeat.HeartBeat // 心跳
cache *cachex.Cache // 本地缓存 cache *cachex.Cache // 本地缓存
cronParser *cron.Parser // cron表达式解析器
} }
// 初始化定时器 // 初始化定时器
@@ -85,6 +87,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
usePriority: op.usePriority, usePriority: op.usePriority,
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
instanceId: U.String(), instanceId: U.String(),
cronParser: op.cronParser,
} }
// 初始化优先级 // 初始化优先级
@@ -346,6 +349,44 @@ func (c *Cluster) EverySpace(ctx context.Context, taskId string, spaceTime time.
return c.addJob(ctx, taskId, jobData, callback, extendData) return c.addJob(ctx, taskId, jobData, callback, extendData)
} }
// 定时任务
// 使用的是秒级cron表达式,可以使用Option设置cronParser
// @param ctx context.Context 上下文
// @param taskId string 任务ID
// @param cronExpression string cron表达式
// @param callback callback 回调函数
// @param extendData interface{} 扩展数据
// @return error
func (l *Cluster) Cron(ctx context.Context, taskId string, cronExpression string, callback func(ctx context.Context, extendData any) error, extendData any, opt ...Option) error {
nowTime := time.Now().In(l.location)
// 获取当天的零点时间
zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location())
options := Options{}
for _, o := range opt {
o(&options)
}
cronParser := l.cronParser
if options.cronParser != nil {
cronParser = options.cronParser
}
sche, err := GetCronSche(cronExpression, cronParser)
if err != nil {
l.logger.Errorf(ctx, "Cron cronExpression error:%s", err.Error())
return err
}
jobData := JobData{
JobType: JobTypeCron,
BaseTime: zeroTime, // 默认当天的零点
CronExpression: cronExpression,
CronSchedule: sche,
}
return l.addJob(ctx, taskId, jobData, callback, extendData)
}
// 统一添加任务 // 统一添加任务
// @param ctx context.Context 上下文 // @param ctx context.Context 上下文
// @param taskId string 任务ID // @param taskId string 任务ID
+15 -2
View File
@@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
"github.com/yuninks/timerx" "github.com/yuninks/timerx"
"github.com/yuninks/timerx/priority" "github.com/yuninks/timerx/priority"
) )
@@ -101,7 +102,7 @@ type OnceWorker struct{}
func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, taskId string, attachData interface{}) *timerx.OnceWorkerResp { func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, taskId string, attachData interface{}) *timerx.OnceWorkerResp {
// 追加写入文件 // 追加写入文件
file, err := os.OpenFile("./test.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) file, err := os.OpenFile("./test3.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -142,7 +143,7 @@ func cluster() {
// log := loggerx.NewLogger(ctx,loggerx.SetToConsole(),loggerx.SetEscapeHTML(false)) // log := loggerx.NewLogger(ctx,loggerx.SetToConsole(),loggerx.SetEscapeHTML(false))
// _ = log // _ = log
cluster, _ := timerx.InitCluster(ctx, client, "test", timerx.WithPriority(103)) cluster, _ := timerx.InitCluster(ctx, client, "test2", timerx.WithPriority(104))
err := cluster.EverySpace(ctx, "test_space1", 1*time.Second, aa, "这是秒任务1") err := cluster.EverySpace(ctx, "test_space1", 1*time.Second, aa, "这是秒任务1")
fmt.Println(err) fmt.Println(err)
err = cluster.EverySpace(ctx, "test_space2", 2*time.Second, aa, "这是秒任务2") err = cluster.EverySpace(ctx, "test_space2", 2*time.Second, aa, "这是秒任务2")
@@ -166,6 +167,18 @@ func cluster() {
fmt.Println(err) fmt.Println(err)
err = cluster.EveryDay(ctx, "test_day3", 10, 30, 30, aa, "这是天任务3") err = cluster.EveryDay(ctx, "test_day3", 10, 30, 30, aa, "这是天任务3")
fmt.Println(err) fmt.Println(err)
// 默认秒级表达式
err = cluster.Cron(ctx, "test_cron1", "*/5 * * * * ?", aa, "这是cron任务1")
fmt.Println(err)
err = cluster.Cron(ctx, "test_cron2", "0/5 * * * * ?", aa, "这是cron任务2")
fmt.Println("这是cron任务2:", err)
// 自定义解析器
err = cluster.Cron(ctx, "test_cron3", "@every 2s", aa, "这是cron任务3", timerx.WithCronParserOption(cron.Descriptor))
fmt.Println("这是cron任务3:", err)
// Linux标准解析器
err = cluster.Cron(ctx, "test_cron4", "*/5 * * * *", aa, "这是cron任务4", timerx.WithCronParserLinux())
fmt.Println("这是cron任务4:", err)
} }
func worker() { func worker() {
+4
View File
@@ -29,4 +29,8 @@ var (
ErrTaskIdExists = errors.New("taskId already exists") ErrTaskIdExists = errors.New("taskId already exists")
// 任务已执行 // 任务已执行
ErrTaskExecuted = errors.New("task already executed") ErrTaskExecuted = errors.New("task already executed")
// cron表达式错误
ErrCronExpression = errors.New("cron expression error")
// ErrCronParser 错误
ErrCronParser = errors.New("cron parser error")
) )
+1
View File
@@ -5,6 +5,7 @@ go 1.24
require ( require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/redis/go-redis/v9 v9.14.0 github.com/redis/go-redis/v9 v9.14.0
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
github.com/yuninks/cachex v1.0.5 github.com/yuninks/cachex v1.0.5
github.com/yuninks/lockx v1.1.3 github.com/yuninks/lockx v1.1.3
+2
View File
@@ -19,6 +19,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE= github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE=
github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+1 -1
View File
@@ -62,7 +62,7 @@ func InitLeader(ctx context.Context, ref redis.UniversalClient, keyPrefix string
l.wg.Add(1) l.wg.Add(1)
go l.leaderElection() go l.leaderElection()
l.logger.Infof(l.ctx, "InitLeader InstanceId %s lockKey:%s", l.instanceId, l.leaderUniLockKey) l.logger.Infof(l.ctx, "InitLeader InstanceId %s lockKey:%s leaderKey:%s", l.instanceId, l.leaderUniLockKey, l.leaderKey)
return l, nil return l, nil
} }
+38
View File
@@ -3,6 +3,8 @@ package timerx
import ( import (
"errors" "errors"
"time" "time"
"github.com/robfig/cron/v3"
) )
// 计算该任务下次执行时间 // 计算该任务下次执行时间
@@ -32,6 +34,8 @@ func GetNextTime(t time.Time, job JobData) (*time.Time, error) {
next, err = calculateNextMinuteTime(t, job) next, err = calculateNextMinuteTime(t, job)
case JobTypeInterval: case JobTypeInterval:
next, err = calculateNextInterval(t, job) next, err = calculateNextInterval(t, job)
case JobTypeCron:
next, err = calculateNextCronTime(t, job)
default: default:
return nil, errors.New("未知的任务类型: " + string(job.JobType)) return nil, errors.New("未知的任务类型: " + string(job.JobType))
} }
@@ -73,6 +77,14 @@ func validateJobData(job JobData) error {
if job.BaseTime.IsZero() { if job.BaseTime.IsZero() {
return ErrBaseTime return ErrBaseTime
} }
case JobTypeCron:
if job.CronExpression == "" {
return ErrCronExpression
}
_, err := calculateNextCronTime(time.Now(), job)
if err != nil {
return err
}
} }
if job.Hour < 0 || job.Hour > 23 { if job.Hour < 0 || job.Hour > 23 {
@@ -216,6 +228,32 @@ func calculateNextMinuteTime(t time.Time, job JobData) (*time.Time, error) {
return &nextMinuteTime, nil return &nextMinuteTime, nil
} }
// 计算cron任务下下次执行时间
func calculateNextCronTime(t time.Time, job JobData) (*time.Time, error) {
if job.CronExpression == "" {
return nil, ErrCronExpression
}
s := *job.CronSchedule
next := s.Next(t)
return &next, nil
}
func GetCronSche(CronExpression string, cronParser *cron.Parser) (*cron.Schedule, error) {
if CronExpression == "" {
return nil, ErrCronExpression
}
if cronParser == nil {
return nil, ErrCronParser
}
sche, err := cronParser.Parse(CronExpression)
if err != nil {
return nil, err
}
return &sche, nil
}
// 检查是否本周期可以运行 // 检查是否本周期可以运行
// 检查是否本周期可以运行(已弃用,使用新的时间比较逻辑) // 检查是否本周期可以运行(已弃用,使用新的时间比较逻辑)
// 保留此函数用于向后兼容,但建议使用新的时间计算逻辑 // 保留此函数用于向后兼容,但建议使用新的时间计算逻辑
+42
View File
@@ -3,6 +3,7 @@ package timerx
import ( import (
"time" "time"
"github.com/robfig/cron/v3"
"github.com/yuninks/timerx/logger" "github.com/yuninks/timerx/logger"
) )
@@ -14,9 +15,13 @@ type Options struct {
priorityVal int64 priorityVal int64
batchSize int batchSize int
maxRetryCount int maxRetryCount int
cronParser *cron.Parser // cron表达式解析器
} }
func defaultOptions() Options { func defaultOptions() Options {
parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
return Options{ return Options{
logger: logger.NewLogger(), logger: logger.NewLogger(),
location: time.Local, location: time.Local,
@@ -25,6 +30,7 @@ func defaultOptions() Options {
priorityVal: 0, priorityVal: 0,
batchSize: 100, batchSize: 100,
maxRetryCount: 0, maxRetryCount: 0,
cronParser: &parser,
} }
} }
@@ -84,3 +90,39 @@ func WithMaxRetryCount(count int) Option {
o.maxRetryCount = count o.maxRetryCount = count
} }
} }
// 添加cron表达式解析器
func WithCronParser(parser cron.Parser) Option {
return func(o *Options) {
o.cronParser = &parser
}
}
// 设置cron表达式解析器 秒级
// "*/5 * * * * ?" => 每隔5秒执行一次
// "0 0 0 * * ?" => 每天零点执行一次
// "0 0 0 1 * ?" => 每月1日零点执行一次
// "0 */5 * * * ?" => 每隔5分钟执行一次
func WithCronParserSecond() Option {
return func(o *Options) {
parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
o.cronParser = &parser
}
}
// 设置cron表达式解析器
// cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor
func WithCronParserOption(options cron.ParseOption) Option {
return func(o *Options) {
parser := cron.NewParser(options)
o.cronParser = &parser
}
}
// Cron表达式 与Linux的定时任务兼容
func WithCronParserLinux() Option {
return func(o *Options) {
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
o.cronParser = &parser
}
}
+7 -15
View File
@@ -1,36 +1,28 @@
# 功能支持 # 功能支持
1. 支持本地任务 1. [X] 支持本地任务
2. 支持集群任务 2. [X] 支持集群任务
3. 支持单次任务 3. [X] 支持单次任务
# 功能说明 # 功能说明
# 功能实现 # 功能实现
1. 集群间任务调度和任务的唯一依赖于redis进行实现 1. 集群间任务调度和任务的唯一依赖于redis进行实现
# 缺陷 # 缺陷
1. 针对月的任务,需要注意日期有效性,且在月末的最后一天,需要考虑月末的最后一天的下一个任务执行时间 1. 针对月的任务,需要注意日期有效性,且在月末的最后一天,需要考虑月末的最后一天的下一个任务执行时间
2. 集群部署时,存在新旧的代码混合问题,任务调度可能存在问题(需要根据实际需要进行版本上线/下线操作)
1. 集群部署时,存在新旧的代码混合问题,任务调度可能存在问题(需要根据实际需要进行版本上线/下线操作)
## 方案一 ## 方案一
1. 启动的时候定时向redis注册任务项 1. 启动的时候定时向redis注册任务项
2. 每次计算执行时间的时候根据注册的任务项进行任务计算 2. 每次计算执行时间的时候根据注册的任务项进行任务计算
3. 注册任务项需要有下线机制,避免能运行它的节点下线了它还被执行 3. 注册任务项需要有下线机制,避免能运行它的节点下线了它还被执行
现在有根据要求根据系统时间整点运行任务的要求,这个比简单的定时重复更复杂,因为不但要按时执行,并且不能重复执行,需要全局记录任务执行的状态,由于任务的间隔时间不确定,这个任务执行状态的保存周期也是有变化的 现在有根据要求根据系统时间整点运行任务的要求,这个比简单的定时重复更复杂,因为不但要按时执行,并且不能重复执行,需要全局记录任务执行的状态,由于任务的间隔时间不确定,这个任务执行状态的保存周期也是有变化的
# 待实现 # 待实现
- [ ] 允许执行完重置任务倒计时 - [ ] 允许执行完重置任务倒计时
+5
View File
@@ -3,6 +3,8 @@ package timerx
import ( import (
"context" "context"
"time" "time"
"github.com/robfig/cron/v3"
) )
type timerStr struct { type timerStr struct {
@@ -23,6 +25,7 @@ const (
JobTypeEveryMinute JobType = "every_minute" // 每分钟 JobTypeEveryMinute JobType = "every_minute" // 每分钟
JobTypeEverySecond JobType = "every_second" // 每秒 JobTypeEverySecond JobType = "every_second" // 每秒
JobTypeInterval JobType = "interval" // 指定时间间隔 JobTypeInterval JobType = "interval" // 指定时间间隔
JobTypeCron JobType = "cron" // cron表达式
) )
type JobData struct { type JobData struct {
@@ -37,6 +40,8 @@ type JobData struct {
Hour int // 每天的第几个小时 Hour int // 每天的第几个小时
Minute int // 每小时的第几分钟 Minute int // 每小时的第几分钟
Second int // 每分钟的第几秒 Second int // 每分钟的第几秒
CronExpression string // cron表达式
CronSchedule *cron.Schedule // cron表达式解析后的数据
} }
// 定义各个回调函数 // 定义各个回调函数