From cd0273f4b2297e88c231a5431881a397016f3617 Mon Sep 17 00:00:00 2001 From: Yun <995116474@qq.com> Date: Thu, 12 Dec 2024 11:30:24 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=94=81=E7=9A=84?= =?UTF-8?q?=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- locks.go | 28 ++++++++++++++++++++++++++++ locks_test.go | 34 ++++++++++++++++++++++++++++++++++ lockx.go | 2 ++ readme.md | 5 +++++ 4 files changed, 69 insertions(+) create mode 100644 locks.go create mode 100644 locks_test.go create mode 100644 readme.md diff --git a/locks.go b/locks.go new file mode 100644 index 0000000..3652e90 --- /dev/null +++ b/locks.go @@ -0,0 +1,28 @@ +package lockx + +import ( + "context" + "fmt" + + "github.com/go-redis/redis/v8" +) + +// 简单使用 + +var redisConn redis.UniversalClient + +// 初始化redis连接 +func Init(ctx context.Context, redis redis.UniversalClient, opts ...Option) error { + redisConn = redis + InitOption(opts...) + return nil +} + +// 新起一个锁对象 +// 先Init后New再Lock +func New(ctx context.Context, uniqueKey string) (*globalLock, error) { + if redisConn == nil { + return nil, fmt.Errorf("redis client is nil") + } + return NewGlobalLock(ctx, redisConn, uniqueKey), nil +} diff --git a/locks_test.go b/locks_test.go new file mode 100644 index 0000000..726c340 --- /dev/null +++ b/locks_test.go @@ -0,0 +1,34 @@ +package lockx_test + +import ( + "context" + "fmt" + "testing" + + "github.com/go-redis/redis/v8" + "github.com/yuninks/lockx" +) + +func TestSimpleLock(t *testing.T) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1" + ":" + "6379", + Password: "123456", // no password set + DB: 0, // use default DB + }) + if client == nil { + fmt.Println("redis init error") + return + } + lockx.Init(ctx, client) + + l, err := lockx.New(ctx, "lockx:test") + if err != nil { + t.Log(err) + return + } + if l.Lock() { + fmt.Println("lock success") + l.Unlock() + } +} diff --git a/lockx.go b/lockx.go index 5a0b0b6..4895024 100644 --- a/lockx.go +++ b/lockx.go @@ -17,6 +17,8 @@ type globalLock struct { value string } + + func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string) *globalLock { ctx, cancel := context.WithTimeout(ctx, opt.lockTimeout) return &globalLock{ diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..1d809af --- /dev/null +++ b/readme.md @@ -0,0 +1,5 @@ +1.这个锁是基于redis的全局锁 + +2.在同一个new下边,多次加锁是可以重入的(因为值是一致) + +3.多次加锁只要执行了一次释放,这个锁将会被释放 From f4e1e3fdd15134bfa262856847f713110f6e041d Mon Sep 17 00:00:00 2001 From: Yun <995116474@qq.com> Date: Wed, 25 Jun 2025 16:44:44 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E5=94=AF=E4=B8=80=E7=9A=84=E5=80=BC?= =?UTF-8?q?=E6=94=B9=E4=B8=BAuuid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 5 ++++- go.sum | 2 ++ lockx.go | 9 +++++---- lockx_test.go | 27 ++++++++++++++++++--------- 4 files changed, 29 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index de94bc4..1e002f6 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,10 @@ module github.com/yuninks/lockx go 1.20 -require github.com/go-redis/redis/v8 v8.11.5 +require ( + github.com/go-redis/redis/v8 v8.11.5 + github.com/google/uuid v1.6.0 +) require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect diff --git a/go.sum b/go.sum index 7342ff8..735ea23 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= diff --git a/lockx.go b/lockx.go index 4895024..7699355 100644 --- a/lockx.go +++ b/lockx.go @@ -2,10 +2,10 @@ package lockx import ( "context" - "fmt" "time" "github.com/go-redis/redis/v8" + "github.com/google/uuid" ) // 全局锁 @@ -17,16 +17,17 @@ type globalLock struct { value string } - - func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string) *globalLock { ctx, cancel := context.WithTimeout(ctx, opt.lockTimeout) + + u, _ := uuid.NewV7() + return &globalLock{ redis: red, ctx: ctx, cancel: cancel, uniqueKey: uniqueKey, - value: fmt.Sprintf("%d", time.Now().UnixNano()), + value: u.String(), } } diff --git a/lockx_test.go b/lockx_test.go index fde321d..a223205 100644 --- a/lockx_test.go +++ b/lockx_test.go @@ -3,10 +3,11 @@ package lockx_test import ( "context" "fmt" + "sync" "testing" - "github.com/yuninks/lockx" "github.com/go-redis/redis/v8" + "github.com/yuninks/lockx" ) var Redis *redis.Client @@ -28,8 +29,8 @@ var Redis *redis.Client func TestLockx(t *testing.T) { client := redis.NewClient(&redis.Options{ Addr: "127.0.0.1" + ":" + "6379", - Password: "", // no password set - DB: 0, // use default DB + Password: "123456", // no password set + DB: 0, // use default DB }) if client == nil { fmt.Println("redis init error") @@ -40,13 +41,21 @@ func TestLockx(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - lock := lockx.NewGlobalLock(ctx, client, "lockx:test") + wg := sync.WaitGroup{} - if !lock.Lock() { - fmt.Println("lock error") + for i := 0; i < 10000; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + lock := lockx.NewGlobalLock(ctx, client, "lockx:test") + defer lock.Unlock() + if !lock.Lock() { + fmt.Println("lock error", i) + return + } + fmt.Println("ssss", i) + }(i) } - defer lock.Unlock() - - fmt.Println("ssss") + wg.Wait() } From f11fbd17039cbf64dd220e852ecadb2d199cce86 Mon Sep 17 00:00:00 2001 From: Yun Date: Wed, 17 Sep 2025 18:12:29 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=85=B3=E9=97=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lockx.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/lockx.go b/lockx.go index 7699355..0acda8a 100644 --- a/lockx.go +++ b/lockx.go @@ -31,6 +31,11 @@ func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey str } } +// 获取上下文 +func (l *globalLock) GetCtx() context.Context { + return l.ctx +} + // 获取锁 func (g *globalLock) Lock() bool { @@ -96,6 +101,7 @@ func (g *globalLock) refresh() { g.refreshExec() case <-g.ctx.Done(): t.Stop() + g.Unlock() return } } @@ -114,8 +120,12 @@ func (g *globalLock) refreshExec() bool { ` resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() - if resp != "OK" { + if err != nil { opt.logger.Errorf(g.ctx, "global refresh err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value) + } + if resp == "ERROR" { + opt.logger.Errorf(g.ctx, "global refresh err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value) + g.Unlock() return false } return true From 4a131290d61047f74ae9c9cd36ce430854374860 Mon Sep 17 00:00:00 2001 From: Yun Date: Wed, 17 Sep 2025 19:12:55 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E5=AE=9A=E6=97=B6=E5=99=A8=E6=94=B9?= =?UTF-8?q?=E7=89=88=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- locks.go | 4 +- locks_test.go | 2 +- lockx.go | 222 ++++++++++++++++++++++++++++++++++---------------- lockx_test.go | 15 ++-- options.go | 40 +++++---- 5 files changed, 189 insertions(+), 94 deletions(-) diff --git a/locks.go b/locks.go index 3652e90..f34d019 100644 --- a/locks.go +++ b/locks.go @@ -20,9 +20,9 @@ func Init(ctx context.Context, redis redis.UniversalClient, opts ...Option) erro // 新起一个锁对象 // 先Init后New再Lock -func New(ctx context.Context, uniqueKey string) (*globalLock, error) { +func New(ctx context.Context, uniqueKey string) (*GlobalLock, error) { if redisConn == nil { return nil, fmt.Errorf("redis client is nil") } - return NewGlobalLock(ctx, redisConn, uniqueKey), nil + return NewGlobalLock(ctx, redisConn, uniqueKey, globalOpts...) } diff --git a/locks_test.go b/locks_test.go index 726c340..6c30feb 100644 --- a/locks_test.go +++ b/locks_test.go @@ -27,7 +27,7 @@ func TestSimpleLock(t *testing.T) { t.Log(err) return } - if l.Lock() { + if b, _ := l.Lock(); b { fmt.Println("lock success") l.Unlock() } diff --git a/lockx.go b/lockx.go index 0acda8a..090eab1 100644 --- a/lockx.go +++ b/lockx.go @@ -2,6 +2,8 @@ package lockx import ( "context" + "fmt" + "sync" "time" "github.com/go-redis/redis/v8" @@ -9,98 +11,158 @@ import ( ) // 全局锁 -type globalLock struct { - redis redis.UniversalClient - ctx context.Context - cancel context.CancelFunc - uniqueKey string - value string +type GlobalLock struct { + redis redis.UniversalClient + ctx context.Context + cancel context.CancelFunc + uniqueKey string + value string + isClosed bool + closeLock sync.RWMutex + options *option + stopRefresh chan struct{} + wg sync.WaitGroup } -func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string) *globalLock { - ctx, cancel := context.WithTimeout(ctx, opt.lockTimeout) - - u, _ := uuid.NewV7() - - return &globalLock{ - redis: red, - ctx: ctx, - cancel: cancel, - uniqueKey: uniqueKey, - value: u.String(), +func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string, opts ...Option) (*GlobalLock, error) { + options := defaultOption() + for _, opt := range opts { + opt(options) } + + ctx, cancel := context.WithTimeout(ctx, options.lockTimeout) + + u, err := uuid.NewV7() + if err != nil { + cancel() + return nil, fmt.Errorf("failed to generate UUID: %w", err) + } + + return &GlobalLock{ + redis: red, + ctx: ctx, + cancel: cancel, + uniqueKey: uniqueKey, + value: u.String(), + stopRefresh: make(chan struct{}), + options: options, + }, nil } // 获取上下文 -func (l *globalLock) GetCtx() context.Context { +func (l *GlobalLock) GetCtx() context.Context { return l.ctx } // 获取锁 -func (g *globalLock) Lock() bool { - +func (g *GlobalLock) Lock() (bool, error) { script := ` - local token = redis.call('get',KEYS[1]) - if token == false - then - return redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) + if redis.call('set', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then + return 'OK' + else + local current_val = redis.call('get', KEYS[1]) + if current_val == ARGV[1] then + redis.call('expire', KEYS[1], ARGV[2]) + return 'OK' + else + return 'ERROR' + end end - return 'ERROR' ` - resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() - if resp != "OK" { - opt.logger.Errorf(g.ctx, "global lock err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value) + resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, int(g.options.Expiry.Seconds())).Result() + if err != nil { + if g.options.logger != nil { + g.options.logger.Errorf(g.ctx, "global lock failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value) + } + return false, err } + if resp == "OK" { - g.refresh() - return true + g.startRefresh() + return true, nil } - return false + + return false, nil } // 尝试获取锁 -func (g *globalLock) Try(limitTimes int) bool { - for i := 0; i < limitTimes; i++ { - if g.Lock() { - return true +func (g *GlobalLock) Try() (bool, error) { + for i := 0; i < g.options.MaxRetryTimes; i++ { + success, err := g.Lock() + if err != nil { + return false, err + } + if success { + return true, nil + } + + select { + case <-time.After(g.options.RetryInterval): + continue + case <-g.ctx.Done(): + return false, g.ctx.Err() } - time.Sleep(time.Millisecond * 100) } - return false + return false, nil } // 删除锁 -func (g *globalLock) Unlock() bool { +func (g *GlobalLock) Unlock() error { + g.closeLock.Lock() + defer g.closeLock.Unlock() + + if g.isClosed { + return nil + } + g.isClosed = true + + // 停止刷新goroutine + close(g.stopRefresh) + g.wg.Wait() script := ` - local token = redis.call('get',KEYS[1]) - if token == ARGV[1] - then - redis.call('del',KEYS[1]) - return 'OK' + if redis.call('get', KEYS[1]) == ARGV[1] then + return redis.call('del', KEYS[1]) + else + return 0 end - return 'ERROR' ` resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result() - if resp != "OK" { - opt.logger.Errorf(g.ctx, "global Unlock err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value) + if err != nil { + if g.options.logger != nil { + g.options.logger.Infof(g.ctx, "global unlock may have failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value) + } + // 即使删除失败也继续执行,因为锁可能会自动过期 } + g.cancel() - return true + + if delCount, ok := resp.(int64); ok && delCount == 1 { + return nil + } + return fmt.Errorf("lock was already released or owned by another client") } -// 刷新锁 -func (g *globalLock) refresh() { +// 启动刷新goroutine +func (g *GlobalLock) startRefresh() { + g.wg.Add(1) go func() { - t := time.NewTicker(time.Second) + defer g.wg.Done() + + ticker := time.NewTicker(g.options.RefreshPeriod) + defer ticker.Stop() + for { select { - case <-t.C: - g.refreshExec() + case <-ticker.C: + if !g.refreshExec() { + return + } + case <-g.stopRefresh: + return case <-g.ctx.Done(): - t.Stop() g.Unlock() return } @@ -108,25 +170,45 @@ func (g *globalLock) refresh() { }() } -func (g *globalLock) refreshExec() bool { - script := ` - local token = redis.call('get',KEYS[1]) - if token == ARGV[1] - then - redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) - return 'OK' - end - return 'ERROR' - ` +// 执行刷新操作 +func (g *GlobalLock) refreshExec() bool { + g.closeLock.RLock() + defer g.closeLock.RUnlock() - resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() - if err != nil { - opt.logger.Errorf(g.ctx, "global refresh err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value) - } - if resp == "ERROR" { - opt.logger.Errorf(g.ctx, "global refresh err resp:%+v err:%+v uniKey:%+v value:%+v", resp, err, g.uniqueKey, g.value) - g.Unlock() + if g.isClosed { return false } + + script := ` + if redis.call('get', KEYS[1]) == ARGV[1] then + redis.call('expire', KEYS[1], ARGV[2]) + return 1 + else + return 0 + end + ` + + resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, int(g.options.Expiry.Seconds())).Result() + if err != nil { + if g.options.logger != nil { + g.options.logger.Errorf(g.ctx, "global refresh failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value) + } + return false + } + + if refreshed, ok := resp.(int64); !ok || refreshed != 1 { + if g.options.logger != nil { + g.options.logger.Errorf(g.ctx, "global refresh failed, lock may be lost, key: %s, value: %s", g.uniqueKey, g.value) + } + return false + } + return true } + +// 检查锁是否已关闭 +func (g *GlobalLock) IsClosed() bool { + g.closeLock.RLock() + defer g.closeLock.RUnlock() + return g.isClosed +} diff --git a/lockx_test.go b/lockx_test.go index a223205..eb9927b 100644 --- a/lockx_test.go +++ b/lockx_test.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" "testing" + "time" "github.com/go-redis/redis/v8" "github.com/yuninks/lockx" @@ -43,18 +44,22 @@ func TestLockx(t *testing.T) { wg := sync.WaitGroup{} - for i := 0; i < 10000; i++ { + for i := 0; i < 20; i++ { wg.Add(1) go func(i int) { defer wg.Done() - lock := lockx.NewGlobalLock(ctx, client, "lockx:test") - defer lock.Unlock() - if !lock.Lock() { + lock, _ := lockx.NewGlobalLock(ctx, client, "lockx:test") + if b, _ := lock.Lock(); !b { fmt.Println("lock error", i) return } - fmt.Println("ssss", i) + defer lock.Unlock() + + fmt.Println("ssss2", i) + + time.Sleep(time.Second * 2) }(i) + time.Sleep(time.Second) } wg.Wait() diff --git a/options.go b/options.go index 2bf41f9..f9eeee5 100644 --- a/options.go +++ b/options.go @@ -7,47 +7,55 @@ import ( ) type option struct { - lockTimeout time.Duration // 锁的超时时间 - logger Logger // 日志 + lockTimeout time.Duration // 锁的超时时间 + Expiry time.Duration // 单次刷新有效时间 + MaxRetryTimes int // 尝试次数 + RetryInterval time.Duration // 尝试间隔 + RefreshPeriod time.Duration // 刷新间隔 + logger Logger // 日志 } func defaultOption() *option { return &option{ - lockTimeout: time.Minute * 60, - logger: &print{}, + lockTimeout: time.Minute * 60, + Expiry: 5 * time.Second, + RefreshPeriod: 1 * time.Second, + MaxRetryTimes: 3, + RetryInterval: 100 * time.Millisecond, + logger: &print{}, } } -var opt *option - -func init() { - opt = defaultOption() -} +var globalOpts []Option // 设置 func InitOption(opts ...Option) { - for _, app := range opts { - app(opt) - } + globalOpts = opts } type Option func(*option) -func SetTimeout(t time.Duration) Option { +func WithTimeout(t time.Duration) Option { return func(o *option) { o.lockTimeout = t } } -func SetLogger(logger Logger) Option { +func WithLogger(logger Logger) Option { return func(o *option) { o.logger = logger } } +func WithExpiry(expiry time.Duration) Option { + return func(o *option) { + o.Expiry = expiry + } +} + type Logger interface { Errorf(ctx context.Context, format string, v ...any) - Printf(ctx context.Context, format string, v ...any) + Infof(ctx context.Context, format string, v ...any) } type print struct{} @@ -56,6 +64,6 @@ func (*print) Errorf(ctx context.Context, format string, v ...any) { log.Printf(format, v...) } -func (*print) Printf(ctx context.Context, format string, v ...any) { +func (*print) Infof(ctx context.Context, format string, v ...any) { log.Printf(format, v...) } From 7ceb13d438543ba5c14ed2fa4761908fa866c1f5 Mon Sep 17 00:00:00 2001 From: Yun Date: Wed, 17 Sep 2025 19:30:18 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 5 + go.sum | 12 ++ lockx.go | 4 + lockx_test.go | 473 ++++++++++++++++++++++++++++++++++++++++++++++++++ options.go | 20 ++- 5 files changed, 513 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 1e002f6..88ad6c5 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,14 @@ go 1.20 require ( github.com/go-redis/redis/v8 v8.11.5 github.com/google/uuid v1.6.0 + github.com/stretchr/testify v1.11.1 ) require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 735ea23..13e4e4e 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= @@ -10,8 +12,18 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lockx.go b/lockx.go index 090eab1..d98226c 100644 --- a/lockx.go +++ b/lockx.go @@ -212,3 +212,7 @@ func (g *GlobalLock) IsClosed() bool { defer g.closeLock.RUnlock() return g.isClosed } + +func (l *GlobalLock) GetValue() string { + return l.value +} diff --git a/lockx_test.go b/lockx_test.go index eb9927b..f69109d 100644 --- a/lockx_test.go +++ b/lockx_test.go @@ -2,12 +2,16 @@ package lockx_test import ( "context" + "errors" "fmt" "sync" "testing" "time" "github.com/go-redis/redis/v8" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/yuninks/lockx" ) @@ -64,3 +68,472 @@ func TestLockx(t *testing.T) { wg.Wait() } + +// MockLogger 用于测试的模拟日志器 +type MockLogger struct { + mock.Mock +} + +func (m *MockLogger) Errorf(ctx context.Context, format string, args ...interface{}) { + m.Called(ctx, format, args) +} + +func (m *MockLogger) Infof(ctx context.Context, format string, args ...interface{}) { + m.Called(ctx, format, args) +} + +// 测试工具函数 +func setupTestRedis(t *testing.T) (redis.UniversalClient, func()) { + + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1" + ":" + "6379", + Password: "123456", // no password set + DB: 0, // use default DB + }) + + return client, func() { + client.Close() + } +} + +func TestNewGlobalLock(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + t.Run("成功创建锁", func(t *testing.T) { + ctx := context.Background() + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-key") + + require.NoError(t, err) + assert.NotEmpty(t, lock.GetValue()) + assert.False(t, lock.IsClosed()) + }) + + t.Run("创建锁时UUID生成失败", func(t *testing.T) { + // 这个测试需要模拟uuid.NewV7()失败,在实际中较难触发 + // 通常可以跳过或者使用mock来测试 + t.Skip("很难模拟UUID生成失败的情况") + }) +} + +func TestGlobalLock_Lock(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + t.Run("成功获取锁", func(t *testing.T) { + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-key-success") + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + defer lock.Unlock() + }) + + t.Run("获取已存在的锁失败", func(t *testing.T) { + // 第一个客户端获取锁 + lock1, err := lockx.NewGlobalLock(ctx, redisClient, "test-key-conflict") + require.NoError(t, err) + + success1, err := lock1.Lock() + require.NoError(t, err) + assert.True(t, success1) + + defer lock1.Unlock() + + // 第二个客户端尝试获取同一个锁 + lock2, err := lockx.NewGlobalLock(ctx, redisClient, "test-key-conflict") + require.NoError(t, err) + + success2, err := lock2.Lock() + require.NoError(t, err) + assert.False(t, success2) + + defer lock2.Unlock() + }) + + t.Run("Redis连接失败", func(t *testing.T) { + // 创建无效的Redis客户端 + invalidClient := redis.NewClient(&redis.Options{ + Addr: "invalid:6379", + }) + + _, err := lockx.NewGlobalLock(ctx, invalidClient, "test-key-fail") + require.NoError(t, err) + + // 设置短超时 + ctxShort, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + + lockWithCtx, err := lockx.NewGlobalLock(ctxShort, invalidClient, "test-key-fail") + require.NoError(t, err) + + success, err := lockWithCtx.Lock() + assert.Error(t, err) + assert.False(t, success) + }) +} + +func TestGlobalLock_Try(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + t.Run("尝试获取可用锁成功", func(t *testing.T) { + _, err := lockx.NewGlobalLock(ctx, redisClient, "test-try-success") + require.NoError(t, err) + + // 使用自定义选项 + customLock, err := lockx.NewGlobalLock(ctx, redisClient, "test-try-custom", + lockx.WithMaxRetryTimes(2), + lockx.WithRetryInterval(50*time.Millisecond), + ) + require.NoError(t, err) + + success, err := customLock.Try() + require.NoError(t, err) + assert.True(t, success) + + defer customLock.Unlock() + }) + + t.Run("尝试获取被占用的锁失败", func(t *testing.T) { + // 先获取锁 + lock1, err := lockx.NewGlobalLock(ctx, redisClient, "test-try-busy") + require.NoError(t, err) + + success1, err := lock1.Lock() + require.NoError(t, err) + assert.True(t, success1) + + // 另一个客户端尝试获取 + lock2, err := lockx.NewGlobalLock(ctx, redisClient, "test-try-busy", + lockx.WithMaxRetryTimes(2), + lockx.WithRetryInterval(10*time.Millisecond), + ) + require.NoError(t, err) + + success2, err := lock2.Try() + require.NoError(t, err) + assert.False(t, success2) + + lock1.Unlock() + lock2.Unlock() + }) + + t.Run("上下文取消", func(t *testing.T) { + ctxCancel, cancel := context.WithCancel(ctx) + lock, err := lockx.NewGlobalLock(ctxCancel, redisClient, "test-try-cancel") + require.NoError(t, err) + + cancel() // 立即取消 + + success, err := lock.Try() + assert.True(t, errors.Is(err, context.Canceled)) + assert.False(t, success) + }) +} + +func TestGlobalLock_Unlock(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + t.Run("成功释放锁", func(t *testing.T) { + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-unlock-success") + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + err = lock.Unlock() + assert.NoError(t, err) + assert.True(t, lock.IsClosed()) + + // 验证锁确实被释放了 + val, err := redisClient.Get(ctx, "test-unlock-success").Result() + assert.Equal(t, redis.Nil, err) + assert.Empty(t, val) + }) + + t.Run("重复释放锁", func(t *testing.T) { + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-unlock-twice") + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + err = lock.Unlock() + assert.NoError(t, err) + + // 再次释放应该不会报错 + err = lock.Unlock() + assert.NoError(t, err) + }) + + t.Run("释放其他客户端的锁", func(t *testing.T) { + lock1, err := lockx.NewGlobalLock(ctx, redisClient, "test-unlock-other") + require.NoError(t, err) + + success, err := lock1.Lock() + require.NoError(t, err) + assert.True(t, success) + + // 另一个客户端尝试释放(使用不同的value) + lock2, err := lockx.NewGlobalLock(ctx, redisClient, "test-unlock-other") + require.NoError(t, err) + + err = lock2.Unlock() + assert.Error(t, err) + assert.Contains(t, err.Error(), "lock was already released or owned by another client") + + lock1.Unlock() + lock2.Unlock() + }) +} + +func TestGlobalLock_Refresh(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + t.Run("自动刷新保持锁", func(t *testing.T) { + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-refresh", + lockx.WithExpiry(2*time.Second), + lockx.WithRefreshPeriod(500*time.Millisecond), + ) + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + // 等待一段时间,确保刷新机制工作 + time.Sleep(3 * time.Second) + + // 检查锁仍然存在 + val, err := redisClient.Get(ctx, "test-refresh").Result() + require.NoError(t, err) + assert.Equal(t, lock.GetValue(), val) + + // 检查TTL应该还在 + ttl, err := redisClient.TTL(ctx, "test-refresh").Result() + require.NoError(t, err) + assert.True(t, ttl > 0 && ttl <= 2*time.Second) + + lock.Unlock() + }) + + t.Run("锁丢失时停止刷新", func(t *testing.T) { + mockLogger := new(MockLogger) + mockLogger.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return() + + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-refresh-lost", + lockx.WithLogger(mockLogger), + lockx.WithExpiry(1*time.Second), + ) + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + // 手动删除锁来模拟锁丢失 + err = redisClient.Del(ctx, "test-refresh-lost").Err() + require.NoError(t, err) + + // 等待刷新周期 + time.Sleep(1500 * time.Millisecond) + + // 验证日志被调用 + mockLogger.AssertCalled(t, "Errorf", mock.Anything, mock.Anything, mock.Anything) + + lock.Unlock() + }) +} + +func TestGlobalLock_Concurrency(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + t.Run("多 goroutine 竞争锁", func(t *testing.T) { + const numGoroutines = 10 + var successCount int + var wg sync.WaitGroup + var mu sync.Mutex + + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + + lock, err := lockx.NewGlobalLock(ctx, redisClient, "concurrent-test") + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + + if success { + mu.Lock() + successCount++ + mu.Unlock() + + // 持有锁一段时间 + time.Sleep(100 * time.Millisecond) + lock.Unlock() + } + }(i) + } + + wg.Wait() + + // 应该只有一个goroutine成功获取锁 + assert.Equal(t, 1, successCount) + }) +} + +func TestGlobalLock_ContextCancellation(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + t.Run("上下文超时", func(t *testing.T) { + ctxShort, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + lock, err := lockx.NewGlobalLock(ctxShort, redisClient, "test-context-timeout") + require.NoError(t, err) + + // 等待上下文超时 + time.Sleep(200 * time.Millisecond) + + // 尝试操作应该失败 + success, err := lock.Lock() + assert.Error(t, err) + assert.False(t, success) + }) + + t.Run("上下文取消时自动释放锁", func(t *testing.T) { + ctxCancel, cancel := context.WithCancel(context.Background()) + lock, err := lockx.NewGlobalLock(ctxCancel, redisClient, "test-context-cancel") + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + // 取消上下文 + cancel() + + // 等待自动清理 + time.Sleep(5 * time.Second) + + // 检查锁应该被释放 + val, err := redisClient.Get(context.Background(), "test-context-cancel").Result() + assert.Equal(t, redis.Nil, err) + assert.Empty(t, val) + }) +} + +func TestGlobalLock_CustomOptions(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + t.Run("自定义配置选项", func(t *testing.T) { + mockLogger := new(MockLogger) + + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-custom-options", + lockx.WithLockTimeout(5*time.Second), + lockx.WithExpiry(10*time.Second), + lockx.WithRefreshPeriod(2*time.Second), + lockx.WithMaxRetryTimes(5), + lockx.WithRetryInterval(200*time.Millisecond), + lockx.WithLogger(mockLogger), + ) + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + // 检查TTL是否正确设置 + ttl, err := redisClient.TTL(ctx, "test-custom-options").Result() + require.NoError(t, err) + assert.True(t, ttl > 8*time.Second && ttl <= 10*time.Second) + + lock.Unlock() + }) +} + +func TestGlobalLock_EdgeCases(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + t.Run("空键名", func(t *testing.T) { + lock, err := lockx.NewGlobalLock(ctx, redisClient, "") + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) // Redis允许空键名 + + lock.Unlock() + }) + + t.Run("非常长的键名", func(t *testing.T) { + longKey := string(make([]byte, 1000)) // 很长的键名 + lock, err := lockx.NewGlobalLock(ctx, redisClient, longKey) + require.NoError(t, err) + + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + lock.Unlock() + }) +} + +// 基准测试 +func BenchmarkGlobalLock(b *testing.B) { + t := testing.T{} + redisClient, teardown := setupTestRedis(&t) + defer teardown() + + ctx := context.Background() + + b.Run("获取释放锁", func(b *testing.B) { + for i := 0; i < b.N; i++ { + lock, err := lockx.NewGlobalLock(ctx, redisClient, fmt.Sprintf("benchmark-%d", i)) + if err != nil { + b.Fatal(err) + } + + success, err := lock.Lock() + if err != nil { + b.Fatal(err) + } + if !success { + b.Fatal("Failed to acquire lock") + } + + err = lock.Unlock() + if err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/options.go b/options.go index f9eeee5..e39d43a 100644 --- a/options.go +++ b/options.go @@ -35,7 +35,7 @@ func InitOption(opts ...Option) { type Option func(*option) -func WithTimeout(t time.Duration) Option { +func WithLockTimeout(t time.Duration) Option { return func(o *option) { o.lockTimeout = t } @@ -53,6 +53,24 @@ func WithExpiry(expiry time.Duration) Option { } } +func WithRefreshPeriod(period time.Duration) Option { + return func(o *option) { + o.RefreshPeriod = period + } +} + +func WithMaxRetryTimes(times int) Option { + return func(o *option) { + o.MaxRetryTimes = times + } +} + +func WithRetryInterval(interval time.Duration) Option { + return func(o *option) { + o.RetryInterval = interval + } +} + type Logger interface { Errorf(ctx context.Context, format string, v ...any) Infof(ctx context.Context, format string, v ...any) From 0a8589d7b6b3212ffb78b166c161f0298addeee9 Mon Sep 17 00:00:00 2001 From: Yun Date: Wed, 17 Sep 2025 19:41:47 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=B8=80=E4=BA=9Boption?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lockx_test.go | 64 +++++++++++++++++++++++++-------------------------- options.go | 21 ++++++++++++++--- 2 files changed, 50 insertions(+), 35 deletions(-) diff --git a/lockx_test.go b/lockx_test.go index f69109d..7731d40 100644 --- a/lockx_test.go +++ b/lockx_test.go @@ -31,43 +31,43 @@ var Redis *redis.Client // Redis = client // } -func TestLockx(t *testing.T) { - client := redis.NewClient(&redis.Options{ - Addr: "127.0.0.1" + ":" + "6379", - Password: "123456", // no password set - DB: 0, // use default DB - }) - if client == nil { - fmt.Println("redis init error") - return - } - fmt.Println("begin") - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() +// func TestLockx(t *testing.T) { +// client := redis.NewClient(&redis.Options{ +// Addr: "127.0.0.1" + ":" + "6379", +// Password: "123456", // no password set +// DB: 0, // use default DB +// }) +// if client == nil { +// fmt.Println("redis init error") +// return +// } +// fmt.Println("begin") +// ctx := context.Background() +// ctx, cancel := context.WithCancel(ctx) +// defer cancel() - wg := sync.WaitGroup{} +// wg := sync.WaitGroup{} - for i := 0; i < 20; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - lock, _ := lockx.NewGlobalLock(ctx, client, "lockx:test") - if b, _ := lock.Lock(); !b { - fmt.Println("lock error", i) - return - } - defer lock.Unlock() +// for i := 0; i < 20; i++ { +// wg.Add(1) +// go func(i int) { +// defer wg.Done() +// lock, _ := lockx.NewGlobalLock(ctx, client, "lockx:test") +// if b, _ := lock.Lock(); !b { +// fmt.Println("lock error", i) +// return +// } +// defer lock.Unlock() - fmt.Println("ssss2", i) +// fmt.Println("ssss2", i) - time.Sleep(time.Second * 2) - }(i) - time.Sleep(time.Second) - } +// time.Sleep(time.Second * 2) +// }(i) +// time.Sleep(time.Second) +// } - wg.Wait() -} +// wg.Wait() +// } // MockLogger 用于测试的模拟日志器 type MockLogger struct { diff --git a/options.go b/options.go index e39d43a..6d71419 100644 --- a/options.go +++ b/options.go @@ -7,12 +7,15 @@ import ( ) type option struct { - lockTimeout time.Duration // 锁的超时时间 - Expiry time.Duration // 单次刷新有效时间 + lockTimeout time.Duration // 锁的超时时间 + MaxRetryTimes int // 尝试次数 RetryInterval time.Duration // 尝试间隔 + + Expiry time.Duration // 单次刷新有效时间 RefreshPeriod time.Duration // 刷新间隔 - logger Logger // 日志 + + logger Logger // 日志 } func defaultOption() *option { @@ -35,36 +38,48 @@ func InitOption(opts ...Option) { type Option func(*option) +// 最大锁定时间(默认1h) func WithLockTimeout(t time.Duration) Option { return func(o *option) { o.lockTimeout = t } } +// 日志 func WithLogger(logger Logger) Option { return func(o *option) { o.logger = logger } } +// key有效时间(会自动刷新) func WithExpiry(expiry time.Duration) Option { return func(o *option) { o.Expiry = expiry + if o.Expiry/3 < o.RefreshPeriod { + o.RefreshPeriod = o.Expiry / 3 + } } } +// 刷新间隔 func WithRefreshPeriod(period time.Duration) Option { return func(o *option) { o.RefreshPeriod = period + if o.RefreshPeriod*3 > o.Expiry { + o.Expiry = o.RefreshPeriod * 3 + } } } +// 最大尝试次数 func WithMaxRetryTimes(times int) Option { return func(o *option) { o.MaxRetryTimes = times } } +// 尝试间隔 func WithRetryInterval(interval time.Duration) Option { return func(o *option) { o.RetryInterval = interval From f34e04f2357835f84992e27b63dfecf979dd2b4f Mon Sep 17 00:00:00 2001 From: Yun Date: Thu, 18 Sep 2025 21:15:12 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=83=A8=E5=88=86?= =?UTF-8?q?=E4=BD=BF=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- go.sum | 9 +++++ lockx.go | 100 ++++++++++++++++++++++++++++++-------------------- lockx_test.go | 61 ++++++++++++++++++++++++++---- 4 files changed, 123 insertions(+), 49 deletions(-) diff --git a/go.mod b/go.mod index 88ad6c5..56be34e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/yuninks/lockx -go 1.20 +go 1.24 require ( github.com/go-redis/redis/v8 v8.11.5 diff --git a/go.sum b/go.sum index 13e4e4e..165699c 100644 --- a/go.sum +++ b/go.sum @@ -5,13 +5,17 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= @@ -19,11 +23,16 @@ github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lockx.go b/lockx.go index d98226c..a55ef46 100644 --- a/lockx.go +++ b/lockx.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/go-redis/redis/v8" @@ -12,19 +13,25 @@ import ( // 全局锁 type GlobalLock struct { - redis redis.UniversalClient - ctx context.Context - cancel context.CancelFunc - uniqueKey string - value string - isClosed bool - closeLock sync.RWMutex - options *option - stopRefresh chan struct{} - wg sync.WaitGroup + redis redis.UniversalClient + ctx context.Context + cancel context.CancelFunc + uniqueKey string + value string + isClosed bool + closeLock sync.RWMutex + options *option + refreshErrCount int64 // 刷新错误次数 } func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string, opts ...Option) (*GlobalLock, error) { + if uniqueKey == "" { + return nil, fmt.Errorf("uniqueKey is empty") + } + if red == nil { + return nil, fmt.Errorf("redis is nil") + } + options := defaultOption() for _, opt := range opts { opt(options) @@ -39,13 +46,12 @@ func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey str } return &GlobalLock{ - redis: red, - ctx: ctx, - cancel: cancel, - uniqueKey: uniqueKey, - value: u.String(), - stopRefresh: make(chan struct{}), - options: options, + redis: red, + ctx: ctx, + cancel: cancel, + uniqueKey: uniqueKey, + value: u.String(), + options: options, }, nil } @@ -109,17 +115,11 @@ func (g *GlobalLock) Try() (bool, error) { // 删除锁 func (g *GlobalLock) Unlock() error { - g.closeLock.Lock() - defer g.closeLock.Unlock() - - if g.isClosed { + // 已经关闭就不需要重复关闭 + if g.setOrGetClose() { + g.options.logger.Infof(g.ctx, "global lock already closed, key: %s, value: %s", g.uniqueKey, g.value) return nil } - g.isClosed = true - - // 停止刷新goroutine - close(g.stopRefresh) - g.wg.Wait() script := ` if redis.call('get', KEYS[1]) == ARGV[1] then @@ -129,7 +129,10 @@ func (g *GlobalLock) Unlock() error { end ` - resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result() + // 避免上游已取消执行不了 + ctx := context.WithoutCancel(g.ctx) + + resp, err := g.redis.Eval(ctx, script, []string{g.uniqueKey}, g.value).Result() if err != nil { if g.options.logger != nil { g.options.logger.Infof(g.ctx, "global unlock may have failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value) @@ -137,8 +140,6 @@ func (g *GlobalLock) Unlock() error { // 即使删除失败也继续执行,因为锁可能会自动过期 } - g.cancel() - if delCount, ok := resp.(int64); ok && delCount == 1 { return nil } @@ -147,9 +148,8 @@ func (g *GlobalLock) Unlock() error { // 启动刷新goroutine func (g *GlobalLock) startRefresh() { - g.wg.Add(1) + go func() { - defer g.wg.Done() ticker := time.NewTicker(g.options.RefreshPeriod) defer ticker.Stop() @@ -157,12 +157,9 @@ func (g *GlobalLock) startRefresh() { for { select { case <-ticker.C: - if !g.refreshExec() { - return - } - case <-g.stopRefresh: - return + g.refreshExec() case <-g.ctx.Done(): + g.options.logger.Infof(g.ctx, "global lock refresh canceled, key: %s, value: %s", g.uniqueKey, g.value) g.Unlock() return } @@ -170,12 +167,26 @@ func (g *GlobalLock) startRefresh() { }() } +// 返回原来的 +func (l *GlobalLock) setOrGetClose() (readyClosed bool) { + l.closeLock.Lock() + defer l.closeLock.Unlock() + + if l.isClosed { + return true + } + + l.isClosed = true + + l.cancel() + + return false +} + // 执行刷新操作 func (g *GlobalLock) refreshExec() bool { - g.closeLock.RLock() - defer g.closeLock.RUnlock() - - if g.isClosed { + if g.IsClosed() { + g.options.logger.Infof(g.ctx, "global lock already closed, key: %s, value: %s", g.uniqueKey, g.value) return false } @@ -193,6 +204,13 @@ func (g *GlobalLock) refreshExec() bool { if g.options.logger != nil { g.options.logger.Errorf(g.ctx, "global refresh failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value) } + + newCount := atomic.AddInt64(&g.refreshErrCount, 1) + if newCount >= 3 { + // 关闭锁 + g.setOrGetClose() + g.options.logger.Errorf(g.ctx, "global refresh failed, lock may be lost 3 times, key: %s, value: %s", g.uniqueKey, g.value) + } return false } @@ -200,6 +218,8 @@ func (g *GlobalLock) refreshExec() bool { if g.options.logger != nil { g.options.logger.Errorf(g.ctx, "global refresh failed, lock may be lost, key: %s, value: %s", g.uniqueKey, g.value) } + // 关闭锁 + g.setOrGetClose() return false } diff --git a/lockx_test.go b/lockx_test.go index 7731d40..4c5474c 100644 --- a/lockx_test.go +++ b/lockx_test.go @@ -235,6 +235,9 @@ func TestGlobalLock_Try(t *testing.T) { success, err := lock.Try() assert.True(t, errors.Is(err, context.Canceled)) assert.False(t, success) + + err = lock.Unlock() + assert.Error(t, err) }) } @@ -335,6 +338,7 @@ func TestGlobalLock_Refresh(t *testing.T) { t.Run("锁丢失时停止刷新", func(t *testing.T) { mockLogger := new(MockLogger) mockLogger.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return() + mockLogger.On("Infof", mock.Anything, mock.Anything, mock.Anything).Return() lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-refresh-lost", lockx.WithLogger(mockLogger), @@ -356,6 +360,8 @@ func TestGlobalLock_Refresh(t *testing.T) { // 验证日志被调用 mockLogger.AssertCalled(t, "Errorf", mock.Anything, mock.Anything, mock.Anything) + // <-lock.GetCtx().Done() + lock.Unlock() }) } @@ -367,7 +373,7 @@ func TestGlobalLock_Concurrency(t *testing.T) { ctx := context.Background() t.Run("多 goroutine 竞争锁", func(t *testing.T) { - const numGoroutines = 10 + const numGoroutines = 100 var successCount int var wg sync.WaitGroup var mu sync.Mutex @@ -435,6 +441,8 @@ func TestGlobalLock_ContextCancellation(t *testing.T) { // 取消上下文 cancel() + // lock.Unlock() + // 等待自动清理 time.Sleep(5 * time.Second) @@ -453,6 +461,8 @@ func TestGlobalLock_CustomOptions(t *testing.T) { t.Run("自定义配置选项", func(t *testing.T) { mockLogger := new(MockLogger) + mockLogger.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return() + mockLogger.On("Infof", mock.Anything, mock.Anything, mock.Anything).Return() lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-custom-options", lockx.WithLockTimeout(5*time.Second), @@ -484,14 +494,9 @@ func TestGlobalLock_EdgeCases(t *testing.T) { ctx := context.Background() t.Run("空键名", func(t *testing.T) { - lock, err := lockx.NewGlobalLock(ctx, redisClient, "") - require.NoError(t, err) + _, err := lockx.NewGlobalLock(ctx, redisClient, "") + require.Error(t, err) - success, err := lock.Lock() - require.NoError(t, err) - assert.True(t, success) // Redis允许空键名 - - lock.Unlock() }) t.Run("非常长的键名", func(t *testing.T) { @@ -537,3 +542,43 @@ func BenchmarkGlobalLock(b *testing.B) { } }) } + +func TestTimeout(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + begin := time.Now() + + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-timeout", lockx.WithLockTimeout(time.Second)) + require.NoError(t, err) + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + <-lock.GetCtx().Done() + t.Log("Done", time.Since(begin)) + +} + +func TestRedisClose(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + begin := time.Now() + + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-timeout", lockx.WithLockTimeout(time.Hour)) + require.NoError(t, err) + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + redisClient.Close() + + <-lock.GetCtx().Done() + t.Log("Done", time.Since(begin)) + +} From 5a51122289d2ee6e3428a3d879e2afeb14e3e5ab Mon Sep 17 00:00:00 2001 From: Yun Date: Fri, 19 Sep 2025 17:49:49 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E5=8E=BB=E6=8E=89=E9=83=A8=E5=88=86?= =?UTF-8?q?=E8=B0=83=E8=AF=95=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lockx.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lockx.go b/lockx.go index a55ef46..07c15cb 100644 --- a/lockx.go +++ b/lockx.go @@ -117,7 +117,7 @@ func (g *GlobalLock) Try() (bool, error) { func (g *GlobalLock) Unlock() error { // 已经关闭就不需要重复关闭 if g.setOrGetClose() { - g.options.logger.Infof(g.ctx, "global lock already closed, key: %s, value: %s", g.uniqueKey, g.value) + // g.options.logger.Infof(g.ctx, "global lock already closed, key: %s, value: %s", g.uniqueKey, g.value) return nil } @@ -143,6 +143,7 @@ func (g *GlobalLock) Unlock() error { if delCount, ok := resp.(int64); ok && delCount == 1 { return nil } + g.options.logger.Infof(g.ctx, "global unlock may have failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value) return fmt.Errorf("lock was already released or owned by another client") } @@ -159,7 +160,7 @@ func (g *GlobalLock) startRefresh() { case <-ticker.C: g.refreshExec() case <-g.ctx.Done(): - g.options.logger.Infof(g.ctx, "global lock refresh canceled, key: %s, value: %s", g.uniqueKey, g.value) + // g.options.logger.Infof(g.ctx, "global lock refresh canceled, key: %s, value: %s", g.uniqueKey, g.value) g.Unlock() return }