修复一个任务调度BUG

This commit is contained in:
Yun
2025-10-20 18:18:22 +08:00
parent a9f37bc4e7
commit c3c6fd05cb
2 changed files with 46 additions and 44 deletions
+9 -6
View File
@@ -541,9 +541,14 @@ func (c *Cluster) executeTasks() {
case <-c.ctx.Done(): case <-c.ctx.Done():
return return
case c.workerChan <- struct{}{}: case c.workerChan <- struct{}{}:
func() {
defer func() {
<-c.workerChan
}()
if c.usePriority && !c.priority.IsLatest(c.ctx) { if c.usePriority && !c.priority.IsLatest(c.ctx) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
continue return
} }
taskID, err := c.redis.BLPop(c.ctx, 10*time.Second, c.listKey).Result() taskID, err := c.redis.BLPop(c.ctx, 10*time.Second, c.listKey).Result()
@@ -553,16 +558,17 @@ func (c *Cluster) executeTasks() {
// Redis 异常,休眠一会儿 // Redis 异常,休眠一会儿
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
continue return
} }
if len(taskID) < 2 { if len(taskID) < 2 {
c.logger.Errorf(c.ctx, "Invalid BLPop result: %v", taskID) c.logger.Errorf(c.ctx, "Invalid BLPop result: %v", taskID)
// 数据异常,继续下一个 // 数据异常,继续下一个
continue return
} }
go c.processTask(taskID[1]) go c.processTask(taskID[1])
}()
} }
} }
@@ -575,9 +581,6 @@ type ReJobData struct {
// 执行任务 // 执行任务
func (l *Cluster) processTask(taskId string) { func (l *Cluster) processTask(taskId string) {
defer func() {
<-l.workerChan
}()
begin := time.Now() begin := time.Now()
+12 -13
View File
@@ -289,19 +289,21 @@ func (l *Once) executeTasks() {
for { for {
if l.usePriority {
if !l.priority.IsLatest(l.ctx) {
time.Sleep(time.Second * 5)
continue
}
}
select { select {
case <-l.stopChan: case <-l.stopChan:
return return
case <-l.ctx.Done(): case <-l.ctx.Done():
return return
case l.workerChan <- struct{}{}: case l.workerChan <- struct{}{}:
func() {
defer func() {
<-l.workerChan
}()
if l.usePriority && !l.priority.IsLatest(l.ctx) {
time.Sleep(time.Second * 5)
return
}
keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result()
if err != nil { if err != nil {
@@ -310,17 +312,17 @@ func (l *Once) executeTasks() {
// Redis 异常,休眠一会儿再重试 // Redis 异常,休眠一会儿再重试
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
} }
continue return
} }
if len(keys) < 2 { if len(keys) < 2 {
l.logger.Errorf(l.ctx, "Invalid task data: %v", keys) l.logger.Errorf(l.ctx, "Invalid task data: %v", keys)
// 数据异常,继续下一个 // 数据异常,继续下一个
continue return
} }
// 处理任务 // 处理任务
go l.processTask(keys[1]) go l.processTask(keys[1])
}()
} }
} }
@@ -518,9 +520,6 @@ func (l *Once) batchGetTasks() {
// 执行任务 // 执行任务
func (l *Once) processTask(key string) { func (l *Once) processTask(key string) {
defer func() {
<-l.workerChan
}()
begin := time.Now() begin := time.Now()