diff --git a/cluster.go b/cluster.go index 8aa2df4..6329aa7 100644 --- a/cluster.go +++ b/cluster.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "runtime/debug" + "strconv" "sync" "time" @@ -31,21 +32,24 @@ type Cluster struct { keyPrefix string // key前缀 location *time.Location // 根据时区计算的时间 - lockKey string // 全局计算的key - zsetKey string // 有序集合的key - listKey string // 可执行的任务列表的key - setKey string // 重入集合的key + lockKey string // 全局计算的key + zsetKey string // 有序集合的key + listKey string // 可执行的任务列表的key + setKey string // 重入集合的key + heartbeatKey string // 心跳的Key + leaderKey string // 上报当前的Leader priority *priority.Priority // 全局优先级 priorityKey string // 全局优先级的key usePriority bool // 是否使用优先级 - wg sync.WaitGroup // 等待组 - isLeader bool // 是否是领导 - leaderLock sync.RWMutex // 领导锁 - leaderKey string // 实例Id - workerList sync.Map // 注册的任务列表 - stopChan chan struct{} + wg sync.WaitGroup // 等待组 + isLeader bool // 是否是领导 + leaderLock sync.RWMutex // 领导锁 + leaderUniLockKey string // 领导唯一锁 + workerList sync.Map // 注册的任务列表 + stopChan chan struct{} // + instanceId string // 实例ID } // 初始化定时器 @@ -57,23 +61,28 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin ctx, cancel := context.WithCancel(ctx) + U, _ := uuid.NewV7() + clu := &Cluster{ - ctx: ctx, - cancel: cancel, - redis: red, - cache: cachex.NewCache(), - timeout: op.timeout, - logger: op.logger, - keyPrefix: keyPrefix, - location: op.location, - lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 - zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 - listKey: "timer:cluster_listKey" + keyPrefix, // 列表 - setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 - priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key - leaderKey: "timer:cluster_leaderKey" + keyPrefix, // 领导 - usePriority: op.usePriority, - stopChan: make(chan struct{}), + ctx: ctx, + cancel: cancel, + redis: red, + cache: cachex.NewCache(), + timeout: op.timeout, + logger: op.logger, + keyPrefix: keyPrefix, + location: op.location, + lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 + zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 + listKey: "timer:cluster_listKey" + keyPrefix, // 列表 + setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 + priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key + leaderUniLockKey: "timer:cluster_leaderUniLockKey" + keyPrefix, // 领导唯一锁 + leaderKey: "timer:cluster_leaderKey" + keyPrefix, // 上报当前Leader + heartbeatKey: "timer:cluster_heartbeatKey" + keyPrefix, // 心跳 有序集合 + usePriority: op.usePriority, + stopChan: make(chan struct{}), + instanceId: U.String(), } // 初始化优先级 @@ -85,13 +94,16 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin // 启动守护进程 clu.startDaemon() + clu.logger.Infof(ctx, "InitCluster success keyPrefix:%s instanceId:%s", clu.keyPrefix, clu.instanceId) + return clu } // Stop 停止集群定时器 -func (c *Cluster) Stop() { - close(c.stopChan) - c.wg.Wait() +func (l *Cluster) Stop() { + close(l.stopChan) + l.cleanHeartbeat(true) + l.wg.Wait() } // 守护任务 @@ -101,6 +113,14 @@ func (l *Cluster) startDaemon() { l.wg.Add(1) go l.leaderElection() + // 心跳上报 + l.wg.Add(1) + go l.heartbeatLoop() + + // 心跳过期清理 + l.wg.Add(1) + go l.cleanHeartbeatLoop() + // 任务调度 l.wg.Add(1) go l.scheduleTasks() @@ -111,6 +131,76 @@ func (l *Cluster) startDaemon() { } +// 心跳上报 +// 需要确定当前存活的实例&当前实例是否是领导 +func (l *Cluster) heartbeatLoop() { + defer l.wg.Done() + + // 先执行一次 + l.heartbeat() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-l.stopChan: + return + case <-l.ctx.Done(): + return + case <-ticker.C: + l.heartbeat() + } + } + +} + +// 单次心跳 +func (l *Cluster) heartbeat() error { + err := l.redis.ZAdd(l.ctx, l.heartbeatKey, &redis.Z{ + Score: float64(time.Now().UnixMilli()), + Member: l.instanceId, + }).Err() + + if err != nil { + l.logger.Errorf(l.ctx, "heartbeat redis.ZAdd err:%v", err) + return err + } + + return nil +} + +// 心跳清理(leader可操作) +func (l *Cluster) cleanHeartbeatLoop() { + defer l.wg.Done() + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-l.stopChan: + return + case <-l.ctx.Done(): + return + case <-ticker.C: + if l.isCurrentLeader() { + l.cleanHeartbeat(false) + } + } + } + +} + +// 单次清理 +func (l *Cluster) cleanHeartbeat(cleanSelf bool) error { + + if cleanSelf { + return l.redis.ZRem(l.ctx, l.heartbeatKey, l.instanceId).Err() + } + return l.redis.ZRemRangeByScore(l.ctx, l.heartbeatKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Second).UnixMilli(), 10)).Err() +} + // 领导选举 // 领导作用:全局推选一个人计算执行时间&移入队列,避免每个都进行计算浪费资源 func (l *Cluster) leaderElection() { @@ -147,7 +237,7 @@ func (l *Cluster) getLeaderLock() error { defer cancel() // 尝试加锁 - lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderKey) + lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderUniLockKey) if err != nil { l.logger.Errorf(l.ctx, "getLeaderLock err:%+v", err) return err @@ -166,6 +256,9 @@ func (l *Cluster) getLeaderLock() error { l.isLeader = true l.leaderLock.Unlock() + // 上报当前的Leader实例 + l.redis.Set(l.ctx, l.leaderKey, l.instanceId, time.Hour*24) + l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue()) go func() { @@ -417,7 +510,7 @@ func (l *Cluster) calculateNextTimes() { return 1 ` - lockKey := fmt.Sprintf("%s:lock:calc:%s:%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli()) + lockKey := fmt.Sprintf("%s_%s_%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli()) _, err = pipe.Eval(l.ctx, script, []string{l.zsetKey, lockKey}, nextTime.UnixMilli(), val.TaskId, 60).Result() if err != nil { diff --git a/go.mod b/go.mod index b01f324..8c8a030 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/yuninks/timerx -go 1.21 +go 1.24 require ( github.com/go-redis/redis/v8 v8.11.5 @@ -8,7 +8,7 @@ require ( github.com/satori/go.uuid v1.2.0 github.com/stretchr/testify v1.11.1 github.com/yuninks/cachex v1.0.5 - github.com/yuninks/lockx v1.1.0 + github.com/yuninks/lockx v1.1.1 ) require ( diff --git a/go.sum b/go.sum index 84bef01..8a6e23f 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM= github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM= -github.com/yuninks/lockx v1.1.0 h1:Qkf+RqYQ6Vr242h3dWxWJqIvIp9/6j5ijTkbuWvrgqU= -github.com/yuninks/lockx v1.1.0/go.mod h1:Y/MtD+4Zc79/0Qsdd9c99jxW5WWxXVZlxcJyYJFJy5A= +github.com/yuninks/lockx v1.1.1 h1:b687xJrBblvquP9czP3iiW62VPr/RerJbQMIi0zQizs= +github.com/yuninks/lockx v1.1.1/go.mod h1:tM46x/fp9046YnTIeddmUhkxFjJ/f0g4J+1zdaGKfd8= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=