diff --git a/cluster.go b/cluster.go index 2443172..622d42d 100644 --- a/cluster.go +++ b/cluster.go @@ -3,12 +3,14 @@ package timerx import ( "context" "errors" + "fmt" "runtime/debug" "sync" "time" - "code.yun.ink/pkg/lockx" "github.com/go-redis/redis/v8" + "github.com/yuninks/cachex" + "github.com/yuninks/lockx" ) // 功能描述 @@ -25,9 +27,11 @@ var clusterOnceLimit sync.Once var clusterWorkerList sync.Map type Cluster struct { - ctx context.Context - redis *redis.Client - logger Logger + ctx context.Context + redis *redis.Client + cache *cachex.Cache + logger Logger + keyPrefix string // key前缀 lockKey string // 全局计算的key zsetKey string // 有序集合的key @@ -45,13 +49,15 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts op := newOptions(opts...) clu = &Cluster{ - ctx: ctx, - redis: red, - logger: op.logger, - lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 - zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 - listKey: "timer:cluster_listKey" + keyPrefix, // 列表 - setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 + ctx: ctx, + redis: red, + cache: cachex.NewCache(), + logger: op.logger, + keyPrefix: keyPrefix, + lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 + zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 + listKey: "timer:cluster_listKey" + keyPrefix, // 列表 + setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 } // 监听任务 @@ -111,7 +117,7 @@ func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday, nowTime := time.Now() jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeEveryWeek, CreateTime: nowTime, Weekday: week, Hour: hour, @@ -127,7 +133,7 @@ func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute in nowTime := time.Now() jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeEveryDay, CreateTime: nowTime, Hour: hour, Minute: minute, @@ -142,7 +148,7 @@ func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second nowTime := time.Now() jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeEveryHour, CreateTime: nowTime, Minute: minute, Second: second, @@ -156,7 +162,7 @@ func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, call nowTime := time.Now() jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeEveryMinute, CreateTime: nowTime, Second: second, } @@ -173,8 +179,12 @@ func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Du return errors.New("间隔时间不能小于0") } + // 获取当天的零点时间 + zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) + jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeInterval, + BaseTime: zeroTime, // 默认当天的零点 CreateTime: nowTime, IntervalTime: spaceTime, } @@ -206,7 +216,7 @@ func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, ca defer cancel() lock := lockx.NewGlobalLock(ctx, c.redis, taskId) - tB := lock.Try(10) + tB := lock.Try(2) if !tB { c.logger.Errorf(ctx, "添加失败:%s", taskId) return errors.New("添加失败") @@ -244,7 +254,7 @@ func (c *Cluster) getNextTime() { // 计算下一次时间 - p := c.redis.Pipeline() + // p := c.redis.Pipeline() // 根据内部注册的任务列表计算下一次执行的时间 clusterWorkerList.Range(func(key, value interface{}) bool { @@ -252,16 +262,50 @@ func (c *Cluster) getNextTime() { nextTime, _ := GetNextTime(time.Now(), time.Local, *val.JobData) - p.ZAdd(ctx, c.zsetKey, &redis.Z{ - Score: float64(nextTime.UnixMilli()), - Member: val.TaskId, - }) - // log.Println("computeTime add", c.zsetKey, val.taskId, nextTime.UnixMilli()) + // fmt.Println(val.ExtendData, val.JobData, nextTime) + + // 内部判定是否重复 + cacheKey := fmt.Sprintf("%s_%s_%d", c.keyPrefix, val.TaskId, nextTime.Unix()) + _, err := c.cache.Get(cacheKey) + if err == nil { + // 缓存已有值 + return true + } + + // redis lua脚本,尝试设置nx锁时间为一分钟,如果能设置进去则添加到有序集合zsetKey + script := ` + local zsetKey = KEYS[1] + + local cacheKey = ARGV[1] + local expireTime = ARGV[2] + + local score = ARGV[3] + local member = ARGV[4] + + local res = redis.call('set', cacheKey, '', 'nx', 'ex', expireTime) + + if res then + redis.call('zadd', zsetKey, score, member) + return "SUCCESS" + end + return "ERROR" + ` + + // TODO: + expireTime := time.Minute + + res, err := c.redis.Eval(ctx, script, []string{c.zsetKey}, cacheKey, expireTime.Seconds(), nextTime.UnixMilli(), val.TaskId).Result() + + if err == nil && res.(string) == "SUCCESS" { + // 设置成功 + c.cache.Set(cacheKey, "", expireTime) + } + return true }) - _, err := p.Exec(ctx) - _ = err + // _, err := p.Exec(ctx) + // _ = err } // 获取任务 @@ -293,7 +337,7 @@ func (c *Cluster) watch() { } _, ok := clusterWorkerList.Load(keys[1]) if !ok { - c.logger.Errorf(c.ctx, "watch timer:任务不存在", keys[1]) + c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", keys[1]) c.redis.SAdd(c.ctx, c.setKey, keys[1]) continue } @@ -316,7 +360,7 @@ func (c *Cluster) watch() { } _, ok := clusterWorkerList.Load(taskId) if !ok { - c.logger.Errorf(c.ctx, "watch timer:任务不存在", taskId) + c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", taskId) c.redis.SAdd(c.ctx, c.setKey, taskId) continue } @@ -337,7 +381,7 @@ func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string) val, ok := clusterWorkerList.Load(taskId) if !ok { - c.logger.Errorf(ctx, "doTask timer:任务不存在", taskId) + c.logger.Errorf(ctx, "doTask timer:任务不存在:%s", taskId) return } t := val.(timerStr) @@ -346,7 +390,7 @@ func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string) lock := lockx.NewGlobalLock(ctx, red, taskId) tB := lock.Lock() if !tB { - c.logger.Errorf(ctx, "doTask timer:获取锁失败", taskId) + c.logger.Errorf(ctx, "doTask timer:获取锁失败:%s", taskId) return } defer lock.Unlock() diff --git a/cmd/main.go b/cmd/main.go index 6eafde1..9ccc702 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,8 +5,8 @@ import ( "fmt" "time" - "github.com/yuninks/timerx" "github.com/go-redis/redis/v8" + "github.com/yuninks/timerx" ) func main() { @@ -24,8 +24,24 @@ func main() { // re() // d() - worker() + cluster() + select {} + +} + +func cluster() { + client := getRedis() + ctx := context.Background() + cluster := timerx.InitCluster(ctx, client, "test") + err := cluster.AddSpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务") + fmt.Println(err) + err = cluster.AddMinute(ctx, "test_min", 15, aa, "这是分钟任务") + fmt.Println(err) + err = cluster.AddHour(ctx, "test_hour", 30, 0, aa, "这是小时任务") + fmt.Println(err) + err = cluster.AddDay(ctx, "test_day", 11, 0, 0, aa, "这是天任务") + fmt.Println(err) } func worker() { @@ -90,7 +106,7 @@ func re() { func aa(ctx context.Context, data interface{}) error { fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println(data) - time.Sleep(time.Second * 5) + // time.Sleep(time.Second * 5) return nil } diff --git a/go.mod b/go.mod index aaf2df9..211d877 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,9 @@ module github.com/yuninks/timerx go 1.19 require ( - code.yun.ink/pkg/lockx v1.0.0 github.com/go-redis/redis/v8 v8.11.5 + github.com/yuninks/cachex v1.0.5 + github.com/yuninks/lockx v1.0.1 ) require ( diff --git a/go.sum b/go.sum index 1584736..5ec3fe7 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,4 @@ code.yun.ink/open/timer v1.0.1 h1:ZWecU5K0rFB15p8DZubozTEwo1vrO4mUCRwEoD1tbEQ= -code.yun.ink/pkg/lockx v1.0.0 h1:xoLyf05PrOAhLID2LbJsEXA8YYURJTK/7spEk/hu/Rs= -code.yun.ink/pkg/lockx v1.0.0/go.mod h1:0xUU5xD8fui0Kf7g4TnFmaxUDo59CH2WM+sitko2SLc= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -11,6 +9,10 @@ github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM= +github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM= +github.com/yuninks/lockx v1.0.1 h1:rBUCFZIR6ABR6QLZFp27O9jpGv/TKjmjCKkk1H2lBjI= +github.com/yuninks/lockx v1.0.1/go.mod h1:AHTVVhM7nZ/p+LVmLQbLneOjM9cn/C5zZ+EpXPi0MMs= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= diff --git a/next_time.go b/next_time.go index 2d17ea3..b78eb4b 100644 --- a/next_time.go +++ b/next_time.go @@ -8,7 +8,7 @@ import ( // 计算该任务下次执行时间 // @param job *JobData 任务数据 // @return time.Time 下次执行时间 -func GetNextTime(t time.Time,loc *time.Location, job JobData) (*time.Time, error) { +func GetNextTime(t time.Time, loc *time.Location, job JobData) (*time.Time, error) { var next time.Time @@ -34,8 +34,8 @@ func GetNextTime(t time.Time,loc *time.Location, job JobData) (*time.Time, error func calculateNextInterval(t time.Time, job JobData) time.Time { // 从创建的时候开始计算 - cycle := t.Sub(job.CreateTime).Microseconds() / job.IntervalTime.Microseconds() - return job.CreateTime.Add(job.IntervalTime * time.Duration(cycle+1)) + cycle := t.Sub(job.BaseTime).Microseconds() / job.IntervalTime.Microseconds() + return job.BaseTime.Add(job.IntervalTime * time.Duration(cycle+1)) } func calculateNextMonthTime(t time.Time, job JobData, loc *time.Location) time.Time { diff --git a/single.go b/single.go index 6b4d82c..889d4ad 100644 --- a/single.go +++ b/single.go @@ -103,7 +103,7 @@ func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, nowTime := time.Now() jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeEveryWeek, CreateTime: nowTime, Weekday: week, Hour: hour, @@ -119,7 +119,7 @@ func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int nowTime := time.Now() jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeEveryDay, CreateTime: nowTime, Hour: hour, Minute: minute, @@ -134,7 +134,7 @@ func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second nowTime := time.Now() jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeEveryHour, CreateTime: nowTime, Minute: minute, Second: second, @@ -148,7 +148,7 @@ func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callb nowTime := time.Now() jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeEveryMinute, CreateTime: nowTime, Second: second, } @@ -166,7 +166,7 @@ func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Dur } jobData := JobData{ - JobType: JobTypeEveryMonth, + JobType: JobTypeInterval, CreateTime: nowTime, IntervalTime: spaceTime, } diff --git a/types.go b/types.go index 49ab5d9..74b79fe 100644 --- a/types.go +++ b/types.go @@ -28,6 +28,7 @@ const ( type JobData struct { JobType JobType // 任务类型 NextTime time.Time // 下次执行时间 + BaseTime time.Time // 基准时间(间隔的基准时间) CreateTime time.Time // 任务创建时间 IntervalTime time.Duration // 任务间隔时间 Month time.Month // 每年的第几个月