Files
lockx/lockx.go
T

219 lines
4.3 KiB
Go
Raw Normal View History

2023-11-26 20:42:30 +08:00
package lockx
import (
"context"
2025-09-17 19:12:55 +08:00
"fmt"
"sync"
2023-11-26 20:42:30 +08:00
"time"
"github.com/go-redis/redis/v8"
2025-06-25 16:44:44 +08:00
"github.com/google/uuid"
2023-11-26 20:42:30 +08:00
)
// 全局锁
2025-09-17 19:12:55 +08:00
type GlobalLock struct {
redis redis.UniversalClient
ctx context.Context
cancel context.CancelFunc
uniqueKey string
value string
isClosed bool
closeLock sync.RWMutex
options *option
stopRefresh chan struct{}
wg sync.WaitGroup
2023-11-26 20:42:30 +08:00
}
2025-09-17 19:12:55 +08:00
func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string, opts ...Option) (*GlobalLock, error) {
options := defaultOption()
for _, opt := range opts {
opt(options)
}
2025-06-25 16:44:44 +08:00
2025-09-17 19:12:55 +08:00
ctx, cancel := context.WithTimeout(ctx, options.lockTimeout)
2025-06-25 16:44:44 +08:00
2025-09-17 19:12:55 +08:00
u, err := uuid.NewV7()
if err != nil {
cancel()
return nil, fmt.Errorf("failed to generate UUID: %w", err)
2023-11-26 20:42:30 +08:00
}
2025-09-17 19:12:55 +08:00
return &GlobalLock{
redis: red,
ctx: ctx,
cancel: cancel,
uniqueKey: uniqueKey,
value: u.String(),
stopRefresh: make(chan struct{}),
options: options,
}, nil
2023-11-26 20:42:30 +08:00
}
2025-09-17 18:12:29 +08:00
// 获取上下文
2025-09-17 19:12:55 +08:00
func (l *GlobalLock) GetCtx() context.Context {
2025-09-17 18:12:29 +08:00
return l.ctx
}
2023-11-26 20:42:30 +08:00
// 获取锁
2025-09-17 19:12:55 +08:00
func (g *GlobalLock) Lock() (bool, error) {
2023-11-26 20:42:30 +08:00
script := `
2025-09-17 19:12:55 +08:00
if redis.call('set', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then
return 'OK'
else
local current_val = redis.call('get', KEYS[1])
if current_val == ARGV[1] then
redis.call('expire', KEYS[1], ARGV[2])
return 'OK'
else
return 'ERROR'
end
2023-11-26 20:42:30 +08:00
end
`
2025-09-17 19:12:55 +08:00
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, int(g.options.Expiry.Seconds())).Result()
if err != nil {
if g.options.logger != nil {
g.options.logger.Errorf(g.ctx, "global lock failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value)
}
return false, err
2023-11-26 20:42:30 +08:00
}
2025-09-17 19:12:55 +08:00
2023-11-26 20:42:30 +08:00
if resp == "OK" {
2025-09-17 19:12:55 +08:00
g.startRefresh()
return true, nil
2023-11-26 20:42:30 +08:00
}
2025-09-17 19:12:55 +08:00
return false, nil
2023-11-26 20:42:30 +08:00
}
// 尝试获取锁
2025-09-17 19:12:55 +08:00
func (g *GlobalLock) Try() (bool, error) {
for i := 0; i < g.options.MaxRetryTimes; i++ {
success, err := g.Lock()
if err != nil {
return false, err
}
if success {
return true, nil
}
select {
case <-time.After(g.options.RetryInterval):
continue
case <-g.ctx.Done():
return false, g.ctx.Err()
2023-11-26 20:42:30 +08:00
}
}
2025-09-17 19:12:55 +08:00
return false, nil
2023-11-26 20:42:30 +08:00
}
// 删除锁
2025-09-17 19:12:55 +08:00
func (g *GlobalLock) Unlock() error {
g.closeLock.Lock()
defer g.closeLock.Unlock()
if g.isClosed {
return nil
}
g.isClosed = true
// 停止刷新goroutine
close(g.stopRefresh)
g.wg.Wait()
2023-11-26 20:42:30 +08:00
script := `
2025-09-17 19:12:55 +08:00
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
2023-11-26 20:42:30 +08:00
end
`
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result()
2025-09-17 19:12:55 +08:00
if err != nil {
if g.options.logger != nil {
g.options.logger.Infof(g.ctx, "global unlock may have failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value)
}
// 即使删除失败也继续执行,因为锁可能会自动过期
2023-11-26 20:42:30 +08:00
}
2025-09-17 19:12:55 +08:00
2023-11-26 20:42:30 +08:00
g.cancel()
2025-09-17 19:12:55 +08:00
if delCount, ok := resp.(int64); ok && delCount == 1 {
return nil
}
return fmt.Errorf("lock was already released or owned by another client")
2023-11-26 20:42:30 +08:00
}
2025-09-17 19:12:55 +08:00
// 启动刷新goroutine
func (g *GlobalLock) startRefresh() {
g.wg.Add(1)
2023-11-26 20:42:30 +08:00
go func() {
2025-09-17 19:12:55 +08:00
defer g.wg.Done()
ticker := time.NewTicker(g.options.RefreshPeriod)
defer ticker.Stop()
2023-11-26 20:42:30 +08:00
for {
select {
2025-09-17 19:12:55 +08:00
case <-ticker.C:
if !g.refreshExec() {
return
}
case <-g.stopRefresh:
return
2023-11-26 20:42:30 +08:00
case <-g.ctx.Done():
2025-09-17 18:12:29 +08:00
g.Unlock()
2023-11-26 20:42:30 +08:00
return
}
}
}()
}
2025-09-17 19:12:55 +08:00
// 执行刷新操作
func (g *GlobalLock) refreshExec() bool {
g.closeLock.RLock()
defer g.closeLock.RUnlock()
if g.isClosed {
return false
}
2023-11-26 20:42:30 +08:00
script := `
2025-09-17 19:12:55 +08:00
if redis.call('get', KEYS[1]) == ARGV[1] then
redis.call('expire', KEYS[1], ARGV[2])
return 1
else
return 0
2023-11-26 20:42:30 +08:00
end
`
2025-09-17 19:12:55 +08:00
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, int(g.options.Expiry.Seconds())).Result()
2025-09-17 18:12:29 +08:00
if err != nil {
2025-09-17 19:12:55 +08:00
if g.options.logger != nil {
g.options.logger.Errorf(g.ctx, "global refresh failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value)
}
return false
2025-09-17 18:12:29 +08:00
}
2025-09-17 19:12:55 +08:00
if refreshed, ok := resp.(int64); !ok || refreshed != 1 {
if g.options.logger != nil {
g.options.logger.Errorf(g.ctx, "global refresh failed, lock may be lost, key: %s, value: %s", g.uniqueKey, g.value)
}
2024-06-21 00:20:54 +08:00
return false
2023-11-26 20:42:30 +08:00
}
2025-09-17 19:12:55 +08:00
2024-06-21 00:20:54 +08:00
return true
2023-11-26 20:42:30 +08:00
}
2025-09-17 19:12:55 +08:00
// 检查锁是否已关闭
func (g *GlobalLock) IsClosed() bool {
g.closeLock.RLock()
defer g.closeLock.RUnlock()
return g.isClosed
}
2025-09-17 19:30:18 +08:00
func (l *GlobalLock) GetValue() string {
return l.value
}