diff --git a/cluster.go b/cluster.go index 9853ea6..8aa2df4 100644 --- a/cluster.go +++ b/cluster.go @@ -56,7 +56,6 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin op := newOptions(opts...) ctx, cancel := context.WithCancel(ctx) - defer cancel() clu := &Cluster{ ctx: ctx, @@ -138,8 +137,17 @@ func (l *Cluster) leaderElection() { // 成为领导 func (l *Cluster) getLeaderLock() error { + + // 非当前优先级不用抢leader + if l.usePriority && !l.priority.IsLatest(l.ctx) { + return nil + } + + ctx, cancel := context.WithCancel(l.ctx) + defer cancel() + // 尝试加锁 - lock, err := lockx.NewGlobalLock(l.ctx, l.redis, l.leaderKey) + lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderKey) if err != nil { l.logger.Errorf(l.ctx, "getLeaderLock err:%+v", err) return err @@ -160,8 +168,30 @@ func (l *Cluster) getLeaderLock() error { l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue()) + go func() { + for { + select { + case <-ctx.Done(): + return + case <-l.stopChan: + return + default: + if !l.priority.IsLatest(l.ctx) { + cancel() + } + time.Sleep(100 * time.Millisecond) + } + } + }() + // 等待超时退出 <-lock.GetCtx().Done() + + // 已过期 + // l.leaderLock.Lock() + // l.isLeader = false + // l.leaderLock.Unlock() + return nil }