diff --git a/go.mod b/go.mod index 88ad6c5..56be34e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/yuninks/lockx -go 1.20 +go 1.24 require ( github.com/go-redis/redis/v8 v8.11.5 diff --git a/go.sum b/go.sum index 13e4e4e..165699c 100644 --- a/go.sum +++ b/go.sum @@ -5,13 +5,17 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= @@ -19,11 +23,16 @@ github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lockx.go b/lockx.go index d98226c..a55ef46 100644 --- a/lockx.go +++ b/lockx.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/go-redis/redis/v8" @@ -12,19 +13,25 @@ import ( // 全局锁 type GlobalLock struct { - redis redis.UniversalClient - ctx context.Context - cancel context.CancelFunc - uniqueKey string - value string - isClosed bool - closeLock sync.RWMutex - options *option - stopRefresh chan struct{} - wg sync.WaitGroup + redis redis.UniversalClient + ctx context.Context + cancel context.CancelFunc + uniqueKey string + value string + isClosed bool + closeLock sync.RWMutex + options *option + refreshErrCount int64 // 刷新错误次数 } func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey string, opts ...Option) (*GlobalLock, error) { + if uniqueKey == "" { + return nil, fmt.Errorf("uniqueKey is empty") + } + if red == nil { + return nil, fmt.Errorf("redis is nil") + } + options := defaultOption() for _, opt := range opts { opt(options) @@ -39,13 +46,12 @@ func NewGlobalLock(ctx context.Context, red redis.UniversalClient, uniqueKey str } return &GlobalLock{ - redis: red, - ctx: ctx, - cancel: cancel, - uniqueKey: uniqueKey, - value: u.String(), - stopRefresh: make(chan struct{}), - options: options, + redis: red, + ctx: ctx, + cancel: cancel, + uniqueKey: uniqueKey, + value: u.String(), + options: options, }, nil } @@ -109,17 +115,11 @@ func (g *GlobalLock) Try() (bool, error) { // 删除锁 func (g *GlobalLock) Unlock() error { - g.closeLock.Lock() - defer g.closeLock.Unlock() - - if g.isClosed { + // 已经关闭就不需要重复关闭 + if g.setOrGetClose() { + g.options.logger.Infof(g.ctx, "global lock already closed, key: %s, value: %s", g.uniqueKey, g.value) return nil } - g.isClosed = true - - // 停止刷新goroutine - close(g.stopRefresh) - g.wg.Wait() script := ` if redis.call('get', KEYS[1]) == ARGV[1] then @@ -129,7 +129,10 @@ func (g *GlobalLock) Unlock() error { end ` - resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result() + // 避免上游已取消执行不了 + ctx := context.WithoutCancel(g.ctx) + + resp, err := g.redis.Eval(ctx, script, []string{g.uniqueKey}, g.value).Result() if err != nil { if g.options.logger != nil { g.options.logger.Infof(g.ctx, "global unlock may have failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value) @@ -137,8 +140,6 @@ func (g *GlobalLock) Unlock() error { // 即使删除失败也继续执行,因为锁可能会自动过期 } - g.cancel() - if delCount, ok := resp.(int64); ok && delCount == 1 { return nil } @@ -147,9 +148,8 @@ func (g *GlobalLock) Unlock() error { // 启动刷新goroutine func (g *GlobalLock) startRefresh() { - g.wg.Add(1) + go func() { - defer g.wg.Done() ticker := time.NewTicker(g.options.RefreshPeriod) defer ticker.Stop() @@ -157,12 +157,9 @@ func (g *GlobalLock) startRefresh() { for { select { case <-ticker.C: - if !g.refreshExec() { - return - } - case <-g.stopRefresh: - return + g.refreshExec() case <-g.ctx.Done(): + g.options.logger.Infof(g.ctx, "global lock refresh canceled, key: %s, value: %s", g.uniqueKey, g.value) g.Unlock() return } @@ -170,12 +167,26 @@ func (g *GlobalLock) startRefresh() { }() } +// 返回原来的 +func (l *GlobalLock) setOrGetClose() (readyClosed bool) { + l.closeLock.Lock() + defer l.closeLock.Unlock() + + if l.isClosed { + return true + } + + l.isClosed = true + + l.cancel() + + return false +} + // 执行刷新操作 func (g *GlobalLock) refreshExec() bool { - g.closeLock.RLock() - defer g.closeLock.RUnlock() - - if g.isClosed { + if g.IsClosed() { + g.options.logger.Infof(g.ctx, "global lock already closed, key: %s, value: %s", g.uniqueKey, g.value) return false } @@ -193,6 +204,13 @@ func (g *GlobalLock) refreshExec() bool { if g.options.logger != nil { g.options.logger.Errorf(g.ctx, "global refresh failed: %v, key: %s, value: %s", err, g.uniqueKey, g.value) } + + newCount := atomic.AddInt64(&g.refreshErrCount, 1) + if newCount >= 3 { + // 关闭锁 + g.setOrGetClose() + g.options.logger.Errorf(g.ctx, "global refresh failed, lock may be lost 3 times, key: %s, value: %s", g.uniqueKey, g.value) + } return false } @@ -200,6 +218,8 @@ func (g *GlobalLock) refreshExec() bool { if g.options.logger != nil { g.options.logger.Errorf(g.ctx, "global refresh failed, lock may be lost, key: %s, value: %s", g.uniqueKey, g.value) } + // 关闭锁 + g.setOrGetClose() return false } diff --git a/lockx_test.go b/lockx_test.go index 7731d40..4c5474c 100644 --- a/lockx_test.go +++ b/lockx_test.go @@ -235,6 +235,9 @@ func TestGlobalLock_Try(t *testing.T) { success, err := lock.Try() assert.True(t, errors.Is(err, context.Canceled)) assert.False(t, success) + + err = lock.Unlock() + assert.Error(t, err) }) } @@ -335,6 +338,7 @@ func TestGlobalLock_Refresh(t *testing.T) { t.Run("锁丢失时停止刷新", func(t *testing.T) { mockLogger := new(MockLogger) mockLogger.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return() + mockLogger.On("Infof", mock.Anything, mock.Anything, mock.Anything).Return() lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-refresh-lost", lockx.WithLogger(mockLogger), @@ -356,6 +360,8 @@ func TestGlobalLock_Refresh(t *testing.T) { // 验证日志被调用 mockLogger.AssertCalled(t, "Errorf", mock.Anything, mock.Anything, mock.Anything) + // <-lock.GetCtx().Done() + lock.Unlock() }) } @@ -367,7 +373,7 @@ func TestGlobalLock_Concurrency(t *testing.T) { ctx := context.Background() t.Run("多 goroutine 竞争锁", func(t *testing.T) { - const numGoroutines = 10 + const numGoroutines = 100 var successCount int var wg sync.WaitGroup var mu sync.Mutex @@ -435,6 +441,8 @@ func TestGlobalLock_ContextCancellation(t *testing.T) { // 取消上下文 cancel() + // lock.Unlock() + // 等待自动清理 time.Sleep(5 * time.Second) @@ -453,6 +461,8 @@ func TestGlobalLock_CustomOptions(t *testing.T) { t.Run("自定义配置选项", func(t *testing.T) { mockLogger := new(MockLogger) + mockLogger.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return() + mockLogger.On("Infof", mock.Anything, mock.Anything, mock.Anything).Return() lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-custom-options", lockx.WithLockTimeout(5*time.Second), @@ -484,14 +494,9 @@ func TestGlobalLock_EdgeCases(t *testing.T) { ctx := context.Background() t.Run("空键名", func(t *testing.T) { - lock, err := lockx.NewGlobalLock(ctx, redisClient, "") - require.NoError(t, err) + _, err := lockx.NewGlobalLock(ctx, redisClient, "") + require.Error(t, err) - success, err := lock.Lock() - require.NoError(t, err) - assert.True(t, success) // Redis允许空键名 - - lock.Unlock() }) t.Run("非常长的键名", func(t *testing.T) { @@ -537,3 +542,43 @@ func BenchmarkGlobalLock(b *testing.B) { } }) } + +func TestTimeout(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + begin := time.Now() + + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-timeout", lockx.WithLockTimeout(time.Second)) + require.NoError(t, err) + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + <-lock.GetCtx().Done() + t.Log("Done", time.Since(begin)) + +} + +func TestRedisClose(t *testing.T) { + redisClient, teardown := setupTestRedis(t) + defer teardown() + + ctx := context.Background() + + begin := time.Now() + + lock, err := lockx.NewGlobalLock(ctx, redisClient, "test-timeout", lockx.WithLockTimeout(time.Hour)) + require.NoError(t, err) + success, err := lock.Lock() + require.NoError(t, err) + assert.True(t, success) + + redisClient.Close() + + <-lock.GetCtx().Done() + t.Log("Done", time.Since(begin)) + +}