优化优先级策略在各集群中的使用
This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/go-redis/redis/v8"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"github.com/yuninks/timerx/logger"
|
||||
"github.com/yuninks/timerx/priority"
|
||||
)
|
||||
|
||||
// 功能描述
|
||||
@@ -21,13 +22,15 @@ import (
|
||||
|
||||
// 单次的任务队列
|
||||
type Once struct {
|
||||
ctx context.Context
|
||||
logger logger.Logger
|
||||
zsetKey string
|
||||
listKey string
|
||||
redis redis.UniversalClient
|
||||
worker Callback
|
||||
keyPrefix string
|
||||
ctx context.Context
|
||||
logger logger.Logger
|
||||
zsetKey string
|
||||
listKey string
|
||||
redis redis.UniversalClient
|
||||
worker Callback
|
||||
keyPrefix string
|
||||
priority *priority.Priority // 全局优先级
|
||||
priorityKey string // 全局优先级的key
|
||||
}
|
||||
|
||||
type OnceWorkerResp struct {
|
||||
@@ -64,14 +67,18 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
|
||||
op := newOptions(opts...)
|
||||
once.Do(func() {
|
||||
wo = &Once{
|
||||
ctx: ctx,
|
||||
logger: op.logger,
|
||||
zsetKey: "timer:once_zsetkey" + keyPrefix,
|
||||
listKey: "timer:once_listkey" + keyPrefix,
|
||||
redis: re,
|
||||
worker: call,
|
||||
keyPrefix: keyPrefix,
|
||||
ctx: ctx,
|
||||
logger: op.logger,
|
||||
zsetKey: "timer:once_zsetkey" + keyPrefix,
|
||||
listKey: "timer:once_listkey" + keyPrefix,
|
||||
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
|
||||
redis: re,
|
||||
worker: call,
|
||||
keyPrefix: keyPrefix,
|
||||
}
|
||||
// 初始化优先级
|
||||
wo.priority = priority.InitPriority(ctx, re, wo.priorityKey, op.priority, priority.SetLogger(wo.logger))
|
||||
|
||||
go wo.getTask()
|
||||
go wo.watch()
|
||||
})
|
||||
@@ -170,6 +177,10 @@ Loop:
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
if !w.priority.IsLatest(w.ctx) {
|
||||
continue
|
||||
}
|
||||
|
||||
script := `
|
||||
local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2])
|
||||
for i,v in ipairs(token) do
|
||||
@@ -190,6 +201,11 @@ Loop:
|
||||
// 监听任务
|
||||
func (w *Once) watch() {
|
||||
for {
|
||||
if !w.priority.IsLatest(w.ctx) {
|
||||
time.Sleep(time.Second * 5)
|
||||
continue
|
||||
}
|
||||
|
||||
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
|
||||
if err != nil {
|
||||
// fmt.Println("watch err:", err)
|
||||
|
||||
Reference in New Issue
Block a user