Files
2025-10-04 22:00:08 +08:00

257 lines
5.8 KiB
Go

package priority
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/yuninks/timerx/logger"
)
// 多版本场景判断当前是否最新版本
type Priority struct {
ctx context.Context // 上下文
cancel context.CancelFunc // 取消函数
priority int64 // 优先级
redis redis.UniversalClient // redis
redisKey string // redis key
logger logger.Logger // 日志
expireTime time.Duration // 过期时间
setInterval time.Duration // 尝试set的间隔
getInterval time.Duration // 尝试get的间隔
wg sync.WaitGroup
isLatest bool // 是否是最新版本
latestMux sync.RWMutex // 最新版本锁
instanceId string // 实例ID
}
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) (*Priority, error) {
if re == nil {
return nil, errors.New("redis is nil")
}
conf := newOptions(opts...)
ctx, cancel := context.WithCancel(ctx)
pro := &Priority{
ctx: ctx,
cancel: cancel,
priority: priority,
redis: re,
logger: conf.logger,
redisKey: "timer:priority_" + conf.source + keyPrefix,
expireTime: conf.expireTime,
setInterval: conf.updateInterval,
getInterval: conf.getInterval,
instanceId: conf.instanceId,
}
pro.startDaemon()
return pro, nil
}
func (p *Priority) Close() {
if p.cancel != nil {
p.cancel()
}
p.wg.Wait()
}
// 守护进程
func (l *Priority) startDaemon() {
// 启动更新缓存
l.wg.Add(1)
go l.runUpdateLoop()
l.wg.Add(1)
go l.getLatestLoop()
}
func (p *Priority) runUpdateLoop() {
defer p.wg.Done()
// 立即尝试设置一次优先级
if _, err := p.setPriority(); err != nil {
p.logger.Errorf(p.ctx, "Initial priority set failed: %v", err)
}
ticker := time.NewTicker(p.setInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if _, err := p.setPriority(); err != nil {
p.logger.Errorf(p.ctx, "Priority update failed: %v", err)
}
case <-p.ctx.Done():
return
}
}
}
func (l *Priority) getLatestLoop() {
defer l.wg.Done()
if err := l.getLatest(); err != nil {
l.logger.Errorf(l.ctx, "Priority update failed: %v", err)
}
ticker := time.NewTicker(l.getInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := l.getLatest(); err != nil {
l.logger.Errorf(l.ctx, "Priority update failed: %v", err)
}
case <-l.ctx.Done():
return
}
}
}
func (p *Priority) IsLatest(ctx context.Context) bool {
p.latestMux.RLock()
defer p.latestMux.RUnlock()
return p.isLatest
}
func (p *Priority) setPriority() (string, error) {
script := `
-- KEYS[1] 是全局优先级的key
local priorityKey = KEYS[1]
-- ARGV[1] 是新的优先级
local priority = ARGV[1]
-- ARGV[2] 是过期时间
local expireTime = ARGV[2]
-- 校验参数完整性
if not priorityKey or not priority or not expireTime then
return redis.error_reply("Missing required arguments")
end
-- 尝试将字符串转换为数字
local currentPriority = redis.call('get', priorityKey)
local currentPriorityNum = tonumber(currentPriority)
local newPriorityNum = tonumber(priority)
if not currentPriority then
-- 如果当前优先级不存在,则设置新优先级并设置TTL
redis.call('set', priorityKey, priority, 'ex', expireTime)
return { "SET" }
elseif currentPriorityNum < newPriorityNum then
-- 如果当前优先级小于新优先级,则更新优先级并更新TTL
redis.call('set', priorityKey, priority, 'ex', expireTime)
return { "RESET" }
elseif currentPriorityNum == newPriorityNum then
-- 优先级相同则更新TTL
redis.call('expire', priorityKey, expireTime)
return { "UPDATE" }
else
-- 如果当前优先级大于新优先级,则不更新
return { "NOAUCH" }
end
`
newPriorityStr := strconv.FormatInt(p.priority, 10)
result, err := p.redis.Eval(p.ctx, script, []string{p.redisKey}, newPriorityStr, p.expireTime.Seconds()).Result()
// p.logger.Infof(p.ctx, "Priority update result:%+v err:%+v", result, err)
if err != nil {
p.logger.Errorf(p.ctx, "Priority update err:%s", err.Error())
return "", err
}
// 解析结果
if resultMap, ok := result.([]interface{}); ok && len(resultMap) == 1 {
resultStr := resultMap[0].(string)
return resultStr, nil
}
return "", fmt.Errorf("script error: %v", result)
}
func (l *Priority) getLatest() error {
// 查询Redis获取当前最高优先级
currentPriority, err := l.getCurrentPriority()
l.logger.Infof(l.ctx, "Priority getLatest currentPriority:%d l.priority:%d err:%+v", currentPriority, l.priority, err)
if err != nil {
l.logger.Errorf(l.ctx, "Priority getLatest getCurrentPriority err:%s", err.Error())
return err
}
if currentPriority > l.priority {
// 当前不是最新的
l.latestMux.Lock()
l.isLatest = false
l.latestMux.Unlock()
return nil
}
l.latestMux.Lock()
l.isLatest = true
l.latestMux.Unlock()
return nil
}
func (p *Priority) getCurrentPriority() (int64, error) {
result, err := p.redis.Get(p.ctx, p.redisKey).Result()
if err != nil {
if err == redis.Nil {
// Key不存在,返回0作为默认值
return 0, nil
}
return 0, err
}
priority, err := strconv.ParseInt(result, 10, 64)
if err != nil {
return 0, err
}
return priority, nil
}
// ForceRefresh 强制刷新优先级,用于紧急情况
func (p *Priority) ForceRefresh() error {
_, err := p.setPriority()
if err != nil {
p.logger.Errorf(p.ctx, "Priority ForceRefresh setPriority err:%s", err.Error())
return err
}
err = p.getLatest()
if err != nil {
p.logger.Errorf(p.ctx, "Priority ForceRefresh getLatest err:%s", err.Error())
return err
}
return nil
}
// GetCurrentMaxPriority 获取当前系统中的最大优先级
func (p *Priority) GetCurrentMaxPriority(ctx context.Context) (int64, error) {
return p.getCurrentPriority()
}