2025-07-24 17:13:17 +08:00
|
|
|
package priority
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2025-09-24 14:50:30 +08:00
|
|
|
"errors"
|
2025-07-24 17:13:17 +08:00
|
|
|
"fmt"
|
|
|
|
|
"strconv"
|
2025-09-18 15:34:01 +08:00
|
|
|
"sync"
|
2025-07-24 17:13:17 +08:00
|
|
|
"time"
|
|
|
|
|
|
2025-10-04 22:00:08 +08:00
|
|
|
"github.com/redis/go-redis/v9"
|
2025-07-24 17:13:17 +08:00
|
|
|
"github.com/yuninks/timerx/logger"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// 多版本场景判断当前是否最新版本
|
|
|
|
|
|
|
|
|
|
type Priority struct {
|
2025-10-04 20:44:16 +08:00
|
|
|
ctx context.Context // 上下文
|
|
|
|
|
cancel context.CancelFunc // 取消函数
|
|
|
|
|
priority int64 // 优先级
|
|
|
|
|
redis redis.UniversalClient // redis
|
|
|
|
|
redisKey string // redis key
|
|
|
|
|
logger logger.Logger // 日志
|
|
|
|
|
expireTime time.Duration // 过期时间
|
2025-09-18 15:34:01 +08:00
|
|
|
|
|
|
|
|
setInterval time.Duration // 尝试set的间隔
|
|
|
|
|
getInterval time.Duration // 尝试get的间隔
|
|
|
|
|
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
|
2025-10-04 20:44:16 +08:00
|
|
|
isLatest bool // 是否是最新版本
|
|
|
|
|
latestMux sync.RWMutex // 最新版本锁
|
|
|
|
|
|
|
|
|
|
instanceId string // 实例ID
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
|
|
|
|
|
2025-09-24 14:50:30 +08:00
|
|
|
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) (*Priority, error) {
|
|
|
|
|
|
2025-09-24 15:06:33 +08:00
|
|
|
if re == nil {
|
2025-09-24 14:50:30 +08:00
|
|
|
return nil, errors.New("redis is nil")
|
|
|
|
|
}
|
|
|
|
|
|
2025-07-24 17:13:17 +08:00
|
|
|
conf := newOptions(opts...)
|
|
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
|
2025-07-24 17:13:17 +08:00
|
|
|
pro := &Priority{
|
2025-09-18 15:34:01 +08:00
|
|
|
ctx: ctx,
|
|
|
|
|
cancel: cancel,
|
|
|
|
|
priority: priority,
|
|
|
|
|
redis: re,
|
|
|
|
|
logger: conf.logger,
|
2025-10-04 20:44:16 +08:00
|
|
|
redisKey: "timer:priority_" + conf.source + keyPrefix,
|
2025-09-18 15:34:01 +08:00
|
|
|
expireTime: conf.expireTime,
|
|
|
|
|
setInterval: conf.updateInterval,
|
|
|
|
|
getInterval: conf.getInterval,
|
2025-10-04 20:44:16 +08:00
|
|
|
instanceId: conf.instanceId,
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
|
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
pro.startDaemon()
|
2025-07-24 17:13:17 +08:00
|
|
|
|
2025-09-24 14:50:30 +08:00
|
|
|
return pro, nil
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
|
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
func (p *Priority) Close() {
|
|
|
|
|
if p.cancel != nil {
|
|
|
|
|
p.cancel()
|
|
|
|
|
}
|
|
|
|
|
p.wg.Wait()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 守护进程
|
|
|
|
|
func (l *Priority) startDaemon() {
|
|
|
|
|
// 启动更新缓存
|
|
|
|
|
l.wg.Add(1)
|
|
|
|
|
go l.runUpdateLoop()
|
2025-08-28 17:45:17 +08:00
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
l.wg.Add(1)
|
|
|
|
|
go l.getLatestLoop()
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Priority) runUpdateLoop() {
|
|
|
|
|
defer p.wg.Done()
|
|
|
|
|
|
|
|
|
|
// 立即尝试设置一次优先级
|
|
|
|
|
if _, err := p.setPriority(); err != nil {
|
2025-09-19 18:52:33 +08:00
|
|
|
p.logger.Errorf(p.ctx, "Initial priority set failed: %v", err)
|
2025-07-25 22:58:45 +08:00
|
|
|
}
|
|
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
ticker := time.NewTicker(p.setInterval)
|
|
|
|
|
defer ticker.Stop()
|
2025-07-24 17:13:17 +08:00
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
if _, err := p.setPriority(); err != nil {
|
2025-09-19 18:52:33 +08:00
|
|
|
p.logger.Errorf(p.ctx, "Priority update failed: %v", err)
|
2025-09-18 15:34:01 +08:00
|
|
|
}
|
|
|
|
|
case <-p.ctx.Done():
|
|
|
|
|
return
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
|
|
|
|
}
|
2025-09-18 15:34:01 +08:00
|
|
|
}
|
2025-07-24 17:13:17 +08:00
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
func (l *Priority) getLatestLoop() {
|
|
|
|
|
|
|
|
|
|
defer l.wg.Done()
|
|
|
|
|
|
|
|
|
|
if err := l.getLatest(); err != nil {
|
|
|
|
|
l.logger.Errorf(l.ctx, "Priority update failed: %v", err)
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
2025-09-18 15:34:01 +08:00
|
|
|
|
|
|
|
|
ticker := time.NewTicker(l.getInterval)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
if err := l.getLatest(); err != nil {
|
|
|
|
|
l.logger.Errorf(l.ctx, "Priority update failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
case <-l.ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
2025-09-18 15:34:01 +08:00
|
|
|
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
|
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
func (p *Priority) IsLatest(ctx context.Context) bool {
|
|
|
|
|
p.latestMux.RLock()
|
|
|
|
|
defer p.latestMux.RUnlock()
|
|
|
|
|
|
|
|
|
|
return p.isLatest
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Priority) setPriority() (string, error) {
|
2025-07-24 17:13:17 +08:00
|
|
|
script := `
|
|
|
|
|
-- KEYS[1] 是全局优先级的key
|
|
|
|
|
local priorityKey = KEYS[1]
|
|
|
|
|
-- ARGV[1] 是新的优先级
|
|
|
|
|
local priority = ARGV[1]
|
|
|
|
|
-- ARGV[2] 是过期时间
|
|
|
|
|
local expireTime = ARGV[2]
|
|
|
|
|
|
|
|
|
|
-- 校验参数完整性
|
|
|
|
|
if not priorityKey or not priority or not expireTime then
|
|
|
|
|
return redis.error_reply("Missing required arguments")
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
-- 尝试将字符串转换为数字
|
|
|
|
|
local currentPriority = redis.call('get', priorityKey)
|
|
|
|
|
local currentPriorityNum = tonumber(currentPriority)
|
|
|
|
|
local newPriorityNum = tonumber(priority)
|
|
|
|
|
|
|
|
|
|
if not currentPriority then
|
|
|
|
|
-- 如果当前优先级不存在,则设置新优先级并设置TTL
|
|
|
|
|
redis.call('set', priorityKey, priority, 'ex', expireTime)
|
2025-09-18 15:34:01 +08:00
|
|
|
return { "SET" }
|
2025-07-24 17:13:17 +08:00
|
|
|
elseif currentPriorityNum < newPriorityNum then
|
|
|
|
|
-- 如果当前优先级小于新优先级,则更新优先级并更新TTL
|
|
|
|
|
redis.call('set', priorityKey, priority, 'ex', expireTime)
|
2025-09-18 15:34:01 +08:00
|
|
|
return { "RESET" }
|
2025-07-24 17:13:17 +08:00
|
|
|
elseif currentPriorityNum == newPriorityNum then
|
|
|
|
|
-- 优先级相同则更新TTL
|
|
|
|
|
redis.call('expire', priorityKey, expireTime)
|
2025-09-18 15:34:01 +08:00
|
|
|
return { "UPDATE" }
|
2025-07-24 17:13:17 +08:00
|
|
|
else
|
|
|
|
|
-- 如果当前优先级大于新优先级,则不更新
|
2025-09-18 15:34:01 +08:00
|
|
|
return { "NOAUCH" }
|
2025-07-24 17:13:17 +08:00
|
|
|
end
|
|
|
|
|
`
|
|
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
newPriorityStr := strconv.FormatInt(p.priority, 10)
|
|
|
|
|
|
|
|
|
|
result, err := p.redis.Eval(p.ctx, script, []string{p.redisKey}, newPriorityStr, p.expireTime.Seconds()).Result()
|
|
|
|
|
// p.logger.Infof(p.ctx, "Priority update result:%+v err:%+v", result, err)
|
2025-07-24 17:13:17 +08:00
|
|
|
if err != nil {
|
2025-09-18 15:34:01 +08:00
|
|
|
p.logger.Errorf(p.ctx, "Priority update err:%s", err.Error())
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 解析结果
|
|
|
|
|
if resultMap, ok := result.([]interface{}); ok && len(resultMap) == 1 {
|
|
|
|
|
resultStr := resultMap[0].(string)
|
|
|
|
|
return resultStr, nil
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
|
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
return "", fmt.Errorf("script error: %v", result)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (l *Priority) getLatest() error {
|
|
|
|
|
// 查询Redis获取当前最高优先级
|
|
|
|
|
currentPriority, err := l.getCurrentPriority()
|
|
|
|
|
|
|
|
|
|
l.logger.Infof(l.ctx, "Priority getLatest currentPriority:%d l.priority:%d err:%+v", currentPriority, l.priority, err)
|
2025-07-24 17:13:17 +08:00
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
if err != nil {
|
|
|
|
|
l.logger.Errorf(l.ctx, "Priority getLatest getCurrentPriority err:%s", err.Error())
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
if currentPriority > l.priority {
|
|
|
|
|
// 当前不是最新的
|
|
|
|
|
l.latestMux.Lock()
|
|
|
|
|
l.isLatest = false
|
|
|
|
|
l.latestMux.Unlock()
|
|
|
|
|
|
|
|
|
|
return nil
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
|
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
l.latestMux.Lock()
|
|
|
|
|
l.isLatest = true
|
|
|
|
|
l.latestMux.Unlock()
|
2025-07-25 22:58:45 +08:00
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
return nil
|
|
|
|
|
}
|
2025-07-25 22:58:45 +08:00
|
|
|
|
2025-09-18 15:34:01 +08:00
|
|
|
func (p *Priority) getCurrentPriority() (int64, error) {
|
|
|
|
|
result, err := p.redis.Get(p.ctx, p.redisKey).Result()
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
// Key不存在,返回0作为默认值
|
|
|
|
|
return 0, nil
|
|
|
|
|
}
|
|
|
|
|
return 0, err
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|
2025-09-18 15:34:01 +08:00
|
|
|
|
|
|
|
|
priority, err := strconv.ParseInt(result, 10, 64)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return priority, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ForceRefresh 强制刷新优先级,用于紧急情况
|
|
|
|
|
func (p *Priority) ForceRefresh() error {
|
|
|
|
|
|
|
|
|
|
_, err := p.setPriority()
|
|
|
|
|
if err != nil {
|
|
|
|
|
p.logger.Errorf(p.ctx, "Priority ForceRefresh setPriority err:%s", err.Error())
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
err = p.getLatest()
|
|
|
|
|
if err != nil {
|
|
|
|
|
p.logger.Errorf(p.ctx, "Priority ForceRefresh getLatest err:%s", err.Error())
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// GetCurrentMaxPriority 获取当前系统中的最大优先级
|
|
|
|
|
func (p *Priority) GetCurrentMaxPriority(ctx context.Context) (int64, error) {
|
|
|
|
|
return p.getCurrentPriority()
|
2025-07-24 17:13:17 +08:00
|
|
|
}
|