From b98a4211162a6e455abbebd29c299d482071e221 Mon Sep 17 00:00:00 2001 From: Yun <995116474@qq.com> Date: Mon, 27 May 2024 20:28:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=84=E7=90=86=E6=B6=88=E6=81=AF=E9=87=8D?= =?UTF-8?q?=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 44 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/cluster.go b/cluster.go index d5c3d7b..30271e7 100644 --- a/cluster.go +++ b/cluster.go @@ -2,6 +2,7 @@ package timerx import ( "context" + "encoding/json" "errors" "fmt" "runtime/debug" @@ -335,7 +336,14 @@ func (c *Cluster) watch() { _, ok := clusterWorkerList.Load(keys[1]) if !ok { c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", keys[1]) - c.redis.SAdd(c.ctx, c.setKey, keys[1]) + + rd := ReJobData{ + TaskId: keys[1], + Times: 1, + } + rdb, _ := json.Marshal(rd) + + c.redis.SAdd(c.ctx, c.setKey, string(rdb)) continue } go c.doTask(c.ctx, keys[1]) @@ -345,7 +353,7 @@ func (c *Cluster) watch() { // 处理重入任务 go func() { for { - taskId, err := c.redis.SPop(c.ctx, c.setKey).Result() + res, err := c.redis.SPop(c.ctx, c.setKey).Result() if err != nil { if err == redis.Nil { // 已经是空了就不要浪费资源了 @@ -355,18 +363,40 @@ func (c *Cluster) watch() { } continue } - _, ok := clusterWorkerList.Load(taskId) - if !ok { - c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", taskId) - c.redis.SAdd(c.ctx, c.setKey, taskId) + + var rd ReJobData + err = json.Unmarshal([]byte(res), &rd) + if err != nil { + c.logger.Errorf(c.ctx, "json.Unmarshal err:%+v", err) continue } - go c.doTask(c.ctx, taskId) + + _, ok := clusterWorkerList.Load(rd.TaskId) + if !ok { + c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", rd.TaskId) + + if rd.Times >= 3 { + // 重试3次还是失败就不执行了 + continue + } + rd.Times++ + + rdb, _ := json.Marshal(rd) + + c.redis.SAdd(c.ctx, c.setKey, string(rdb)) + continue + } + go c.doTask(c.ctx, rd.TaskId) } }() } +type ReJobData struct { + TaskId string + Times int +} + // 执行任务 func (c *Cluster) doTask(ctx context.Context, taskId string) {