From bdd0ee714b33365d515c97717b023f089a0d4177 Mon Sep 17 00:00:00 2001 From: Yun <995116474@qq.com> Date: Tue, 28 May 2024 15:02:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=BF=BD=E5=8A=A0=E6=AC=A1?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 21 ++++++++++++++++++--- cmd/main.go | 8 ++++---- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/cluster.go b/cluster.go index 30271e7..e7c4a8d 100644 --- a/cluster.go +++ b/cluster.go @@ -263,12 +263,21 @@ func (c *Cluster) getNextTime() { // fmt.Println(val.ExtendData, val.JobData, nextTime) // 内部判定是否重复 - cacheKey := fmt.Sprintf("%s_%s_%d", c.keyPrefix, val.TaskId, nextTime.Unix()) - _, err := c.cache.Get(cacheKey) + cacheKey := fmt.Sprintf("%s_%s_%d", c.keyPrefix, val.TaskId, nextTime.UnixMilli()) + cacheVal, err := c.cache.Get(cacheKey) if err == nil { // 缓存已有值 return true } + valueNum := int(0) + if cacheVal != nil { + valueNum = cacheVal.(int) + } + if valueNum > 2 { + // 重试2次还是失败就不执行了 + return true + } + // fmt.Println("计算时间1", val.ExtendData, time.UnixMilli(nextTime.UnixMilli()).Format("2006-01-02 15:04:05")) // redis lua脚本,尝试设置nx锁时间为一分钟,如果能设置进去则添加到有序集合zsetKey script := ` @@ -294,11 +303,17 @@ func (c *Cluster) getNextTime() { res, err := c.redis.Eval(c.ctx, script, []string{c.zsetKey}, cacheKey, expireTime.Seconds(), nextTime.UnixMilli(), val.TaskId).Result() + valueNum++ + if err == nil && res.(string) == "SUCCESS" { // 设置成功 - c.cache.Set(cacheKey, "", expireTime) + valueNum = 10 + + // fmt.Println("计算时间2", val.ExtendData, time.UnixMilli(nextTime.UnixMilli()).Format("2006-01-02 15:04:05")) } + c.cache.Set(cacheKey, valueNum, expireTime) + return true }) diff --git a/cmd/main.go b/cmd/main.go index 9ccc702..16c3474 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -78,8 +78,8 @@ func (w *Worker) Worker(jobType string, uniqueKey string, data interface{}) (tim func getRedis() *redis.Client { client := redis.NewClient(&redis.Options{ Addr: "127.0.0.1" + ":" + "6379", - Password: "", // no password set - DB: 0, // use default DB + Password: "123456", // no password set + DB: 0, // use default DB }) if client == nil { panic("redis init error") @@ -104,8 +104,8 @@ func re() { } func aa(ctx context.Context, data interface{}) error { - fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) - fmt.Println(data) + fmt.Println("-执行时间:", data, time.Now().Format("2006-01-02 15:04:05")) + // fmt.Println(data) // time.Sleep(time.Second * 5) return nil }