优化
This commit is contained in:
+1
-5
@@ -82,7 +82,6 @@ func (c *cluster) AddTimer(ctx context.Context, uniqueKey string, spaceTime time
|
||||
return errors.New("添加失败")
|
||||
}
|
||||
defer lock.Unlock()
|
||||
lock.Refresh()
|
||||
|
||||
nowTime := time.Now()
|
||||
|
||||
@@ -135,9 +134,6 @@ func (c *cluster) getNextTime() {
|
||||
}
|
||||
defer lock.Unlock()
|
||||
|
||||
// 更新锁
|
||||
lock.Refresh()
|
||||
|
||||
// 计算下一次时间
|
||||
|
||||
// 读取执行的缓存
|
||||
@@ -254,6 +250,7 @@ func doTask(ctx context.Context, red *redis.Client, taskId string) {
|
||||
|
||||
val, ok := clusterWorkerList.Load(taskId)
|
||||
if !ok {
|
||||
fmt.Println("doTask timer:任务不存在")
|
||||
return
|
||||
}
|
||||
t := val.(timerStr)
|
||||
@@ -266,7 +263,6 @@ func doTask(ctx context.Context, red *redis.Client, taskId string) {
|
||||
return
|
||||
}
|
||||
defer lock.Unlock()
|
||||
lock.Refresh()
|
||||
|
||||
ctx = context.WithValue(ctx, extendParamKey, t.Extend)
|
||||
|
||||
|
||||
+17
-9
@@ -13,14 +13,17 @@ import (
|
||||
type globalLock struct {
|
||||
redis *redis.Client
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
uniqueKey string
|
||||
value string
|
||||
}
|
||||
|
||||
func NewGlobalLock(ctx context.Context, red *redis.Client, uniqueKey string) *globalLock {
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||
return &globalLock{
|
||||
redis: red,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
uniqueKey: uniqueKey,
|
||||
value: fmt.Sprintf("%d", time.Now().UnixNano()),
|
||||
}
|
||||
@@ -43,7 +46,11 @@ func (g *globalLock) Lock() bool {
|
||||
_ = err
|
||||
log.Println("globalLock Lock", resp, err, g.uniqueKey, g.value)
|
||||
}
|
||||
return resp == "OK"
|
||||
if resp == "OK" {
|
||||
g.refresh()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// 尝试获取锁
|
||||
@@ -74,21 +81,22 @@ func (g *globalLock) Unlock() bool {
|
||||
if resp != "OK" {
|
||||
log.Println("globalLock Unlock", resp, err, g.uniqueKey, g.value)
|
||||
}
|
||||
return resp == "OK"
|
||||
if resp == "OK" {
|
||||
g.cancel()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// 刷新锁
|
||||
func (g *globalLock) Refresh() {
|
||||
func (g *globalLock) refresh() {
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(g.ctx, time.Second*30)
|
||||
defer cancel()
|
||||
|
||||
t := time.NewTicker(time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
g.refresh()
|
||||
case <-ctx.Done():
|
||||
g.refreshExec()
|
||||
case <-g.ctx.Done():
|
||||
t.Stop()
|
||||
return
|
||||
}
|
||||
@@ -96,7 +104,7 @@ func (g *globalLock) Refresh() {
|
||||
}()
|
||||
}
|
||||
|
||||
func (g *globalLock) refresh() bool {
|
||||
func (g *globalLock) refreshExec() bool {
|
||||
script := `
|
||||
local token = redis.call('get',KEYS[1])
|
||||
if token == ARGV[1]
|
||||
|
||||
@@ -46,7 +46,6 @@ func TestLockx(t *testing.T) {
|
||||
fmt.Println("lock error")
|
||||
}
|
||||
defer lock.Unlock()
|
||||
lock.Refresh()
|
||||
|
||||
fmt.Println("ssss")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user