优化redis的使用
This commit is contained in:
+8
-8
@@ -28,7 +28,7 @@ var clusterWorkerList sync.Map
|
||||
|
||||
type Cluster struct {
|
||||
ctx context.Context
|
||||
redis *redis.Client
|
||||
redis redis.UniversalClient
|
||||
cache *cachex.Cache
|
||||
logger Logger
|
||||
keyPrefix string // key前缀
|
||||
@@ -43,7 +43,7 @@ var clu *Cluster = nil
|
||||
|
||||
// 初始化定时器
|
||||
// 全局只需要初始化一次
|
||||
func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts ...Option) *Cluster {
|
||||
func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) *Cluster {
|
||||
|
||||
clusterOnceLimit.Do(func() {
|
||||
op := newOptions(opts...)
|
||||
@@ -65,7 +65,7 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts
|
||||
|
||||
timer := time.NewTicker(time.Millisecond * 200)
|
||||
|
||||
go func(ctx context.Context, red *redis.Client) {
|
||||
go func(ctx context.Context) {
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
@@ -76,7 +76,7 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
}(ctx, red)
|
||||
}(ctx)
|
||||
})
|
||||
return clu
|
||||
}
|
||||
@@ -341,7 +341,7 @@ func (c *Cluster) watch() {
|
||||
c.redis.SAdd(c.ctx, c.setKey, keys[1])
|
||||
continue
|
||||
}
|
||||
go c.doTask(c.ctx, c.redis, keys[1])
|
||||
go c.doTask(c.ctx, keys[1])
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -364,14 +364,14 @@ func (c *Cluster) watch() {
|
||||
c.redis.SAdd(c.ctx, c.setKey, taskId)
|
||||
continue
|
||||
}
|
||||
go c.doTask(c.ctx, c.redis, taskId)
|
||||
go c.doTask(c.ctx, taskId)
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
// 执行任务
|
||||
func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string) {
|
||||
func (c *Cluster) doTask(ctx context.Context, taskId string) {
|
||||
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
@@ -387,7 +387,7 @@ func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string)
|
||||
t := val.(timerStr)
|
||||
|
||||
// 这里加一个全局锁
|
||||
lock := lockx.NewGlobalLock(ctx, red, taskId)
|
||||
lock := lockx.NewGlobalLock(ctx, c.redis, taskId)
|
||||
tB := lock.Lock()
|
||||
if !tB {
|
||||
c.logger.Errorf(ctx, "doTask timer:获取锁失败:%s", taskId)
|
||||
|
||||
Reference in New Issue
Block a user