diff --git a/cluster.go b/cluster.go index c4b0e73..30271e7 100644 --- a/cluster.go +++ b/cluster.go @@ -4,12 +4,14 @@ import ( "context" "encoding/json" "errors" + "fmt" "runtime/debug" "sync" "time" - "code.yun.ink/pkg/lockx" "github.com/go-redis/redis/v8" + "github.com/yuninks/cachex" + "github.com/yuninks/lockx" ) // 功能描述 @@ -26,11 +28,13 @@ var clusterOnceLimit sync.Once var clusterWorkerList sync.Map type Cluster struct { - ctx context.Context - redis *redis.Client - logger Logger + ctx context.Context + redis redis.UniversalClient + cache *cachex.Cache + logger Logger + keyPrefix string // key前缀 + lockKey string // 全局计算的key - nextKey string // 下一次执行的key zsetKey string // 有序集合的key listKey string // 可执行的任务列表的key setKey string // 重入集合的key @@ -40,19 +44,21 @@ var clu *Cluster = nil // 初始化定时器 // 全局只需要初始化一次 -func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts ...Option) *Cluster { - op := newOptions(opts...) +func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) *Cluster { clusterOnceLimit.Do(func() { + op := newOptions(opts...) + clu = &Cluster{ - ctx: ctx, - redis: red, - logger: op.logger, - lockKey: keyPrefix + "timer:cluster_globalLockKey", // 定时器的全局锁 - nextKey: keyPrefix + "timer:cluster_nextKey", // 下一次 - zsetKey: keyPrefix + "timer:cluster_zsetKey", // 有序集合 - listKey: keyPrefix + "timer:cluster_listKey", // 列表 - setKey: keyPrefix + "timer:cluster_setKey", // 重入集合 + ctx: ctx, + redis: red, + cache: cachex.NewCache(), + logger: op.logger, + keyPrefix: keyPrefix, + lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 + zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 + listKey: "timer:cluster_listKey" + keyPrefix, // 列表 + setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 } // 监听任务 @@ -60,7 +66,7 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts timer := time.NewTicker(time.Millisecond * 200) - go func(ctx context.Context, red *redis.Client) { + go func(ctx context.Context) { Loop: for { select { @@ -71,95 +77,147 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts break Loop } } - }(ctx, red) + }(ctx) }) return clu } -// TODO:指定执行时间 -// 1.每月的1号2点执行(如果当月没有这个号就不执行) -// 2.每周的周一2点执行 -// 3.每天的2点执行 -// 4.每小时的2分执行 -// 5.每分钟的2秒执行 -func (c *Cluster) AddEveryMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) error { +// 每月执行一次 +// @param ctx 上下文 +// @param taskId 任务ID +// @param day 每月的几号 +// @param hour 小时 +// @param minute 分钟 +// @param second 秒 +// @param callback 回调函数 +// @param extendData 扩展数据 +// @return error +func (c *Cluster) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), day, hour, minute, second, 0, nowTime.Location()) - if nextTime.Before(nowTime) { - nextTime = nextTime.AddDate(0, 1, 0) + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Day: day, + Hour: hour, + Minute: minute, + Second: second, } - return c.addJob(ctx, taskId, nextTime, time.Hour*24*30, callback, extendData, JobTypeEveryMonth, &JobData{Day: &day, Hour: &hour, Minute: &minute, Second: &second}) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) AddEveryWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) error { +// 每周执行一次 +// @param ctx context.Context 上下文 +// @param taskId string 任务ID +// @param week time.Weekday 周 +// @param hour int 小时 +// @param minute int 分钟 +// @param second int 秒 +func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), hour, minute, second, 0, nowTime.Location()) - for nextTime.Weekday() != week { - nextTime = nextTime.AddDate(0, 0, 1) + + jobData := JobData{ + JobType: JobTypeEveryWeek, + CreateTime: nowTime, + Weekday: week, + Hour: hour, + Minute: minute, + Second: second, } - if nextTime.Before(nowTime) { - nextTime = nextTime.AddDate(0, 0, 7) - } - return c.addJob(ctx, taskId, nextTime, time.Hour*24*7, callback, extendData, JobTypeInterval, nil) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) AddEveryDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) error { +// 每天执行一次 +func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), hour, minute, second, 0, nowTime.Location()) - if nextTime.Before(nowTime) { - nextTime = nextTime.AddDate(0, 0, 1) + + jobData := JobData{ + JobType: JobTypeEveryDay, + CreateTime: nowTime, + Hour: hour, + Minute: minute, + Second: second, } - return c.addJob(ctx, taskId, nextTime, time.Hour*24, callback, extendData, JobTypeInterval, nil) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) AddEveryHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) error { +// 每小时执行一次 +func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), nowTime.Hour(), minute, second, 0, nowTime.Location()) - if nextTime.Before(nowTime) { - nextTime = nextTime.Add(time.Hour) + + jobData := JobData{ + JobType: JobTypeEveryHour, + CreateTime: nowTime, + Minute: minute, + Second: second, } - return c.addJob(ctx, taskId, nextTime, time.Hour, callback, extendData, JobTypeInterval, nil) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) AddEveryMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) error { +// 每分钟执行一次 +func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) error { nowTime := time.Now() - // 计算下一次执行的时间 - nextTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), nowTime.Hour(), nowTime.Minute(), second, 0, nowTime.Location()) - if nextTime.Before(nowTime) { - nextTime = nextTime.Add(time.Minute) + + jobData := JobData{ + JobType: JobTypeEveryMinute, + CreateTime: nowTime, + Second: second, } - return c.addJob(ctx, taskId, nextTime, time.Minute, callback, extendData, JobTypeInterval, nil) + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -func (c *Cluster) Add(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) error { - return c.addJob(ctx, taskId, time.Now(), spaceTime, callback, extendData, JobTypeInterval, nil) +// 特定时间间隔 +func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) error { + nowTime := time.Now() + + if spaceTime < 0 { + c.logger.Errorf(ctx, "间隔时间不能小于0") + return errors.New("间隔时间不能小于0") + } + + // 获取当天的零点时间 + zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) + + jobData := JobData{ + JobType: JobTypeInterval, + BaseTime: zeroTime, // 默认当天的零点 + CreateTime: nowTime, + IntervalTime: spaceTime, + } + + return c.addJob(ctx, taskId, jobData, callback, extendData) } -// 指定时间间隔 -// TODO: -// 1.不同服务定的时间间隔不一致问题 -// 2.后起的服务计算了时间覆盖前面原有的时间问题 -func (c *Cluster) addJob(ctx context.Context, taskId string, beginTime time.Time, spaceTime time.Duration, callback callback, extendData interface{}, jobType JobType, jobData *JobData) error { +// 统一添加任务 +// @param ctx context.Context 上下文 +// @param taskId string 任务ID +// @param jobData *JobData 任务数据 +// @param callback callback 回调函数 +// @param extendData interface{} 扩展数据 +// @return error +func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback callback, extendData interface{}) error { _, ok := clusterWorkerList.Load(taskId) if ok { c.logger.Errorf(ctx, "key已存在:%s", taskId) return errors.New("key已存在") } - if spaceTime != spaceTime.Abs() { - c.logger.Errorf(ctx, "时间间隔不能为负数:%s", taskId) - return errors.New("时间间隔不能为负数") + _, err := GetNextTime(time.Now(), time.Local, jobData) + if err != nil { + c.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error()) + return err } ctx, cancel := context.WithCancel(ctx) defer cancel() lock := lockx.NewGlobalLock(ctx, c.redis, taskId) - tB := lock.Try(10) + tB := lock.Try(2) if !tB { c.logger.Errorf(ctx, "添加失败:%s", taskId) return errors.New("添加失败") @@ -167,48 +225,23 @@ func (c *Cluster) addJob(ctx context.Context, taskId string, beginTime time.Time defer lock.Unlock() t := timerStr{ - BeginTime: beginTime, - NextTime: beginTime, - SpaceTime: spaceTime, Callback: callback, ExtendData: extendData, TaskId: taskId, - JobType: jobType, - JobData: jobData, + JobData: &jobData, } clusterWorkerList.Store(taskId, t) - cacheStr, _ := c.redis.Get(ctx, c.nextKey).Result() - execTime := make(map[string]time.Time) - json.Unmarshal([]byte(cacheStr), &execTime) - - p := c.redis.Pipeline() - - p.ZAdd(ctx, c.zsetKey, &redis.Z{ - Score: float64(nextTime.UnixMilli()), - Member: taskId, - }) - execTime[taskId] = nextTime - n, _ := json.Marshal(execTime) - // fmt.Println("execTime:", execTime, string(n)) - p.Set(ctx, c.nextKey, string(n), 0) - - _, err := p.Exec(ctx) - - // fmt.Println("添加", err) - return err } // 计算下一次执行的时间 // TODO:注册的任务需放在Redis集中存储,因为本地的话,如果有多个服务,那么就会出现不一致的情况。但是要注意服务如何进行下线,由于是主动上报的,需要有一个机制进行删除过期的任务(添加任务&定时器轮训注册) +// TODO:考虑不同实例系统时间不一样,可能计算的下次时间不一致,会有重复执行的可能 func (c *Cluster) getNextTime() { - ctx, cancel := context.WithCancel(c.ctx) - defer cancel() - - lock := lockx.NewGlobalLock(ctx, c.redis, c.lockKey) + lock := lockx.NewGlobalLock(c.ctx, c.redis, c.lockKey) // 获取锁 lockBool := lock.Lock() if !lockBool { @@ -219,53 +252,58 @@ func (c *Cluster) getNextTime() { // 计算下一次时间 - // 读取执行的缓存 - cacheStr, _ := c.redis.Get(ctx, c.nextKey).Result() - execTime := make(map[string]time.Time) - json.Unmarshal([]byte(cacheStr), &execTime) - - p := c.redis.Pipeline() - - nowTime := time.Now() + // p := c.redis.Pipeline() + // 根据内部注册的任务列表计算下一次执行的时间 clusterWorkerList.Range(func(key, value interface{}) bool { val := value.(timerStr) - beforeTime := execTime[val.TaskId] - if beforeTime.After(nowTime) { + + nextTime, _ := GetNextTime(time.Now(), time.Local, *val.JobData) + + // fmt.Println(val.ExtendData, val.JobData, nextTime) + + // 内部判定是否重复 + cacheKey := fmt.Sprintf("%s_%s_%d", c.keyPrefix, val.TaskId, nextTime.Unix()) + _, err := c.cache.Get(cacheKey) + if err == nil { + // 缓存已有值 return true } - nextTime := getNextExecTime(val) - execTime[val.TaskId] = nextTime - p.ZAdd(ctx, c.zsetKey, &redis.Z{ - Score: float64(nextTime.UnixMilli()), - Member: val.TaskId, - }) - // log.Println("computeTime add", c.zsetKey, val.taskId, nextTime.UnixMilli()) + // redis lua脚本,尝试设置nx锁时间为一分钟,如果能设置进去则添加到有序集合zsetKey + script := ` + local zsetKey = KEYS[1] + + local cacheKey = ARGV[1] + local expireTime = ARGV[2] + + local score = ARGV[3] + local member = ARGV[4] + + local res = redis.call('set', cacheKey, '', 'nx', 'ex', expireTime) + + if res then + redis.call('zadd', zsetKey, score, member) + return "SUCCESS" + end + return "ERROR" + ` + + // TODO: + expireTime := time.Minute + + res, err := c.redis.Eval(c.ctx, script, []string{c.zsetKey}, cacheKey, expireTime.Seconds(), nextTime.UnixMilli(), val.TaskId).Result() + + if err == nil && res.(string) == "SUCCESS" { + // 设置成功 + c.cache.Set(cacheKey, "", expireTime) + } + return true }) - // 更新缓存 - b, _ := json.Marshal(execTime) - p.Set(ctx, c.nextKey, string(b), 0) - - _, err := p.Exec(ctx) - _ = err -} - -// 递归遍历获取执行时间 -// TODO:需要根据不同的任务类型计算下次定时时间 -func getNextExecTime(ts timerStr) time.Time { - nowTime := time.Now() - if ts.NextTime.After(nowTime) { - return ts.NextTime - } - nextTime := ts.NextTime.Add(ts.SpaceTime) - ts.NextTime = nextTime - if nextTime.Before(nowTime) { - nextTime = getNextExecTime(ts) - } - return nextTime + // _, err := p.Exec(ctx) + // _ = err } // 获取任务 @@ -297,17 +335,25 @@ func (c *Cluster) watch() { } _, ok := clusterWorkerList.Load(keys[1]) if !ok { - c.logger.Errorf(c.ctx, "watch timer:任务不存在", keys[1]) - c.redis.SAdd(c.ctx, c.setKey, keys[1]) + c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", keys[1]) + + rd := ReJobData{ + TaskId: keys[1], + Times: 1, + } + rdb, _ := json.Marshal(rd) + + c.redis.SAdd(c.ctx, c.setKey, string(rdb)) continue } - go c.doTask(c.ctx, c.redis, keys[1]) + go c.doTask(c.ctx, keys[1]) } }() + // 处理重入任务 go func() { for { - taskId, err := c.redis.SPop(c.ctx, c.setKey).Result() + res, err := c.redis.SPop(c.ctx, c.setKey).Result() if err != nil { if err == redis.Nil { // 已经是空了就不要浪费资源了 @@ -317,20 +363,61 @@ func (c *Cluster) watch() { } continue } - _, ok := clusterWorkerList.Load(taskId) - if !ok { - c.logger.Errorf(c.ctx, "watch timer:任务不存在", taskId) - c.redis.SAdd(c.ctx, c.setKey, taskId) + + var rd ReJobData + err = json.Unmarshal([]byte(res), &rd) + if err != nil { + c.logger.Errorf(c.ctx, "json.Unmarshal err:%+v", err) continue } - go c.doTask(c.ctx, c.redis, taskId) + + _, ok := clusterWorkerList.Load(rd.TaskId) + if !ok { + c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", rd.TaskId) + + if rd.Times >= 3 { + // 重试3次还是失败就不执行了 + continue + } + rd.Times++ + + rdb, _ := json.Marshal(rd) + + c.redis.SAdd(c.ctx, c.setKey, string(rdb)) + continue + } + go c.doTask(c.ctx, rd.TaskId) } }() } +type ReJobData struct { + TaskId string + Times int +} + // 执行任务 -func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string) { +func (c *Cluster) doTask(ctx context.Context, taskId string) { + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + val, ok := clusterWorkerList.Load(taskId) + if !ok { + c.logger.Errorf(ctx, "doTask timer:任务不存在:%s", taskId) + return + } + t := val.(timerStr) + + // 这里加一个全局锁 + lock := lockx.NewGlobalLock(ctx, c.redis, taskId) + tB := lock.Lock() + if !tB { + c.logger.Errorf(ctx, "doTask timer:获取锁失败:%s", taskId) + return + } + defer lock.Unlock() defer func() { if err := recover(); err != nil { @@ -338,22 +425,6 @@ func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string) } }() - val, ok := clusterWorkerList.Load(taskId) - if !ok { - c.logger.Errorf(ctx, "doTask timer:任务不存在", taskId) - return - } - t := val.(timerStr) - - // 这里加一个全局锁 - lock := lockx.NewGlobalLock(ctx, red, taskId) - tB := lock.Lock() - if !tB { - c.logger.Errorf(ctx, "doTask timer:获取锁失败", taskId) - return - } - defer lock.Unlock() - // 执行任务 t.Callback(ctx, t.ExtendData) } diff --git a/cluster_test.go b/cluster_test.go index ad7e218..34bd72c 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1,25 +1,185 @@ -package timerx +package timerx_test import ( + "context" "fmt" "testing" + "time" "github.com/go-redis/redis/v8" + "github.com/yuninks/timerx" ) -// 示例测试 +func TestCluster_AddEveryMonth(t *testing.T) { + ctx := context.Background() + redis := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer redis.Close() -// func exampleDemo(ctx context.Context) bool { -// fmt.Println("fff") -// return false -// } + cluster := timerx.InitCluster(ctx, redis, "test") -// func ExampleB() { -// ctx := context.Background() -// timer.InitSingle(ctx) -// timer.AddToTimer(1, exampleDemo) -// // OutPut: -// } + taskId := "testTask" + hour := 2 + minute := 3 + second := 4 + callback := func(ctx context.Context, data interface{}) error { + // do something + fmt.Println("Task executed:", data) + return nil + } + extendData := "testData" + + err := cluster.AddMonth(ctx, taskId, 1, hour, minute, second, callback, extendData) + if err != nil { + t.Errorf("AddEveryMonth failed, err: %v", err) + } + + // TODO: verify the job is added to the cluster and can be executed at the specified time +} + +func TestCluster_AddEveryWeek(t *testing.T) { + ctx := context.Background() + redis := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer redis.Close() + + cluster := timerx.InitCluster(ctx, redis, "test") + + taskId := "testTask" + week := time.Sunday + hour := 2 + minute := 3 + second := 4 + callback := func(ctx context.Context, data interface{}) error { + // do something + fmt.Println("Task executed:", data) + return nil + } + extendData := "testData" + + err := cluster.AddWeek(ctx, taskId, week, hour, minute, second, callback, extendData) + if err != nil { + t.Errorf("AddEveryWeek failed, err: %v", err) + } + + // TODO: verify the job is added to the cluster and can be executed at the specified time +} + +func TestCluster_AddEveryDay(t *testing.T) { + ctx := context.Background() + redis := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer redis.Close() + + cluster := timerx.InitCluster(ctx, redis, "test") + + taskId := "testTask" + hour := 2 + minute := 3 + second := 4 + callback := func(ctx context.Context, data interface{}) error { + // do something + fmt.Println("Task executed:", data) + return nil + } + extendData := "testData" + + err := cluster.AddDay(ctx, taskId, hour, minute, second, callback, extendData) + if err != nil { + t.Errorf("AddEveryDay failed, err: %v", err) + } + + // TODO: verify the job is added to the cluster and can be executed at the specified time +} + +func TestCluster_AddEveryHour(t *testing.T) { + ctx := context.Background() + redis := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer redis.Close() + + cluster := timerx.InitCluster(ctx, redis, "test") + + taskId := "testTask" + minute := 3 + second := 4 + callback := func(ctx context.Context, data interface{}) error{ + // do something + fmt.Println("Task executed:", data) + return nil + } + extendData := "testData" + + err := cluster.AddHour(ctx, taskId, minute, second, callback, extendData) + if err != nil { + t.Errorf("AddEveryHour failed, err: %v", err) + } + + // TODO: verify the job is added to the cluster and can be executed at the specified time +} + +func TestCluster_AddEveryMinute(t *testing.T) { + ctx := context.Background() + redis := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer redis.Close() + + cluster := timerx.InitCluster(ctx, redis, "test") + + taskId := "testTask" + second := 4 + callback := func(ctx context.Context, data interface{}) error{ + // do something + fmt.Println("Task executed:", data) + return nil + } + extendData := "testData" + + err := cluster.AddMinute(ctx, taskId, second, callback, extendData) + if err != nil { + t.Errorf("AddEveryMinute failed, err: %v", err) + } + + // TODO: verify the job is added to the cluster and can be executed at the specified time +} + +func TestCluster_Add(t *testing.T) { + fmt.Println("66666") + ctx := context.Background() + fmt.Println("66666") + redis := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer redis.Close() + + t.Log("6666") + + cluster := timerx.InitCluster(ctx, redis, "test") + + taskId := "testTask" + dur := time.Second + callback := func(ctx context.Context, data interface{}) error { + // do something + fmt.Println("Task executed:", data) + return nil + } + extendData := "testData" + + err := cluster.AddSpace(ctx, taskId, dur, callback, extendData) + if err != nil { + t.Errorf("Add failed, err: %v", err) + } + + time.Sleep(time.Second * 20) + + + // TODO: verify the job is added to the cluster and can be executed after the specified duration +} func TestMain(m *testing.M) { client := redis.NewClient(&redis.Options{ diff --git a/cmd/main.go b/cmd/main.go index ff811de..9ccc702 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,8 +5,8 @@ import ( "fmt" "time" - "code.yun.ink/pkg/timerx" "github.com/go-redis/redis/v8" + "github.com/yuninks/timerx" ) func main() { @@ -24,8 +24,24 @@ func main() { // re() // d() - worker() + cluster() + select {} + +} + +func cluster() { + client := getRedis() + ctx := context.Background() + cluster := timerx.InitCluster(ctx, client, "test") + err := cluster.AddSpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务") + fmt.Println(err) + err = cluster.AddMinute(ctx, "test_min", 15, aa, "这是分钟任务") + fmt.Println(err) + err = cluster.AddHour(ctx, "test_hour", 30, 0, aa, "这是小时任务") + fmt.Println(err) + err = cluster.AddDay(ctx, "test_day", 11, 0, 0, aa, "这是天任务") + fmt.Println(err) } func worker() { @@ -77,12 +93,12 @@ func re() { ctx := context.Background() cl := timerx.InitCluster(ctx, client, "kkkk") - cl.Add(ctx, "test1", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test2", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test3", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test4", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test5", 1*time.Millisecond, aa, "data") - cl.Add(ctx, "test6", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test1", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test2", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test3", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test4", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test5", 1*time.Millisecond, aa, "data") + cl.AddSpace(ctx, "test6", 1*time.Millisecond, aa, "data") select {} } @@ -90,7 +106,7 @@ func re() { func aa(ctx context.Context, data interface{}) error { fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println(data) - time.Sleep(time.Second * 5) + // time.Sleep(time.Second * 5) return nil } diff --git a/go.mod b/go.mod index aaf2df9..bd6a1c1 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,9 @@ module github.com/yuninks/timerx go 1.19 require ( - code.yun.ink/pkg/lockx v1.0.0 github.com/go-redis/redis/v8 v8.11.5 + github.com/yuninks/cachex v1.0.5 + github.com/yuninks/lockx v1.0.2 ) require ( diff --git a/go.sum b/go.sum index 1584736..91e4db7 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,3 @@ -code.yun.ink/open/timer v1.0.1 h1:ZWecU5K0rFB15p8DZubozTEwo1vrO4mUCRwEoD1tbEQ= -code.yun.ink/pkg/lockx v1.0.0 h1:xoLyf05PrOAhLID2LbJsEXA8YYURJTK/7spEk/hu/Rs= -code.yun.ink/pkg/lockx v1.0.0/go.mod h1:0xUU5xD8fui0Kf7g4TnFmaxUDo59CH2WM+sitko2SLc= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -11,6 +8,10 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM= +github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM= +github.com/yuninks/lockx v1.0.2 h1:p0n791WmsU8D7YF2tQaNLwPE75jdd774unlJZRTNfaw= +github.com/yuninks/lockx v1.0.2/go.mod h1:J6wvuUELLcMn6FCmiZFt7K5w1QQAh1myL7h3JrZaQiQ= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= diff --git a/next_time.go b/next_time.go new file mode 100644 index 0000000..b78eb4b --- /dev/null +++ b/next_time.go @@ -0,0 +1,116 @@ +package timerx + +import ( + "errors" + "time" +) + +// 计算该任务下次执行时间 +// @param job *JobData 任务数据 +// @return time.Time 下次执行时间 +func GetNextTime(t time.Time, loc *time.Location, job JobData) (*time.Time, error) { + + var next time.Time + + switch job.JobType { + case JobTypeEveryMonth: + next = calculateNextMonthTime(t, job, loc) + case JobTypeEveryWeek: + next = calculateNextWeekTime(t, job, loc) + case JobTypeEveryDay: + next = calculateNextDayTime(t, job, loc) + case JobTypeEveryHour: + next = calculateNextHourTime(t, job, loc) + case JobTypeEveryMinute: + next = calculateNextMinuteTime(t, job, loc) + case JobTypeInterval: + next = calculateNextInterval(t, job) + default: + return nil, errors.New("未知的任务类型: " + string(job.JobType)) + } + + return &next, nil +} + +func calculateNextInterval(t time.Time, job JobData) time.Time { + // 从创建的时候开始计算 + cycle := t.Sub(job.BaseTime).Microseconds() / job.IntervalTime.Microseconds() + return job.BaseTime.Add(job.IntervalTime * time.Duration(cycle+1)) +} + +func calculateNextMonthTime(t time.Time, job JobData, loc *time.Location) time.Time { + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, loc) + } + // 下一个周期(下个月) + return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, loc) +} + +func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Time { + weekday := t.Weekday() + days := int(job.Weekday - weekday) + if days < 0 { + days += 7 + } + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc) + } + // 下一个周期(下周) + return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, loc) +} + +func calculateNextDayTime(t time.Time, job JobData, loc *time.Location) time.Time { + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc) + } + // 下一个周期(明天) + return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, loc) +} + +func calculateNextHourTime(t time.Time, job JobData, loc *time.Location) time.Time { + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, loc) + } + // 下一个周期(下个小时) + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, loc) +} + +func calculateNextMinuteTime(t time.Time, job JobData, loc *time.Location) time.Time { + // 判断是否可执行并返回下一个执行时间 + if canRun(t, job) { + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, loc) + } + // 下一个周期(下分钟) + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, loc) +} + +// 检查是否本周期可以运行 +func canRun(t time.Time, job JobData) bool { + switch job.JobType { + case JobTypeEveryMonth: + return t.Day() < job.Day || + (t.Day() == job.Day && t.Hour() < job.Hour) || + (t.Day() == job.Day && t.Hour() == job.Hour && t.Minute() < job.Minute) || + (t.Day() == job.Day && t.Hour() == job.Hour && t.Minute() == job.Minute && t.Second() <= job.Second) + case JobTypeEveryWeek: + return t.Weekday() < job.Weekday || + (t.Weekday() == job.Weekday && t.Hour() < job.Hour) || + (t.Weekday() == job.Weekday && t.Hour() == job.Hour && t.Minute() < job.Minute) || + (t.Weekday() == job.Weekday && t.Hour() == job.Hour && t.Minute() == job.Minute && t.Second() <= job.Second) + case JobTypeEveryDay: + return t.Hour() < job.Hour || + (t.Hour() == job.Hour && t.Minute() < job.Minute) || + (t.Hour() == job.Hour && t.Minute() == job.Minute && t.Second() <= job.Second) + case JobTypeEveryHour: + return t.Minute() < job.Minute || + (t.Minute() == job.Minute && t.Second() <= job.Second) + case JobTypeEveryMinute: + return t.Second() <= job.Second + default: + return false + } +} diff --git a/next_time_test.go b/next_time_test.go new file mode 100644 index 0000000..234ceab --- /dev/null +++ b/next_time_test.go @@ -0,0 +1,108 @@ +package timerx_test + +import ( + "errors" + "testing" + "time" + + "github.com/yuninks/timerx" +) + +func TestGetNextTime(t *testing.T) { + // Test cases + tests := []struct { + name string + job timerx.JobData + expectedTime time.Time + expectedError error + }{ + { + name: "Test JobTypeEveryMonth", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryMonth, + Day: 15, + Hour: 10, + Minute: 0, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 15, 10, 0, 0, 0, time.Local), + expectedError: nil, + }, + { + name: "Test JobTypeEveryWeek", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryWeek, + Weekday: time.Tuesday, + Hour: 10, + Minute: 0, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 8, 10, 0, 0, 0, time.Local), // Assuming current date is March 7, 2022 + expectedError: nil, + }, + { + name: "Test JobTypeEveryDay", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryDay, + Hour: 10, + Minute: 0, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 8, 10, 0, 0, 0, time.Local), // Assuming current date is March 7, 2022 + expectedError: nil, + }, + { + name: "Test JobTypeEveryHour", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryHour, + Minute: 0, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 7, 11, 0, 0, 0, time.Local), // Assuming current date is March 7, 2022, 10:30 AM + expectedError: nil, + }, + { + name: "Test JobTypeEveryMinute", + job: timerx.JobData{ + JobType: timerx.JobTypeEveryMinute, + Second: 0, + }, + expectedTime: time.Date(2022, 3, 7, 10, 31, 0, 0, time.Local), // Assuming current date is March 7, 2022, 10:30 AM + expectedError: nil, + }, + { + name: "Test JobTypeInterval", + job: timerx.JobData{ + JobType: timerx.JobTypeInterval, + IntervalTime: 1 * time.Hour, + }, + expectedTime: time.Date(2022, 3, 7, 11, 30, 0, 0, time.Local), // Assuming current date is March 7, 2022, 10:30 AM + expectedError: nil, + }, + { + name: "Test unknown JobType", + job: timerx.JobData{ + JobType: timerx.JobType(100), + }, + expectedTime: time.Time{}, + expectedError: errors.New("未知的任务类型: 100"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + now := time.Now() + loc := time.FixedZone("CST", 8*3600) + nextTime, err := timerx.GetNextTime(now,loc,test.job) + if err != nil { + if test.expectedError == nil || err.Error() != test.expectedError.Error() { + t.Errorf("Expected error: %v, Got error: %v", test.expectedError, err) + } + } else { + if nextTime.IsZero() != (test.expectedTime == time.Time{}) || (nextTime != &test.expectedTime) { + t.Errorf("Expected time: %v, Got time: %v", test.expectedTime, nextTime) + } + } + }) + } +} diff --git a/once.go b/once.go index cfbaa25..8674ee7 100644 --- a/once.go +++ b/once.go @@ -19,11 +19,12 @@ import ( // 3. 任务执行失败支持快捷重新加入队列 // 单次的任务队列 -type worker struct { +type Once struct { ctx context.Context + logger Logger zsetKey string listKey string - redis *redis.Client + redis redis.UniversalClient worker Callback } @@ -37,13 +38,15 @@ const ( // 需要考虑执行失败重新放入队列的情况 type Callback interface { // 任务执行 - // uniqueKey: 任务唯一标识 - // jobType: 任务类型,用于区分任务 - // data: 任务数据 - Worker(jobType string, uniqueKey string, data interface{}) (WorkerCode, time.Duration) + // @param jobType string 任务类型 + // @param uniTaskId string 任务唯一标识 + // @param data interface{} 任务数据 + // @return WorkerCode 任务执行结果 + // @return time.Duration 任务执行时间间隔 + Worker(jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration) } -var wo *worker = nil +var wo *Once = nil var once sync.Once type extendData struct { @@ -52,15 +55,16 @@ type extendData struct { } // 初始化 -func InitOnce(ctx context.Context, re *redis.Client, jobGlobalName string, jobCallback Callback) *worker { - +func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Callback, opts ...Option) *Once { + op := newOptions(opts...) once.Do(func() { - wo = &worker{ + wo = &Once{ ctx: ctx, - zsetKey: "timer:once_zsetkey" + jobGlobalName, - listKey: "timer:once_listkey" + jobGlobalName, + logger: op.logger, + zsetKey: "timer:once_zsetkey" + keyPrefix, + listKey: "timer:once_listkey" + keyPrefix, redis: re, - worker: jobCallback, + worker: call, } go wo.getTask() go wo.watch() @@ -71,7 +75,11 @@ func InitOnce(ctx context.Context, re *redis.Client, jobGlobalName string, jobCa // 添加任务 // 重复插入就代表覆盖 -func (w *worker) Add(jobType string, uniqueKey string, delayTime time.Duration, data interface{}) error { +// @param jobType string 任务类型 +// @param uniTaskId string 任务唯一标识 +// @param delayTime time.Duration 延迟时间 +// @param attachData interface{} 附加数据 +func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, attachData interface{}) error { if delayTime.Abs() != delayTime { return fmt.Errorf("时间间隔不能为负数") } @@ -79,19 +87,21 @@ func (w *worker) Add(jobType string, uniqueKey string, delayTime time.Duration, return fmt.Errorf("时间间隔不能为0") } - redisKey := fmt.Sprintf("%s[:]%s", jobType, uniqueKey) + redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId) ed := extendData{ Delay: delayTime, - Data: data, + Data: attachData, } b, _ := json.Marshal(ed) + // 写入附加数据 _, err := w.redis.SetEX(w.ctx, redisKey, b, delayTime+time.Second*5).Result() if err != nil { return err } + // 吸入执行时间 _, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{ Score: float64(time.Now().Add(delayTime).UnixMilli()), Member: redisKey, @@ -101,8 +111,8 @@ func (w *worker) Add(jobType string, uniqueKey string, delayTime time.Duration, } // 删除任务 -func (w *worker) Del(jobType string, uniqueKey string) error { - redisKey := fmt.Sprintf("%s[:]%s", jobType, uniqueKey) +func (w *Once) Del(jobType string, uniTaskId string) error { + redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId) w.redis.Del(w.ctx, redisKey).Result() @@ -112,7 +122,7 @@ func (w *worker) Del(jobType string, uniqueKey string) error { } // 获取任务 -func (w *worker) getTask() { +func (w *Once) getTask() { timer := time.NewTicker(time.Millisecond * 200) defer timer.Stop() @@ -138,7 +148,7 @@ Loop: } // 监听任务 -func (w *worker) watch() { +func (w *Once) watch() { for { keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() if err != nil { @@ -151,7 +161,8 @@ func (w *worker) watch() { } } -func (w *worker) doTask(key string) { +// 执行任务 +func (w *Once) doTask(key string) { defer func() { if err := recover(); err != nil { fmt.Println("timer:定时器出错", err) diff --git a/readme.md b/readme.md index 8a25d15..8a44edd 100644 --- a/readme.md +++ b/readme.md @@ -1,14 +1,25 @@ -开发目标 +# 功能支持 -1. 支持单机定时 -2. 支持集群定时 -3. 支持间隔定时 -4. 支持固定时间 -5. 支持全局唯一 +1. 支持本地任务 +2. 支持集群任务 +3. 支持单次任务 + +# 功能说明 -设计思想 -1. 不再单独区分单机还是集群,统一按集群处理,单机只是集群里面只有一个节点 -2. 计算和执行分离,计算只负责计算,执行只负责执行,计算和执行之间通过消息队列进行通信 + +# 功能实现 + +1. 集群间任务调度和任务的唯一依赖于redis进行实现 +# 缺陷 + +1. 集群部署时,存在新旧的代码混合问题,任务调度可能存在问题(需要根据实际需要进行版本上线/下线操作) +## 方案一 +1. 启动的时候定时向redis注册任务项 +2. 每次计算执行时间的时候根据注册的任务项进行任务计算 +3. 注册任务项需要有下线机制,避免能运行它的节点下线了它还被执行 + + +现在有根据要求根据系统时间整点运行任务的要求,这个比简单的定时重复更复杂,因为不但要按时执行,并且不能重复执行,需要全局记录任务执行的状态,由于任务的间隔时间不确定,这个任务执行状态的保存周期也是有变化的 diff --git a/single.go b/single.go index 1b1266d..889d4ad 100644 --- a/single.go +++ b/single.go @@ -5,21 +5,21 @@ package timerx import ( "context" "errors" - "fmt" "runtime/debug" "sync" "time" ) -// 定时器 +// 简单定时器 // 1. 这个定时器的作用范围是本机 +// 2. 适用简单的时间间隔定时任务 -// uuid -> timerStr -var timerMap = make(map[string]*timerStr) -var timerMapMux sync.Mutex +// 定时器结构体 +var singleWorkerList sync.Map -var timerCount int // 当前定时数目 -var onceLimit sync.Once // 实现单例 +var singleTimerIndex int // 当前定时数目 + +var singleOnceLimit sync.Once // 实现单例 type Single struct { ctx context.Context @@ -28,10 +28,13 @@ type Single struct { var sin *Single = nil -// 定时器类 -func InitSingle(ctx context.Context, opts ...Option) *Single { - onceLimit.Do(func() { +var singleNextTime = time.Now() // 下一次执行的时间 +// 定时器类 +// @param ctx context.Context 上下文 +// @param opts ...Option 配置项 +func InitSingle(ctx context.Context, opts ...Option) *Single { + singleOnceLimit.Do(func() { op := newOptions(opts...) sin = &Single{ @@ -45,7 +48,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { for { select { case t := <-timer.C: - if t.Before(nextTime) { + if t.Before(singleNextTime) { // 当前时间小于下次发送时间:跳过 continue } @@ -64,89 +67,175 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { return sin } +// 每月执行一次 +// @param ctx 上下文 +// @param taskId 任务ID +// @param day 每月的几号 +// @param hour 小时 +// @param minute 分钟 +// @param second 秒 +// @param callback 回调函数 +// @param extendData 扩展数据 +// @return error +func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryMonth, + CreateTime: nowTime, + Day: day, + Hour: hour, + Minute: minute, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 每周执行一次 +// @param ctx context.Context 上下文 +// @param taskId string 任务ID +// @param week time.Weekday 周 +// @param hour int 小时 +// @param minute int 分钟 +// @param second int 秒 +func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryWeek, + CreateTime: nowTime, + Weekday: week, + Hour: hour, + Minute: minute, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 每天执行一次 +func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryDay, + CreateTime: nowTime, + Hour: hour, + Minute: minute, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 每小时执行一次 +func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryHour, + CreateTime: nowTime, + Minute: minute, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 每分钟执行一次 +func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + jobData := JobData{ + JobType: JobTypeEveryMinute, + CreateTime: nowTime, + Second: second, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + +// 特定时间间隔 +func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) (int, error) { + nowTime := time.Now() + + if spaceTime < 0 { + c.logger.Errorf(ctx, "间隔时间不能小于0") + return 0, errors.New("间隔时间不能小于0") + } + + jobData := JobData{ + JobType: JobTypeInterval, + CreateTime: nowTime, + IntervalTime: spaceTime, + } + + return c.addJob(ctx, jobData, callback, extendData) +} + // 间隔定时器 // @param space 间隔时间 // @param call 回调函数 // @param extend 附加参数 // @return int 定时器索引 // @return error 错误 -func (s *Single) Add(space time.Duration, call callback, extend interface{}) (int, error) { - timerMapMux.Lock() - defer timerMapMux.Unlock() - - if space != space.Abs() { - s.logger.Errorf(s.ctx, "space must be positive") - return 0, errors.New("space must be positive") - } - - timerCount += 1 +func (l *Single) addJob(ctx context.Context, jobData JobData, call callback, extend interface{}) (int, error) { + singleTimerIndex += 1 nowTime := time.Now() + _, err := GetNextTime(nowTime, time.Local, jobData) + if err != nil { + l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error()) + return 0, err + } + t := timerStr{ Callback: call, - BeginTime: nowTime, - NextTime: nowTime, // nowTime.Add(space), // 添加任务的时候就执行一次 - SpaceTime: space, CanRunning: make(chan struct{}, 1), ExtendData: extend, + JobData: &jobData, } - timerMap[fmt.Sprintf("%d", timerCount)] = &t + singleWorkerList.Store(singleTimerIndex, t) - if t.NextTime.Before(nextTime) { - // 本条规则下次需要发送的时间小于系统下次发送时间:替换 - nextTime = t.NextTime - } - - return timerCount, nil + return singleTimerIndex, nil } // 删除定时器 -func (s *Single) Del(index string) { - timerMapMux.Lock() - defer timerMapMux.Unlock() - delete(timerMap, index) +func (s *Single) Del(index int) { + singleWorkerList.Delete(index) } // 迭代定时器列表 func (s *Single) iterator(ctx context.Context, nowTime time.Time) { - timerMapMux.Lock() - defer timerMapMux.Unlock() - - // fmt.Println("nowTime:", nowTime.Format("2006-01-02 15:04:05.000")) // 默认5秒后(如果没有值就暂停进来5秒) newNextTime := nowTime.Add(time.Second * 5) index := 0 - for _, v := range timerMap { + singleWorkerList.Range(func(k, v interface{}) bool { index++ - v := v - // 判断执行的时机 - if v.NextTime.Before(nowTime) { - // fmt.Println("NextTime", v.NextTime.Format("2006-01-02 15:04:05.000")) + timeStr := v.(timerStr) - v.NextTime = v.NextTime.Add(v.SpaceTime) - - // 判断下次执行时间与当前时间 - if v.NextTime.Before(nowTime) { - v.NextTime = nowTime.Add(v.SpaceTime) - } + if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) { + // 可执行 + nextTime, _ := GetNextTime(nowTime, time.Local, *timeStr.JobData) + timeStr.JobData.NextTime = *nextTime if index == 1 { // 循环的第一个需要替换默认值 - newNextTime = v.NextTime + newNextTime = timeStr.JobData.NextTime } - // 获取最小的 - if v.NextTime.Before(newNextTime) { + if nextTime.Before(newNextTime) { // 本规则下次发送时间小于系统下次需要执行的时间:替换 - newNextTime = v.NextTime + newNextTime = *nextTime } // 处理中就跳过本次 - go func(ctx context.Context, v *timerStr) { + go func(ctx context.Context, v timerStr) { select { case v.CanRunning <- struct{}{}: defer func() { @@ -164,18 +253,22 @@ func (s *Single) iterator(ctx context.Context, nowTime time.Time) { // fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag) return } - }(ctx, v) + }(ctx, timeStr) + } - } + + return true + + }) // 实际下次时间小于预期下次时间:替换 - if nextTime.Before(newNextTime) { + if singleNextTime.Before(newNextTime) { // 判断一下避免异常 if newNextTime.Before(nowTime) { // 比当前时间小 - nextTime = nowTime + singleNextTime = nowTime } else { - nextTime = newNextTime + singleNextTime = newNextTime } } diff --git a/types.go b/types.go index d4a7aaa..74b79fe 100644 --- a/types.go +++ b/types.go @@ -7,38 +7,37 @@ import ( type timerStr struct { Callback callback // 需要回调的方法 - CanRunning chan (struct{}) // 是否允许执行 - BeginTime time.Time // 初始化任务的时间 - NextTime time.Time // [删]下一次执行的时间 - SpaceTime time.Duration // 任务间隔时间 - TaskId string // 任务ID 全局唯一键 + CanRunning chan (struct{}) // 是否允许执行(only single) + TaskId string // 任务ID 全局唯一键(only cluster) ExtendData interface{} // 附加参数 - JobType JobType // 任务类型 JobData *JobData // 任务时间数据 } type JobType string const ( - JobTypeEveryDay JobType = "every_day" - JobTypeEveryHour JobType = "every_hour" - JobTypeEveryMinute JobType = "every_minute" - JobTypeEverySecond JobType = "every_second" - JobTypeEveryMonth JobType = "every_month" - // 根据间隔时间执行 - JobTypeInterval JobType = "interval" + JobTypeEveryMonth JobType = "every_month" // 每月 + JobTypeEveryWeek JobType = "every_week" // 每周 + JobTypeEveryDay JobType = "every_day" // 每天 + JobTypeEveryHour JobType = "every_hour" // 每小时 + JobTypeEveryMinute JobType = "every_minute" // 每分钟 + JobTypeEverySecond JobType = "every_second" // 每秒 + JobTypeInterval JobType = "interval" // 指定时间间隔 ) type JobData struct { - Month *time.Month // 每年的第几个月 - Weekday *time.Weekday // 每周的周几 - Day *int // 每月的第几天 - Hour *int // 每天的第几个小时 - Minute *int // 每小时的第几分钟 - Second *int // 每分钟的第几秒 + JobType JobType // 任务类型 + NextTime time.Time // 下次执行时间 + BaseTime time.Time // 基准时间(间隔的基准时间) + CreateTime time.Time // 任务创建时间 + IntervalTime time.Duration // 任务间隔时间 + Month time.Month // 每年的第几个月 + Weekday time.Weekday // 每周的周几 + Day int // 每月的第几天 + Hour int // 每天的第几个小时 + Minute int // 每小时的第几分钟 + Second int // 每分钟的第几秒 } -var nextTime = time.Now() // 下一次执行的时间 - // 定义各个回调函数 type callback func(ctx context.Context, extendData interface{}) error