Merge branch 'dev' of github.com:yuninks/timerx into dev
This commit is contained in:
+8
-117
@@ -6,7 +6,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -15,6 +14,7 @@ import (
|
|||||||
"github.com/yuninks/cachex"
|
"github.com/yuninks/cachex"
|
||||||
"github.com/yuninks/lockx"
|
"github.com/yuninks/lockx"
|
||||||
"github.com/yuninks/timerx/logger"
|
"github.com/yuninks/timerx/logger"
|
||||||
|
"github.com/yuninks/timerx/priority"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 功能描述
|
// 功能描述
|
||||||
@@ -44,7 +44,7 @@ type Cluster struct {
|
|||||||
listKey string // 可执行的任务列表的key
|
listKey string // 可执行的任务列表的key
|
||||||
setKey string // 重入集合的key
|
setKey string // 重入集合的key
|
||||||
|
|
||||||
priority int // 全局优先级
|
priority *priority.Priority // 全局优先级
|
||||||
priorityKey string // 全局优先级的key
|
priorityKey string // 全局优先级的key
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,7 +65,6 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
|||||||
logger: op.logger,
|
logger: op.logger,
|
||||||
keyPrefix: keyPrefix,
|
keyPrefix: keyPrefix,
|
||||||
location: op.location,
|
location: op.location,
|
||||||
priority: op.priority,
|
|
||||||
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
|
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
|
||||||
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
|
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
|
||||||
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
|
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
|
||||||
@@ -73,26 +72,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
|||||||
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
|
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
|
||||||
}
|
}
|
||||||
|
|
||||||
// 设置锁的超时时间
|
// 初始化优先级
|
||||||
lockx.InitOption(lockx.SetTimeout(op.timeout))
|
clu.priority = priority.InitPriority(ctx, red, clu.priorityKey, op.priority, priority.SetLogger(clu.logger))
|
||||||
|
|
||||||
// 监听任务
|
// 监听任务
|
||||||
go clu.watch()
|
go clu.watch()
|
||||||
|
|
||||||
priorityTime := time.NewTicker(time.Second * 10)
|
|
||||||
go func(ctx context.Context) {
|
|
||||||
clu.setPriority()
|
|
||||||
Loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-priorityTime.C:
|
|
||||||
clu.setPriority()
|
|
||||||
case <-ctx.Done():
|
|
||||||
break Loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(ctx)
|
|
||||||
|
|
||||||
timer := time.NewTicker(time.Millisecond * 200)
|
timer := time.NewTicker(time.Millisecond * 200)
|
||||||
|
|
||||||
go func(ctx context.Context) {
|
go func(ctx context.Context) {
|
||||||
@@ -100,7 +85,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
if !clu.canRun() {
|
if !clu.priority.IsLatest(ctx) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
clu.getTask()
|
clu.getTask()
|
||||||
@@ -110,104 +95,10 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(ctx)
|
}(ctx)
|
||||||
// })
|
|
||||||
return clu
|
return clu
|
||||||
}
|
}
|
||||||
|
|
||||||
// 判断是否可执行
|
|
||||||
func (l *Cluster) canRun() bool {
|
|
||||||
// 加缓存
|
|
||||||
str, err := l.redis.Get(l.ctx, l.priorityKey).Result()
|
|
||||||
|
|
||||||
fmt.Println(str, err)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
if err == redis.Nil && l.priority == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
l.logger.Errorf(l.ctx, "获取全局优先级失败:%s", err.Error())
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
strPriority, err := strconv.Atoi(str)
|
|
||||||
if err != nil {
|
|
||||||
l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error())
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if l.priority >= strPriority {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// 设置全局优先级
|
|
||||||
func (l *Cluster) setPriority() bool {
|
|
||||||
// redis lua脚本
|
|
||||||
// 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl
|
|
||||||
script := `
|
|
||||||
-- KEYS[1] 是全局优先级的key
|
|
||||||
local priorityKey = KEYS[1]
|
|
||||||
-- ARGV[1] 是新的优先级
|
|
||||||
local priority = ARGV[1]
|
|
||||||
-- ARGV[2] 是过期时间
|
|
||||||
local expireTime = ARGV[2]
|
|
||||||
|
|
||||||
-- 校验参数完整性
|
|
||||||
if not priorityKey or not priority or not expireTime then
|
|
||||||
return redis.error_reply("Missing required arguments")
|
|
||||||
end
|
|
||||||
|
|
||||||
-- 尝试将字符串转换为数字
|
|
||||||
local currentPriority = redis.call('get', priorityKey)
|
|
||||||
local currentPriorityNum = tonumber(currentPriority)
|
|
||||||
local newPriorityNum = tonumber(priority)
|
|
||||||
|
|
||||||
if not currentPriority then
|
|
||||||
-- 如果当前优先级不存在,则设置新优先级并设置TTL
|
|
||||||
redis.call('set', priorityKey, priority, 'ex', expireTime)
|
|
||||||
return { "SET", expireTime }
|
|
||||||
elseif currentPriorityNum < newPriorityNum then
|
|
||||||
-- 如果当前优先级小于新优先级,则更新优先级并更新TTL
|
|
||||||
redis.call('set', priorityKey, priority, 'ex', expireTime)
|
|
||||||
return { "RESET", expireTime }
|
|
||||||
elseif currentPriorityNum == newPriorityNum then
|
|
||||||
-- 优先级相同则更新TTL
|
|
||||||
redis.call('expire', priorityKey, expireTime)
|
|
||||||
return { "UPDATE", expireTime }
|
|
||||||
else
|
|
||||||
-- 如果当前优先级大于新优先级,则不更新
|
|
||||||
return { "NOAUCH", '0' }
|
|
||||||
end
|
|
||||||
`
|
|
||||||
priority := fmt.Sprintf("%d", l.priority)
|
|
||||||
|
|
||||||
expireTime := (time.Second * 30).Seconds() // 设置过期时间为1分钟
|
|
||||||
res, err := l.redis.Eval(l.ctx, script, []string{l.priorityKey}, priority, expireTime).Result()
|
|
||||||
if err != nil {
|
|
||||||
l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error())
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("设置全局优先级返回值:%+v", res)
|
|
||||||
|
|
||||||
// 处理返回值,包含操作结果和 TTL
|
|
||||||
resultArray := res.([]interface{})
|
|
||||||
if len(resultArray) < 2 {
|
|
||||||
l.logger.Errorf(l.ctx, "设置全局优先级失败: 返回值格式不正确")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
operationResult := resultArray[0].(string)
|
|
||||||
ttl := resultArray[1].(string)
|
|
||||||
|
|
||||||
if operationResult == "SET" || operationResult == "UPDATE" {
|
|
||||||
l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
_ = ttl
|
|
||||||
l.logger.Infof(l.ctx, "设置全局优先级未更新:%s", priority)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// 每月执行一次
|
// 每月执行一次
|
||||||
// @param ctx 上下文
|
// @param ctx 上下文
|
||||||
// @param taskId 任务ID
|
// @param taskId 任务ID
|
||||||
@@ -468,7 +359,7 @@ func (c *Cluster) watch() {
|
|||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
|
||||||
if !c.canRun() {
|
if !c.priority.IsLatest(c.ctx) {
|
||||||
// 如果全局优先级不满足就不执行
|
// 如果全局优先级不满足就不执行
|
||||||
time.Sleep(time.Second * 5)
|
time.Sleep(time.Second * 5)
|
||||||
continue
|
continue
|
||||||
@@ -502,7 +393,7 @@ func (c *Cluster) watch() {
|
|||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
|
|
||||||
if !c.canRun() {
|
if !c.priority.IsLatest(c.ctx) {
|
||||||
// 如果全局优先级不满足就不执行
|
// 如果全局优先级不满足就不执行
|
||||||
time.Sleep(time.Second * 5)
|
time.Sleep(time.Second * 5)
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
uuid "github.com/satori/go.uuid"
|
uuid "github.com/satori/go.uuid"
|
||||||
"github.com/yuninks/timerx/logger"
|
"github.com/yuninks/timerx/logger"
|
||||||
|
"github.com/yuninks/timerx/priority"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 功能描述
|
// 功能描述
|
||||||
@@ -27,6 +28,8 @@ type Once struct {
|
|||||||
redis redis.UniversalClient
|
redis redis.UniversalClient
|
||||||
worker Callback
|
worker Callback
|
||||||
keyPrefix string
|
keyPrefix string
|
||||||
|
priority *priority.Priority // 全局优先级
|
||||||
|
priorityKey string // 全局优先级的key
|
||||||
}
|
}
|
||||||
|
|
||||||
type OnceWorkerResp struct {
|
type OnceWorkerResp struct {
|
||||||
@@ -67,10 +70,14 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
|
|||||||
logger: op.logger,
|
logger: op.logger,
|
||||||
zsetKey: "timer:once_zsetkey" + keyPrefix,
|
zsetKey: "timer:once_zsetkey" + keyPrefix,
|
||||||
listKey: "timer:once_listkey" + keyPrefix,
|
listKey: "timer:once_listkey" + keyPrefix,
|
||||||
|
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
|
||||||
redis: re,
|
redis: re,
|
||||||
worker: call,
|
worker: call,
|
||||||
keyPrefix: keyPrefix,
|
keyPrefix: keyPrefix,
|
||||||
}
|
}
|
||||||
|
// 初始化优先级
|
||||||
|
wo.priority = priority.InitPriority(ctx, re, wo.priorityKey, op.priority, priority.SetLogger(wo.logger))
|
||||||
|
|
||||||
go wo.getTask()
|
go wo.getTask()
|
||||||
go wo.watch()
|
go wo.watch()
|
||||||
// })
|
// })
|
||||||
@@ -169,6 +176,10 @@ Loop:
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
|
if !w.priority.IsLatest(w.ctx) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
script := `
|
script := `
|
||||||
local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2])
|
local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2])
|
||||||
for i,v in ipairs(token) do
|
for i,v in ipairs(token) do
|
||||||
@@ -189,6 +200,11 @@ Loop:
|
|||||||
// 监听任务
|
// 监听任务
|
||||||
func (w *Once) watch() {
|
func (w *Once) watch() {
|
||||||
for {
|
for {
|
||||||
|
if !w.priority.IsLatest(w.ctx) {
|
||||||
|
time.Sleep(time.Second * 5)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
|
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// fmt.Println("watch err:", err)
|
// fmt.Println("watch err:", err)
|
||||||
|
|||||||
+1
-1
@@ -8,7 +8,7 @@ import (
|
|||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
updateInterval time.Duration // 更新间隔
|
updateInterval time.Duration // 更新间隔
|
||||||
expireTime time.Duration
|
expireTime time.Duration // 有效时间
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+13
-2
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
@@ -19,6 +20,8 @@ type Priority struct {
|
|||||||
redisKey string
|
redisKey string
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
expireTime time.Duration
|
expireTime time.Duration
|
||||||
|
updateInterval time.Duration // 更新间隔
|
||||||
|
deadTime int64 // 缓存时间戳,单位秒
|
||||||
}
|
}
|
||||||
|
|
||||||
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int, opts ...Option) *Priority {
|
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int, opts ...Option) *Priority {
|
||||||
@@ -31,16 +34,17 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin
|
|||||||
logger: conf.logger,
|
logger: conf.logger,
|
||||||
redisKey: "timer:priority_" + keyPrefix,
|
redisKey: "timer:priority_" + keyPrefix,
|
||||||
expireTime: conf.expireTime,
|
expireTime: conf.expireTime,
|
||||||
|
updateInterval: conf.updateInterval,
|
||||||
}
|
}
|
||||||
|
|
||||||
// 更新间隔
|
// 更新间隔
|
||||||
updateTnterval := time.NewTicker(conf.updateInterval)
|
ut := time.NewTicker(conf.updateInterval)
|
||||||
go func(ctx context.Context) {
|
go func(ctx context.Context) {
|
||||||
pro.setPriority()
|
pro.setPriority()
|
||||||
Loop:
|
Loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-updateTnterval.C:
|
case <-ut.C:
|
||||||
pro.setPriority()
|
pro.setPriority()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
break Loop
|
break Loop
|
||||||
@@ -53,6 +57,10 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin
|
|||||||
|
|
||||||
func (l *Priority) IsLatest(ctx context.Context) bool {
|
func (l *Priority) IsLatest(ctx context.Context) bool {
|
||||||
// 加缓存
|
// 加缓存
|
||||||
|
if atomic.LoadInt64(&l.deadTime) > time.Now().Unix() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
str, err := l.redis.Get(l.ctx, l.redisKey).Result()
|
str, err := l.redis.Get(l.ctx, l.redisKey).Result()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -133,6 +141,9 @@ func (l *Priority) setPriority() bool {
|
|||||||
|
|
||||||
if operationResult == "SET" || operationResult == "UPDATE" {
|
if operationResult == "SET" || operationResult == "UPDATE" {
|
||||||
l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority)
|
l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority)
|
||||||
|
|
||||||
|
atomic.StoreInt64(&l.deadTime, time.Now().Add(l.updateInterval).Unix())
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
_ = ttl
|
_ = ttl
|
||||||
|
|||||||
Reference in New Issue
Block a user