diff --git a/cluster.go b/cluster.go index d5c3d7b..30271e7 100644 --- a/cluster.go +++ b/cluster.go @@ -2,6 +2,7 @@ package timerx import ( "context" + "encoding/json" "errors" "fmt" "runtime/debug" @@ -335,7 +336,14 @@ func (c *Cluster) watch() { _, ok := clusterWorkerList.Load(keys[1]) if !ok { c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", keys[1]) - c.redis.SAdd(c.ctx, c.setKey, keys[1]) + + rd := ReJobData{ + TaskId: keys[1], + Times: 1, + } + rdb, _ := json.Marshal(rd) + + c.redis.SAdd(c.ctx, c.setKey, string(rdb)) continue } go c.doTask(c.ctx, keys[1]) @@ -345,7 +353,7 @@ func (c *Cluster) watch() { // 处理重入任务 go func() { for { - taskId, err := c.redis.SPop(c.ctx, c.setKey).Result() + res, err := c.redis.SPop(c.ctx, c.setKey).Result() if err != nil { if err == redis.Nil { // 已经是空了就不要浪费资源了 @@ -355,18 +363,40 @@ func (c *Cluster) watch() { } continue } - _, ok := clusterWorkerList.Load(taskId) - if !ok { - c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", taskId) - c.redis.SAdd(c.ctx, c.setKey, taskId) + + var rd ReJobData + err = json.Unmarshal([]byte(res), &rd) + if err != nil { + c.logger.Errorf(c.ctx, "json.Unmarshal err:%+v", err) continue } - go c.doTask(c.ctx, taskId) + + _, ok := clusterWorkerList.Load(rd.TaskId) + if !ok { + c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", rd.TaskId) + + if rd.Times >= 3 { + // 重试3次还是失败就不执行了 + continue + } + rd.Times++ + + rdb, _ := json.Marshal(rd) + + c.redis.SAdd(c.ctx, c.setKey, string(rdb)) + continue + } + go c.doTask(c.ctx, rd.TaskId) } }() } +type ReJobData struct { + TaskId string + Times int +} + // 执行任务 func (c *Cluster) doTask(ctx context.Context, taskId string) {