添加心跳相关内容

This commit is contained in:
Yun
2025-09-18 21:18:35 +08:00
parent 549eee700e
commit f79be3955f
3 changed files with 128 additions and 35 deletions
+124 -31
View File
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"runtime/debug"
"strconv"
"sync"
"time"
@@ -31,21 +32,24 @@ type Cluster struct {
keyPrefix string // key前缀
location *time.Location // 根据时区计算的时间
lockKey string // 全局计算的key
zsetKey string // 有序集合的key
listKey string // 可执行的任务列表的key
setKey string // 重入集合的key
lockKey string // 全局计算的key
zsetKey string // 有序集合的key
listKey string // 可执行的任务列表的key
setKey string // 重入集合的key
heartbeatKey string // 心跳的Key
leaderKey string // 上报当前的Leader
priority *priority.Priority // 全局优先级
priorityKey string // 全局优先级的key
usePriority bool // 是否使用优先级
wg sync.WaitGroup // 等待组
isLeader bool // 是否是领导
leaderLock sync.RWMutex // 领导锁
leaderKey string // 实例Id
workerList sync.Map // 注册的任务列表
stopChan chan struct{}
wg sync.WaitGroup // 等待组
isLeader bool // 是否是领导
leaderLock sync.RWMutex // 领导锁
leaderUniLockKey string // 领导唯一锁
workerList sync.Map // 注册的任务列表
stopChan chan struct{} //
instanceId string // 实例ID
}
// 初始化定时器
@@ -57,23 +61,28 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
ctx, cancel := context.WithCancel(ctx)
U, _ := uuid.NewV7()
clu := &Cluster{
ctx: ctx,
cancel: cancel,
redis: red,
cache: cachex.NewCache(),
timeout: op.timeout,
logger: op.logger,
keyPrefix: keyPrefix,
location: op.location,
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
leaderKey: "timer:cluster_leaderKey" + keyPrefix, // 领导
usePriority: op.usePriority,
stopChan: make(chan struct{}),
ctx: ctx,
cancel: cancel,
redis: red,
cache: cachex.NewCache(),
timeout: op.timeout,
logger: op.logger,
keyPrefix: keyPrefix,
location: op.location,
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
leaderUniLockKey: "timer:cluster_leaderUniLockKey" + keyPrefix, // 领导唯一锁
leaderKey: "timer:cluster_leaderKey" + keyPrefix, // 上报当前Leader
heartbeatKey: "timer:cluster_heartbeatKey" + keyPrefix, // 心跳 有序集合
usePriority: op.usePriority,
stopChan: make(chan struct{}),
instanceId: U.String(),
}
// 初始化优先级
@@ -85,13 +94,16 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
// 启动守护进程
clu.startDaemon()
clu.logger.Infof(ctx, "InitCluster success keyPrefix:%s instanceId:%s", clu.keyPrefix, clu.instanceId)
return clu
}
// Stop 停止集群定时器
func (c *Cluster) Stop() {
close(c.stopChan)
c.wg.Wait()
func (l *Cluster) Stop() {
close(l.stopChan)
l.cleanHeartbeat(true)
l.wg.Wait()
}
// 守护任务
@@ -101,6 +113,14 @@ func (l *Cluster) startDaemon() {
l.wg.Add(1)
go l.leaderElection()
// 心跳上报
l.wg.Add(1)
go l.heartbeatLoop()
// 心跳过期清理
l.wg.Add(1)
go l.cleanHeartbeatLoop()
// 任务调度
l.wg.Add(1)
go l.scheduleTasks()
@@ -111,6 +131,76 @@ func (l *Cluster) startDaemon() {
}
// 心跳上报
// 需要确定当前存活的实例&当前实例是否是领导
func (l *Cluster) heartbeatLoop() {
defer l.wg.Done()
// 先执行一次
l.heartbeat()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-l.stopChan:
return
case <-l.ctx.Done():
return
case <-ticker.C:
l.heartbeat()
}
}
}
// 单次心跳
func (l *Cluster) heartbeat() error {
err := l.redis.ZAdd(l.ctx, l.heartbeatKey, &redis.Z{
Score: float64(time.Now().UnixMilli()),
Member: l.instanceId,
}).Err()
if err != nil {
l.logger.Errorf(l.ctx, "heartbeat redis.ZAdd err:%v", err)
return err
}
return nil
}
// 心跳清理(leader可操作)
func (l *Cluster) cleanHeartbeatLoop() {
defer l.wg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-l.stopChan:
return
case <-l.ctx.Done():
return
case <-ticker.C:
if l.isCurrentLeader() {
l.cleanHeartbeat(false)
}
}
}
}
// 单次清理
func (l *Cluster) cleanHeartbeat(cleanSelf bool) error {
if cleanSelf {
return l.redis.ZRem(l.ctx, l.heartbeatKey, l.instanceId).Err()
}
return l.redis.ZRemRangeByScore(l.ctx, l.heartbeatKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Second).UnixMilli(), 10)).Err()
}
// 领导选举
// 领导作用:全局推选一个人计算执行时间&移入队列,避免每个都进行计算浪费资源
func (l *Cluster) leaderElection() {
@@ -147,7 +237,7 @@ func (l *Cluster) getLeaderLock() error {
defer cancel()
// 尝试加锁
lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderKey)
lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderUniLockKey)
if err != nil {
l.logger.Errorf(l.ctx, "getLeaderLock err:%+v", err)
return err
@@ -166,6 +256,9 @@ func (l *Cluster) getLeaderLock() error {
l.isLeader = true
l.leaderLock.Unlock()
// 上报当前的Leader实例
l.redis.Set(l.ctx, l.leaderKey, l.instanceId, time.Hour*24)
l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue())
go func() {
@@ -417,7 +510,7 @@ func (l *Cluster) calculateNextTimes() {
return 1
`
lockKey := fmt.Sprintf("%s:lock:calc:%s:%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli())
lockKey := fmt.Sprintf("%s_%s_%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli())
_, err = pipe.Eval(l.ctx, script, []string{l.zsetKey, lockKey},
nextTime.UnixMilli(), val.TaskId, 60).Result()
if err != nil {