From c3c6fd05cbfb2625ba5e89c10c9f3c75d27c0239 Mon Sep 17 00:00:00 2001 From: Yun Date: Mon, 20 Oct 2025 18:18:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=80=E4=B8=AA=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E8=B0=83=E5=BA=A6BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 43 +++++++++++++++++++++++-------------------- once.go | 47 +++++++++++++++++++++++------------------------ 2 files changed, 46 insertions(+), 44 deletions(-) diff --git a/cluster.go b/cluster.go index 1e81c6a..6fcc135 100644 --- a/cluster.go +++ b/cluster.go @@ -541,28 +541,34 @@ func (c *Cluster) executeTasks() { case <-c.ctx.Done(): return case c.workerChan <- struct{}{}: - if c.usePriority && !c.priority.IsLatest(c.ctx) { - time.Sleep(5 * time.Second) - continue - } + func() { + defer func() { + <-c.workerChan + }() - taskID, err := c.redis.BLPop(c.ctx, 10*time.Second, c.listKey).Result() - if err != nil { - if err != redis.Nil { - c.logger.Errorf(c.ctx, "Failed to pop task: %v", err) - // Redis 异常,休眠一会儿 + if c.usePriority && !c.priority.IsLatest(c.ctx) { time.Sleep(5 * time.Second) + return } - continue - } - if len(taskID) < 2 { - c.logger.Errorf(c.ctx, "Invalid BLPop result: %v", taskID) - // 数据异常,继续下一个 - continue - } + taskID, err := c.redis.BLPop(c.ctx, 10*time.Second, c.listKey).Result() + if err != nil { + if err != redis.Nil { + c.logger.Errorf(c.ctx, "Failed to pop task: %v", err) + // Redis 异常,休眠一会儿 + time.Sleep(5 * time.Second) + } + return + } - go c.processTask(taskID[1]) + if len(taskID) < 2 { + c.logger.Errorf(c.ctx, "Invalid BLPop result: %v", taskID) + // 数据异常,继续下一个 + return + } + + go c.processTask(taskID[1]) + }() } } @@ -575,9 +581,6 @@ type ReJobData struct { // 执行任务 func (l *Cluster) processTask(taskId string) { - defer func() { - <-l.workerChan - }() begin := time.Now() diff --git a/once.go b/once.go index 4a2279b..5aba721 100644 --- a/once.go +++ b/once.go @@ -289,38 +289,40 @@ func (l *Once) executeTasks() { for { - if l.usePriority { - if !l.priority.IsLatest(l.ctx) { - time.Sleep(time.Second * 5) - continue - } - } - select { case <-l.stopChan: return case <-l.ctx.Done(): return case l.workerChan <- struct{}{}: + func() { + defer func() { + <-l.workerChan + }() - keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() - if err != nil { - if err != redis.Nil { - l.logger.Errorf(l.ctx, "Failed to pop task: %v", err) - // Redis 异常,休眠一会儿再重试 + if l.usePriority && !l.priority.IsLatest(l.ctx) { time.Sleep(time.Second * 5) + return } - continue - } - if len(keys) < 2 { - l.logger.Errorf(l.ctx, "Invalid task data: %v", keys) - // 数据异常,继续下一个 - continue - } - // 处理任务 - go l.processTask(keys[1]) + keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() + if err != nil { + if err != redis.Nil { + l.logger.Errorf(l.ctx, "Failed to pop task: %v", err) + // Redis 异常,休眠一会儿再重试 + time.Sleep(time.Second * 5) + } + return + } + if len(keys) < 2 { + l.logger.Errorf(l.ctx, "Invalid task data: %v", keys) + // 数据异常,继续下一个 + return + } + // 处理任务 + go l.processTask(keys[1]) + }() } } @@ -518,9 +520,6 @@ func (l *Once) batchGetTasks() { // 执行任务 func (l *Once) processTask(key string) { - defer func() { - <-l.workerChan - }() begin := time.Now()