调整Key以支持Redis集群版
This commit is contained in:
+10
-10
@@ -81,12 +81,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
||||
logger: op.logger,
|
||||
keyPrefix: keyPrefix,
|
||||
location: op.location,
|
||||
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
|
||||
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
|
||||
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
|
||||
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
|
||||
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
|
||||
executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合
|
||||
lockKey: fmt.Sprintf("timer:{%s}:cluster_lock", keyPrefix),
|
||||
zsetKey: fmt.Sprintf("timer:{%s}:cluster_zset", keyPrefix),
|
||||
listKey: fmt.Sprintf("timer:{%s}:cluster_list", keyPrefix),
|
||||
setKey: fmt.Sprintf("timer:{%s}:cluster_set", keyPrefix),
|
||||
priorityKey: fmt.Sprintf("timer:{%s}:cluster_priority", keyPrefix),
|
||||
executeInfoKey: fmt.Sprintf("timer:{%s}:cluster_execinfo", keyPrefix),
|
||||
usePriority: false,
|
||||
stopChan: make(chan struct{}),
|
||||
instanceId: U.String(),
|
||||
@@ -462,10 +462,10 @@ func (l *Cluster) calculateNextTimes() {
|
||||
// 使用Lua脚本原子性添加任务
|
||||
script := `
|
||||
local zsetKey = KEYS[1]
|
||||
local lockKey = KEYS[2]
|
||||
local score = ARGV[1]
|
||||
local taskID = ARGV[2]
|
||||
local expireTime = ARGV[3]
|
||||
local lockKey = ARGV[4]
|
||||
|
||||
-- 检查是否已存在
|
||||
local existing = redis.call('zscore', zsetKey, taskID)
|
||||
@@ -483,9 +483,9 @@ func (l *Cluster) calculateNextTimes() {
|
||||
return 1
|
||||
`
|
||||
|
||||
lockKey := fmt.Sprintf("%s_%s_%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli())
|
||||
_, err = pipe.Eval(l.ctx, script, []string{l.zsetKey},
|
||||
nextTime.UnixMilli(), val.TaskId, 60, lockKey).Result()
|
||||
lockKey := fmt.Sprintf("timer:{%s}:cluster_calc_lock:%s:%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli())
|
||||
_, err = pipe.Eval(l.ctx, script, []string{l.zsetKey, lockKey},
|
||||
nextTime.UnixMilli(), val.TaskId, 60).Result()
|
||||
if err != nil {
|
||||
l.logger.Errorf(l.ctx, "Failed to schedule task: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user