定时器改版优化

This commit is contained in:
Yun
2025-09-17 19:12:55 +08:00
parent f11fbd1703
commit 4a131290d6
5 changed files with 189 additions and 94 deletions
+2 -2
View File
@@ -20,9 +20,9 @@ func Init(ctx context.Context, redis redis.UniversalClient, opts ...Option) erro
// 新起一个锁对象 // 新起一个锁对象
// 先Init后New再Lock // 先Init后New再Lock
func New(ctx context.Context, uniqueKey string) (*globalLock, error) { func New(ctx context.Context, uniqueKey string) (*GlobalLock, error) {
if redisConn == nil { if redisConn == nil {
return nil, fmt.Errorf("redis client is nil") return nil, fmt.Errorf("redis client is nil")
} }
return NewGlobalLock(ctx, redisConn, uniqueKey), nil return NewGlobalLock(ctx, redisConn, uniqueKey, globalOpts...)
} }
+1 -1
View File
@@ -27,7 +27,7 @@ func TestSimpleLock(t *testing.T) {
t.Log(err) t.Log(err)
return return
} }
if l.Lock() { if b, _ := l.Lock(); b {
fmt.Println("lock success") fmt.Println("lock success")
l.Unlock() l.Unlock()
} }
+141 -59
View File
@@ -2,6 +2,8 @@ package lockx
import ( import (
"context" "context"
"fmt"
"sync"
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
@@ -9,98 +11,158 @@ import (
) )
// 全局锁 // 全局锁
type globalLock struct { type GlobalLock struct {
redis redis.UniversalClient redis redis.UniversalClient
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
uniqueKey string uniqueKey string
value string value string
isClosed bool
closeLock sync.RWMutex
options *option
stopRefresh chan struct{}
wg sync.WaitGroup
} }
func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string) *globalLock { func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string, opts ...Option) (*GlobalLock, error) {
ctx, cancel := context.WithTimeout(ctx, opt.lockTimeout) options := defaultOption()
for _, opt := range opts {
opt(options)
}
u, _ := uuid.NewV7() ctx, cancel := context.WithTimeout(ctx, options.lockTimeout)
return &globalLock{ u, err := uuid.NewV7()
if err != nil {
cancel()
return nil, fmt.Errorf("failed to generate UUID: %w", err)
}
return &GlobalLock{
redis: red, redis: red,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
uniqueKey: uniqueKey, uniqueKey: uniqueKey,
value: u.String(), value: u.String(),
} stopRefresh: make(chan struct{}),
options: options,
}, nil
} }
// 获取上下文 // 获取上下文
func (l *globalLock) GetCtx() context.Context { func (l *GlobalLock) GetCtx() context.Context {
return l.ctx return l.ctx
} }
// 获取锁 // 获取锁
func (g *globalLock) Lock() bool { func (g *GlobalLock) Lock() (bool, error) {
script := ` script := `
local token = redis.call('get',KEYS[1]) if redis.call('set', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then
if token == false return 'OK'
then else
return redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) local current_val = redis.call('get', KEYS[1])
end if current_val == ARGV[1] then
redis.call('expire', KEYS[1], ARGV[2])
return 'OK'
else
return 'ERROR' return 'ERROR'
end
end
` `
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, int(g.options.Expiry.Seconds())).Result()
if resp != "OK" { if err != nil {
opt.logger.Errorf(g.ctx, "global lock err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value) if g.options.logger != nil {
g.options.logger.Errorf(g.ctx, "global lock failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value)
} }
return false, err
}
if resp == "OK" { if resp == "OK" {
g.refresh() g.startRefresh()
return true return true, nil
} }
return false
return false, nil
} }
// 尝试获取锁 // 尝试获取锁
func (g *globalLock) Try(limitTimes int) bool { func (g *GlobalLock) Try() (bool, error) {
for i := 0; i < limitTimes; i++ { for i := 0; i < g.options.MaxRetryTimes; i++ {
if g.Lock() { success, err := g.Lock()
return true if err != nil {
return false, err
} }
time.Sleep(time.Millisecond * 100) if success {
return true, nil
} }
return false
select {
case <-time.After(g.options.RetryInterval):
continue
case <-g.ctx.Done():
return false, g.ctx.Err()
}
}
return false, nil
} }
// 删除锁 // 删除锁
func (g *globalLock) Unlock() bool { func (g *GlobalLock) Unlock() error {
g.closeLock.Lock()
defer g.closeLock.Unlock()
if g.isClosed {
return nil
}
g.isClosed = true
// 停止刷新goroutine
close(g.stopRefresh)
g.wg.Wait()
script := ` script := `
local token = redis.call('get',KEYS[1]) if redis.call('get', KEYS[1]) == ARGV[1] then
if token == ARGV[1] return redis.call('del', KEYS[1])
then else
redis.call('del',KEYS[1]) return 0
return 'OK'
end end
return 'ERROR'
` `
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result() resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result()
if resp != "OK" { if err != nil {
opt.logger.Errorf(g.ctx, "global Unlock err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value) if g.options.logger != nil {
g.options.logger.Infof(g.ctx, "global unlock may have failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value)
} }
// 即使删除失败也继续执行,因为锁可能会自动过期
}
g.cancel() g.cancel()
return true
if delCount, ok := resp.(int64); ok && delCount == 1 {
return nil
}
return fmt.Errorf("lock was already released or owned by another client")
} }
// 刷新锁 // 启动刷新goroutine
func (g *globalLock) refresh() { func (g *GlobalLock) startRefresh() {
g.wg.Add(1)
go func() { go func() {
t := time.NewTicker(time.Second) defer g.wg.Done()
ticker := time.NewTicker(g.options.RefreshPeriod)
defer ticker.Stop()
for { for {
select { select {
case <-t.C: case <-ticker.C:
g.refreshExec() if !g.refreshExec() {
return
}
case <-g.stopRefresh:
return
case <-g.ctx.Done(): case <-g.ctx.Done():
t.Stop()
g.Unlock() g.Unlock()
return return
} }
@@ -108,25 +170,45 @@ func (g *globalLock) refresh() {
}() }()
} }
func (g *globalLock) refreshExec() bool { // 执行刷新操作
script := ` func (g *GlobalLock) refreshExec() bool {
local token = redis.call('get',KEYS[1]) g.closeLock.RLock()
if token == ARGV[1] defer g.closeLock.RUnlock()
then
redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2])
return 'OK'
end
return 'ERROR'
`
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() if g.isClosed {
if err != nil {
opt.logger.Errorf(g.ctx, "global refresh err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value)
}
if resp == "ERROR" {
opt.logger.Errorf(g.ctx, "global refresh err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value)
g.Unlock()
return false return false
} }
script := `
if redis.call('get', KEYS[1]) == ARGV[1] then
redis.call('expire', KEYS[1], ARGV[2])
return 1
else
return 0
end
`
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, int(g.options.Expiry.Seconds())).Result()
if err != nil {
if g.options.logger != nil {
g.options.logger.Errorf(g.ctx, "global refresh failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value)
}
return false
}
if refreshed, ok := resp.(int64); !ok || refreshed != 1 {
if g.options.logger != nil {
g.options.logger.Errorf(g.ctx, "global refresh failed, lock may be lost, key: %s, value: %s", g.uniqueKey, g.value)
}
return false
}
return true return true
} }
// 检查锁是否已关闭
func (g *GlobalLock) IsClosed() bool {
g.closeLock.RLock()
defer g.closeLock.RUnlock()
return g.isClosed
}
+10 -5
View File
@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"testing" "testing"
"time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/yuninks/lockx" "github.com/yuninks/lockx"
@@ -43,18 +44,22 @@ func TestLockx(t *testing.T) {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i := 0; i < 10000; i++ { for i := 0; i < 20; i++ {
wg.Add(1) wg.Add(1)
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
lock := lockx.NewGlobalLock(ctx, client, "lockx:test") lock, _ := lockx.NewGlobalLock(ctx, client, "lockx:test")
defer lock.Unlock() if b, _ := lock.Lock(); !b {
if !lock.Lock() {
fmt.Println("lock error", i) fmt.Println("lock error", i)
return return
} }
fmt.Println("ssss", i) defer lock.Unlock()
fmt.Println("ssss2", i)
time.Sleep(time.Second * 2)
}(i) }(i)
time.Sleep(time.Second)
} }
wg.Wait() wg.Wait()
+20 -12
View File
@@ -8,46 +8,54 @@ import (
type option struct { type option struct {
lockTimeout time.Duration // 锁的超时时间 lockTimeout time.Duration // 锁的超时时间
Expiry time.Duration // 单次刷新有效时间
MaxRetryTimes int // 尝试次数
RetryInterval time.Duration // 尝试间隔
RefreshPeriod time.Duration // 刷新间隔
logger Logger // 日志 logger Logger // 日志
} }
func defaultOption() *option { func defaultOption() *option {
return &option{ return &option{
lockTimeout: time.Minute * 60, lockTimeout: time.Minute * 60,
Expiry: 5 * time.Second,
RefreshPeriod: 1 * time.Second,
MaxRetryTimes: 3,
RetryInterval: 100 * time.Millisecond,
logger: &print{}, logger: &print{},
} }
} }
var opt *option var globalOpts []Option
func init() {
opt = defaultOption()
}
// 设置 // 设置
func InitOption(opts ...Option) { func InitOption(opts ...Option) {
for _, app := range opts { globalOpts = opts
app(opt)
}
} }
type Option func(*option) type Option func(*option)
func SetTimeout(t time.Duration) Option { func WithTimeout(t time.Duration) Option {
return func(o *option) { return func(o *option) {
o.lockTimeout = t o.lockTimeout = t
} }
} }
func SetLogger(logger Logger) Option { func WithLogger(logger Logger) Option {
return func(o *option) { return func(o *option) {
o.logger = logger o.logger = logger
} }
} }
func WithExpiry(expiry time.Duration) Option {
return func(o *option) {
o.Expiry = expiry
}
}
type Logger interface { type Logger interface {
Errorf(ctx context.Context, format string, v ...any) Errorf(ctx context.Context, format string, v ...any)
Printf(ctx context.Context, format string, v ...any) Infof(ctx context.Context, format string, v ...any)
} }
type print struct{} type print struct{}
@@ -56,6 +64,6 @@ func (*print) Errorf(ctx context.Context, format string, v ...any) {
log.Printf(format, v...) log.Printf(format, v...)
} }
func (*print) Printf(ctx context.Context, format string, v ...any) { func (*print) Infof(ctx context.Context, format string, v ...any) {
log.Printf(format, v...) log.Printf(format, v...)
} }