添加example&修改集群模式调用信息

This commit is contained in:
Yun
2025-10-05 21:43:41 +08:00
parent 1f9684080d
commit 9caa984846
6 changed files with 290 additions and 6 deletions
+7 -5
View File
@@ -53,6 +53,7 @@ type Cluster struct {
heartbeat *heartbeat.HeartBeat // 心跳
cache *cachex.Cache // 本地缓存
cronParser *cron.Parser // cron表达式解析器
batchSize int // 批量获取任务的数量
}
// 初始化定时器
@@ -88,6 +89,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
stopChan: make(chan struct{}),
instanceId: U.String(),
cronParser: op.cronParser,
batchSize: op.batchSize,
}
// 初始化优先级
@@ -135,7 +137,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
heartbeat.WithLeader(clu.leader),
heartbeat.WithLogger(clu.logger),
heartbeat.WithPriority(clu.priority),
heartbeat.WithSource("once"),
heartbeat.WithSource("cluster"),
)
if err != nil {
clu.logger.Errorf(ctx, "InitHeartBeat err:%v", err)
@@ -450,10 +452,10 @@ func (l *Cluster) calculateNextTimes() {
// 使用Lua脚本原子性添加任务
script := `
local zsetKey = KEYS[1]
local lockKey = KEYS[2]
local score = ARGV[1]
local taskID = ARGV[2]
local expireTime = ARGV[3]
local lockKey = ARGV[4]
-- 检查是否已存在
local existing = redis.call('zscore', zsetKey, taskID)
@@ -472,8 +474,8 @@ func (l *Cluster) calculateNextTimes() {
`
lockKey := fmt.Sprintf("%s_%s_%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli())
_, err = pipe.Eval(l.ctx, script, []string{l.zsetKey, lockKey},
nextTime.UnixMilli(), val.TaskId, 60).Result()
_, err = pipe.Eval(l.ctx, script, []string{l.zsetKey},
nextTime.UnixMilli(), val.TaskId, 60, lockKey).Result()
if err != nil {
l.logger.Errorf(l.ctx, "Failed to schedule task: %v", err)
}
@@ -504,7 +506,7 @@ func (c *Cluster) moveReadyTasks() {
`
result, err := c.redis.Eval(c.ctx, script, []string{c.zsetKey, c.listKey},
time.Now().UnixMilli(), 100).Result()
time.Now().UnixMilli(), c.batchSize).Result()
if err != nil && err != redis.Nil {
c.logger.Errorf(c.ctx, "Failed to move ready tasks: %v", err)
return