Merge remote-tracking branch 'refs/remotes/origin/dev' into dev

This commit is contained in:
yun
2026-05-17 21:48:38 +08:00
11 changed files with 393 additions and 109 deletions
+48 -27
View File
@@ -52,7 +52,9 @@ type Once struct {
keySeparator string // 分割符
timeout time.Duration // 任务执行超时时间
maxRunCount int // 最大重试次数 0代表不限
maxRunCount int // 最大重试次数 0代表不限
workerChan chan struct{} // worker
maxWorkers int // 最大worker数量
}
type OnceWorkerResp struct {
@@ -116,7 +118,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
listKey: "timer:once_listkey" + keyPrefix,
executeInfoKey: "timer:once_executeInfoKey" + keyPrefix,
globalLockPrefix: "timer:once_globalLockPrefix" + keyPrefix,
usePriority: op.usePriority,
usePriority: false,
redis: re,
worker: call,
keyPrefix: keyPrefix,
@@ -126,10 +128,24 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
keySeparator: "[:]",
timeout: op.timeout,
maxRunCount: op.maxRunCount,
workerChan: make(chan struct{}, op.maxWorkers),
maxWorkers: op.maxWorkers,
}
// 初始化优先级
if wo.usePriority {
if op.priorityType != priorityTypeNone {
wo.usePriority = true
if op.priorityType == priorityTypeVersion {
pVal, err := priority.PriorityByVersion(op.priorityVersion)
if err != nil {
wo.logger.Errorf(ctx, "PriorityByVersion version:%s err:%v", op.priorityVersion, err)
return nil, err
}
op.priorityVal = pVal
}
pri, err := priority.InitPriority(
ctx,
re,
@@ -280,38 +296,40 @@ func (l *Once) executeTasks() {
for {
if l.usePriority {
if !l.priority.IsLatest(l.ctx) {
time.Sleep(time.Second * 5)
continue
}
}
select {
case <-l.stopChan:
return
case <-l.ctx.Done():
return
default:
case l.workerChan <- struct{}{}:
func() {
defer func() {
<-l.workerChan
}()
keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result()
if err != nil {
if err != redis.Nil {
l.logger.Errorf(l.ctx, "Failed to pop task: %v", err)
// Redis 异常,休眠一会儿再重试
if l.usePriority && !l.priority.IsLatest(l.ctx) {
time.Sleep(time.Second * 5)
return
}
continue
}
if len(keys) < 2 {
l.logger.Errorf(l.ctx, "Invalid task data: %v", keys)
// 数据异常,继续下一个
continue
}
// 处理任务
go l.processTask(keys[1])
keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result()
if err != nil {
if err != redis.Nil {
l.logger.Errorf(l.ctx, "Failed to pop task: %v", err)
// Redis 异常,休眠一会儿再重试
time.Sleep(time.Second * 5)
}
return
}
if len(keys) < 2 {
l.logger.Errorf(l.ctx, "Invalid task data: %v", keys)
// 数据异常,继续下一个
return
}
// 处理任务
go l.processTask(keys[1])
}()
}
}
@@ -338,6 +356,9 @@ func (l *Once) parseRedisKey(key string) (OnceTaskType, string, error) {
// @param delayTime time.Duration 延迟时间
// @param attachData interface{} 附加数据
func (l *Once) Save(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error {
if delayTime < 0 {
delayTime = 0
}
execTime := time.Now().Add(delayTime)
return l.save(ctx, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0)
}
@@ -414,8 +435,7 @@ func (w *Once) save(ctx context.Context, jobType jobType, taskType OnceTaskType,
// 添加任务(不覆盖)
func (l *Once) Create(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error {
if delayTime <= 0 {
l.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v", taskType, taskId, attachData)
return ErrDelayTime
delayTime = 0
}
execTime := time.Now().Add(delayTime)
return l.create(ctx, createSourceDefault, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0)
@@ -523,6 +543,7 @@ func (l *Once) batchGetTasks() {
// 执行任务
func (l *Once) processTask(key string) {
begin := time.Now()
ctx, cancel := context.WithTimeout(l.ctx, l.timeout)