From db1735de304312f8025c89cf181a38f7c3d1b781 Mon Sep 17 00:00:00 2001 From: Yun Date: Thu, 18 Sep 2025 16:23:04 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E7=BE=A4=E9=9C=80=E8=A6=81=E5=88=A4?= =?UTF-8?q?=E6=96=AD=E8=83=BD=E5=90=A6=E6=89=A7=E8=A1=8Creader?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/cluster.go b/cluster.go index 9853ea6..8aa2df4 100644 --- a/cluster.go +++ b/cluster.go @@ -56,7 +56,6 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin op := newOptions(opts...) ctx, cancel := context.WithCancel(ctx) - defer cancel() clu := &Cluster{ ctx: ctx, @@ -138,8 +137,17 @@ func (l *Cluster) leaderElection() { // 成为领导 func (l *Cluster) getLeaderLock() error { + + // 非当前优先级不用抢leader + if l.usePriority && !l.priority.IsLatest(l.ctx) { + return nil + } + + ctx, cancel := context.WithCancel(l.ctx) + defer cancel() + // 尝试加锁 - lock, err := lockx.NewGlobalLock(l.ctx, l.redis, l.leaderKey) + lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderKey) if err != nil { l.logger.Errorf(l.ctx, "getLeaderLock err:%+v", err) return err @@ -160,8 +168,30 @@ func (l *Cluster) getLeaderLock() error { l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue()) + go func() { + for { + select { + case <-ctx.Done(): + return + case <-l.stopChan: + return + default: + if !l.priority.IsLatest(l.ctx) { + cancel() + } + time.Sleep(100 * time.Millisecond) + } + } + }() + // 等待超时退出 <-lock.GetCtx().Done() + + // 已过期 + // l.leaderLock.Lock() + // l.isLeader = false + // l.leaderLock.Unlock() + return nil }