集群定时器添加优先级
This commit is contained in:
+142
-11
@@ -6,6 +6,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -41,6 +42,9 @@ type Cluster struct {
|
|||||||
zsetKey string // 有序集合的key
|
zsetKey string // 有序集合的key
|
||||||
listKey string // 可执行的任务列表的key
|
listKey string // 可执行的任务列表的key
|
||||||
setKey string // 重入集合的key
|
setKey string // 重入集合的key
|
||||||
|
|
||||||
|
priority int // 全局优先级
|
||||||
|
priorityKey string // 全局优先级的key
|
||||||
}
|
}
|
||||||
|
|
||||||
var clu *Cluster = nil
|
var clu *Cluster = nil
|
||||||
@@ -53,17 +57,19 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
|||||||
op := newOptions(opts...)
|
op := newOptions(opts...)
|
||||||
|
|
||||||
clu = &Cluster{
|
clu = &Cluster{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
redis: red,
|
redis: red,
|
||||||
cache: cachex.NewCache(),
|
cache: cachex.NewCache(),
|
||||||
timeout: op.timeout,
|
timeout: op.timeout,
|
||||||
logger: op.logger,
|
logger: op.logger,
|
||||||
keyPrefix: keyPrefix,
|
keyPrefix: keyPrefix,
|
||||||
location: op.location,
|
location: op.location,
|
||||||
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
|
priority: op.priority,
|
||||||
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
|
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
|
||||||
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
|
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
|
||||||
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
|
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
|
||||||
|
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
|
||||||
|
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
|
||||||
}
|
}
|
||||||
|
|
||||||
// 设置锁的超时时间
|
// 设置锁的超时时间
|
||||||
@@ -72,6 +78,20 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
|||||||
// 监听任务
|
// 监听任务
|
||||||
go clu.watch()
|
go clu.watch()
|
||||||
|
|
||||||
|
priorityTime := time.NewTicker(time.Second * 10)
|
||||||
|
go func(ctx context.Context) {
|
||||||
|
clu.setPriority()
|
||||||
|
Loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-priorityTime.C:
|
||||||
|
clu.setPriority()
|
||||||
|
case <-ctx.Done():
|
||||||
|
break Loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(ctx)
|
||||||
|
|
||||||
timer := time.NewTicker(time.Millisecond * 200)
|
timer := time.NewTicker(time.Millisecond * 200)
|
||||||
|
|
||||||
go func(ctx context.Context) {
|
go func(ctx context.Context) {
|
||||||
@@ -79,6 +99,9 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
|
if !clu.canRun() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
clu.getTask()
|
clu.getTask()
|
||||||
clu.getNextTime()
|
clu.getNextTime()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -90,6 +113,100 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
|||||||
return clu
|
return clu
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 判断是否可执行
|
||||||
|
func (l *Cluster) canRun() bool {
|
||||||
|
// 加缓存
|
||||||
|
str, err := l.redis.Get(l.ctx, l.priorityKey).Result()
|
||||||
|
|
||||||
|
fmt.Println(str, err)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if err == redis.Nil && l.priority == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
l.logger.Errorf(l.ctx, "获取全局优先级失败:%s", err.Error())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
strPriority, err := strconv.Atoi(str)
|
||||||
|
if err != nil {
|
||||||
|
l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if l.priority >= strPriority {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置全局优先级
|
||||||
|
func (l *Cluster) setPriority() bool {
|
||||||
|
// redis lua脚本
|
||||||
|
// 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl
|
||||||
|
script := `
|
||||||
|
-- KEYS[1] 是全局优先级的key
|
||||||
|
local priorityKey = KEYS[1]
|
||||||
|
-- ARGV[1] 是新的优先级
|
||||||
|
local priority = ARGV[1]
|
||||||
|
-- ARGV[2] 是过期时间
|
||||||
|
local expireTime = ARGV[2]
|
||||||
|
|
||||||
|
-- 校验参数完整性
|
||||||
|
if not priorityKey or not priority or not expireTime then
|
||||||
|
return redis.error_reply("Missing required arguments")
|
||||||
|
end
|
||||||
|
|
||||||
|
-- 尝试将字符串转换为数字
|
||||||
|
local currentPriority = redis.call('get', priorityKey)
|
||||||
|
local currentPriorityNum = tonumber(currentPriority)
|
||||||
|
local newPriorityNum = tonumber(priority)
|
||||||
|
|
||||||
|
if not currentPriority then
|
||||||
|
-- 如果当前优先级不存在,则设置新优先级并设置TTL
|
||||||
|
redis.call('set', priorityKey, priority, 'ex', expireTime)
|
||||||
|
return { "SET", expireTime }
|
||||||
|
elseif currentPriorityNum < newPriorityNum then
|
||||||
|
-- 如果当前优先级小于新优先级,则更新优先级并更新TTL
|
||||||
|
redis.call('set', priorityKey, priority, 'ex', expireTime)
|
||||||
|
return { "RESET", expireTime }
|
||||||
|
elseif currentPriorityNum == newPriorityNum then
|
||||||
|
-- 优先级相同则更新TTL
|
||||||
|
redis.call('expire', priorityKey, expireTime)
|
||||||
|
return { "UPDATE", expireTime }
|
||||||
|
else
|
||||||
|
-- 如果当前优先级大于新优先级,则不更新
|
||||||
|
return { "NOAUCH", '0' }
|
||||||
|
end
|
||||||
|
`
|
||||||
|
priority := fmt.Sprintf("%d", l.priority)
|
||||||
|
|
||||||
|
expireTime := (time.Second*30).Seconds() // 设置过期时间为1分钟
|
||||||
|
res, err := l.redis.Eval(l.ctx, script, []string{l.priorityKey}, priority, expireTime).Result()
|
||||||
|
if err != nil {
|
||||||
|
l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error())
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("设置全局优先级返回值:%+v", res)
|
||||||
|
|
||||||
|
// 处理返回值,包含操作结果和 TTL
|
||||||
|
resultArray := res.([]interface{})
|
||||||
|
if len(resultArray) < 2 {
|
||||||
|
l.logger.Errorf(l.ctx, "设置全局优先级失败: 返回值格式不正确")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
operationResult := resultArray[0].(string)
|
||||||
|
ttl := resultArray[1].(string)
|
||||||
|
|
||||||
|
if operationResult == "SET" || operationResult == "UPDATE" {
|
||||||
|
l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
_ = ttl
|
||||||
|
l.logger.Infof(l.ctx, "设置全局优先级未更新:%s", priority)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// 每月执行一次
|
// 每月执行一次
|
||||||
// @param ctx 上下文
|
// @param ctx 上下文
|
||||||
// @param taskId 任务ID
|
// @param taskId 任务ID
|
||||||
@@ -349,6 +466,13 @@ func (c *Cluster) watch() {
|
|||||||
// 执行任务
|
// 执行任务
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
|
||||||
|
if !c.canRun() {
|
||||||
|
// 如果全局优先级不满足就不执行
|
||||||
|
time.Sleep(time.Second * 5)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
keys, err := c.redis.BLPop(c.ctx, time.Second*10, c.listKey).Result()
|
keys, err := c.redis.BLPop(c.ctx, time.Second*10, c.listKey).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err != redis.Nil {
|
if err != redis.Nil {
|
||||||
@@ -376,6 +500,13 @@ func (c *Cluster) watch() {
|
|||||||
// 处理重入任务
|
// 处理重入任务
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
|
||||||
|
if !c.canRun() {
|
||||||
|
// 如果全局优先级不满足就不执行
|
||||||
|
time.Sleep(time.Second * 5)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
res, err := c.redis.SPop(c.ctx, c.setKey).Result()
|
res, err := c.redis.SPop(c.ctx, c.setKey).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
|
|||||||
+3
-3
@@ -24,8 +24,8 @@ func main() {
|
|||||||
|
|
||||||
// re()
|
// re()
|
||||||
// d()
|
// d()
|
||||||
// cluster()
|
cluster()
|
||||||
once()
|
// once()
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
|
|
||||||
@@ -89,7 +89,7 @@ func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string,
|
|||||||
func cluster() {
|
func cluster() {
|
||||||
client := getRedis()
|
client := getRedis()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
cluster := timerx.InitCluster(ctx, client, "test")
|
cluster := timerx.InitCluster(ctx, client, "test",timerx.SetPriority(101))
|
||||||
err := cluster.EverySpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
|
err := cluster.EverySpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
err = cluster.EveryMinute(ctx, "test_min", 15, aa, "这是分钟任务")
|
err = cluster.EveryMinute(ctx, "test_min", 15, aa, "这是分钟任务")
|
||||||
|
|||||||
@@ -3,16 +3,18 @@ package timerx
|
|||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
logger Logger
|
logger Logger
|
||||||
location *time.Location
|
location *time.Location
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
priority int
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultOptions() Options {
|
func defaultOptions() Options {
|
||||||
return Options{
|
return Options{
|
||||||
logger: NewLogger(),
|
logger: NewLogger(),
|
||||||
location: time.Local,
|
location: time.Local,
|
||||||
timeout: time.Hour,
|
timeout: time.Hour,
|
||||||
|
priority: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -46,3 +48,10 @@ func SetTimeout(d time.Duration) Option {
|
|||||||
o.timeout = d
|
o.timeout = d
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 设置优先级
|
||||||
|
func SetPriority(priority int) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.priority = priority
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user