diff --git a/cluster.go b/cluster.go index 8d4fc1d..3a69a89 100644 --- a/cluster.go +++ b/cluster.go @@ -529,7 +529,7 @@ func (c *Cluster) executeTasks() { return case <-c.ctx.Done(): return - case <-c.workerChan: + case c.workerChan <- struct{}{}: if c.usePriority && !c.priority.IsLatest(c.ctx) { time.Sleep(5 * time.Second) continue diff --git a/once.go b/once.go index eab1134..493e638 100644 --- a/once.go +++ b/once.go @@ -121,8 +121,8 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c keySeparator: "[:]", timeout: op.timeout, maxRunCount: op.maxRunCount, - workerChan: make(chan struct{}, op.maxRunCount), - maxWorkers: op.maxRunCount, + workerChan: make(chan struct{}, op.maxWorkers), + maxWorkers: op.maxWorkers, } // 初始化优先级 @@ -289,7 +289,8 @@ func (l *Once) executeTasks() { return case <-l.ctx.Done(): return - case <-l.workerChan: + case l.workerChan <- struct{}{}: + keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() if err != nil { if err != redis.Nil {