diff --git a/cluster.go b/cluster.go index 9fe2b5e..c46f67b 100644 --- a/cluster.go +++ b/cluster.go @@ -82,7 +82,6 @@ func (c *cluster) AddTimer(ctx context.Context, uniqueKey string, spaceTime time return errors.New("添加失败") } defer lock.Unlock() - lock.Refresh() nowTime := time.Now() @@ -135,9 +134,6 @@ func (c *cluster) getNextTime() { } defer lock.Unlock() - // 更新锁 - lock.Refresh() - // 计算下一次时间 // 读取执行的缓存 @@ -254,6 +250,7 @@ func doTask(ctx context.Context, red *redis.Client, taskId string) { val, ok := clusterWorkerList.Load(taskId) if !ok { + fmt.Println("doTask timer:任务不存在") return } t := val.(timerStr) @@ -266,7 +263,6 @@ func doTask(ctx context.Context, red *redis.Client, taskId string) { return } defer lock.Unlock() - lock.Refresh() ctx = context.WithValue(ctx, extendParamKey, t.Extend) diff --git a/lockx/lockx.go b/lockx/lockx.go index 21c7e12..ffe612e 100644 --- a/lockx/lockx.go +++ b/lockx/lockx.go @@ -13,14 +13,17 @@ import ( type globalLock struct { redis *redis.Client ctx context.Context + cancel context.CancelFunc uniqueKey string value string } func NewGlobalLock(ctx context.Context, red *redis.Client, uniqueKey string) *globalLock { + ctx, cancel := context.WithTimeout(ctx, time.Second*30) return &globalLock{ redis: red, ctx: ctx, + cancel: cancel, uniqueKey: uniqueKey, value: fmt.Sprintf("%d", time.Now().UnixNano()), } @@ -43,7 +46,11 @@ func (g *globalLock) Lock() bool { _ = err log.Println("globalLock Lock", resp, err, g.uniqueKey, g.value) } - return resp == "OK" + if resp == "OK" { + g.refresh() + return true + } + return false } // 尝试获取锁 @@ -74,21 +81,22 @@ func (g *globalLock) Unlock() bool { if resp != "OK" { log.Println("globalLock Unlock", resp, err, g.uniqueKey, g.value) } - return resp == "OK" + if resp == "OK" { + g.cancel() + return true + } + return false } // 刷新锁 -func (g *globalLock) Refresh() { +func (g *globalLock) refresh() { go func() { - ctx, cancel := context.WithTimeout(g.ctx, time.Second*30) - defer cancel() - t := time.NewTicker(time.Second) for { select { case <-t.C: - g.refresh() - case <-ctx.Done(): + g.refreshExec() + case <-g.ctx.Done(): t.Stop() return } @@ -96,7 +104,7 @@ func (g *globalLock) Refresh() { }() } -func (g *globalLock) refresh() bool { +func (g *globalLock) refreshExec() bool { script := ` local token = redis.call('get',KEYS[1]) if token == ARGV[1] diff --git a/lockx/lockx_test.go b/lockx/lockx_test.go index 25d1606..9b36e9b 100644 --- a/lockx/lockx_test.go +++ b/lockx/lockx_test.go @@ -46,7 +46,6 @@ func TestLockx(t *testing.T) { fmt.Println("lock error") } defer lock.Unlock() - lock.Refresh() fmt.Println("ssss")