优化部分使用
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
@@ -12,19 +13,25 @@ import (
|
||||
|
||||
// 全局锁
|
||||
type GlobalLock struct {
|
||||
redis redis.UniversalClient
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
uniqueKey string
|
||||
value string
|
||||
isClosed bool
|
||||
closeLock sync.RWMutex
|
||||
options *option
|
||||
stopRefresh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
redis redis.UniversalClient
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
uniqueKey string
|
||||
value string
|
||||
isClosed bool
|
||||
closeLock sync.RWMutex
|
||||
options *option
|
||||
refreshErrCount int64 // 刷新错误次数
|
||||
}
|
||||
|
||||
func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string, opts ...Option) (*GlobalLock, error) {
|
||||
if uniqueKey == "" {
|
||||
return nil, fmt.Errorf("uniqueKey is empty")
|
||||
}
|
||||
if red == nil {
|
||||
return nil, fmt.Errorf("redis is nil")
|
||||
}
|
||||
|
||||
options := defaultOption()
|
||||
for _, opt := range opts {
|
||||
opt(options)
|
||||
@@ -39,13 +46,12 @@ func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey str
|
||||
}
|
||||
|
||||
return &GlobalLock{
|
||||
redis: red,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
uniqueKey: uniqueKey,
|
||||
value: u.String(),
|
||||
stopRefresh: make(chan struct{}),
|
||||
options: options,
|
||||
redis: red,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
uniqueKey: uniqueKey,
|
||||
value: u.String(),
|
||||
options: options,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -109,17 +115,11 @@ func (g *GlobalLock) Try() (bool, error) {
|
||||
|
||||
// 删除锁
|
||||
func (g *GlobalLock) Unlock() error {
|
||||
g.closeLock.Lock()
|
||||
defer g.closeLock.Unlock()
|
||||
|
||||
if g.isClosed {
|
||||
// 已经关闭就不需要重复关闭
|
||||
if g.setOrGetClose() {
|
||||
g.options.logger.Infof(g.ctx, "global lock already closed, key: %s, value: %s", g.uniqueKey, g.value)
|
||||
return nil
|
||||
}
|
||||
g.isClosed = true
|
||||
|
||||
// 停止刷新goroutine
|
||||
close(g.stopRefresh)
|
||||
g.wg.Wait()
|
||||
|
||||
script := `
|
||||
if redis.call('get', KEYS[1]) == ARGV[1] then
|
||||
@@ -129,7 +129,10 @@ func (g *GlobalLock) Unlock() error {
|
||||
end
|
||||
`
|
||||
|
||||
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result()
|
||||
// 避免上游已取消执行不了
|
||||
ctx := context.WithoutCancel(g.ctx)
|
||||
|
||||
resp, err := g.redis.Eval(ctx, script, []string{g.uniqueKey}, g.value).Result()
|
||||
if err != nil {
|
||||
if g.options.logger != nil {
|
||||
g.options.logger.Infof(g.ctx, "global unlock may have failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value)
|
||||
@@ -137,8 +140,6 @@ func (g *GlobalLock) Unlock() error {
|
||||
// 即使删除失败也继续执行,因为锁可能会自动过期
|
||||
}
|
||||
|
||||
g.cancel()
|
||||
|
||||
if delCount, ok := resp.(int64); ok && delCount == 1 {
|
||||
return nil
|
||||
}
|
||||
@@ -147,9 +148,8 @@ func (g *GlobalLock) Unlock() error {
|
||||
|
||||
// 启动刷新goroutine
|
||||
func (g *GlobalLock) startRefresh() {
|
||||
g.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer g.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(g.options.RefreshPeriod)
|
||||
defer ticker.Stop()
|
||||
@@ -157,12 +157,9 @@ func (g *GlobalLock) startRefresh() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if !g.refreshExec() {
|
||||
return
|
||||
}
|
||||
case <-g.stopRefresh:
|
||||
return
|
||||
g.refreshExec()
|
||||
case <-g.ctx.Done():
|
||||
g.options.logger.Infof(g.ctx, "global lock refresh canceled, key: %s, value: %s", g.uniqueKey, g.value)
|
||||
g.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -170,12 +167,26 @@ func (g *GlobalLock) startRefresh() {
|
||||
}()
|
||||
}
|
||||
|
||||
// 返回原来的
|
||||
func (l *GlobalLock) setOrGetClose() (readyClosed bool) {
|
||||
l.closeLock.Lock()
|
||||
defer l.closeLock.Unlock()
|
||||
|
||||
if l.isClosed {
|
||||
return true
|
||||
}
|
||||
|
||||
l.isClosed = true
|
||||
|
||||
l.cancel()
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// 执行刷新操作
|
||||
func (g *GlobalLock) refreshExec() bool {
|
||||
g.closeLock.RLock()
|
||||
defer g.closeLock.RUnlock()
|
||||
|
||||
if g.isClosed {
|
||||
if g.IsClosed() {
|
||||
g.options.logger.Infof(g.ctx, "global lock already closed, key: %s, value: %s", g.uniqueKey, g.value)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -193,6 +204,13 @@ func (g *GlobalLock) refreshExec() bool {
|
||||
if g.options.logger != nil {
|
||||
g.options.logger.Errorf(g.ctx, "global refresh failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value)
|
||||
}
|
||||
|
||||
newCount := atomic.AddInt64(&g.refreshErrCount, 1)
|
||||
if newCount >= 3 {
|
||||
// 关闭锁
|
||||
g.setOrGetClose()
|
||||
g.options.logger.Errorf(g.ctx, "global refresh failed, lock may be lost 3 times, key: %s, value: %s", g.uniqueKey, g.value)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -200,6 +218,8 @@ func (g *GlobalLock) refreshExec() bool {
|
||||
if g.options.logger != nil {
|
||||
g.options.logger.Errorf(g.ctx, "global refresh failed, lock may be lost, key: %s, value: %s", g.uniqueKey, g.value)
|
||||
}
|
||||
// 关闭锁
|
||||
g.setOrGetClose()
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user