From 7912bbc56c32cbc41d4bfd733af25f5faecc3aa5 Mon Sep 17 00:00:00 2001 From: Yun Date: Mon, 20 May 2024 09:35:12 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=A7=E7=89=88=E6=9C=AC=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 221 ++++++++++++++++++++++------------------------ cluster_test.go | 12 +-- cmd/main.go | 12 +-- next_time.go | 116 ++++++++++++++++++++++++ next_time_test.go | 108 ++++++++++++++++++++++ single.go | 210 ++++++++++++++++++++++++++++++------------- types.go | 40 ++++----- 7 files changed, 513 insertions(+), 206 deletions(-) create mode 100644 next_time.go create mode 100644 next_time_test.go diff --git a/cluster.go b/cluster.go index afe3014..2443172 100644 --- a/cluster.go +++ b/cluster.go @@ -2,7 +2,6 @@ package timerx import ( "context" - "encoding/json" "errors" "runtime/debug" "sync" @@ -26,11 +25,11 @@ var clusterOnceLimit sync.Once var clusterWorkerList sync.Map type Cluster struct { - ctx context.Context - redis *redis.Client - logger Logger + ctx context.Context + redis *redis.Client + logger Logger + lockKey string // 全局计算的key - nextKey string // 下一次执行的key zsetKey string // 有序集合的key listKey string // 可执行的任务列表的key setKey string // 重入集合的key @@ -41,15 +40,15 @@ var clu *Cluster = nil // 初始化定时器 // 全局只需要初始化一次 func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts ...Option) *Cluster { - op := newOptions(opts...) clusterOnceLimit.Do(func() { + op := newOptions(opts...) + clu = &Cluster{ ctx: ctx, redis: red, logger: op.logger, lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 - nextKey: "timer:cluster_nextKey" + keyPrefix, // 下一次 zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 listKey: "timer:cluster_listKey" + keyPrefix, // 列表 setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 @@ -76,83 +75,131 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts return clu } -// TODO:指定执行时间 -// 1.每月的1号2点执行(如果当月没有这个号就不执行) -// 2.每周的周一2点执行 -// 3.每天的2点执行 -// 4.每小时的2分执行 -// 5.每分钟的2秒执行 -func (c *Cluster) AddEveryMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) error { +// 每月执行一次 +// @param ctx 上下文 +// @param taskId 任务ID +// @param day 每月的几号 +// @param hour 小时 +// @param minute 分钟 +// @param second 秒 +// @param callback 回调函数 +// @param extendData 扩展数据 +// @return error +func (c *Cluster) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), day, hour, minute, second, 0, nowTime.Location()) - if nextTime.Before(nowTime) { - nextTime = nextTime.AddDate(0, 1, 0) + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Day: day, + Hour: hour, + Minute: minute, + Second: second, } - return c.addJob(ctx, taskId, nextTime, time.Hour*24*30, callback, extendData, JobTypeEveryMonth, &JobData{Day: &day, Hour: &hour, Minute: &minute, Second: &second}) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) AddEveryWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) error { +// 每周执行一次 +// @param ctx context.Context 上下文 +// @param taskId string 任务ID +// @param week time.Weekday 周 +// @param hour int 小时 +// @param minute int 分钟 +// @param second int 秒 +func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), hour, minute, second, 0, nowTime.Location()) - for nextTime.Weekday() != week { - nextTime = nextTime.AddDate(0, 0, 1) + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Weekday: week, + Hour: hour, + Minute: minute, + Second: second, } - if nextTime.Before(nowTime) { - nextTime = nextTime.AddDate(0, 0, 7) - } - return c.addJob(ctx, taskId, nextTime, time.Hour*24*7, callback, extendData, JobTypeInterval, nil) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) AddEveryDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) error { +// 每天执行一次 +func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), hour, minute, second, 0, nowTime.Location()) - if nextTime.Before(nowTime) { - nextTime = nextTime.AddDate(0, 0, 1) + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Hour: hour, + Minute: minute, + Second: second, } - return c.addJob(ctx, taskId, nextTime, time.Hour*24, callback, extendData, JobTypeInterval, nil) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) AddEveryHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) error { +// 每小时执行一次 +func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), nowTime.Hour(), minute, second, 0, nowTime.Location()) - if nextTime.Before(nowTime) { - nextTime = nextTime.Add(time.Hour) + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Minute: minute, + Second: second, } - return c.addJob(ctx, taskId, nextTime, time.Hour, callback, extendData, JobTypeInterval, nil) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) AddEveryMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) error { +// 每分钟执行一次 +func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), nowTime.Hour(), nowTime.Minute(), second, 0, nowTime.Location()) - if nextTime.Before(nowTime) { - nextTime = nextTime.Add(time.Minute) + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Second: second, } - return c.addJob(ctx, taskId, nextTime, time.Minute, callback, extendData, JobTypeInterval, nil) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) Add(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) error { - return c.addJob(ctx, taskId, time.Now(), spaceTime, callback, extendData, JobTypeInterval, nil) +// 特定时间间隔 +func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) error { + nowTime := time.Now() + + if spaceTime < 0 { + c.logger.Errorf(ctx, "间隔时间不能小于0") + return errors.New("间隔时间不能小于0") + } + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + IntervalTime: spaceTime, + } + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -// 指定时间间隔 -// TODO: -// 1.不同服务定的时间间隔不一致问题 -// 2.后起的服务计算了时间覆盖前面原有的时间问题 -func (c *Cluster) addJob(ctx context.Context, taskId string, beginTime time.Time, spaceTime time.Duration, callback callback, extendData interface{}, jobType JobType, jobData *JobData) error { +// 统一添加任务 +// @param ctx context.Context 上下文 +// @param taskId string 任务ID +// @param jobData *JobData 任务数据 +// @param callback callback 回调函数 +// @param extendData interface{} 扩展数据 +// @return error +func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback callback, extendData interface{}) error { _, ok := clusterWorkerList.Load(taskId) if ok { c.logger.Errorf(ctx, "key已存在:%s", taskId) return errors.New("key已存在") } - if spaceTime != spaceTime.Abs() { - c.logger.Errorf(ctx, "时间间隔不能为负数:%s", taskId) - return errors.New("时间间隔不能为负数") + _, err := GetNextTime(time.Now(), time.Local, jobData) + if err != nil { + c.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error()) + return err } ctx, cancel := context.WithCancel(ctx) @@ -167,42 +214,20 @@ func (c *Cluster) addJob(ctx context.Context, taskId string, beginTime time.Time defer lock.Unlock() t := timerStr{ - BeginTime: beginTime, - NextTime: beginTime, - SpaceTime: spaceTime, Callback: callback, ExtendData: extendData, TaskId: taskId, - JobType: jobType, - JobData: jobData, + JobData: &jobData, } clusterWorkerList.Store(taskId, t) - cacheStr, _ := c.redis.Get(ctx, c.nextKey).Result() - execTime := make(map[string]time.Time) - json.Unmarshal([]byte(cacheStr), &execTime) - - p := c.redis.Pipeline() - - p.ZAdd(ctx, c.zsetKey, &redis.Z{ - Score: float64(nextTime.UnixMilli()), - Member: taskId, - }) - execTime[taskId] = nextTime - n, _ := json.Marshal(execTime) - // fmt.Println("execTime:", execTime, string(n)) - p.Set(ctx, c.nextKey, string(n), 0) - - _, err := p.Exec(ctx) - - // fmt.Println("添加", err) - return err } // 计算下一次执行的时间 // TODO:注册的任务需放在Redis集中存储,因为本地的话,如果有多个服务,那么就会出现不一致的情况。但是要注意服务如何进行下线,由于是主动上报的,需要有一个机制进行删除过期的任务(添加任务&定时器轮训注册) +// TODO:考虑不同实例系统时间不一样,可能计算的下次时间不一致,会有重复执行的可能 func (c *Cluster) getNextTime() { ctx, cancel := context.WithCancel(c.ctx) @@ -219,24 +244,13 @@ func (c *Cluster) getNextTime() { // 计算下一次时间 - // 读取执行的缓存 - cacheStr, _ := c.redis.Get(ctx, c.nextKey).Result() - execTime := make(map[string]time.Time) - json.Unmarshal([]byte(cacheStr), &execTime) - p := c.redis.Pipeline() - nowTime := time.Now() - // 根据内部注册的任务列表计算下一次执行的时间 clusterWorkerList.Range(func(key, value interface{}) bool { val := value.(timerStr) - beforeTime := execTime[val.TaskId] - if beforeTime.After(nowTime) { - return true - } - nextTime := getNextExecTime(val) - execTime[val.TaskId] = nextTime + + nextTime, _ := GetNextTime(time.Now(), time.Local, *val.JobData) p.ZAdd(ctx, c.zsetKey, &redis.Z{ Score: float64(nextTime.UnixMilli()), @@ -246,31 +260,10 @@ func (c *Cluster) getNextTime() { return true }) - // 更新缓存 - b, _ := json.Marshal(execTime) - p.Set(ctx, c.nextKey, string(b), 0) - _, err := p.Exec(ctx) _ = err } -// 递归遍历获取执行时间 -// TODO:需要根据不同的任务类型计算下次定时时间 -func getNextExecTime(ts timerStr) time.Time { - nowTime := time.Now() - if ts.NextTime.After(nowTime) { - return ts.NextTime - } - nextTime := ts.NextTime.Add(ts.SpaceTime) - ts.NextTime = nextTime - - // 递归计算直到拿到下一次执行的时间 - if nextTime.Before(nowTime) { - nextTime = getNextExecTime(ts) - } - return nextTime -} - // 获取任务 func (c *Cluster) getTask() { // 定时去Redis获取任务 diff --git a/cluster_test.go b/cluster_test.go index cfd51bf..34bd72c 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -30,7 +30,7 @@ func TestCluster_AddEveryMonth(t *testing.T) { } extendData := "testData" - err := cluster.AddEveryMonth(ctx, taskId, 1, hour, minute, second, callback, extendData) + err := cluster.AddMonth(ctx, taskId, 1, hour, minute, second, callback, extendData) if err != nil { t.Errorf("AddEveryMonth failed, err: %v", err) } @@ -59,7 +59,7 @@ func TestCluster_AddEveryWeek(t *testing.T) { } extendData := "testData" - err := cluster.AddEveryWeek(ctx, taskId, week, hour, minute, second, callback, extendData) + err := cluster.AddWeek(ctx, taskId, week, hour, minute, second, callback, extendData) if err != nil { t.Errorf("AddEveryWeek failed, err: %v", err) } @@ -87,7 +87,7 @@ func TestCluster_AddEveryDay(t *testing.T) { } extendData := "testData" - err := cluster.AddEveryDay(ctx, taskId, hour, minute, second, callback, extendData) + err := cluster.AddDay(ctx, taskId, hour, minute, second, callback, extendData) if err != nil { t.Errorf("AddEveryDay failed, err: %v", err) } @@ -114,7 +114,7 @@ func TestCluster_AddEveryHour(t *testing.T) { } extendData := "testData" - err := cluster.AddEveryHour(ctx, taskId, minute, second, callback, extendData) + err := cluster.AddHour(ctx, taskId, minute, second, callback, extendData) if err != nil { t.Errorf("AddEveryHour failed, err: %v", err) } @@ -140,7 +140,7 @@ func TestCluster_AddEveryMinute(t *testing.T) { } extendData := "testData" - err := cluster.AddEveryMinute(ctx, taskId, second, callback, extendData) + err := cluster.AddMinute(ctx, taskId, second, callback, extendData) if err != nil { t.Errorf("AddEveryMinute failed, err: %v", err) } @@ -170,7 +170,7 @@ func TestCluster_Add(t *testing.T) { } extendData := "testData" - err := cluster.Add(ctx, taskId, dur, callback, extendData) + err := cluster.AddSpace(ctx, taskId, dur, callback, extendData) if err != nil { t.Errorf("Add failed, err: %v", err) } diff --git a/cmd/main.go b/cmd/main.go index 39aacf7..6eafde1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -77,12 +77,12 @@ func re() { ctx := context.Background() cl := timerx.InitCluster(ctx, client, "kkkk") - cl.Add(ctx, "test1", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test2", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test3", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test4", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test5", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test6", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test1", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test2", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test3", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test4", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test5", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test6", 1*time.Millisecond, aa, "data") select {} } diff --git a/next_time.go b/next_time.go new file mode 100644 index 0000000..2d17ea3 --- /dev/null +++ b/next_time.go @@ -0,0 +1,116 @@ +package timerx + +import ( + "errors" + "time" +) + +// 计算该任务下次执行时间 +// @param job *JobData 任务数据 +// @return time.Time 下次执行时间 +func GetNextTime(t time.Time,loc *time.Location, job JobData) (*time.Time, error) { + + var next time.Time + + switch job.JobType { + case JobTypeEveryMonth: + next = calculateNextMonthTime(t, job, loc) + case JobTypeEveryWeek: + next = calculateNextWeekTime(t, job, loc) + case JobTypeEveryDay: + next = calculateNextDayTime(t, job, loc) + case JobTypeEveryHour: + next = calculateNextHourTime(t, job, loc) + case JobTypeEveryMinute: + next = calculateNextMinuteTime(t, job, loc) + case JobTypeInterval: + next = calculateNextInterval(t, job) + default: + return nil, errors.New("未知的任务类型: " + string(job.JobType)) + } + + return &next, nil +} + +func calculateNextInterval(t time.Time, job JobData) time.Time { + // 从创建的时候开始计算 + cycle := t.Sub(job.CreateTime).Microseconds() / job.IntervalTime.Microseconds() + return job.CreateTime.Add(job.IntervalTime * time.Duration(cycle+1)) +} + +func calculateNextMonthTime(t time.Time, job JobData, loc *time.Location) time.Time { + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, loc) + } + // 下一个周期(下个月) + return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, loc) +} + +func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Time { + weekday := t.Weekday() + days := int(job.Weekday - weekday) + if days < 0 { + days += 7 + } + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc) + } + // 下一个周期(下周) + return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, loc) +} + +func calculateNextDayTime(t time.Time, job JobData, loc *time.Location) time.Time { + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc) + } + // 下一个周期(明天) + return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, loc) +} + +func calculateNextHourTime(t time.Time, job JobData, loc *time.Location) time.Time { + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, loc) + } + // 下一个周期(下个小时) + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, loc) +} + +func calculateNextMinuteTime(t time.Time, job JobData, loc *time.Location) time.Time { + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, loc) + } + // 下一个周期(下分钟) + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, loc) +} + +// 检查是否本周期可以运行 +func canRun(t time.Time, job JobData) bool { + switch job.JobType { + case JobTypeEveryMonth: + return t.Day() < job.Day || + (t.Day() == job.Day && t.Hour() < job.Hour) || + (t.Day() == job.Day && t.Hour() == job.Hour && t.Minute() < job.Minute) || + (t.Day() == job.Day && t.Hour() == job.Hour && t.Minute() == job.Minute && t.Second() <= job.Second) + case JobTypeEveryWeek: + return t.Weekday() < job.Weekday || + (t.Weekday() == job.Weekday && t.Hour() < job.Hour) || + (t.Weekday() == job.Weekday && t.Hour() == job.Hour && t.Minute() < job.Minute) || + (t.Weekday() == job.Weekday && t.Hour() == job.Hour && t.Minute() == job.Minute && t.Second() <= job.Second) + case JobTypeEveryDay: + return t.Hour() < job.Hour || + (t.Hour() == job.Hour && t.Minute() < job.Minute) || + (t.Hour() == job.Hour && t.Minute() == job.Minute && t.Second() <= job.Second) + case JobTypeEveryHour: + return t.Minute() < job.Minute || + (t.Minute() == job.Minute && t.Second() <= job.Second) + case JobTypeEveryMinute: + return t.Second() <= job.Second + default: + return false + } +} diff --git a/next_time_test.go b/next_time_test.go new file mode 100644 index 0000000..234ceab --- /dev/null +++ b/next_time_test.go @@ -0,0 +1,108 @@ +package timerx_test + +import ( + "errors" + "testing" + "time" + + "github.com/yuninks/timerx" +) + +func TestGetNextTime(t *testing.T) { + // Test cases + tests := []struct { + name string + job timerx.JobData + expectedTime time.Time + expectedError error + }{ + { + name: "Test JobTypeEveryMonth", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryMonth, + Day: 15, + Hour: 10, + Minute: 0, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 15, 10, 0, 0, 0, time.Local), + expectedError: nil, + }, + { + name: "Test JobTypeEveryWeek", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryWeek, + Weekday: time.Tuesday, + Hour: 10, + Minute: 0, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 8, 10, 0, 0, 0, time.Local), // Assuming current date is March 7, 2022 + expectedError: nil, + }, + { + name: "Test JobTypeEveryDay", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryDay, + Hour: 10, + Minute: 0, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 8, 10, 0, 0, 0, time.Local), // Assuming current date is March 7, 2022 + expectedError: nil, + }, + { + name: "Test JobTypeEveryHour", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryHour, + Minute: 0, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 7, 11, 0, 0, 0, time.Local), // Assuming current date is March 7, 2022, 10:30 AM + expectedError: nil, + }, + { + name: "Test JobTypeEveryMinute", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryMinute, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 7, 10, 31, 0, 0, time.Local), // Assuming current date is March 7, 2022, 10:30 AM + expectedError: nil, + }, + { + name: "Test JobTypeInterval", + job: timerx.JobData{ + JobType: timerx.JobTypeInterval, + IntervalTime: 1 * time.Hour, + }, + expectedTime: time.Date(2022, 3, 7, 11, 30, 0, 0, time.Local), // Assuming current date is March 7, 2022, 10:30 AM + expectedError: nil, + }, + { + name: "Test unknown JobType", + job: timerx.JobData{ + JobType: timerx.JobType(100), + }, + expectedTime: time.Time{}, + expectedError: errors.New("未知的任务类型: 100"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + now := time.Now() + loc := time.FixedZone("CST", 8*3600) + nextTime, err := timerx.GetNextTime(now,loc,test.job) + if err != nil { + if test.expectedError == nil || err.Error() != test.expectedError.Error() { + t.Errorf("Expected error: %v, Got error: %v", test.expectedError, err) + } + } else { + if nextTime.IsZero() != (test.expectedTime == time.Time{}) || (nextTime != &test.expectedTime) { + t.Errorf("Expected time: %v, Got time: %v", test.expectedTime, nextTime) + } + } + }) + } +} diff --git a/single.go b/single.go index d8569ec..6b4d82c 100644 --- a/single.go +++ b/single.go @@ -5,7 +5,6 @@ package timerx import ( "context" "errors" - "fmt" "runtime/debug" "sync" "time" @@ -15,12 +14,12 @@ import ( // 1. 这个定时器的作用范围是本机 // 2. 适用简单的时间间隔定时任务 -// uuid -> timerStr -var timerMap = make(map[string]*timerStr) -var timerMapMux sync.Mutex +// 定时器结构体 +var singleWorkerList sync.Map -var timerCount int // 当前定时数目 -var onceLimit sync.Once // 实现单例 +var singleTimerIndex int // 当前定时数目 + +var singleOnceLimit sync.Once // 实现单例 type Single struct { ctx context.Context @@ -29,10 +28,13 @@ type Single struct { var sin *Single = nil -// 定时器类 -func InitSingle(ctx context.Context, opts ...Option) *Single { - onceLimit.Do(func() { +var singleNextTime = time.Now() // 下一次执行的时间 +// 定时器类 +// @param ctx context.Context 上下文 +// @param opts ...Option 配置项 +func InitSingle(ctx context.Context, opts ...Option) *Single { + singleOnceLimit.Do(func() { op := newOptions(opts...) sin = &Single{ @@ -46,7 +48,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { for { select { case t := <-timer.C: - if t.Before(nextTime) { + if t.Before(singleNextTime) { // 当前时间小于下次发送时间:跳过 continue } @@ -65,89 +67,175 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { return sin } +// 每月执行一次 +// @param ctx 上下文 +// @param taskId 任务ID +// @param day 每月的几号 +// @param hour 小时 +// @param minute 分钟 +// @param second 秒 +// @param callback 回调函数 +// @param extendData 扩展数据 +// @return error +func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Day: day, + Hour: hour, + Minute: minute, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 每周执行一次 +// @param ctx context.Context 上下文 +// @param taskId string 任务ID +// @param week time.Weekday 周 +// @param hour int 小时 +// @param minute int 分钟 +// @param second int 秒 +func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Weekday: week, + Hour: hour, + Minute: minute, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 每天执行一次 +func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Hour: hour, + Minute: minute, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 每小时执行一次 +func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Minute: minute, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 每分钟执行一次 +func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 特定时间间隔 +func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + if spaceTime < 0 { + c.logger.Errorf(ctx, "间隔时间不能小于0") + return 0, errors.New("间隔时间不能小于0") + } + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + IntervalTime: spaceTime, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + // 间隔定时器 // @param space 间隔时间 // @param call 回调函数 // @param extend 附加参数 // @return int 定时器索引 // @return error 错误 -func (s *Single) Add(space time.Duration, call callback, extend interface{}) (int, error) { - timerMapMux.Lock() - defer timerMapMux.Unlock() - - if space != space.Abs() { - s.logger.Errorf(s.ctx, "space must be positive") - return 0, errors.New("space must be positive") - } - - timerCount += 1 +func (l *Single) addJob(ctx context.Context, jobData JobData, call callback, extend interface{}) (int, error) { + singleTimerIndex += 1 nowTime := time.Now() + _, err := GetNextTime(nowTime, time.Local, jobData) + if err != nil { + l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error()) + return 0, err + } + t := timerStr{ Callback: call, - BeginTime: nowTime, - NextTime: nowTime, // nowTime.Add(space), // 添加任务的时候就执行一次 - SpaceTime: space, CanRunning: make(chan struct{}, 1), ExtendData: extend, + JobData: &jobData, } - timerMap[fmt.Sprintf("%d", timerCount)] = &t + singleWorkerList.Store(singleTimerIndex, t) - if t.NextTime.Before(nextTime) { - // 本条规则下次需要发送的时间小于系统下次发送时间:替换 - nextTime = t.NextTime - } - - return timerCount, nil + return singleTimerIndex, nil } // 删除定时器 -func (s *Single) Del(index string) { - timerMapMux.Lock() - defer timerMapMux.Unlock() - delete(timerMap, index) +func (s *Single) Del(index int) { + singleWorkerList.Delete(index) } // 迭代定时器列表 func (s *Single) iterator(ctx context.Context, nowTime time.Time) { - timerMapMux.Lock() - defer timerMapMux.Unlock() - - // fmt.Println("nowTime:", nowTime.Format("2006-01-02 15:04:05.000")) // 默认5秒后(如果没有值就暂停进来5秒) newNextTime := nowTime.Add(time.Second * 5) index := 0 - for _, v := range timerMap { + singleWorkerList.Range(func(k, v interface{}) bool { index++ - v := v - // 判断执行的时机 - if v.NextTime.Before(nowTime) { - // fmt.Println("NextTime", v.NextTime.Format("2006-01-02 15:04:05.000")) + timeStr := v.(timerStr) - v.NextTime = v.NextTime.Add(v.SpaceTime) - - // 判断下次执行时间与当前时间 - if v.NextTime.Before(nowTime) { - v.NextTime = nowTime.Add(v.SpaceTime) - } + if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) { + // 可执行 + nextTime, _ := GetNextTime(nowTime, time.Local, *timeStr.JobData) + timeStr.JobData.NextTime = *nextTime if index == 1 { // 循环的第一个需要替换默认值 - newNextTime = v.NextTime + newNextTime = timeStr.JobData.NextTime } - // 获取最小的 - if v.NextTime.Before(newNextTime) { + if nextTime.Before(newNextTime) { // 本规则下次发送时间小于系统下次需要执行的时间:替换 - newNextTime = v.NextTime + newNextTime = *nextTime } // 处理中就跳过本次 - go func(ctx context.Context, v *timerStr) { + go func(ctx context.Context, v timerStr) { select { case v.CanRunning <- struct{}{}: defer func() { @@ -165,18 +253,22 @@ func (s *Single) iterator(ctx context.Context, nowTime time.Time) { // fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag) return } - }(ctx, v) + }(ctx, timeStr) + } - } + + return true + + }) // 实际下次时间小于预期下次时间:替换 - if nextTime.Before(newNextTime) { + if singleNextTime.Before(newNextTime) { // 判断一下避免异常 if newNextTime.Before(nowTime) { // 比当前时间小 - nextTime = nowTime + singleNextTime = nowTime } else { - nextTime = newNextTime + singleNextTime = newNextTime } } diff --git a/types.go b/types.go index d4a7aaa..49ab5d9 100644 --- a/types.go +++ b/types.go @@ -7,38 +7,36 @@ import ( type timerStr struct { Callback callback // 需要回调的方法 - CanRunning chan (struct{}) // 是否允许执行 - BeginTime time.Time // 初始化任务的时间 - NextTime time.Time // [删]下一次执行的时间 - SpaceTime time.Duration // 任务间隔时间 - TaskId string // 任务ID 全局唯一键 + CanRunning chan (struct{}) // 是否允许执行(only single) + TaskId string // 任务ID 全局唯一键(only cluster) ExtendData interface{} // 附加参数 - JobType JobType // 任务类型 JobData *JobData // 任务时间数据 } type JobType string const ( - JobTypeEveryDay JobType = "every_day" - JobTypeEveryHour JobType = "every_hour" - JobTypeEveryMinute JobType = "every_minute" - JobTypeEverySecond JobType = "every_second" - JobTypeEveryMonth JobType = "every_month" - // 根据间隔时间执行 - JobTypeInterval JobType = "interval" + JobTypeEveryMonth JobType = "every_month" // 每月 + JobTypeEveryWeek JobType = "every_week" // 每周 + JobTypeEveryDay JobType = "every_day" // 每天 + JobTypeEveryHour JobType = "every_hour" // 每小时 + JobTypeEveryMinute JobType = "every_minute" // 每分钟 + JobTypeEverySecond JobType = "every_second" // 每秒 + JobTypeInterval JobType = "interval" // 指定时间间隔 ) type JobData struct { - Month *time.Month // 每年的第几个月 - Weekday *time.Weekday // 每周的周几 - Day *int // 每月的第几天 - Hour *int // 每天的第几个小时 - Minute *int // 每小时的第几分钟 - Second *int // 每分钟的第几秒 + JobType JobType // 任务类型 + NextTime time.Time // 下次执行时间 + CreateTime time.Time // 任务创建时间 + IntervalTime time.Duration // 任务间隔时间 + Month time.Month // 每年的第几个月 + Weekday time.Weekday // 每周的周几 + Day int // 每月的第几天 + Hour int // 每天的第几个小时 + Minute int // 每小时的第几分钟 + Second int // 每分钟的第几秒 } -var nextTime = time.Now() // 下一次执行的时间 - // 定义各个回调函数 type callback func(ctx context.Context, extendData interface{}) error