diff --git a/cluster.go b/cluster.go index 1ccb6f0..dba663f 100644 --- a/cluster.go +++ b/cluster.go @@ -33,7 +33,8 @@ type Cluster struct { redis redis.UniversalClient cache *cachex.Cache logger Logger - keyPrefix string // key前缀 + keyPrefix string // key前缀 + location *time.Location // 根据时区计算的时间 lockKey string // 全局计算的key zsetKey string // 有序集合的key @@ -56,6 +57,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin cache: cachex.NewCache(), logger: op.logger, keyPrefix: keyPrefix, + location: op.location, lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 listKey: "timer:cluster_listKey" + keyPrefix, // 列表 @@ -208,7 +210,7 @@ func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, ca return errors.New("key已存在") } - _, err := GetNextTime(time.Now(), time.Local, jobData) + _, err := GetNextTime(time.Now().In(c.location), jobData) if err != nil { c.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error()) return err @@ -259,7 +261,7 @@ func (c *Cluster) getNextTime() { clusterWorkerList.Range(func(key, value interface{}) bool { val := value.(timerStr) - nextTime, _ := GetNextTime(time.Now(), time.Local, *val.JobData) + nextTime, _ := GetNextTime(time.Now().In(c.location), *val.JobData) // fmt.Println(val.ExtendData, val.JobData, nextTime) diff --git a/next_time.go b/next_time.go index b78eb4b..0f616c1 100644 --- a/next_time.go +++ b/next_time.go @@ -8,21 +8,21 @@ import ( // 计算该任务下次执行时间 // @param job *JobData 任务数据 // @return time.Time 下次执行时间 -func GetNextTime(t time.Time, loc *time.Location, job JobData) (*time.Time, error) { +func GetNextTime(t time.Time, job JobData) (*time.Time, error) { var next time.Time switch job.JobType { case JobTypeEveryMonth: - next = calculateNextMonthTime(t, job, loc) + next = calculateNextMonthTime(t, job) case JobTypeEveryWeek: - next = calculateNextWeekTime(t, job, loc) + next = calculateNextWeekTime(t, job) case JobTypeEveryDay: - next = calculateNextDayTime(t, job, loc) + next = calculateNextDayTime(t, job) case JobTypeEveryHour: - next = calculateNextHourTime(t, job, loc) + next = calculateNextHourTime(t, job) case JobTypeEveryMinute: - next = calculateNextMinuteTime(t, job, loc) + next = calculateNextMinuteTime(t, job) case JobTypeInterval: next = calculateNextInterval(t, job) default: @@ -38,16 +38,17 @@ func calculateNextInterval(t time.Time, job JobData) time.Time { return job.BaseTime.Add(job.IntervalTime * time.Duration(cycle+1)) } -func calculateNextMonthTime(t time.Time, job JobData, loc *time.Location) time.Time { +func calculateNextMonthTime(t time.Time, job JobData) time.Time { // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { - return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, loc) + return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, t.Location()) } // 下一个周期(下个月) - return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, loc) + return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, t.Location()) } -func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Time { +func calculateNextWeekTime(t time.Time, job JobData) time.Time { weekday := t.Weekday() days := int(job.Weekday - weekday) if days < 0 { @@ -55,37 +56,37 @@ func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Ti } // 判断是否可执行并返回下一个执行时间 if canRun(t, job) { - return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc) + return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location()) } // 下一个周期(下周) - return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, loc) + return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, t.Location()) } -func calculateNextDayTime(t time.Time, job JobData, loc *time.Location) time.Time { +func calculateNextDayTime(t time.Time, job JobData) time.Time { // 判断是否可执行并返回下一个执行时间 if canRun(t, job) { - return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc) + return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location()) } // 下一个周期(明天) - return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, loc) + return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, t.Location()) } -func calculateNextHourTime(t time.Time, job JobData, loc *time.Location) time.Time { +func calculateNextHourTime(t time.Time, job JobData) time.Time { // 判断是否可执行并返回下一个执行时间 if canRun(t, job) { - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, loc) + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, t.Location()) } // 下一个周期(下个小时) - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, loc) + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, t.Location()) } -func calculateNextMinuteTime(t time.Time, job JobData, loc *time.Location) time.Time { +func calculateNextMinuteTime(t time.Time, job JobData) time.Time { // 判断是否可执行并返回下一个执行时间 if canRun(t, job) { - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, loc) + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, t.Location()) } // 下一个周期(下分钟) - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, loc) + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, t.Location()) } // 检查是否本周期可以运行 diff --git a/next_time_test.go b/next_time_test.go index 234ceab..fdb167c 100644 --- a/next_time_test.go +++ b/next_time_test.go @@ -92,8 +92,8 @@ func TestGetNextTime(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { now := time.Now() - loc := time.FixedZone("CST", 8*3600) - nextTime, err := timerx.GetNextTime(now,loc,test.job) + // loc := time.FixedZone("CST", 8*3600) + nextTime, err := timerx.GetNextTime(now, test.job) if err != nil { if test.expectedError == nil || err.Error() != test.expectedError.Error() { t.Errorf("Expected error: %v, Got error: %v", test.expectedError, err) diff --git a/option.go b/option.go index 050d203..e7cc476 100644 --- a/option.go +++ b/option.go @@ -1,12 +1,16 @@ package timerx +import "time" + type Options struct { - logger Logger + logger Logger + location *time.Location } func defaultOptions() Options { return Options{ - logger: NewLogger(), + logger: NewLogger(), + location: time.Local, } } @@ -28,8 +32,8 @@ func SetLogger(log Logger) Option { } // 设定时区 -func SetTimeZone(zone string) Option { +func SetTimeZone(zone *time.Location) Option { return func(o *Options) { - // todo + o.location = zone } } diff --git a/single.go b/single.go index a415320..153aeea 100644 --- a/single.go +++ b/single.go @@ -24,8 +24,9 @@ var singleTimerIndex int // 当前定时数目 var singleOnceLimit sync.Once // 实现单例 type Single struct { - ctx context.Context - logger Logger + ctx context.Context + logger Logger + location *time.Location } var sin *Single = nil @@ -40,8 +41,9 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { op := newOptions(opts...) sin = &Single{ - ctx: ctx, - logger: op.logger, + ctx: ctx, + logger: op.logger, + location: op.location, } timer := time.NewTicker(time.Millisecond * 200) @@ -55,7 +57,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { continue } // 迭代定时器 - sin.iterator(ctx, t) + sin.iterator(ctx) // fmt.Println("timer: 执行") case <-ctx.Done(): // 跳出循环 @@ -185,9 +187,7 @@ func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Dur func (l *Single) addJob(ctx context.Context, jobData JobData, call callback, extend interface{}) (int, error) { singleTimerIndex += 1 - nowTime := time.Now() - - _, err := GetNextTime(nowTime, time.Local, jobData) + _, err := GetNextTime(time.Now().In(l.location), jobData) if err != nil { l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error()) return 0, err @@ -211,7 +211,9 @@ func (s *Single) Del(index int) { } // 迭代定时器列表 -func (s *Single) iterator(ctx context.Context, nowTime time.Time) { +func (s *Single) iterator(ctx context.Context) { + + nowTime := time.Now().In(s.location) // 默认5秒后(如果没有值就暂停进来5秒) newNextTime := nowTime.Add(time.Second * 5) @@ -223,7 +225,7 @@ func (s *Single) iterator(ctx context.Context, nowTime time.Time) { if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) { // 可执行 - nextTime, _ := GetNextTime(nowTime, time.Local, *timeStr.JobData) + nextTime, _ := GetNextTime(nowTime, *timeStr.JobData) timeStr.JobData.NextTime = *nextTime if index == 1 {