添加全局锁
This commit is contained in:
+4
-4
@@ -110,12 +110,12 @@ func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, ta
|
||||
file.WriteString(fmt.Sprintf("执行时间:%s\n", time.Now().Format("2006-01-02 15:04:05")))
|
||||
|
||||
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||
fmt.Println(taskType, taskId)
|
||||
// fmt.Println(taskType, taskId)
|
||||
|
||||
fmt.Printf("原来的参数:%+v %T\n", attachData, attachData)
|
||||
// fmt.Printf("原来的参数:%+v %T\n", attachData, attachData)
|
||||
|
||||
v, ok := attachData.(int64)
|
||||
fmt.Println("vvvvvvv", v, ok)
|
||||
// v, ok := attachData.(int64)
|
||||
// fmt.Println("vvvvvvv", v, ok)
|
||||
// fmt.Printf()
|
||||
|
||||
// d := OnceData{}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/uuid"
|
||||
"github.com/yuninks/lockx"
|
||||
"github.com/yuninks/timerx/heartbeat"
|
||||
"github.com/yuninks/timerx/leader"
|
||||
"github.com/yuninks/timerx/logger"
|
||||
@@ -335,7 +336,11 @@ func (w *Once) save(taskType OnceTaskType, taskId string, delayTime time.Duratio
|
||||
}
|
||||
|
||||
// 添加任务(不覆盖)
|
||||
func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error {
|
||||
func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error {
|
||||
return l.create(taskType, taskId, delayTime, attachData, 0)
|
||||
}
|
||||
|
||||
func (l *Once) create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any, retryCount int) error {
|
||||
if delayTime <= 0 {
|
||||
return fmt.Errorf("delay time must be positive")
|
||||
}
|
||||
@@ -354,7 +359,7 @@ func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Durat
|
||||
return fmt.Errorf("task already exists")
|
||||
}
|
||||
|
||||
return l.Save(taskType, taskId, delayTime, attachData)
|
||||
return l.save(taskType, taskId, delayTime, attachData, retryCount)
|
||||
}
|
||||
|
||||
// 删除任务
|
||||
@@ -428,6 +433,18 @@ func (l *Once) processTask(key string) {
|
||||
return
|
||||
}
|
||||
|
||||
// 这里加一个全局锁
|
||||
lock, err := lockx.NewGlobalLock(ctx, l.redis, key)
|
||||
if err != nil {
|
||||
l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s", taskId)
|
||||
return
|
||||
}
|
||||
if b, err := lock.Lock(); !b {
|
||||
l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s %+v", taskId, err)
|
||||
return
|
||||
}
|
||||
defer lock.Unlock()
|
||||
|
||||
// 上报执行情况
|
||||
executeVal := fmt.Sprintf("%s|%s|%s|%s", key, l.instanceId, u.String(), begin.Format(time.RFC3339Nano))
|
||||
l.redis.ZAdd(ctx, l.executeInfoKey, &redis.Z{
|
||||
@@ -491,5 +508,6 @@ func (l *Once) handleRetry(ctx context.Context, taskType OnceTaskType, taskId st
|
||||
l.logger.Infof(ctx, "handleRetry retrying task: %s:%s, retry count: %d",
|
||||
taskType, taskId, ed.RetryCount)
|
||||
|
||||
return l.save(taskType, taskId, ed.Delay, ed.Data, ed.RetryCount)
|
||||
// 不覆盖的新建
|
||||
return l.create(taskType, taskId, ed.Delay, ed.Data, ed.RetryCount)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user