package timerx import ( "context" "errors" "runtime/debug" "sync" "time" "code.yun.ink/pkg/lockx" "github.com/go-redis/redis/v8" ) // 功能描述 // 这是基于Redis的定时任务调度器,能够有效的在服务集群里面调度任务,避免了单点压力过高或单点故障问题 // 由于所有的服务代码是一致的,也就是一个定时任务将在所有的服务都有注册,具体调度到哪个服务运行看调度结果 // 暂不支持删除定时器,因为这个定时器的设计是基于全局的,如果删除了,那么其他服务就不知道了 // 单例模式 var clusterOnceLimit sync.Once // 已注册的任务列表 var clusterWorkerList sync.Map type Cluster struct { ctx context.Context redis *redis.Client logger Logger lockKey string // 全局计算的key zsetKey string // 有序集合的key listKey string // 可执行的任务列表的key setKey string // 重入集合的key } var clu *Cluster = nil // 初始化定时器 // 全局只需要初始化一次 func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts ...Option) *Cluster { clusterOnceLimit.Do(func() { op := newOptions(opts...) clu = &Cluster{ ctx: ctx, redis: red, logger: op.logger, lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 listKey: "timer:cluster_listKey" + keyPrefix, // 列表 setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 } // 监听任务 go clu.watch() timer := time.NewTicker(time.Millisecond * 200) go func(ctx context.Context, red *redis.Client) { Loop: for { select { case <-timer.C: clu.getTask() clu.getNextTime() case <-ctx.Done(): break Loop } } }(ctx, red) }) return clu } // 每月执行一次 // @param ctx 上下文 // @param taskId 任务ID // @param day 每月的几号 // @param hour 小时 // @param minute 分钟 // @param second 秒 // @param callback 回调函数 // @param extendData 扩展数据 // @return error func (c *Cluster) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() jobData := JobData{ JobType: JobTypeEveryMonth, CreateTime: nowTime, Day: day, Hour: hour, Minute: minute, Second: second, } return c.addJob(ctx, taskId, jobData, callback, extendData) } // 每周执行一次 // @param ctx context.Context 上下文 // @param taskId string 任务ID // @param week time.Weekday 周 // @param hour int 小时 // @param minute int 分钟 // @param second int 秒 func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() jobData := JobData{ JobType: JobTypeEveryMonth, CreateTime: nowTime, Weekday: week, Hour: hour, Minute: minute, Second: second, } return c.addJob(ctx, taskId, jobData, callback, extendData) } // 每天执行一次 func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() jobData := JobData{ JobType: JobTypeEveryMonth, CreateTime: nowTime, Hour: hour, Minute: minute, Second: second, } return c.addJob(ctx, taskId, jobData, callback, extendData) } // 每小时执行一次 func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) error { nowTime := time.Now() jobData := JobData{ JobType: JobTypeEveryMonth, CreateTime: nowTime, Minute: minute, Second: second, } return c.addJob(ctx, taskId, jobData, callback, extendData) } // 每分钟执行一次 func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) error { nowTime := time.Now() jobData := JobData{ JobType: JobTypeEveryMonth, CreateTime: nowTime, Second: second, } return c.addJob(ctx, taskId, jobData, callback, extendData) } // 特定时间间隔 func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) error { nowTime := time.Now() if spaceTime < 0 { c.logger.Errorf(ctx, "间隔时间不能小于0") return errors.New("间隔时间不能小于0") } jobData := JobData{ JobType: JobTypeEveryMonth, CreateTime: nowTime, IntervalTime: spaceTime, } return c.addJob(ctx, taskId, jobData, callback, extendData) } // 统一添加任务 // @param ctx context.Context 上下文 // @param taskId string 任务ID // @param jobData *JobData 任务数据 // @param callback callback 回调函数 // @param extendData interface{} 扩展数据 // @return error func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback callback, extendData interface{}) error { _, ok := clusterWorkerList.Load(taskId) if ok { c.logger.Errorf(ctx, "key已存在:%s", taskId) return errors.New("key已存在") } _, err := GetNextTime(time.Now(), time.Local, jobData) if err != nil { c.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error()) return err } ctx, cancel := context.WithCancel(ctx) defer cancel() lock := lockx.NewGlobalLock(ctx, c.redis, taskId) tB := lock.Try(10) if !tB { c.logger.Errorf(ctx, "添加失败:%s", taskId) return errors.New("添加失败") } defer lock.Unlock() t := timerStr{ Callback: callback, ExtendData: extendData, TaskId: taskId, JobData: &jobData, } clusterWorkerList.Store(taskId, t) return err } // 计算下一次执行的时间 // TODO:注册的任务需放在Redis集中存储,因为本地的话,如果有多个服务,那么就会出现不一致的情况。但是要注意服务如何进行下线,由于是主动上报的,需要有一个机制进行删除过期的任务(添加任务&定时器轮训注册) // TODO:考虑不同实例系统时间不一样,可能计算的下次时间不一致,会有重复执行的可能 func (c *Cluster) getNextTime() { ctx, cancel := context.WithCancel(c.ctx) defer cancel() lock := lockx.NewGlobalLock(ctx, c.redis, c.lockKey) // 获取锁 lockBool := lock.Lock() if !lockBool { // log.Println("timer:获取锁失败") return } defer lock.Unlock() // 计算下一次时间 p := c.redis.Pipeline() // 根据内部注册的任务列表计算下一次执行的时间 clusterWorkerList.Range(func(key, value interface{}) bool { val := value.(timerStr) nextTime, _ := GetNextTime(time.Now(), time.Local, *val.JobData) p.ZAdd(ctx, c.zsetKey, &redis.Z{ Score: float64(nextTime.UnixMilli()), Member: val.TaskId, }) // log.Println("computeTime add", c.zsetKey, val.taskId, nextTime.UnixMilli()) return true }) _, err := p.Exec(ctx) _ = err } // 获取任务 func (c *Cluster) getTask() { // 定时去Redis获取任务 script := ` local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) for i,v in ipairs(token) do redis.call('zrem',KEYS[1],v) redis.call('lpush',KEYS[2],v) end return "OK" ` c.redis.Eval(c.ctx, script, []string{c.zsetKey, c.listKey}, 0, time.Now().UnixMilli()).Result() } // 监听任务 func (c *Cluster) watch() { // 执行任务 go func() { for { keys, err := c.redis.BLPop(c.ctx, time.Second*10, c.listKey).Result() if err != nil { if err != redis.Nil { c.logger.Errorf(c.ctx, "BLPop watch err:%+v", err) } continue } _, ok := clusterWorkerList.Load(keys[1]) if !ok { c.logger.Errorf(c.ctx, "watch timer:任务不存在", keys[1]) c.redis.SAdd(c.ctx, c.setKey, keys[1]) continue } go c.doTask(c.ctx, c.redis, keys[1]) } }() // 处理重入任务 go func() { for { taskId, err := c.redis.SPop(c.ctx, c.setKey).Result() if err != nil { if err == redis.Nil { // 已经是空了就不要浪费资源了 time.Sleep(time.Second) } else { c.logger.Errorf(c.ctx, "SPop watch err:%+v", err) } continue } _, ok := clusterWorkerList.Load(taskId) if !ok { c.logger.Errorf(c.ctx, "watch timer:任务不存在", taskId) c.redis.SAdd(c.ctx, c.setKey, taskId) continue } go c.doTask(c.ctx, c.redis, taskId) } }() } // 执行任务 func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string) { defer func() { if err := recover(); err != nil { c.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack())) } }() val, ok := clusterWorkerList.Load(taskId) if !ok { c.logger.Errorf(ctx, "doTask timer:任务不存在", taskId) return } t := val.(timerStr) // 这里加一个全局锁 lock := lockx.NewGlobalLock(ctx, red, taskId) tB := lock.Lock() if !tB { c.logger.Errorf(ctx, "doTask timer:获取锁失败", taskId) return } defer lock.Unlock() // 执行任务 t.Callback(ctx, t.ExtendData) }