Merge branch 'dev'
This commit is contained in:
+18
-3
@@ -263,12 +263,21 @@ func (c *Cluster) getNextTime() {
|
|||||||
// fmt.Println(val.ExtendData, val.JobData, nextTime)
|
// fmt.Println(val.ExtendData, val.JobData, nextTime)
|
||||||
|
|
||||||
// 内部判定是否重复
|
// 内部判定是否重复
|
||||||
cacheKey := fmt.Sprintf("%s_%s_%d", c.keyPrefix, val.TaskId, nextTime.Unix())
|
cacheKey := fmt.Sprintf("%s_%s_%d", c.keyPrefix, val.TaskId, nextTime.UnixMilli())
|
||||||
_, err := c.cache.Get(cacheKey)
|
cacheVal, err := c.cache.Get(cacheKey)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// 缓存已有值
|
// 缓存已有值
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
valueNum := int(0)
|
||||||
|
if cacheVal != nil {
|
||||||
|
valueNum = cacheVal.(int)
|
||||||
|
}
|
||||||
|
if valueNum > 2 {
|
||||||
|
// 重试2次还是失败就不执行了
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// fmt.Println("计算时间1", val.ExtendData, time.UnixMilli(nextTime.UnixMilli()).Format("2006-01-02 15:04:05"))
|
||||||
|
|
||||||
// redis lua脚本,尝试设置nx锁时间为一分钟,如果能设置进去则添加到有序集合zsetKey
|
// redis lua脚本,尝试设置nx锁时间为一分钟,如果能设置进去则添加到有序集合zsetKey
|
||||||
script := `
|
script := `
|
||||||
@@ -294,11 +303,17 @@ func (c *Cluster) getNextTime() {
|
|||||||
|
|
||||||
res, err := c.redis.Eval(c.ctx, script, []string{c.zsetKey}, cacheKey, expireTime.Seconds(), nextTime.UnixMilli(), val.TaskId).Result()
|
res, err := c.redis.Eval(c.ctx, script, []string{c.zsetKey}, cacheKey, expireTime.Seconds(), nextTime.UnixMilli(), val.TaskId).Result()
|
||||||
|
|
||||||
|
valueNum++
|
||||||
|
|
||||||
if err == nil && res.(string) == "SUCCESS" {
|
if err == nil && res.(string) == "SUCCESS" {
|
||||||
// 设置成功
|
// 设置成功
|
||||||
c.cache.Set(cacheKey, "", expireTime)
|
valueNum = 10
|
||||||
|
|
||||||
|
// fmt.Println("计算时间2", val.ExtendData, time.UnixMilli(nextTime.UnixMilli()).Format("2006-01-02 15:04:05"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.cache.Set(cacheKey, valueNum, expireTime)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
+4
-4
@@ -78,8 +78,8 @@ func (w *Worker) Worker(jobType string, uniqueKey string, data interface{}) (tim
|
|||||||
func getRedis() *redis.Client {
|
func getRedis() *redis.Client {
|
||||||
client := redis.NewClient(&redis.Options{
|
client := redis.NewClient(&redis.Options{
|
||||||
Addr: "127.0.0.1" + ":" + "6379",
|
Addr: "127.0.0.1" + ":" + "6379",
|
||||||
Password: "", // no password set
|
Password: "123456", // no password set
|
||||||
DB: 0, // use default DB
|
DB: 0, // use default DB
|
||||||
})
|
})
|
||||||
if client == nil {
|
if client == nil {
|
||||||
panic("redis init error")
|
panic("redis init error")
|
||||||
@@ -104,8 +104,8 @@ func re() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func aa(ctx context.Context, data interface{}) error {
|
func aa(ctx context.Context, data interface{}) error {
|
||||||
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
fmt.Println("-执行时间:", data, time.Now().Format("2006-01-02 15:04:05"))
|
||||||
fmt.Println(data)
|
// fmt.Println(data)
|
||||||
// time.Sleep(time.Second * 5)
|
// time.Sleep(time.Second * 5)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user