diff --git a/cluster.go b/cluster.go index 622d42d..6f61114 100644 --- a/cluster.go +++ b/cluster.go @@ -28,7 +28,7 @@ var clusterWorkerList sync.Map type Cluster struct { ctx context.Context - redis *redis.Client + redis redis.UniversalClient cache *cachex.Cache logger Logger keyPrefix string // key前缀 @@ -43,7 +43,7 @@ var clu *Cluster = nil // 初始化定时器 // 全局只需要初始化一次 -func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts ...Option) *Cluster { +func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) *Cluster { clusterOnceLimit.Do(func() { op := newOptions(opts...) @@ -65,7 +65,7 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts timer := time.NewTicker(time.Millisecond * 200) - go func(ctx context.Context, red *redis.Client) { + go func(ctx context.Context) { Loop: for { select { @@ -76,7 +76,7 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts break Loop } } - }(ctx, red) + }(ctx) }) return clu } @@ -341,7 +341,7 @@ func (c *Cluster) watch() { c.redis.SAdd(c.ctx, c.setKey, keys[1]) continue } - go c.doTask(c.ctx, c.redis, keys[1]) + go c.doTask(c.ctx, keys[1]) } }() @@ -364,14 +364,14 @@ func (c *Cluster) watch() { c.redis.SAdd(c.ctx, c.setKey, taskId) continue } - go c.doTask(c.ctx, c.redis, taskId) + go c.doTask(c.ctx, taskId) } }() } // 执行任务 -func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string) { +func (c *Cluster) doTask(ctx context.Context, taskId string) { defer func() { if err := recover(); err != nil { @@ -387,7 +387,7 @@ func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string) t := val.(timerStr) // 这里加一个全局锁 - lock := lockx.NewGlobalLock(ctx, red, taskId) + lock := lockx.NewGlobalLock(ctx, c.redis, taskId) tB := lock.Lock() if !tB { c.logger.Errorf(ctx, "doTask timer:获取锁失败:%s", taskId) diff --git a/go.mod b/go.mod index 211d877..bd6a1c1 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.19 require ( github.com/go-redis/redis/v8 v8.11.5 github.com/yuninks/cachex v1.0.5 - github.com/yuninks/lockx v1.0.1 + github.com/yuninks/lockx v1.0.2 ) require ( diff --git a/go.sum b/go.sum index 5ec3fe7..91e4db7 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,3 @@ -code.yun.ink/open/timer v1.0.1 h1:ZWecU5K0rFB15p8DZubozTEwo1vrO4mUCRwEoD1tbEQ= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= @@ -11,8 +10,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM= github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM= -github.com/yuninks/lockx v1.0.1 h1:rBUCFZIR6ABR6QLZFp27O9jpGv/TKjmjCKkk1H2lBjI= -github.com/yuninks/lockx v1.0.1/go.mod h1:AHTVVhM7nZ/p+LVmLQbLneOjM9cn/C5zZ+EpXPi0MMs= +github.com/yuninks/lockx v1.0.2 h1:p0n791WmsU8D7YF2tQaNLwPE75jdd774unlJZRTNfaw= +github.com/yuninks/lockx v1.0.2/go.mod h1:J6wvuUELLcMn6FCmiZFt7K5w1QQAh1myL7h3JrZaQiQ= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=