From 14eb90bf7d2359c0e6787449e2b09df6b0b55f9c Mon Sep 17 00:00:00 2001 From: Yun Date: Sat, 4 Oct 2025 20:44:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=9B=86=E7=BE=A4=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E4=BD=BF=E7=94=A8=E5=B0=81=E8=A3=85=E7=9A=84=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 282 ++++++++++++----------------------------- heartbeat/heartbeat.go | 2 +- heartbeat/options.go | 13 +- leader/leader.go | 2 +- leader/options.go | 7 + once.go | 4 +- priority/option.go | 21 ++- priority/priority.go | 23 ++-- 8 files changed, 136 insertions(+), 218 deletions(-) diff --git a/cluster.go b/cluster.go index 92359cc..8863490 100644 --- a/cluster.go +++ b/cluster.go @@ -13,6 +13,8 @@ import ( "github.com/google/uuid" "github.com/yuninks/cachex" "github.com/yuninks/lockx" + "github.com/yuninks/timerx/heartbeat" + "github.com/yuninks/timerx/leader" "github.com/yuninks/timerx/logger" "github.com/yuninks/timerx/priority" ) @@ -26,7 +28,6 @@ type Cluster struct { ctx context.Context // context cancel context.CancelFunc // 取消函数 redis redis.UniversalClient // redis - cache *cachex.Cache // 本地缓存 timeout time.Duration // job执行超时时间 logger logger.Logger // 日志 keyPrefix string // key前缀 @@ -36,21 +37,20 @@ type Cluster struct { zsetKey string // 有序集合的key listKey string // 可执行的任务列表的key setKey string // 重入集合的key - heartbeatKey string // 心跳的Key - leaderKey string // 上报当前的Leader executeInfoKey string // 执行情况的key + wg sync.WaitGroup // 等待组 + workerList sync.Map // 注册的任务列表 + stopChan chan struct{} // + instanceId string // 实例ID + priority *priority.Priority // 全局优先级 priorityKey string // 全局优先级的key usePriority bool // 是否使用优先级 - wg sync.WaitGroup // 等待组 - isLeader bool // 是否是领导 - leaderLock sync.RWMutex // 领导锁 - leaderUniLockKey string // 领导唯一锁 - workerList sync.Map // 注册的任务列表 - stopChan chan struct{} // - instanceId string // 实例ID + leader *leader.Leader // Leader + heartbeat *heartbeat.HeartBeat // 心跳 + cache *cachex.Cache // 本地缓存 } // 初始化定时器 @@ -68,33 +68,38 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin U, _ := uuid.NewV7() clu := &Cluster{ - ctx: ctx, - cancel: cancel, - redis: red, - cache: cachex.NewCache(), - timeout: op.timeout, - logger: op.logger, - keyPrefix: keyPrefix, - location: op.location, - lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 - zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 - listKey: "timer:cluster_listKey" + keyPrefix, // 列表 - setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 - priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key - leaderUniLockKey: "timer:cluster_leaderUniLockKey" + keyPrefix, // 领导唯一锁 - leaderKey: "timer:cluster_leaderKey" + keyPrefix, // 上报当前Leader - heartbeatKey: "timer:cluster_heartbeatKey" + keyPrefix, // 心跳 有序集合 - executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合 - usePriority: op.usePriority, - stopChan: make(chan struct{}), - instanceId: U.String(), + ctx: ctx, + cancel: cancel, + redis: red, + cache: cachex.NewCache(), + timeout: op.timeout, + logger: op.logger, + keyPrefix: keyPrefix, + location: op.location, + lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 + zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 + listKey: "timer:cluster_listKey" + keyPrefix, // 列表 + setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 + priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key + executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合 + usePriority: op.usePriority, + stopChan: make(chan struct{}), + instanceId: U.String(), } // 初始化优先级 if clu.usePriority { - pri, err := priority.InitPriority(ctx, red, clu.priorityKey, op.priorityVal, priority.WithLogger(clu.logger)) + pri, err := priority.InitPriority( + ctx, + red, + clu.priorityKey, + op.priorityVal, + priority.WithLogger(clu.logger), + priority.WithInstanceId(clu.instanceId), + priority.WithSource("cluster"), + ) if err != nil { clu.logger.Errorf(ctx, "InitPriority err:%v", err) return nil, err @@ -102,6 +107,39 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin clu.priority = pri } + // 初始化leader + le, err := leader.InitLeader( + ctx, + clu.redis, + keyPrefix, + leader.WithLogger(clu.logger), + leader.WithPriority(clu.priority), + leader.WithInstanceId(clu.instanceId), + leader.WithSource("cluster"), + ) + if err != nil { + clu.logger.Infof(ctx, "InitLeader err:%v", err) + return nil, err + } + clu.leader = le + + // 初始化心跳 + heart, err := heartbeat.InitHeartBeat( + ctx, + clu.redis, + clu.keyPrefix, + heartbeat.WithInstanceId(clu.instanceId), + heartbeat.WithLeader(clu.leader), + heartbeat.WithLogger(clu.logger), + heartbeat.WithPriority(clu.priority), + heartbeat.WithSource("once"), + ) + if err != nil { + clu.logger.Errorf(ctx, "InitHeartBeat err:%v", err) + return nil, err + } + clu.heartbeat = heart + // 启动守护进程 clu.startDaemon() @@ -119,25 +157,13 @@ func (l *Cluster) Stop() { if l.usePriority && l.priority != nil { l.priority.Close() } - l.cleanHeartbeat(true) + l.wg.Wait() } // 守护任务 func (l *Cluster) startDaemon() { - // 领导选举 - l.wg.Add(1) - go l.leaderElection() - - // 心跳上报 - l.wg.Add(1) - go l.heartbeatLoop() - - // 心跳过期清理 - l.wg.Add(1) - go l.cleanHeartbeatLoop() - // 任务调度 l.wg.Add(1) go l.scheduleTasks() @@ -146,17 +172,15 @@ func (l *Cluster) startDaemon() { l.wg.Add(1) go l.executeTasks() + l.wg.Add(1) + go l.cleanExecuteInfoLoop() + } -// 心跳上报 -// 需要确定当前存活的实例&当前实例是否是领导 -func (l *Cluster) heartbeatLoop() { - defer l.wg.Done() +func (l *Cluster) cleanExecuteInfoLoop() { + l.wg.Done() - // 先执行一次 - l.heartbeat() - - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(time.Minute * 5) defer ticker.Stop() for { @@ -166,164 +190,20 @@ func (l *Cluster) heartbeatLoop() { case <-l.ctx.Done(): return case <-ticker.C: - l.heartbeat() - } - } - -} - -// 单次心跳 -func (l *Cluster) heartbeat() error { - err := l.redis.ZAdd(l.ctx, l.heartbeatKey, &redis.Z{ - Score: float64(time.Now().UnixMilli()), - Member: l.instanceId, - }).Err() - - if err != nil { - l.logger.Errorf(l.ctx, "heartbeat redis.ZAdd err:%v", err) - return err - } - - return nil -} - -// 心跳清理(leader可操作) -func (l *Cluster) cleanHeartbeatLoop() { - defer l.wg.Done() - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-l.stopChan: - return - case <-l.ctx.Done(): - return - case <-ticker.C: - if l.isCurrentLeader() { - l.cleanHeartbeat(false) + if l.leader.IsLeader() { + l.cleanExecuteInfo() } } } - } -// 单次清理 -func (l *Cluster) cleanHeartbeat(cleanSelf bool) error { - - if cleanSelf { - return l.redis.ZRem(l.ctx, l.heartbeatKey, l.instanceId).Err() - } - - // 移除心跳 - l.redis.ZRemRangeByScore(l.ctx, l.heartbeatKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Second).UnixMilli(), 10)).Err() - +// 清除过期任务 +func (l *Cluster) cleanExecuteInfo() error { // 移除执行信息 l.redis.ZRemRangeByScore(l.ctx, l.executeInfoKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Minute).UnixMilli(), 10)).Err() return nil } -// 领导选举 -// 领导作用:全局推选一个人计算执行时间&移入队列,避免每个都进行计算浪费资源 -func (l *Cluster) leaderElection() { - defer l.wg.Done() - - // 先执行一次 - l.getLeaderLock() - - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - l.getLeaderLock() - case <-l.stopChan: - return - case <-l.ctx.Done(): - return - } - } - -} - -// 成为领导 -func (l *Cluster) getLeaderLock() error { - - // 非当前优先级不用抢leader - if l.usePriority && !l.priority.IsLatest(l.ctx) { - return nil - } - - ctx, cancel := context.WithCancel(l.ctx) - defer cancel() - - // 尝试加锁 - lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderUniLockKey) - if err != nil { - l.logger.Errorf(l.ctx, "getLeaderLock err:%+v", err) - return err - } - if b, _ := lock.Lock(); !b { - // 加锁失败 非Reader - l.leaderLock.Lock() - l.isLeader = false - l.leaderLock.Unlock() - return nil - } - defer lock.Unlock() - - // 加锁成功 - l.leaderLock.Lock() - l.isLeader = true - l.leaderLock.Unlock() - - // 上报当前的Leader实例 - l.redis.Set(l.ctx, l.leaderKey, l.instanceId, time.Hour*24) - - l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue()) - - go func() { - if !l.usePriority || l.priority == nil { - return - } - - for { - select { - case <-ctx.Done(): - return - case <-l.stopChan: - return - default: - if !l.priority.IsLatest(l.ctx) { - cancel() - return - } - time.Sleep(100 * time.Millisecond) - } - } - }() - - // 等待超时退出 - <-lock.GetCtx().Done() - - // 已过期 - // l.leaderLock.Lock() - // l.isLeader = false - // l.leaderLock.Unlock() - - return nil - -} - -// isCurrentLeader 检查当前实例是否是leader -func (c *Cluster) isCurrentLeader() bool { - c.leaderLock.RLock() - defer c.leaderLock.RUnlock() - return c.isLeader -} - // scheduleTasks 调度任务(只有leader执行) func (c *Cluster) scheduleTasks() { defer c.wg.Done() @@ -334,7 +214,7 @@ func (c *Cluster) scheduleTasks() { for { select { case <-ticker.C: - if !c.isCurrentLeader() { + if !c.leader.IsLeader() { continue } if c.usePriority && !c.priority.IsLatest(c.ctx) { diff --git a/heartbeat/heartbeat.go b/heartbeat/heartbeat.go index f478f69..4c86df7 100644 --- a/heartbeat/heartbeat.go +++ b/heartbeat/heartbeat.go @@ -47,7 +47,7 @@ func InitHeartBeat(ctx context.Context, ref redis.UniversalClient, keyPrefix str ctx: ctx, cancel: cancel, - heartbeatKey: "timer:heartbeat_key" + keyPrefix, + heartbeatKey: "timer:heartbeat_key" + op.source + keyPrefix, priority: op.priority, redis: ref, diff --git a/heartbeat/options.go b/heartbeat/options.go index 24c12ef..b2a8c7c 100644 --- a/heartbeat/options.go +++ b/heartbeat/options.go @@ -8,10 +8,11 @@ import ( ) type Options struct { - logger logger.Logger - instanceId string + logger logger.Logger // 日志 + instanceId string // 实例ID priority *priority.Priority // 全局优先级 - leader *leader.Leader + leader *leader.Leader // Leader + source string // 来源服务 } func defaultOptions() Options { @@ -57,3 +58,9 @@ func WithInstanceId(instanceId string) Option { o.instanceId = instanceId } } + +func WithSource(source string) Option { + return func(o *Options) { + o.source = source + } +} diff --git a/leader/leader.go b/leader/leader.go index cde8c3f..c5b9f11 100644 --- a/leader/leader.go +++ b/leader/leader.go @@ -52,7 +52,7 @@ func InitLeader(ctx context.Context, ref redis.UniversalClient, keyPrefix string ctx: ctx, cancel: cancel, redis: ref, - leaderUniLockKey: "timer:leader_lockKey" + keyPrefix, + leaderUniLockKey: "timer:leader_lockKey" + op.source + keyPrefix, priority: op.priority, instanceId: op.instanceId, logger: op.logger, diff --git a/leader/options.go b/leader/options.go index bb3e4c9..01ea794 100644 --- a/leader/options.go +++ b/leader/options.go @@ -10,6 +10,7 @@ type Options struct { logger logger.Logger instanceId string priority *priority.Priority // 全局优先级 + source string // 来源服务 } func defaultOptions() Options { @@ -49,3 +50,9 @@ func WithInstanceId(instanceId string) Option { o.instanceId = instanceId } } + +func WithSource(source string) Option { + return func(o *Options) { + o.source = source + } +} diff --git a/once.go b/once.go index 62ee01b..5ca74fa 100644 --- a/once.go +++ b/once.go @@ -114,7 +114,8 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c // 初始化优先级 if wo.usePriority { - pri, err := priority.InitPriority(ctx, + pri, err := priority.InitPriority( + ctx, re, keyPrefix, op.priorityVal, @@ -151,6 +152,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c heartbeat.WithLeader(wo.leader), heartbeat.WithLogger(wo.logger), heartbeat.WithPriority(wo.priority), + heartbeat.WithSource("once"), ) if err != nil { wo.logger.Errorf(ctx, "InitHeartBeat err:%v", err) diff --git a/priority/option.go b/priority/option.go index dffad0b..bef428f 100644 --- a/priority/option.go +++ b/priority/option.go @@ -3,6 +3,7 @@ package priority import ( "time" + "github.com/google/uuid" "github.com/yuninks/timerx/logger" ) @@ -10,15 +11,21 @@ type Options struct { getInterval time.Duration // 查询周期 updateInterval time.Duration // 更新间隔 expireTime time.Duration // 有效时间 - logger logger.Logger + logger logger.Logger // 日志 + source string // 来源服务 + instanceId string // 实例ID } func defaultOptions() Options { + + u, _ := uuid.NewV7() + return Options{ getInterval: time.Second * 2, updateInterval: time.Second * 4, expireTime: time.Second * 8, logger: logger.NewLogger(), + instanceId: u.String(), } } @@ -49,3 +56,15 @@ func WithUpdateInterval(d time.Duration) Option { o.getInterval = d / 3 } } + +func WithInstanceId(instanceId string) Option { + return func(o *Options) { + o.instanceId = instanceId + } +} + +func WithSource(s string) Option { + return func(o *Options) { + o.source = s + } +} diff --git a/priority/priority.go b/priority/priority.go index ccdfe32..0c63785 100644 --- a/priority/priority.go +++ b/priority/priority.go @@ -15,21 +15,23 @@ import ( // 多版本场景判断当前是否最新版本 type Priority struct { - ctx context.Context - cancel context.CancelFunc - priority int64 // 优先级 - redis redis.UniversalClient - redisKey string - logger logger.Logger - expireTime time.Duration + ctx context.Context // 上下文 + cancel context.CancelFunc // 取消函数 + priority int64 // 优先级 + redis redis.UniversalClient // redis + redisKey string // redis key + logger logger.Logger // 日志 + expireTime time.Duration // 过期时间 setInterval time.Duration // 尝试set的间隔 getInterval time.Duration // 尝试get的间隔 wg sync.WaitGroup - isLatest bool - latestMux sync.RWMutex + isLatest bool // 是否是最新版本 + latestMux sync.RWMutex // 最新版本锁 + + instanceId string // 实例ID } func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) (*Priority, error) { @@ -48,10 +50,11 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin priority: priority, redis: re, logger: conf.logger, - redisKey: "timer:priority_" + keyPrefix, + redisKey: "timer:priority_" + conf.source + keyPrefix, expireTime: conf.expireTime, setInterval: conf.updateInterval, getInterval: conf.getInterval, + instanceId: conf.instanceId, } pro.startDaemon()