From 464b467868e0ed1fb05d2929deb866aee9a81d92 Mon Sep 17 00:00:00 2001 From: Yun Date: Sun, 14 Sep 2025 19:05:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BA=86=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E5=99=A8+=E4=B8=8B=E6=AC=A1=E7=9A=84?= =?UTF-8?q?=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 7 +- error.go | 25 ++ go.mod | 4 + go.sum | 9 + logger/logger.go | 13 +- next_time.go | 251 +++++++++++++---- next_time_test.go | 702 ++++++++++++++++++++++++++++++++++++++++++++-- single.go | 370 +++++++++++++++++------- single_test.go | 479 +++++++++++++++++++++++++++++-- types.go | 11 +- 10 files changed, 1662 insertions(+), 209 deletions(-) create mode 100644 error.go diff --git a/cluster.go b/cluster.go index e33c33b..050e712 100644 --- a/cluster.go +++ b/cluster.go @@ -464,7 +464,12 @@ func (c *Cluster) doTask(ctx context.Context, taskId string) { c.logger.Errorf(ctx, "doTask timer:任务不存在:%s", taskId) return } - t := val.(timerStr) + t,ok := val.(timerStr) + if !ok { + c.logger.Errorf(ctx, "doTask timer:任务不存在:%s", taskId) + return + } + // 这里加一个全局锁 lock := lockx.NewGlobalLock(ctx, c.redis, taskId) diff --git a/error.go b/error.go new file mode 100644 index 0000000..61e8329 --- /dev/null +++ b/error.go @@ -0,0 +1,25 @@ +package timerx + +import "errors" + +var ( + ErrTimerNotFound = errors.New("timer not found") + // 任务ID不能为空 + ErrTaskIdEmpty = errors.New("taskId can not be empty") + // 每月的天数必须在0-31之间 + ErrMonthDay = errors.New("month day must be between 0 and 31") + // 小时必须在0-23之间 + ErrHour = errors.New("hour must be between 0 and 23") + // 分钟必须在0-59之间 + ErrMinute = errors.New("minute must be between 0 and 59") + // 秒必须在0-59之间 + ErrSecond = errors.New("second must be between 0 and 59") + // 回调函数不能为空 + ErrCallbackEmpty = errors.New("callback can not be empty") + // 星期必须在0-6之间 + ErrWeekday = errors.New("weekday must be between Sunday and Saturday") + // 创建时间不能为空 + ErrCreateTime = errors.New("create time can not be empty") + // 间隔时间必须大于0 + ErrIntervalTime = errors.New("interval time must be greater than 0") +) diff --git a/go.mod b/go.mod index 689e1c9..78150e6 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,16 @@ go 1.19 require ( github.com/go-redis/redis/v8 v8.11.5 github.com/satori/go.uuid v1.2.0 + github.com/stretchr/testify v1.11.1 github.com/yuninks/cachex v1.0.5 github.com/yuninks/lockx v1.0.2 ) require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8b561b9..f8e2ba0 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= @@ -13,8 +15,12 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM= github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM= github.com/yuninks/lockx v1.0.2 h1:p0n791WmsU8D7YF2tQaNLwPE75jdd774unlJZRTNfaw= @@ -22,7 +28,10 @@ github.com/yuninks/lockx v1.0.2/go.mod h1:J6wvuUELLcMn6FCmiZFt7K5w1QQAh1myL7h3Jr golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/logger/logger.go b/logger/logger.go index 825f247..2038acd 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -6,8 +6,9 @@ import ( ) type Logger interface { - Infof(ctx context.Context, format string, v ...interface{}) - Errorf(ctx context.Context, format string, v ...interface{}) + Infof(ctx context.Context, format string, v ...any) + Warnf(ctx context.Context, format string, v ...any) + Errorf(ctx context.Context, format string, v ...any) } type defaultLogger struct{} @@ -16,10 +17,14 @@ func NewLogger() *defaultLogger { return &defaultLogger{} } -func (l *defaultLogger) Infof(ctx context.Context, format string, v ...interface{}) { +func (l *defaultLogger) Infof(ctx context.Context, format string, v ...any) { log.Printf("[INFO] "+format, v...) } -func (l *defaultLogger) Errorf(ctx context.Context, format string, v ...interface{}) { +func (l *defaultLogger) Warnf(ctx context.Context, format string, v ...any) { + log.Printf("[WARN] "+format, v...) +} + +func (l *defaultLogger) Errorf(ctx context.Context, format string, v ...any) { log.Printf("[ERROR] "+format, v...) } diff --git a/next_time.go b/next_time.go index 0f616c1..e1e45d2 100644 --- a/next_time.go +++ b/next_time.go @@ -7,110 +7,237 @@ import ( // 计算该任务下次执行时间 // @param job *JobData 任务数据 +// @param t time.Time 当前时间 // @return time.Time 下次执行时间 +// @return error 错误信息 func GetNextTime(t time.Time, job JobData) (*time.Time, error) { - var next time.Time + if err := validateJobData(job); err != nil { + return nil, err + } + + var next *time.Time + var err error switch job.JobType { case JobTypeEveryMonth: - next = calculateNextMonthTime(t, job) + next, err = calculateNextMonthTime(t, job) case JobTypeEveryWeek: - next = calculateNextWeekTime(t, job) + next, err = calculateNextWeekTime(t, job) case JobTypeEveryDay: - next = calculateNextDayTime(t, job) + next, err = calculateNextDayTime(t, job) case JobTypeEveryHour: - next = calculateNextHourTime(t, job) + next, err = calculateNextHourTime(t, job) case JobTypeEveryMinute: - next = calculateNextMinuteTime(t, job) + next, err = calculateNextMinuteTime(t, job) case JobTypeInterval: - next = calculateNextInterval(t, job) + next, err = calculateNextInterval(t, job) default: return nil, errors.New("未知的任务类型: " + string(job.JobType)) } + if err != nil { + return nil, err + } + + return next, nil +} + +// 参数校验 +func validateJobData(job JobData) error { + switch job.JobType { + case JobTypeEveryMonth: + if job.Day < 1 || job.Day > 31 { + return ErrMonthDay + } + case JobTypeEveryWeek: + if job.Weekday < time.Sunday || job.Weekday > time.Saturday { + return ErrWeekday + } + case JobTypeEveryDay: + if job.Hour < 0 || job.Hour > 23 { + return ErrHour + } + case JobTypeEveryHour: + if job.Minute < 0 || job.Minute > 59 { + return ErrMinute + } + case JobTypeEveryMinute: + if job.Second < 0 || job.Second > 59 { + return ErrSecond + } + case JobTypeInterval: + if job.IntervalTime <= 0 { + return ErrIntervalTime + } + if job.CreateTime.IsZero() { + return ErrCreateTime + } + } + + if job.Hour < 0 || job.Hour > 23 { + return ErrHour + } + if job.Minute < 0 || job.Minute > 59 { + return ErrMinute + } + if job.Second < 0 || job.Second > 59 { + return ErrSecond + } + + return nil +} + +func calculateNextInterval(t time.Time, job JobData) (*time.Time, error) { + if job.CreateTime.IsZero() { + return nil, ErrCreateTime + } + if job.IntervalTime <= 0 { + return nil, ErrIntervalTime + } + // 计算从创建时间到当前时间经过了多少个间隔 + elapsed := t.Sub(job.CreateTime) + intervals := elapsed / job.IntervalTime + + // 计算下一个执行时间 + next := job.CreateTime.Add((intervals + 1) * job.IntervalTime) + + // 确保下次执行时间不早于当前时间 + if next.Before(t) || next.Equal(t) { + next = next.Add(job.IntervalTime) + } + return &next, nil } -func calculateNextInterval(t time.Time, job JobData) time.Time { - // 从创建的时候开始计算 - cycle := t.Sub(job.BaseTime).Microseconds() / job.IntervalTime.Microseconds() - return job.BaseTime.Add(job.IntervalTime * time.Duration(cycle+1)) +func calculateNextMonthTime(t time.Time, job JobData) (*time.Time, error) { + // 尝试光剑本月的执行四件 + currentMonthTime := time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, t.Location()) + + // 如果日期无效(比如2月30号),则调整到该月最后一天 + if currentMonthTime.Day() != job.Day { + // 获取该月的最后一天 + lastDay := time.Date(t.Year(), t.Month()+1, 0, 0, 0, 0, 0, t.Location()).Day() + if job.Day > lastDay { + currentMonthTime = time.Date(t.Year(), t.Month(), lastDay, job.Hour, job.Minute, job.Second, 0, t.Location()) + } + } + + if currentMonthTime.After(t) { + return ¤tMonthTime, nil + } + + // 计算下个月的同一天 + nextMonth := t.Month() + 1 + year := t.Year() + if nextMonth > 12 { + nextMonth = 1 + year++ + } + + nextMonthTime := time.Date(year, nextMonth, job.Day, job.Hour, job.Minute, job.Second, 0, t.Location()) + // 如果日期无效,调整到下个月的最后一天 + if nextMonthTime.Day() != job.Day { + lastDay := time.Date(year, nextMonth+1, 0, 0, 0, 0, 0, t.Location()).Day() + if job.Day > lastDay { + nextMonthTime = time.Date(year, nextMonth, lastDay, job.Hour, job.Minute, job.Second, 0, t.Location()) + } + } + + return &nextMonthTime, nil } -func calculateNextMonthTime(t time.Time, job JobData) time.Time { - // 判断是否可执行并返回下一个执行时间 +func calculateNextWeekTime(t time.Time, job JobData) (*time.Time, error) { + currentWeekday := t.Weekday() + targetWeekday := job.Weekday - if canRun(t, job) { - return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, t.Location()) + // 计算距离目标星期几的天数 + daysToAdd := int(targetWeekday - currentWeekday) + if daysToAdd < 0 { + daysToAdd += 7 } - // 下一个周期(下个月) - return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, t.Location()) + // 本周的目标时间 + thisWeekTime := time.Date(t.Year(), t.Month(), t.Day()+daysToAdd, job.Hour, job.Minute, job.Second, 0, t.Location()) + + if thisWeekTime.After(t) { + return &thisWeekTime, nil + } + + // 下周的目标时间 + nextWeekTime := time.Date(t.Year(), t.Month(), t.Day()+daysToAdd+7, job.Hour, job.Minute, job.Second, 0, t.Location()) + return &nextWeekTime, nil } -func calculateNextWeekTime(t time.Time, job JobData) time.Time { - weekday := t.Weekday() - days := int(job.Weekday - weekday) - if days < 0 { - days += 7 +func calculateNextDayTime(t time.Time, job JobData) (*time.Time, error) { + // 今天的目标时间 + todayTime := time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location()) + + if todayTime.After(t) { + return &todayTime, nil } - // 判断是否可执行并返回下一个执行时间 - if canRun(t, job) { - return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location()) - } - // 下一个周期(下周) - return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, t.Location()) + + // 明天的时间 + nextDayTime := time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, t.Location()) + return &nextDayTime, nil + } -func calculateNextDayTime(t time.Time, job JobData) time.Time { - // 判断是否可执行并返回下一个执行时间 - if canRun(t, job) { - return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location()) +func calculateNextHourTime(t time.Time, job JobData) (*time.Time, error) { + // 计算当前小时的目标时间 + currentHourTime := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, t.Location()) + + if currentHourTime.After(t) { + return ¤tHourTime, nil } - // 下一个周期(明天) - return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, t.Location()) + + // 下一个小时的时间 + nextHourTime := time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, t.Location()) + return &nextHourTime, nil } -func calculateNextHourTime(t time.Time, job JobData) time.Time { - // 判断是否可执行并返回下一个执行时间 - if canRun(t, job) { - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, t.Location()) - } - // 下一个周期(下个小时) - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, t.Location()) -} +func calculateNextMinuteTime(t time.Time, job JobData) (*time.Time, error) { + // 计算当前分钟的目标时间 -func calculateNextMinuteTime(t time.Time, job JobData) time.Time { - // 判断是否可执行并返回下一个执行时间 - if canRun(t, job) { - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, t.Location()) + currentMinuteTime := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, t.Location()) + + if currentMinuteTime.After(t) { + return ¤tMinuteTime, nil } - // 下一个周期(下分钟) - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, t.Location()) + + // 下一分钟的时间 + nextMinuteTime := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, t.Location()) + return &nextMinuteTime, nil } // 检查是否本周期可以运行 +// 检查是否本周期可以运行(已弃用,使用新的时间比较逻辑) +// 保留此函数用于向后兼容,但建议使用新的时间计算逻辑 func canRun(t time.Time, job JobData) bool { + targetTime := time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location()) + switch job.JobType { case JobTypeEveryMonth: - return t.Day() < job.Day || - (t.Day() == job.Day && t.Hour() < job.Hour) || - (t.Day() == job.Day && t.Hour() == job.Hour && t.Minute() < job.Minute) || - (t.Day() == job.Day && t.Hour() == job.Hour && t.Minute() == job.Minute && t.Second() <= job.Second) + // 对于月任务,需要比较日期 + targetTime = time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, t.Location()) + return !targetTime.Before(t) case JobTypeEveryWeek: - return t.Weekday() < job.Weekday || - (t.Weekday() == job.Weekday && t.Hour() < job.Hour) || - (t.Weekday() == job.Weekday && t.Hour() == job.Hour && t.Minute() < job.Minute) || - (t.Weekday() == job.Weekday && t.Hour() == job.Hour && t.Minute() == job.Minute && t.Second() <= job.Second) + // 对于周任务,需要比较星期 + currentWeekday := t.Weekday() + if currentWeekday < job.Weekday { + return true + } + if currentWeekday == job.Weekday { + return targetTime.After(t) || targetTime.Equal(t) + } + return false case JobTypeEveryDay: - return t.Hour() < job.Hour || - (t.Hour() == job.Hour && t.Minute() < job.Minute) || - (t.Hour() == job.Hour && t.Minute() == job.Minute && t.Second() <= job.Second) + return targetTime.After(t) || targetTime.Equal(t) case JobTypeEveryHour: - return t.Minute() < job.Minute || - (t.Minute() == job.Minute && t.Second() <= job.Second) + hourTarget := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, t.Location()) + return hourTarget.After(t) || hourTarget.Equal(t) case JobTypeEveryMinute: - return t.Second() <= job.Second + minuteTarget := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, t.Location()) + return minuteTarget.After(t) || minuteTarget.Equal(t) default: return false } diff --git a/next_time_test.go b/next_time_test.go index f1aa161..dd94bbc 100644 --- a/next_time_test.go +++ b/next_time_test.go @@ -1,25 +1,25 @@ -package timerx_test +package timerx import ( "errors" "testing" "time" - "github.com/yuninks/timerx" + "github.com/stretchr/testify/assert" ) func TestGetNextTime(t *testing.T) { // Test cases tests := []struct { name string - job timerx.JobData + job JobData expectedTime time.Time expectedError error }{ { name: "Test JobTypeEveryMonth", - job: timerx.JobData{ - JobType: timerx.JobTypeEveryMonth, + job: JobData{ + JobType: JobTypeEveryMonth, Day: 15, Hour: 10, Minute: 0, @@ -30,8 +30,8 @@ func TestGetNextTime(t *testing.T) { }, { name: "Test JobTypeEveryWeek", - job: timerx.JobData{ - JobType: timerx.JobTypeEveryWeek, + job: JobData{ + JobType: JobTypeEveryWeek, Weekday: time.Tuesday, Hour: 10, Minute: 0, @@ -42,8 +42,8 @@ func TestGetNextTime(t *testing.T) { }, { name: "Test JobTypeEveryDay", - job: timerx.JobData{ - JobType: timerx.JobTypeEveryDay, + job: JobData{ + JobType: JobTypeEveryDay, Hour: 10, Minute: 0, Second: 0, @@ -53,8 +53,8 @@ func TestGetNextTime(t *testing.T) { }, { name: "Test JobTypeEveryHour", - job: timerx.JobData{ - JobType: timerx.JobTypeEveryHour, + job: JobData{ + JobType: JobTypeEveryHour, Minute: 0, Second: 0, }, @@ -63,8 +63,8 @@ func TestGetNextTime(t *testing.T) { }, { name: "Test JobTypeEveryMinute", - job: timerx.JobData{ - JobType: timerx.JobTypeEveryMinute, + job: JobData{ + JobType: JobTypeEveryMinute, Second: 0, }, expectedTime: time.Date(2022, 3, 7, 10, 31, 0, 0, time.Local), // Assuming current date is March 7, 2022, 10:30 AM @@ -72,8 +72,8 @@ func TestGetNextTime(t *testing.T) { }, { name: "Test JobTypeInterval", - job: timerx.JobData{ - JobType: timerx.JobTypeInterval, + job: JobData{ + JobType: JobTypeInterval, IntervalTime: 1 * time.Hour, }, expectedTime: time.Date(2022, 3, 7, 11, 30, 0, 0, time.Local), // Assuming current date is March 7, 2022, 10:30 AM @@ -81,8 +81,8 @@ func TestGetNextTime(t *testing.T) { }, { name: "Test unknown JobType", - job: timerx.JobData{ - JobType: timerx.JobType("100"), + job: JobData{ + JobType: JobType("100"), }, expectedTime: time.Time{}, expectedError: errors.New("未知的任务类型: 100"), @@ -93,7 +93,7 @@ func TestGetNextTime(t *testing.T) { t.Run(test.name, func(t *testing.T) { now := time.Now() // loc := time.FixedZone("CST", 8*3600) - nextTime, err := timerx.GetNextTime(now, test.job) + nextTime, err := GetNextTime(now, test.job) if err != nil { if test.expectedError == nil || err.Error() != test.expectedError.Error() { t.Errorf("Expected error: %v, Got error: %v", test.expectedError, err) @@ -106,3 +106,669 @@ func TestGetNextTime(t *testing.T) { }) } } + +// 测试参数验证 +func TestValidateJobData(t *testing.T) { + tests := []struct { + name string + job JobData + expected error + }{ + { + name: "有效月任务", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 15, + Hour: 12, + Minute: 30, + Second: 0, + }, + expected: nil, + }, + { + name: "无效月任务-日期太小", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 0, + Hour: 12, + Minute: 30, + Second: 0, + }, + expected: ErrMonthDay, + }, + { + name: "无效月任务-日期太大", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 32, + Hour: 12, + Minute: 30, + Second: 0, + }, + expected: ErrMonthDay, + }, + { + name: "有效周任务", + job: JobData{ + JobType: JobTypeEveryWeek, + Weekday: time.Monday, + Hour: 12, + Minute: 30, + Second: 0, + }, + expected: nil, + }, + { + name: "无效周任务-星期超出范围", + job: JobData{ + JobType: JobTypeEveryWeek, + Weekday: time.Weekday(7), // 超出范围 + Hour: 12, + Minute: 30, + Second: 0, + }, + expected: ErrWeekday, + }, + { + name: "有效间隔任务", + job: JobData{ + JobType: JobTypeInterval, + CreateTime: time.Now(), + IntervalTime: time.Minute, + Hour: 12, + Minute: 30, + Second: 0, + }, + expected: nil, + }, + { + name: "无效间隔任务-间隔时间为0", + job: JobData{ + JobType: JobTypeInterval, + CreateTime: time.Now(), + IntervalTime: 0, + Hour: 12, + Minute: 30, + Second: 0, + }, + expected: ErrIntervalTime, + }, + { + name: "无效间隔任务-创建时间为空", + job: JobData{ + JobType: JobTypeInterval, + CreateTime: time.Time{}, + IntervalTime: time.Minute, + Hour: 12, + Minute: 30, + Second: 0, + }, + expected: ErrCreateTime, + }, + { + name: "无效小时", + job: JobData{ + JobType: JobTypeEveryDay, + Hour: 24, // 无效小时 + Minute: 30, + Second: 0, + }, + expected: ErrHour, + }, + { + name: "无效分钟", + job: JobData{ + JobType: JobTypeEveryDay, + Hour: 12, + Minute: 60, // 无效分钟 + Second: 0, + }, + expected: ErrMinute, + }, + { + name: "无效秒数", + job: JobData{ + JobType: JobTypeEveryDay, + Hour: 12, + Minute: 30, + Second: 60, // 无效秒数 + }, + expected: ErrSecond, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateJobData(tt.job) + assert.Equal(t, tt.expected, err) + }) + } +} + +// 测试间隔任务 +func TestCalculateNextInterval(t *testing.T) { + now := time.Date(2023, 6, 15, 12, 0, 0, 0, time.UTC) + createTime := time.Date(2023, 6, 15, 10, 0, 0, 0, time.UTC) + + tests := []struct { + name string + job JobData + currentTime time.Time + expected time.Time + }{ + { + name: "间隔1小时-当前时间在创建时间之后", + job: JobData{ + JobType: JobTypeInterval, + CreateTime: createTime, + IntervalTime: time.Hour, + }, + currentTime: now, + expected: time.Date(2023, 6, 15, 13, 0, 0, 0, time.UTC), + }, + { + name: "间隔30分钟-刚好在间隔点上", + job: JobData{ + JobType: JobTypeInterval, + CreateTime: createTime, + IntervalTime: 30 * time.Minute, + }, + currentTime: time.Date(2023, 6, 15, 12, 30, 0, 0, time.UTC), + expected: time.Date(2023, 6, 15, 13, 0, 0, 0, time.UTC), + }, + { + name: "间隔1天-跨天", + job: JobData{ + JobType: JobTypeInterval, + CreateTime: createTime, + IntervalTime: 24 * time.Hour, + }, + currentTime: now, + expected: time.Date(2023, 6, 16, 10, 0, 0, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := calculateNextInterval(tt.currentTime, tt.job) + assert.NoError(t, err) + assert.Equal(t, tt.expected, *result) + }) + } +} + +// 测试月任务 +func TestCalculateNextMonthTime(t *testing.T) { + baseTime := time.Date(2023, 6, 15, 12, 30, 45, 0, time.UTC) + + tests := []struct { + name string + job JobData + currentTime time.Time + expected time.Time + }{ + { + name: "本月还能执行", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 20, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 20, 12, 30, 45, 0, time.UTC), + }, + { + name: "本月已过,下个月执行", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 10, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: baseTime, + expected: time.Date(2023, 7, 10, 12, 30, 45, 0, time.UTC), + }, + { + name: "2月30日调整到2月28日", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 30, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: time.Date(2023, 1, 15, 12, 30, 45, 0, time.UTC), + expected: time.Date(2023, 2, 28, 12, 30, 45, 0, time.UTC), + }, + { + name: "闰年2月29日", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 29, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: time.Date(2024, 1, 15, 12, 30, 45, 0, time.UTC), // 2024是闰年 + expected: time.Date(2024, 2, 29, 12, 30, 45, 0, time.UTC), + }, + { + name: "跨年", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 15, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: time.Date(2023, 12, 20, 12, 30, 45, 0, time.UTC), + expected: time.Date(2024, 1, 15, 12, 30, 45, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := calculateNextMonthTime(tt.currentTime, tt.job) + assert.NoError(t, err) + assert.Equal(t, tt.expected, *result) + }) + } +} + +// 测试周任务 +func TestCalculateNextWeekTime(t *testing.T) { + baseTime := time.Date(2023, 6, 15, 12, 30, 45, 0, time.UTC) // 星期四 + + tests := []struct { + name string + job JobData + currentTime time.Time + expected time.Time + }{ + { + name: "本周还能执行-周五", + job: JobData{ + JobType: JobTypeEveryWeek, + Weekday: time.Friday, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 16, 12, 30, 45, 0, time.UTC), + }, + { + name: "本周已过,下周执行-周三", + job: JobData{ + JobType: JobTypeEveryWeek, + Weekday: time.Wednesday, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 21, 12, 30, 45, 0, time.UTC), + }, + { + name: "同一天但时间已过", + job: JobData{ + JobType: JobTypeEveryWeek, + Weekday: time.Thursday, + Hour: 10, // 早于当前时间 + Minute: 30, + Second: 45, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 22, 10, 30, 45, 0, time.UTC), + }, + { + name: "跨月", + job: JobData{ + JobType: JobTypeEveryWeek, + Weekday: time.Monday, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: time.Date(2023, 6, 30, 12, 30, 45, 0, time.UTC), // 周五 + expected: time.Date(2023, 7, 3, 12, 30, 45, 0, time.UTC), // 下周一 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := calculateNextWeekTime(tt.currentTime, tt.job) + assert.NoError(t, err) + assert.Equal(t, tt.expected, *result) + }) + } +} + +// 测试日任务 +func TestCalculateNextDayTime(t *testing.T) { + baseTime := time.Date(2023, 6, 15, 12, 30, 45, 0, time.UTC) + + tests := []struct { + name string + job JobData + currentTime time.Time + expected time.Time + }{ + { + name: "今天还能执行", + job: JobData{ + JobType: JobTypeEveryDay, + Hour: 14, + Minute: 30, + Second: 45, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 15, 14, 30, 45, 0, time.UTC), + }, + { + name: "今天已过,明天执行", + job: JobData{ + JobType: JobTypeEveryDay, + Hour: 10, + Minute: 30, + Second: 45, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 16, 10, 30, 45, 0, time.UTC), + }, + { + name: "跨月", + job: JobData{ + JobType: JobTypeEveryDay, + Hour: 10, + Minute: 30, + Second: 45, + }, + currentTime: time.Date(2023, 6, 30, 12, 30, 45, 0, time.UTC), + expected: time.Date(2023, 7, 1, 10, 30, 45, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := calculateNextDayTime(tt.currentTime, tt.job) + assert.NoError(t, err) + assert.Equal(t, tt.expected, *result) + }) + } +} + +// 测试小时任务 +func TestCalculateNextHourTime(t *testing.T) { + baseTime := time.Date(2023, 6, 15, 12, 30, 45, 0, time.UTC) + + tests := []struct { + name string + job JobData + currentTime time.Time + expected time.Time + }{ + { + name: "本小时还能执行", + job: JobData{ + JobType: JobTypeEveryHour, + Minute: 45, + Second: 0, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 15, 12, 45, 0, 0, time.UTC), + }, + { + name: "本小时已过,下小时执行", + job: JobData{ + JobType: JobTypeEveryHour, + Minute: 15, + Second: 0, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 15, 13, 15, 0, 0, time.UTC), + }, + { + name: "跨天", + job: JobData{ + JobType: JobTypeEveryHour, + Minute: 15, + Second: 0, + }, + currentTime: time.Date(2023, 6, 15, 23, 30, 45, 0, time.UTC), + expected: time.Date(2023, 6, 16, 0, 15, 0, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := calculateNextHourTime(tt.currentTime, tt.job) + assert.NoError(t, err) + assert.Equal(t, tt.expected, *result) + }) + } +} + +// 测试分钟任务 +func TestCalculateNextMinuteTime(t *testing.T) { + baseTime := time.Date(2023, 6, 15, 12, 30, 45, 0, time.UTC) + + tests := []struct { + name string + job JobData + currentTime time.Time + expected time.Time + }{ + { + name: "本分钟还能执行", + job: JobData{ + JobType: JobTypeEveryMinute, + Second: 50, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 15, 12, 30, 50, 0, time.UTC), + }, + { + name: "本分钟已过,下分钟执行", + job: JobData{ + JobType: JobTypeEveryMinute, + Second: 30, + }, + currentTime: baseTime, + expected: time.Date(2023, 6, 15, 12, 31, 30, 0, time.UTC), + }, + { + name: "跨小时", + job: JobData{ + JobType: JobTypeEveryMinute, + Second: 30, + }, + currentTime: time.Date(2023, 6, 15, 12, 59, 45, 0, time.UTC), + expected: time.Date(2023, 6, 15, 13, 0, 30, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := calculateNextMinuteTime(tt.currentTime, tt.job) + assert.NoError(t, err) + assert.Equal(t, tt.expected, *result) + }) + } +} + +// 测试GetNextTime集成函数 +func TestGetNextTime_Integration(t *testing.T) { + now := time.Date(2023, 6, 15, 12, 30, 45, 0, time.UTC) + + tests := []struct { + name string + job JobData + expected time.Time + }{ + { + name: "月任务集成测试", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 20, + Hour: 12, + Minute: 30, + Second: 45, + }, + expected: time.Date(2023, 6, 20, 12, 30, 45, 0, time.UTC), + }, + { + name: "周任务集成测试", + job: JobData{ + JobType: JobTypeEveryWeek, + Weekday: time.Friday, + Hour: 12, + Minute: 30, + Second: 45, + }, + expected: time.Date(2023, 6, 16, 12, 30, 45, 0, time.UTC), + }, + { + name: "间隔任务集成测试", + job: JobData{ + JobType: JobTypeInterval, + CreateTime: time.Date(2023, 6, 15, 10, 0, 0, 0, time.UTC), + IntervalTime: time.Hour, + }, + expected: time.Date(2023, 6, 15, 13, 0, 0, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := GetNextTime(now, tt.job) + assert.NoError(t, err) + assert.Equal(t, tt.expected, *result) + }) + } +} + +// 测试错误情况 +func TestGetNextTime_ErrorCases(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + job JobData + expected error + }{ + { + name: "未知任务类型", + job: JobData{ + JobType: "99", // 无效类型 + }, + expected: errors.New("未知的任务类型: 99"), + }, + { + name: "无效月任务日期", + job: JobData{ + JobType: "每月", + Day: 32, // 无效日期 + Hour: 12, + Minute: 30, + Second: 0, + }, + expected: ErrMonthDay, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := GetNextTime(now, tt.job) + assert.Nil(t, result) + assert.Equal(t, tt.expected, err) + }) + } +} + +// 测试边界条件 +func TestGetNextTime_EdgeCases(t *testing.T) { + tests := []struct { + name string + job JobData + currentTime time.Time + expected time.Time + }{ + { + name: "刚好在执行时间点上-应该到下一个周期", + job: JobData{ + JobType: JobTypeEveryDay, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: time.Date(2023, 6, 15, 12, 30, 45, 0, time.UTC), + expected: time.Date(2023, 6, 16, 12, 30, 45, 0, time.UTC), + }, + { + name: "闰年2月29日", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 29, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: time.Date(2024, 1, 15, 12, 30, 45, 0, time.UTC), // 闰年 + expected: time.Date(2024, 2, 29, 12, 30, 45, 0, time.UTC), + }, + { + name: "非闰年2月29日调整到28日", + job: JobData{ + JobType: JobTypeEveryMonth, + Day: 29, + Hour: 12, + Minute: 30, + Second: 45, + }, + currentTime: time.Date(2023, 1, 15, 12, 30, 45, 0, time.UTC), // 非闰年 + expected: time.Date(2023, 2, 28, 12, 30, 45, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := GetNextTime(tt.currentTime, tt.job) + assert.NoError(t, err) + assert.Equal(t, tt.expected, *result) + }) + } +} + +// 测试时区处理 +func TestGetNextTime_Timezone(t *testing.T) { + // 测试不同时区 + locations := []*time.Location{ + time.UTC, + time.FixedZone("TEST+8", 8*60*60), + time.FixedZone("TEST-5", -5*60*60), + } + + for _, loc := range locations { + t.Run(loc.String(), func(t *testing.T) { + currentTime := time.Date(2023, 6, 15, 12, 30, 45, 0, loc) + + job := JobData{ + JobType: JobTypeEveryDay, + Hour: 14, + Minute: 30, + Second: 45, + } + + result, err := GetNextTime(currentTime, job) + assert.NoError(t, err) + + expected := time.Date(2023, 6, 15, 14, 30, 45, 0, loc) + assert.Equal(t, expected, *result) + assert.Equal(t, loc, result.Location()) + }) + } +} diff --git a/single.go b/single.go index 2496305..89f75ad 100644 --- a/single.go +++ b/single.go @@ -5,8 +5,10 @@ package timerx import ( "context" "errors" + "fmt" "runtime/debug" "sync" + "sync/atomic" "time" uuid "github.com/satori/go.uuid" @@ -17,61 +19,141 @@ import ( // 1. 这个定时器的作用范围是本机 // 2. 适用简单的时间间隔定时任务 -// 定时器结构体 -var singleWorkerList sync.Map - -var singleTimerIndex int // 当前定时数目 - -// var singleOnceLimit sync.Once // 实现单例 +// 避免执行重复 +var singleHasRun sync.Map type Single struct { - ctx context.Context - logger logger.Logger - location *time.Location + ctx context.Context + cancel context.CancelFunc + logger logger.Logger + location *time.Location + nextTime time.Time + nextTimeMux sync.RWMutex + wg sync.WaitGroup + workerList sync.Map + timerIndex int64 + stopChan chan struct{} + hasRun sync.Map } -// var sin *Single = nil - -var singleNextTime = time.Now() // 下一次执行的时间 - // 定时器类 // @param ctx context.Context 上下文 // @param opts ...Option 配置项 func InitSingle(ctx context.Context, opts ...Option) *Single { - // singleOnceLimit.Do(func() { op := newOptions(opts...) + ctx, cancel := context.WithCancel(ctx) sin := &Single{ ctx: ctx, + cancel: cancel, logger: op.logger, location: op.location, + nextTime: time.Now(), + stopChan: make(chan struct{}), } - timer := time.NewTicker(time.Millisecond * 200) - go func(ctx context.Context) { - Loop: - for { - select { - case t := <-timer.C: - if t.Before(singleNextTime) { - // 当前时间小于下次发送时间:跳过 - continue - } - // 迭代定时器 - sin.iterator(ctx) - // fmt.Println("timer: 执行") - case <-ctx.Done(): - // 跳出循环 - break Loop - } - } - sin.logger.Infof(ctx, "timer: initend") - }(ctx) - // }) + sin.wg.Add(1) + go sin.timerLoop(ctx) + + sin.wg.Add(1) + go sin.cleanupLoop(ctx) return sin } +// 停止所有定时任务 +func (s *Single) Stop() { + if s.cancel != nil { + s.cancel() + } + close(s.stopChan) + s.wg.Wait() + + // 清理所有资源 + s.workerList.Range(func(k, v interface{}) bool { + if timer, ok := v.(timerStr); ok { + close(timer.CanRunning) + } + s.workerList.Delete(k) + return true + }) +} + +// 获取任务数量 +func (s *Single) TaskCount() int { + count := 0 + s.workerList.Range(func(k, v interface{}) bool { + count++ + return true + }) + return count +} + +func (l *Single) MaxIndex() int64 { + return atomic.LoadInt64(&l.timerIndex) + 1 +} + +// 定时器主循环 +func (s *Single) timerLoop(ctx context.Context) { + defer s.wg.Done() + + ticker := time.NewTicker(100 * time.Millisecond) // 提高精度到100ms + defer ticker.Stop() + + for { + select { + case t := <-ticker.C: + s.nextTimeMux.RLock() + nextTime := s.nextTime + s.nextTimeMux.RUnlock() + + if t.Before(nextTime) { + continue + } + + s.iterator(ctx) + + case <-ctx.Done(): + s.logger.Infof(ctx, "timer: context cancelled, stopping timer loop") + return + case <-s.stopChan: + s.logger.Infof(ctx, "timer: received stop signal, stopping timer loop") + return + } + } +} + +// 清理循环 +func (s *Single) cleanupLoop(ctx context.Context) { + defer s.wg.Done() + + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + now := time.Now() + cleanupTime := now.Add(-2 * time.Minute) // 清理2分钟前的记录 + + s.hasRun.Range(func(k, v interface{}) bool { + t, ok := v.(time.Time) + if !ok || t.Before(cleanupTime) { + s.hasRun.Delete(k) + } + return true + }) + + case <-ctx.Done(): + s.logger.Infof(ctx, "timer: context cancelled, stopping cleanup loop") + return + case <-s.stopChan: + s.logger.Infof(ctx, "timer: received stop signal, stopping cleanup loop") + return + } + } +} + // 每月执行一次 // @param ctx 上下文 // @param taskId 任务ID @@ -82,11 +164,13 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { // @param callback 回调函数 // @param extendData 扩展数据 // @return error -func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) { - nowTime := time.Now() +func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { + + nowTime := time.Now().In(c.location) jobData := JobData{ JobType: JobTypeEveryMonth, + TaskId: taskId, CreateTime: nowTime, Day: day, Hour: hour, @@ -104,11 +188,12 @@ func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, // @param hour int 小时 // @param minute int 分钟 // @param second int 秒 -func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) { - nowTime := time.Now() +func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { + nowTime := time.Now().In(c.location) jobData := JobData{ JobType: JobTypeEveryWeek, + TaskId: taskId, CreateTime: nowTime, Weekday: week, Hour: hour, @@ -120,11 +205,12 @@ func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, } // 每天执行一次 -func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) { - nowTime := time.Now() +func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { + nowTime := time.Now().In(c.location) jobData := JobData{ JobType: JobTypeEveryDay, + TaskId: taskId, CreateTime: nowTime, Hour: hour, Minute: minute, @@ -135,11 +221,12 @@ func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int } // 每小时执行一次 -func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) { - nowTime := time.Now() +func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { + nowTime := time.Now().In(c.location) jobData := JobData{ JobType: JobTypeEveryHour, + TaskId: taskId, CreateTime: nowTime, Minute: minute, Second: second, @@ -149,11 +236,12 @@ func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second } // 每分钟执行一次 -func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) { - nowTime := time.Now() +func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { + nowTime := time.Now().In(c.location) jobData := JobData{ JobType: JobTypeEveryMinute, + TaskId: taskId, CreateTime: nowTime, Second: second, } @@ -162,8 +250,8 @@ func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callb } // 特定时间间隔 -func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) { - nowTime := time.Now() +func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) { + nowTime := time.Now().In(c.location) if spaceTime < 0 { c.logger.Errorf(ctx, "间隔时间不能小于0") @@ -172,6 +260,7 @@ func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Dur jobData := JobData{ JobType: JobTypeInterval, + TaskId: taskId, CreateTime: nowTime, IntervalTime: spaceTime, } @@ -185,80 +274,113 @@ func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Dur // @param extend 附加参数 // @return int 定时器索引 // @return error 错误 -func (l *Single) addJob(ctx context.Context, jobData JobData, call func(ctx context.Context, extendData interface{}) error, extend interface{}) (int, error) { - singleTimerIndex += 1 +func (l *Single) addJob(ctx context.Context, jobData JobData, call func(ctx context.Context, extendData interface{}) error, extend interface{}) (int64, error) { + if jobData.TaskId == "" { + l.logger.Errorf(ctx, "任务ID不能为空") + return 0, ErrTaskIdEmpty + } + if jobData.Day < 0 || jobData.Day > 31 { + l.logger.Errorf(ctx, "每月的天数必须在0-31之间") + return 0, ErrMonthDay + } + if jobData.Hour < 0 || jobData.Hour > 23 { + l.logger.Errorf(ctx, "小时必须在0-23之间") + return 0, ErrHour + } + if jobData.Minute < 0 || jobData.Minute > 59 { + l.logger.Errorf(ctx, "分钟必须在0-59之间") + return 0, ErrMinute + } + if jobData.Second < 0 || jobData.Second > 59 { + l.logger.Errorf(ctx, "秒必须在0-59之间") + return 0, ErrSecond + } + if call == nil { + l.logger.Errorf(ctx, "回调函数不能为空") + return 0, ErrCallbackEmpty + } - _, err := GetNextTime(time.Now().In(l.location), jobData) + nextTime, err := GetNextTime(time.Now().In(l.location), jobData) if err != nil { l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error()) return 0, err } + jobData.NextTime = *nextTime + + // 生成唯一索引 + index := atomic.AddInt64(&l.timerIndex, 1) t := timerStr{ Callback: call, CanRunning: make(chan struct{}, 1), ExtendData: extend, + TaskId: jobData.TaskId, JobData: &jobData, } - singleWorkerList.Store(singleTimerIndex, t) + l.workerList.Store(index, t) - return singleTimerIndex, nil + // 计算下次执行时间(全局) + l.updateNextTimeIfEarlier(*nextTime) + + return index, nil +} + +// 如果更早则更新下次执行时间(全局) +func (s *Single) updateNextTimeIfEarlier(candidate time.Time) { + s.nextTimeMux.Lock() + defer s.nextTimeMux.Unlock() + + if candidate.Before(s.nextTime) { + s.nextTime = candidate + } } // 删除定时器 -func (s *Single) Del(index int) { - singleWorkerList.Delete(index) +func (l *Single) Del(index int64) { + if val, ok := l.workerList.Load(index); ok { + if timer, ok := val.(timerStr); ok { + close(timer.CanRunning) + } + l.workerList.Delete(index) + } } // 迭代定时器列表 -func (s *Single) iterator(ctx context.Context) { - - nowTime := time.Now().In(s.location) +func (l *Single) iterator(ctx context.Context) { + // 当前时间 + nowTime := time.Now().In(l.location) // 默认5秒后(如果没有值就暂停进来5秒) newNextTime := nowTime.Add(time.Second * 5) - index := 0 - singleWorkerList.Range(func(k, v interface{}) bool { - index++ - timeStr := v.(timerStr) + l.workerList.Range(func(k, v interface{}) bool { + timeStr, ok := v.(timerStr) + if !ok { + l.logger.Errorf(ctx, "timer: 类型断言失败,跳过该任务") + l.workerList.Delete(k) + return true + } if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) { - // 可执行 - nextTime, _ := GetNextTime(nowTime, *timeStr.JobData) - timeStr.JobData.NextTime = *nextTime - if index == 1 { - // 循环的第一个需要替换默认值 - newNextTime = timeStr.JobData.NextTime + originTime := timeStr.JobData.NextTime + + // 计算下次执行时间 + nextTime, err := GetNextTime(nowTime, *timeStr.JobData) + if err != nil { + l.logger.Errorf(ctx, "timer: 计算下次执行时间失败:%s", err.Error()) + return true } + // 更新下次执行时间 + timeStr.JobData.NextTime = *nextTime if nextTime.Before(newNextTime) { // 本规则下次发送时间小于系统下次需要执行的时间:替换 newNextTime = *nextTime } - // 处理中就跳过本次 - go func(ctx context.Context, v timerStr) { - select { - case v.CanRunning <- struct{}{}: - defer func() { - // fmt.Printf("timer: 执行完成 %v %v \n", k, v.Tag) - select { - case <-v.CanRunning: - return - default: - return - } - }() - // fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag) - s.doTask(ctx, v.Callback, v.ExtendData) - default: - // fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag) - return - } - }(ctx, timeStr) + go l.executeTask(ctx, timeStr, originTime) } @@ -266,30 +388,76 @@ func (s *Single) iterator(ctx context.Context) { }) - // 实际下次时间小于预期下次时间:替换 - if singleNextTime.Before(newNextTime) { - // 判断一下避免异常 - if newNextTime.Before(nowTime) { - // 比当前时间小 - singleNextTime = nowTime - } else { - singleNextTime = newNextTime - } - } + l.updateNextTime(newNextTime) +} - // fmt.Println("timer: one finish") +// 执行任务 +func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) { + select { + case timer.CanRunning <- struct{}{}: + defer func() { + select { + case <-timer.CanRunning: + default: + } + }() + + // 检查任务是否已执行 + taskKey := fmt.Sprintf("%s:%d", timer.TaskId, originTime.UnixNano()) + if _, loaded := s.hasRun.LoadOrStore(taskKey, time.Now()); loaded { + s.logger.Warnf(ctx, "timer: 任务已执行,跳过本次执行 %s", timer.TaskId) + return + } + + // 创建带追踪ID的上下文 + traceCtx := context.WithValue(ctx, "trace_id", uuid.NewV4().String()) + traceCtx, cancel := context.WithTimeout(traceCtx, 30*time.Second) // 设置执行超时 + defer cancel() + + // 执行回调 + if err := s.doTask(traceCtx, timer, originTime); err != nil { + s.logger.Errorf(traceCtx, "timer: 任务执行失败: %s", err.Error()) + } + + default: + // 任务正在执行中,跳过本次 + } } // 定时器操作类 // 这里不应painc -func (s *Single) doTask(ctx context.Context, call func(ctx context.Context, extendData interface{}) error, extend interface{}) error { +func (l *Single) doTask(ctx context.Context, timeStr timerStr, originTime time.Time) error { defer func() { if err := recover(); err != nil { - s.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack())) + l.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack())) } }() + nuiKey := timeStr.TaskId + originTime.String() + + // timeStr.TaskId + + _, loaded := singleHasRun.LoadOrStore(nuiKey, time.Now()) + if loaded { + // 已经存在,说明已经执行过了 + l.logger.Errorf(ctx, "timer: 任务已执行,跳过本次执行 %s", nuiKey) + return nil + } + ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String()) - return call(ctx, extend) + return timeStr.Callback(ctx, timeStr.ExtendData) +} + +// 更新下次执行时间 +func (s *Single) updateNextTime(newTime time.Time) { + s.nextTimeMux.Lock() + defer s.nextTimeMux.Unlock() + + now := time.Now() + if newTime.Before(now) { + s.nextTime = now + } else { + s.nextTime = newTime + } } diff --git a/single_test.go b/single_test.go index 488b4f2..b18fb89 100644 --- a/single_test.go +++ b/single_test.go @@ -1,26 +1,469 @@ package timerx_test import ( + "context" "fmt" + "sync" + "sync/atomic" "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/yuninks/timerx" ) -// 单元测试 - -func TestHelloWorld(t *testing.T) { - // 日志 - // t.Log("hello world") - - fmt.Println("hello world") - - // s := "ddd" - // t.Logf("Log测试%s", s) - // t.Errorf("ErrorF %s", s) - - // 标记错误(继续运行) - // t.Fail() - - // 终止运行 - // t.FailNow() - +// MockLogger 用于测试的日志记录器 +type MockLogger struct { + Infos []string + Errors []string + Warns []string + mu sync.Mutex +} + +func (m *MockLogger) Infof(ctx context.Context, format string, args ...interface{}) { + m.mu.Lock() + defer m.mu.Unlock() + m.Infos = append(m.Infos, fmt.Sprintf(format, args...)) +} + +func (m *MockLogger) Errorf(ctx context.Context, format string, args ...interface{}) { + m.mu.Lock() + defer m.mu.Unlock() + m.Errors = append(m.Errors, fmt.Sprintf(format, args...)) +} + +func (m *MockLogger) Warnf(ctx context.Context, format string, args ...interface{}) { + m.mu.Lock() + defer m.mu.Unlock() + m.Warns = append(m.Warns, fmt.Sprintf(format, args...)) +} + +func (m *MockLogger) Clear() { + m.mu.Lock() + defer m.mu.Unlock() + m.Infos = nil + m.Errors = nil + m.Warns = nil +} + +// 测试基础功能 +func TestSingleTimer_Basic(t *testing.T) { + ctx := context.Background() + mockLogger := &MockLogger{} + + timer := timerx.InitSingle(ctx, + timerx.SetLogger(mockLogger), + timerx.SetTimeZone(time.UTC)) + defer timer.Stop() + + // 测试任务计数 + assert.Equal(t, 0, timer.TaskCount()) + + var executionCount int32 + taskFunc := func(ctx context.Context, data interface{}) error { + atomic.AddInt32(&executionCount, 1) + return nil + } + + // 添加间隔任务 + index, err := timer.AddSpace(ctx, "test-task", 100*time.Millisecond, taskFunc, nil) + assert.NoError(t, err) + assert.Greater(t, index, int64(0)) + assert.Equal(t, 1, timer.TaskCount()) + + // 等待任务执行 + time.Sleep(300 * time.Millisecond) + assert.GreaterOrEqual(t, atomic.LoadInt32(&executionCount), int32(2)) + + // 删除任务 + timer.Del(index) + assert.Equal(t, 0, timer.TaskCount()) +} + +// 测试错误参数 +func TestSingleTimer_InvalidParams(t *testing.T) { + ctx := context.Background() + timer := timerx.InitSingle(ctx) + defer timer.Stop() + + validFunc := func(ctx context.Context, data interface{}) error { return nil } + + // 测试空taskId + _, err := timer.AddSpace(ctx, "", time.Second, validFunc, nil) + assert.Error(t, err) + + // 测试nil回调函数 + _, err = timer.AddSpace(ctx, "test", time.Second, nil, nil) + assert.Error(t, err) + + // 测试无效间隔时间 + _, err = timer.AddSpace(ctx, "test", -time.Second, validFunc, nil) + assert.Error(t, err) + _, err = timer.AddSpace(ctx, "test", 0, validFunc, nil) + assert.Error(t, err) +} + +// 测试任务去重 +func TestSingleTimer_Deduplication(t *testing.T) { + ctx := context.Background() + mockLogger := &MockLogger{} + + timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + defer timer.Stop() + + var executionCount int32 + taskFunc := func(ctx context.Context, data interface{}) error { + atomic.AddInt32(&executionCount, 1) + time.Sleep(100 * time.Millisecond) // 模拟耗时任务 + return nil + } + + // 添加短间隔任务 + _, err := timer.AddSpace(ctx, "dedup-test", 50*time.Millisecond, taskFunc, nil) + assert.NoError(t, err) + + // 等待一段时间,检查去重是否生效 + time.Sleep(200 * time.Millisecond) + + // 应该只有1次执行(因为任务执行需要100ms,50ms的间隔会被去重) + assert.Equal(t, int32(1), atomic.LoadInt32(&executionCount)) + + t.Logf("warn: %v", mockLogger.Warns) + t.Logf("info: %v", mockLogger.Infos) + fmt.Println("info:", mockLogger.Infos) + fmt.Println("warn:", mockLogger.Warns) + + // 检查是否有去重日志 + assert.Contains(t, mockLogger.Warns, "timer: 任务已执行,跳过本次执行 dedup-test") +} + +// 测试并发安全 +func TestSingleTimer_Concurrency(t *testing.T) { + ctx := context.Background() + timer := timerx.InitSingle(ctx) + defer timer.Stop() + + var wg sync.WaitGroup + var executionCount int32 + + // 并发添加任务 + for i := 0; i < 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + taskFunc := func(ctx context.Context, data interface{}) error { + atomic.AddInt32(&executionCount, 1) + return nil + } + + _, err := timer.AddSpace(ctx, fmt.Sprintf("concurrent-%d", i), + time.Duration(i+1)*100*time.Millisecond, taskFunc, nil) + assert.NoError(t, err) + }(i) + } + + wg.Wait() + assert.Equal(t, 10, timer.TaskCount()) + + // 等待任务执行 + time.Sleep(500 * time.Millisecond) + assert.Greater(t, atomic.LoadInt32(&executionCount), int32(0)) + + // 并发删除任务 + timer.TaskCount() + maxIndex := timer.MaxIndex() + for i := int64(1); i < maxIndex; i++ { + wg.Add(1) + go func(index int64) { + defer wg.Done() + timer.Del(index) + }(i) + } + + wg.Wait() + assert.Equal(t, 0, timer.TaskCount()) +} + +// 测试任务超时 +func TestSingleTimer_Timeout(t *testing.T) { + ctx := context.Background() + mockLogger := &MockLogger{} + + timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + defer timer.Stop() + + // 长时间运行的任务 + longTask := func(ctx context.Context, data interface{}) error { + select { + case <-time.After(2 * time.Second): // 超过超时时间 + case <-ctx.Done(): + return ctx.Err() + } + return nil + } + + _, err := timer.AddSpace(ctx, "timeout-test", 100*time.Millisecond, longTask, nil) + assert.NoError(t, err) + + time.Sleep(500 * time.Millisecond) + + // 检查是否有超时相关的错误日志 + assert.Contains(t, mockLogger.Errors, "context deadline exceeded") +} + +// 测试panic恢复 +func TestSingleTimer_PanicRecovery(t *testing.T) { + ctx := context.Background() + mockLogger := &MockLogger{} + + timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + defer timer.Stop() + + panicTask := func(ctx context.Context, data interface{}) error { + panic("test panic") + } + + _, err := timer.AddSpace(ctx, "panic-test", 100*time.Millisecond, panicTask, nil) + assert.NoError(t, err) + + time.Sleep(200 * time.Millisecond) + + // 检查是否有panic恢复日志 + assert.Contains(t, mockLogger.Errors, "timer: 回调任务panic err") +} + +// 测试不同时间类型的任务 +func TestSingleTimer_DifferentJobTypes(t *testing.T) { + ctx := context.Background() + timer := timerx.InitSingle(ctx, timerx.SetTimeZone(time.UTC)) + defer timer.Stop() + + var counts struct { + month int32 + week int32 + day int32 + hour int32 + minute int32 + space int32 + } + + now := time.Now().UTC() + + // 月任务(下个月同一天) + _, err := timer.AddMonth(ctx, "month-job", now.Day(), now.Hour(), now.Minute(), now.Second()+1, + func(ctx context.Context, data interface{}) error { + atomic.AddInt32(&counts.month, 1) + return nil + }, nil) + assert.NoError(t, err) + + // 周任务(下周同一天) + _, err = timer.AddWeek(ctx, "week-job", now.Weekday(), now.Hour(), now.Minute(), now.Second()+1, + func(ctx context.Context, data interface{}) error { + atomic.AddInt32(&counts.week, 1) + return nil + }, nil) + assert.NoError(t, err) + + // 间隔任务(立即执行) + _, err = timer.AddSpace(ctx, "space-job", 100*time.Millisecond, + func(ctx context.Context, data interface{}) error { + atomic.AddInt32(&counts.space, 1) + return nil + }, nil) + assert.NoError(t, err) + + time.Sleep(300 * time.Millisecond) + + // 只有间隔任务应该执行 + assert.Equal(t, int32(1), atomic.LoadInt32(&counts.space)) + assert.Equal(t, int32(0), atomic.LoadInt32(&counts.month)) + assert.Equal(t, int32(0), atomic.LoadInt32(&counts.week)) +} + +// 测试上下文取消 +func TestSingleTimer_ContextCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + mockLogger := &MockLogger{} + + timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + + var executionCount int32 + _, err := timer.AddSpace(ctx, "cancel-test", 100*time.Millisecond, + func(ctx context.Context, data interface{}) error { + atomic.AddInt32(&executionCount, 1) + return nil + }, nil) + assert.NoError(t, err) + + // 让任务执行一次 + time.Sleep(150 * time.Millisecond) + initialCount := atomic.LoadInt32(&executionCount) + + // 取消上下文 + cancel() + time.Sleep(100 * time.Millisecond) // 等待停止 + + // 检查是否停止了执行 + finalCount := atomic.LoadInt32(&executionCount) + assert.Equal(t, initialCount, finalCount) // 计数不应该再增加 + + // 检查是否有停止日志 + assert.Contains(t, mockLogger.Infos, "timer: context cancelled, stopping timer loop") +} + +// 测试扩展数据传递 +func TestSingleTimer_ExtendData(t *testing.T) { + ctx := context.Background() + timer := timerx.InitSingle(ctx) + defer timer.Stop() + + type TestData struct { + Message string + Count int + } + + testData := &TestData{Message: "hello", Count: 42} + var receivedData *TestData + + _, err := timer.AddSpace(ctx, "data-test", 100*time.Millisecond, + func(ctx context.Context, data interface{}) error { + if data != nil { + receivedData = data.(*TestData) + } + return nil + }, testData) + assert.NoError(t, err) + + time.Sleep(150 * time.Millisecond) + + assert.NotNil(t, receivedData) + assert.Equal(t, "hello", receivedData.Message) + assert.Equal(t, 42, receivedData.Count) +} + +// 测试任务删除 +func TestSingleTimer_TaskDeletion(t *testing.T) { + ctx := context.Background() + timer := timerx.InitSingle(ctx) + defer timer.Stop() + + var executionCount int32 + + // 添加多个任务 + index1, err := timer.AddSpace(ctx, "task-1", 100*time.Millisecond, + func(ctx context.Context, data interface{}) error { + atomic.AddInt32(&executionCount, 1) + return nil + }, nil) + assert.NoError(t, err) + + index2, err := timer.AddSpace(ctx, "task-2", 100*time.Millisecond, + func(ctx context.Context, data interface{}) error { + atomic.AddInt32(&executionCount, 1) + return nil + }, nil) + assert.NoError(t, err) + + assert.Equal(t, 2, timer.TaskCount()) + + // 删除一个任务 + timer.Del(index1) + assert.Equal(t, 1, timer.TaskCount()) + + // 等待执行 + time.Sleep(200 * time.Millisecond) + count := atomic.LoadInt32(&executionCount) + + // 应该只有task-2执行 + assert.True(t, count >= 1 && count <= 2) + + // 删除另一个任务 + timer.Del(index2) + assert.Equal(t, 0, timer.TaskCount()) +} + +// 测试GetNextTime函数(需要根据实际实现调整) +func TestGetNextTime2(t *testing.T) { + now := time.Now().UTC() + + // 测试间隔任务 + jobData := timerx.JobData{ + JobType: timerx.JobTypeInterval, + IntervalTime: time.Minute, + } + + nextTime, err := timerx.GetNextTime(now, jobData) + assert.NoError(t, err) + assert.WithinDuration(t, now.Add(time.Minute), *nextTime, time.Second) +} + +// 基准测试 +func BenchmarkSingleTimer_AddAndExecute(b *testing.B) { + ctx := context.Background() + timer := timerx.InitSingle(ctx) + defer timer.Stop() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + timer.AddSpace(ctx, fmt.Sprintf("bench-%d", i), time.Millisecond, + func(ctx context.Context, data interface{}) error { + return nil + }, nil) + } +} + +// 测试日志记录 +func TestSingleTimer_Logging(t *testing.T) { + ctx := context.Background() + mockLogger := &MockLogger{} + + timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + defer timer.Stop() + + // 添加会panic的任务 + _, err := timer.AddSpace(ctx, "logging-test", 100*time.Millisecond, + func(ctx context.Context, data interface{}) error { + panic("test panic for logging") + }, nil) + assert.NoError(t, err) + + time.Sleep(200 * time.Millisecond) + + // 检查日志记录 + assert.NotEmpty(t, mockLogger.Errors) + assert.Contains(t, mockLogger.Errors[0], "timer: 回调任务panic err") +} + +// 测试时区处理 +func TestSingleTimer_Timezone(t *testing.T) { + // 测试不同时区 + locations := []*time.Location{ + time.UTC, + time.FixedZone("TEST+8", 8*60*60), + time.FixedZone("TEST-5", -5*60*60), + } + + for _, loc := range locations { + t.Run(loc.String(), func(t *testing.T) { + ctx := context.Background() + timer := timerx.InitSingle(ctx, timerx.SetTimeZone(loc)) + defer timer.Stop() + + var executed bool + // now := time.Now().In(loc) + + // 添加下一秒执行的任务 + _, err := timer.AddSpace(ctx, "tz-test", time.Second, + func(ctx context.Context, data interface{}) error { + executed = true + return nil + }, nil) + assert.NoError(t, err) + + time.Sleep(1500 * time.Millisecond) + assert.True(t, executed) + }) + } } diff --git a/types.go b/types.go index a6b9b78..d9c216b 100644 --- a/types.go +++ b/types.go @@ -6,11 +6,11 @@ import ( ) type timerStr struct { - Callback func(ctx context.Context, extendData interface{}) error // 需要回调的方法 - CanRunning chan (struct{}) // 是否允许执行(only single) - TaskId string // 任务ID 全局唯一键(only cluster) - ExtendData interface{} // 附加参数 - JobData *JobData // 任务时间数据 + Callback func(ctx context.Context, extendData interface{}) error // 需要回调的方法 + CanRunning chan (struct{}) // 是否允许执行(only single) + TaskId string // 任务ID 全局唯一键(only cluster) + ExtendData interface{} // 附加参数 + JobData *JobData // 任务时间数据 } type JobType string @@ -27,6 +27,7 @@ const ( type JobData struct { JobType JobType // 任务类型 + TaskId string // 任务ID 全局唯一键(only cluster) NextTime time.Time // 下次执行时间 BaseTime time.Time // 基准时间(间隔的基准时间) CreateTime time.Time // 任务创建时间