修改集群模式使用封装的方法

This commit is contained in:
Yun
2025-10-04 20:44:16 +08:00
parent 81ce4f67d3
commit 14eb90bf7d
8 changed files with 136 additions and 218 deletions
+81 -201
View File
@@ -13,6 +13,8 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/yuninks/cachex" "github.com/yuninks/cachex"
"github.com/yuninks/lockx" "github.com/yuninks/lockx"
"github.com/yuninks/timerx/heartbeat"
"github.com/yuninks/timerx/leader"
"github.com/yuninks/timerx/logger" "github.com/yuninks/timerx/logger"
"github.com/yuninks/timerx/priority" "github.com/yuninks/timerx/priority"
) )
@@ -26,7 +28,6 @@ type Cluster struct {
ctx context.Context // context ctx context.Context // context
cancel context.CancelFunc // 取消函数 cancel context.CancelFunc // 取消函数
redis redis.UniversalClient // redis redis redis.UniversalClient // redis
cache *cachex.Cache // 本地缓存
timeout time.Duration // job执行超时时间 timeout time.Duration // job执行超时时间
logger logger.Logger // 日志 logger logger.Logger // 日志
keyPrefix string // key前缀 keyPrefix string // key前缀
@@ -36,21 +37,20 @@ type Cluster struct {
zsetKey string // 有序集合的key zsetKey string // 有序集合的key
listKey string // 可执行的任务列表的key listKey string // 可执行的任务列表的key
setKey string // 重入集合的key setKey string // 重入集合的key
heartbeatKey string // 心跳的Key
leaderKey string // 上报当前的Leader
executeInfoKey string // 执行情况的key executeInfoKey string // 执行情况的key
wg sync.WaitGroup // 等待组
workerList sync.Map // 注册的任务列表
stopChan chan struct{} //
instanceId string // 实例ID
priority *priority.Priority // 全局优先级 priority *priority.Priority // 全局优先级
priorityKey string // 全局优先级的key priorityKey string // 全局优先级的key
usePriority bool // 是否使用优先级 usePriority bool // 是否使用优先级
wg sync.WaitGroup // 等待组 leader *leader.Leader // Leader
isLeader bool // 是否是领导 heartbeat *heartbeat.HeartBeat // 心跳
leaderLock sync.RWMutex // 领导锁 cache *cachex.Cache // 本地缓存
leaderUniLockKey string // 领导唯一锁
workerList sync.Map // 注册的任务列表
stopChan chan struct{} //
instanceId string // 实例ID
} }
// 初始化定时器 // 初始化定时器
@@ -68,33 +68,38 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
U, _ := uuid.NewV7() U, _ := uuid.NewV7()
clu := &Cluster{ clu := &Cluster{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
redis: red, redis: red,
cache: cachex.NewCache(), cache: cachex.NewCache(),
timeout: op.timeout, timeout: op.timeout,
logger: op.logger, logger: op.logger,
keyPrefix: keyPrefix, keyPrefix: keyPrefix,
location: op.location, location: op.location,
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
listKey: "timer:cluster_listKey" + keyPrefix, // 列表 listKey: "timer:cluster_listKey" + keyPrefix, // 列表
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
leaderUniLockKey: "timer:cluster_leaderUniLockKey" + keyPrefix, // 领导唯一锁 executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合
leaderKey: "timer:cluster_leaderKey" + keyPrefix, // 上报当前Leader usePriority: op.usePriority,
heartbeatKey: "timer:cluster_heartbeatKey" + keyPrefix, // 心跳 有序集合 stopChan: make(chan struct{}),
executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合 instanceId: U.String(),
usePriority: op.usePriority,
stopChan: make(chan struct{}),
instanceId: U.String(),
} }
// 初始化优先级 // 初始化优先级
if clu.usePriority { if clu.usePriority {
pri, err := priority.InitPriority(ctx, red, clu.priorityKey, op.priorityVal, priority.WithLogger(clu.logger)) pri, err := priority.InitPriority(
ctx,
red,
clu.priorityKey,
op.priorityVal,
priority.WithLogger(clu.logger),
priority.WithInstanceId(clu.instanceId),
priority.WithSource("cluster"),
)
if err != nil { if err != nil {
clu.logger.Errorf(ctx, "InitPriority err:%v", err) clu.logger.Errorf(ctx, "InitPriority err:%v", err)
return nil, err return nil, err
@@ -102,6 +107,39 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
clu.priority = pri clu.priority = pri
} }
// 初始化leader
le, err := leader.InitLeader(
ctx,
clu.redis,
keyPrefix,
leader.WithLogger(clu.logger),
leader.WithPriority(clu.priority),
leader.WithInstanceId(clu.instanceId),
leader.WithSource("cluster"),
)
if err != nil {
clu.logger.Infof(ctx, "InitLeader err:%v", err)
return nil, err
}
clu.leader = le
// 初始化心跳
heart, err := heartbeat.InitHeartBeat(
ctx,
clu.redis,
clu.keyPrefix,
heartbeat.WithInstanceId(clu.instanceId),
heartbeat.WithLeader(clu.leader),
heartbeat.WithLogger(clu.logger),
heartbeat.WithPriority(clu.priority),
heartbeat.WithSource("once"),
)
if err != nil {
clu.logger.Errorf(ctx, "InitHeartBeat err:%v", err)
return nil, err
}
clu.heartbeat = heart
// 启动守护进程 // 启动守护进程
clu.startDaemon() clu.startDaemon()
@@ -119,25 +157,13 @@ func (l *Cluster) Stop() {
if l.usePriority && l.priority != nil { if l.usePriority && l.priority != nil {
l.priority.Close() l.priority.Close()
} }
l.cleanHeartbeat(true)
l.wg.Wait() l.wg.Wait()
} }
// 守护任务 // 守护任务
func (l *Cluster) startDaemon() { func (l *Cluster) startDaemon() {
// 领导选举
l.wg.Add(1)
go l.leaderElection()
// 心跳上报
l.wg.Add(1)
go l.heartbeatLoop()
// 心跳过期清理
l.wg.Add(1)
go l.cleanHeartbeatLoop()
// 任务调度 // 任务调度
l.wg.Add(1) l.wg.Add(1)
go l.scheduleTasks() go l.scheduleTasks()
@@ -146,17 +172,15 @@ func (l *Cluster) startDaemon() {
l.wg.Add(1) l.wg.Add(1)
go l.executeTasks() go l.executeTasks()
l.wg.Add(1)
go l.cleanExecuteInfoLoop()
} }
// 心跳上报 func (l *Cluster) cleanExecuteInfoLoop() {
// 需要确定当前存活的实例&当前实例是否是领导 l.wg.Done()
func (l *Cluster) heartbeatLoop() {
defer l.wg.Done()
// 先执行一次 ticker := time.NewTicker(time.Minute * 5)
l.heartbeat()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop() defer ticker.Stop()
for { for {
@@ -166,164 +190,20 @@ func (l *Cluster) heartbeatLoop() {
case <-l.ctx.Done(): case <-l.ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
l.heartbeat() if l.leader.IsLeader() {
} l.cleanExecuteInfo()
}
}
// 单次心跳
func (l *Cluster) heartbeat() error {
err := l.redis.ZAdd(l.ctx, l.heartbeatKey, &redis.Z{
Score: float64(time.Now().UnixMilli()),
Member: l.instanceId,
}).Err()
if err != nil {
l.logger.Errorf(l.ctx, "heartbeat redis.ZAdd err:%v", err)
return err
}
return nil
}
// 心跳清理(leader可操作)
func (l *Cluster) cleanHeartbeatLoop() {
defer l.wg.Done()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-l.stopChan:
return
case <-l.ctx.Done():
return
case <-ticker.C:
if l.isCurrentLeader() {
l.cleanHeartbeat(false)
} }
} }
} }
} }
// 单次清理 // 清除过期任务
func (l *Cluster) cleanHeartbeat(cleanSelf bool) error { func (l *Cluster) cleanExecuteInfo() error {
if cleanSelf {
return l.redis.ZRem(l.ctx, l.heartbeatKey, l.instanceId).Err()
}
// 移除心跳
l.redis.ZRemRangeByScore(l.ctx, l.heartbeatKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Second).UnixMilli(), 10)).Err()
// 移除执行信息 // 移除执行信息
l.redis.ZRemRangeByScore(l.ctx, l.executeInfoKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Minute).UnixMilli(), 10)).Err() l.redis.ZRemRangeByScore(l.ctx, l.executeInfoKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Minute).UnixMilli(), 10)).Err()
return nil return nil
} }
// 领导选举
// 领导作用:全局推选一个人计算执行时间&移入队列,避免每个都进行计算浪费资源
func (l *Cluster) leaderElection() {
defer l.wg.Done()
// 先执行一次
l.getLeaderLock()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
l.getLeaderLock()
case <-l.stopChan:
return
case <-l.ctx.Done():
return
}
}
}
// 成为领导
func (l *Cluster) getLeaderLock() error {
// 非当前优先级不用抢leader
if l.usePriority && !l.priority.IsLatest(l.ctx) {
return nil
}
ctx, cancel := context.WithCancel(l.ctx)
defer cancel()
// 尝试加锁
lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderUniLockKey)
if err != nil {
l.logger.Errorf(l.ctx, "getLeaderLock err:%+v", err)
return err
}
if b, _ := lock.Lock(); !b {
// 加锁失败 非Reader
l.leaderLock.Lock()
l.isLeader = false
l.leaderLock.Unlock()
return nil
}
defer lock.Unlock()
// 加锁成功
l.leaderLock.Lock()
l.isLeader = true
l.leaderLock.Unlock()
// 上报当前的Leader实例
l.redis.Set(l.ctx, l.leaderKey, l.instanceId, time.Hour*24)
l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue())
go func() {
if !l.usePriority || l.priority == nil {
return
}
for {
select {
case <-ctx.Done():
return
case <-l.stopChan:
return
default:
if !l.priority.IsLatest(l.ctx) {
cancel()
return
}
time.Sleep(100 * time.Millisecond)
}
}
}()
// 等待超时退出
<-lock.GetCtx().Done()
// 已过期
// l.leaderLock.Lock()
// l.isLeader = false
// l.leaderLock.Unlock()
return nil
}
// isCurrentLeader 检查当前实例是否是leader
func (c *Cluster) isCurrentLeader() bool {
c.leaderLock.RLock()
defer c.leaderLock.RUnlock()
return c.isLeader
}
// scheduleTasks 调度任务(只有leader执行) // scheduleTasks 调度任务(只有leader执行)
func (c *Cluster) scheduleTasks() { func (c *Cluster) scheduleTasks() {
defer c.wg.Done() defer c.wg.Done()
@@ -334,7 +214,7 @@ func (c *Cluster) scheduleTasks() {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if !c.isCurrentLeader() { if !c.leader.IsLeader() {
continue continue
} }
if c.usePriority && !c.priority.IsLatest(c.ctx) { if c.usePriority && !c.priority.IsLatest(c.ctx) {
+1 -1
View File
@@ -47,7 +47,7 @@ func InitHeartBeat(ctx context.Context, ref redis.UniversalClient, keyPrefix str
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
heartbeatKey: "timer:heartbeat_key" + keyPrefix, heartbeatKey: "timer:heartbeat_key" + op.source + keyPrefix,
priority: op.priority, priority: op.priority,
redis: ref, redis: ref,
+10 -3
View File
@@ -8,10 +8,11 @@ import (
) )
type Options struct { type Options struct {
logger logger.Logger logger logger.Logger // 日志
instanceId string instanceId string // 实例ID
priority *priority.Priority // 全局优先级 priority *priority.Priority // 全局优先级
leader *leader.Leader leader *leader.Leader // Leader
source string // 来源服务
} }
func defaultOptions() Options { func defaultOptions() Options {
@@ -57,3 +58,9 @@ func WithInstanceId(instanceId string) Option {
o.instanceId = instanceId o.instanceId = instanceId
} }
} }
func WithSource(source string) Option {
return func(o *Options) {
o.source = source
}
}
+1 -1
View File
@@ -52,7 +52,7 @@ func InitLeader(ctx context.Context, ref redis.UniversalClient, keyPrefix string
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
redis: ref, redis: ref,
leaderUniLockKey: "timer:leader_lockKey" + keyPrefix, leaderUniLockKey: "timer:leader_lockKey" + op.source + keyPrefix,
priority: op.priority, priority: op.priority,
instanceId: op.instanceId, instanceId: op.instanceId,
logger: op.logger, logger: op.logger,
+7
View File
@@ -10,6 +10,7 @@ type Options struct {
logger logger.Logger logger logger.Logger
instanceId string instanceId string
priority *priority.Priority // 全局优先级 priority *priority.Priority // 全局优先级
source string // 来源服务
} }
func defaultOptions() Options { func defaultOptions() Options {
@@ -49,3 +50,9 @@ func WithInstanceId(instanceId string) Option {
o.instanceId = instanceId o.instanceId = instanceId
} }
} }
func WithSource(source string) Option {
return func(o *Options) {
o.source = source
}
}
+3 -1
View File
@@ -114,7 +114,8 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
// 初始化优先级 // 初始化优先级
if wo.usePriority { if wo.usePriority {
pri, err := priority.InitPriority(ctx, pri, err := priority.InitPriority(
ctx,
re, re,
keyPrefix, keyPrefix,
op.priorityVal, op.priorityVal,
@@ -151,6 +152,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
heartbeat.WithLeader(wo.leader), heartbeat.WithLeader(wo.leader),
heartbeat.WithLogger(wo.logger), heartbeat.WithLogger(wo.logger),
heartbeat.WithPriority(wo.priority), heartbeat.WithPriority(wo.priority),
heartbeat.WithSource("once"),
) )
if err != nil { if err != nil {
wo.logger.Errorf(ctx, "InitHeartBeat err:%v", err) wo.logger.Errorf(ctx, "InitHeartBeat err:%v", err)
+20 -1
View File
@@ -3,6 +3,7 @@ package priority
import ( import (
"time" "time"
"github.com/google/uuid"
"github.com/yuninks/timerx/logger" "github.com/yuninks/timerx/logger"
) )
@@ -10,15 +11,21 @@ type Options struct {
getInterval time.Duration // 查询周期 getInterval time.Duration // 查询周期
updateInterval time.Duration // 更新间隔 updateInterval time.Duration // 更新间隔
expireTime time.Duration // 有效时间 expireTime time.Duration // 有效时间
logger logger.Logger logger logger.Logger // 日志
source string // 来源服务
instanceId string // 实例ID
} }
func defaultOptions() Options { func defaultOptions() Options {
u, _ := uuid.NewV7()
return Options{ return Options{
getInterval: time.Second * 2, getInterval: time.Second * 2,
updateInterval: time.Second * 4, updateInterval: time.Second * 4,
expireTime: time.Second * 8, expireTime: time.Second * 8,
logger: logger.NewLogger(), logger: logger.NewLogger(),
instanceId: u.String(),
} }
} }
@@ -49,3 +56,15 @@ func WithUpdateInterval(d time.Duration) Option {
o.getInterval = d / 3 o.getInterval = d / 3
} }
} }
func WithInstanceId(instanceId string) Option {
return func(o *Options) {
o.instanceId = instanceId
}
}
func WithSource(s string) Option {
return func(o *Options) {
o.source = s
}
}
+13 -10
View File
@@ -15,21 +15,23 @@ import (
// 多版本场景判断当前是否最新版本 // 多版本场景判断当前是否最新版本
type Priority struct { type Priority struct {
ctx context.Context ctx context.Context // 上下文
cancel context.CancelFunc cancel context.CancelFunc // 取消函数
priority int64 // 优先级 priority int64 // 优先级
redis redis.UniversalClient redis redis.UniversalClient // redis
redisKey string redisKey string // redis key
logger logger.Logger logger logger.Logger // 日志
expireTime time.Duration expireTime time.Duration // 过期时间
setInterval time.Duration // 尝试set的间隔 setInterval time.Duration // 尝试set的间隔
getInterval time.Duration // 尝试get的间隔 getInterval time.Duration // 尝试get的间隔
wg sync.WaitGroup wg sync.WaitGroup
isLatest bool isLatest bool // 是否是最新版本
latestMux sync.RWMutex latestMux sync.RWMutex // 最新版本锁
instanceId string // 实例ID
} }
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) (*Priority, error) { func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) (*Priority, error) {
@@ -48,10 +50,11 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin
priority: priority, priority: priority,
redis: re, redis: re,
logger: conf.logger, logger: conf.logger,
redisKey: "timer:priority_" + keyPrefix, redisKey: "timer:priority_" + conf.source + keyPrefix,
expireTime: conf.expireTime, expireTime: conf.expireTime,
setInterval: conf.updateInterval, setInterval: conf.updateInterval,
getInterval: conf.getInterval, getInterval: conf.getInterval,
instanceId: conf.instanceId,
} }
pro.startDaemon() pro.startDaemon()