集群需要判断能否执行reader
This commit is contained in:
+32
-2
@@ -56,7 +56,6 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
||||
op := newOptions(opts...)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
clu := &Cluster{
|
||||
ctx: ctx,
|
||||
@@ -138,8 +137,17 @@ func (l *Cluster) leaderElection() {
|
||||
|
||||
// 成为领导
|
||||
func (l *Cluster) getLeaderLock() error {
|
||||
|
||||
// 非当前优先级不用抢leader
|
||||
if l.usePriority && !l.priority.IsLatest(l.ctx) {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(l.ctx)
|
||||
defer cancel()
|
||||
|
||||
// 尝试加锁
|
||||
lock, err := lockx.NewGlobalLock(l.ctx, l.redis, l.leaderKey)
|
||||
lock, err := lockx.NewGlobalLock(ctx, l.redis, l.leaderKey)
|
||||
if err != nil {
|
||||
l.logger.Errorf(l.ctx, "getLeaderLock err:%+v", err)
|
||||
return err
|
||||
@@ -160,8 +168,30 @@ func (l *Cluster) getLeaderLock() error {
|
||||
|
||||
l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue())
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-l.stopChan:
|
||||
return
|
||||
default:
|
||||
if !l.priority.IsLatest(l.ctx) {
|
||||
cancel()
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// 等待超时退出
|
||||
<-lock.GetCtx().Done()
|
||||
|
||||
// 已过期
|
||||
// l.leaderLock.Lock()
|
||||
// l.isLeader = false
|
||||
// l.leaderLock.Unlock()
|
||||
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user