处理消息重入
This commit is contained in:
+37
-7
@@ -2,6 +2,7 @@ package timerx
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
@@ -335,7 +336,14 @@ func (c *Cluster) watch() {
|
|||||||
_, ok := clusterWorkerList.Load(keys[1])
|
_, ok := clusterWorkerList.Load(keys[1])
|
||||||
if !ok {
|
if !ok {
|
||||||
c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", keys[1])
|
c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", keys[1])
|
||||||
c.redis.SAdd(c.ctx, c.setKey, keys[1])
|
|
||||||
|
rd := ReJobData{
|
||||||
|
TaskId: keys[1],
|
||||||
|
Times: 1,
|
||||||
|
}
|
||||||
|
rdb, _ := json.Marshal(rd)
|
||||||
|
|
||||||
|
c.redis.SAdd(c.ctx, c.setKey, string(rdb))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
go c.doTask(c.ctx, keys[1])
|
go c.doTask(c.ctx, keys[1])
|
||||||
@@ -345,7 +353,7 @@ func (c *Cluster) watch() {
|
|||||||
// 处理重入任务
|
// 处理重入任务
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
taskId, err := c.redis.SPop(c.ctx, c.setKey).Result()
|
res, err := c.redis.SPop(c.ctx, c.setKey).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
// 已经是空了就不要浪费资源了
|
// 已经是空了就不要浪费资源了
|
||||||
@@ -355,18 +363,40 @@ func (c *Cluster) watch() {
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
_, ok := clusterWorkerList.Load(taskId)
|
|
||||||
if !ok {
|
var rd ReJobData
|
||||||
c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", taskId)
|
err = json.Unmarshal([]byte(res), &rd)
|
||||||
c.redis.SAdd(c.ctx, c.setKey, taskId)
|
if err != nil {
|
||||||
|
c.logger.Errorf(c.ctx, "json.Unmarshal err:%+v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
go c.doTask(c.ctx, taskId)
|
|
||||||
|
_, ok := clusterWorkerList.Load(rd.TaskId)
|
||||||
|
if !ok {
|
||||||
|
c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", rd.TaskId)
|
||||||
|
|
||||||
|
if rd.Times >= 3 {
|
||||||
|
// 重试3次还是失败就不执行了
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rd.Times++
|
||||||
|
|
||||||
|
rdb, _ := json.Marshal(rd)
|
||||||
|
|
||||||
|
c.redis.SAdd(c.ctx, c.setKey, string(rdb))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
go c.doTask(c.ctx, rd.TaskId)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ReJobData struct {
|
||||||
|
TaskId string
|
||||||
|
Times int
|
||||||
|
}
|
||||||
|
|
||||||
// 执行任务
|
// 执行任务
|
||||||
func (c *Cluster) doTask(ctx context.Context, taskId string) {
|
func (c *Cluster) doTask(ctx context.Context, taskId string) {
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user