From 16a392a266bbc425638d6e1f98491f90290c1320 Mon Sep 17 00:00:00 2001 From: Yun Date: Wed, 24 Sep 2025 21:07:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- once.go | 60 +++++++++++++++++++++++++++++---------------------------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/once.go b/once.go index 86fe7e5..b535507 100644 --- a/once.go +++ b/once.go @@ -27,18 +27,19 @@ import ( // 单次的任务队列 type Once struct { - ctx context.Context // ctx - cancel context.CancelFunc // cancel - logger logger.Logger // 日志 - zsetKey string // 任务列表 有序集合 - listKey string // - redis redis.UniversalClient // Redis - worker Callback // 回调 - keyPrefix string // - priority *priority.Priority // 全局优先级 - usePriority bool // - batchSize int // 每批大小 - wg sync.WaitGroup // + ctx context.Context // ctx + cancel context.CancelFunc // cancel + logger logger.Logger // 日志 + zsetKey string // 任务列表 有序集合 + listKey string // 列表 + globalLockPrefix string // 全局锁的前缀 + redis redis.UniversalClient // Redis + worker Callback // 回调 + keyPrefix string // + priority *priority.Priority // 全局优先级 + usePriority bool // + batchSize int // 每批大小 + wg sync.WaitGroup // stopChan chan struct{} // instanceId string // 实例ID @@ -92,22 +93,23 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c u, _ := uuid.NewV7() wo := &Once{ - ctx: ctx, - cancel: cancel, - logger: op.logger, - zsetKey: "timer:once_zsetkey" + keyPrefix, - listKey: "timer:once_listkey" + keyPrefix, - executeInfoKey: "timer:once_executeInfoKey" + keyPrefix, - usePriority: op.usePriority, - redis: re, - worker: call, - keyPrefix: keyPrefix, - batchSize: op.batchSize, - stopChan: make(chan struct{}), - instanceId: u.String(), - keySeparator: "[:]", - timeout: op.timeout, - maxRetryCount: op.maxRetryCount, + ctx: ctx, + cancel: cancel, + logger: op.logger, + zsetKey: "timer:once_zsetkey" + keyPrefix, + listKey: "timer:once_listkey" + keyPrefix, + executeInfoKey: "timer:once_executeInfoKey" + keyPrefix, + globalLockPrefix: "timer:once_globalLockPrefix" + keyPrefix, + usePriority: op.usePriority, + redis: re, + worker: call, + keyPrefix: keyPrefix, + batchSize: op.batchSize, + stopChan: make(chan struct{}), + instanceId: u.String(), + keySeparator: "[:]", + timeout: op.timeout, + maxRetryCount: op.maxRetryCount, } // 初始化优先级 @@ -434,7 +436,7 @@ func (l *Once) processTask(key string) { } // 这里加一个全局锁 - lock, err := lockx.NewGlobalLock(ctx, l.redis, key) + lock, err := lockx.NewGlobalLock(ctx, l.redis, l.globalLockPrefix+key) if err != nil { l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s", taskId) return