This commit is contained in:
Yun
2025-09-24 21:07:06 +08:00
parent 970a1ca33c
commit 16a392a266
+31 -29
View File
@@ -27,18 +27,19 @@ import (
// 单次的任务队列
type Once struct {
ctx context.Context // ctx
cancel context.CancelFunc // cancel
logger logger.Logger // 日志
zsetKey string // 任务列表 有序集合
listKey string //
redis redis.UniversalClient // Redis
worker Callback // 回调
keyPrefix string //
priority *priority.Priority // 全局优先级
usePriority bool //
batchSize int // 每批大小
wg sync.WaitGroup //
ctx context.Context // ctx
cancel context.CancelFunc // cancel
logger logger.Logger // 日志
zsetKey string // 任务列表 有序集合
listKey string // 列表
globalLockPrefix string // 全局锁的前缀
redis redis.UniversalClient // Redis
worker Callback // 回调
keyPrefix string //
priority *priority.Priority // 全局优先级
usePriority bool //
batchSize int // 每批大小
wg sync.WaitGroup //
stopChan chan struct{} //
instanceId string // 实例ID
@@ -92,22 +93,23 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
u, _ := uuid.NewV7()
wo := &Once{
ctx: ctx,
cancel: cancel,
logger: op.logger,
zsetKey: "timer:once_zsetkey" + keyPrefix,
listKey: "timer:once_listkey" + keyPrefix,
executeInfoKey: "timer:once_executeInfoKey" + keyPrefix,
usePriority: op.usePriority,
redis: re,
worker: call,
keyPrefix: keyPrefix,
batchSize: op.batchSize,
stopChan: make(chan struct{}),
instanceId: u.String(),
keySeparator: "[:]",
timeout: op.timeout,
maxRetryCount: op.maxRetryCount,
ctx: ctx,
cancel: cancel,
logger: op.logger,
zsetKey: "timer:once_zsetkey" + keyPrefix,
listKey: "timer:once_listkey" + keyPrefix,
executeInfoKey: "timer:once_executeInfoKey" + keyPrefix,
globalLockPrefix: "timer:once_globalLockPrefix" + keyPrefix,
usePriority: op.usePriority,
redis: re,
worker: call,
keyPrefix: keyPrefix,
batchSize: op.batchSize,
stopChan: make(chan struct{}),
instanceId: u.String(),
keySeparator: "[:]",
timeout: op.timeout,
maxRetryCount: op.maxRetryCount,
}
// 初始化优先级
@@ -434,7 +436,7 @@ func (l *Once) processTask(key string) {
}
// 这里加一个全局锁
lock, err := lockx.NewGlobalLock(ctx, l.redis, key)
lock, err := lockx.NewGlobalLock(ctx, l.redis, l.globalLockPrefix+key)
if err != nil {
l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s", taskId)
return