From 037d8cf107395f88a0b30baa4a7fd6e15a2c0441 Mon Sep 17 00:00:00 2001 From: Yun Date: Thu, 24 Jul 2025 17:13:17 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=97=A5=E5=BF=97=E5=92=8C?= =?UTF-8?q?=E5=B0=81=E8=A3=85=E4=BC=98=E5=85=88=E7=BA=A7=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 3 +- logger.go => logger/logger.go | 2 +- once.go | 3 +- option.go | 12 ++- priority/option.go | 56 ++++++++++++++ priority/priority.go | 141 ++++++++++++++++++++++++++++++++++ priority/priority_test.go | 44 +++++++++++ single.go | 3 +- 8 files changed, 256 insertions(+), 8 deletions(-) rename logger.go => logger/logger.go (97%) create mode 100644 priority/option.go create mode 100644 priority/priority.go create mode 100644 priority/priority_test.go diff --git a/cluster.go b/cluster.go index 007f7a2..b49cd95 100644 --- a/cluster.go +++ b/cluster.go @@ -14,6 +14,7 @@ import ( uuid "github.com/satori/go.uuid" "github.com/yuninks/cachex" "github.com/yuninks/lockx" + "github.com/yuninks/timerx/logger" ) // 功能描述 @@ -34,7 +35,7 @@ type Cluster struct { redis redis.UniversalClient cache *cachex.Cache timeout time.Duration - logger Logger + logger logger.Logger keyPrefix string // key前缀 location *time.Location // 根据时区计算的时间 diff --git a/logger.go b/logger/logger.go similarity index 97% rename from logger.go rename to logger/logger.go index 9a461ad..825f247 100644 --- a/logger.go +++ b/logger/logger.go @@ -1,4 +1,4 @@ -package timerx +package logger import ( "context" diff --git a/once.go b/once.go index 49afba9..5295e78 100644 --- a/once.go +++ b/once.go @@ -11,6 +11,7 @@ import ( "github.com/go-redis/redis/v8" uuid "github.com/satori/go.uuid" + "github.com/yuninks/timerx/logger" ) // 功能描述 @@ -21,7 +22,7 @@ import ( // 单次的任务队列 type Once struct { ctx context.Context - logger Logger + logger logger.Logger zsetKey string listKey string redis redis.UniversalClient diff --git a/option.go b/option.go index bf91541..e21a8b5 100644 --- a/option.go +++ b/option.go @@ -1,9 +1,13 @@ package timerx -import "time" +import ( + "time" + + "github.com/yuninks/timerx/logger" +) type Options struct { - logger Logger + logger logger.Logger location *time.Location timeout time.Duration priority int @@ -11,7 +15,7 @@ type Options struct { func defaultOptions() Options { return Options{ - logger: NewLogger(), + logger: logger.NewLogger(), location: time.Local, timeout: time.Hour, priority: 0, @@ -29,7 +33,7 @@ func newOptions(opts ...Option) Options { } // 设置日志 -func SetLogger(log Logger) Option { +func SetLogger(log logger.Logger) Option { return func(o *Options) { o.logger = log } diff --git a/priority/option.go b/priority/option.go new file mode 100644 index 0000000..6bef502 --- /dev/null +++ b/priority/option.go @@ -0,0 +1,56 @@ +package priority + +import ( + "time" + + "github.com/yuninks/timerx/logger" +) + +type Options struct { + priority int // 优先级,数字越大越优先 + updateInterval time.Duration // 更新间隔 + expireTime time.Duration + logger logger.Logger +} + +func defaultOptions() Options { + return Options{ + priority: 0, // 默认优先级 + updateInterval: time.Second * 10, + expireTime: time.Second * 32, + logger: logger.NewLogger(), + } +} + +type Option func(*Options) + +func newOptions(opts ...Option) Options { + o := defaultOptions() + for _, opt := range opts { + opt(&o) + } + return o +} + +func SetPriority(priority int) Option { + return func(o *Options) { + o.priority = priority + } +} + +func SetLogger(log logger.Logger) Option { + return func(o *Options) { + o.logger = log + } +} + +// 有效时间是3个周期 +func SetUpdateInterval(d time.Duration) Option { + if d.Abs() < time.Second { + d = time.Second * 10 + } + return func(o *Options) { + o.updateInterval = d + o.expireTime = d*3 + time.Second + } +} diff --git a/priority/priority.go b/priority/priority.go new file mode 100644 index 0000000..3b92116 --- /dev/null +++ b/priority/priority.go @@ -0,0 +1,141 @@ +package priority + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/go-redis/redis/v8" + "github.com/yuninks/timerx/logger" +) + +// 多版本场景判断当前是否最新版本 + +type Priority struct { + ctx context.Context + priority int // 优先级 + redis redis.UniversalClient + redisKey string + logger logger.Logger + expireTime time.Duration +} + +func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, opts ...Option) *Priority { + conf := newOptions(opts...) + + pro := &Priority{ + ctx: ctx, + priority: conf.priority, + redis: re, + logger: conf.logger, + redisKey: "timer:priority_" + keyPrefix, + expireTime: conf.expireTime, + } + + // 更新间隔 + updateTnterval := time.NewTicker(conf.updateInterval) + go func(ctx context.Context) { + pro.setPriority() + Loop: + for { + select { + case <-updateTnterval.C: + pro.setPriority() + case <-ctx.Done(): + break Loop + } + } + }(ctx) + + return pro +} + +func (l *Priority) IsLatest(ctx context.Context) bool { + // 加缓存 + str, err := l.redis.Get(l.ctx, l.redisKey).Result() + + if err != nil { + if err == redis.Nil && l.priority == 0 { + return true + } + l.logger.Errorf(l.ctx, "获取全局优先级失败:%s", err.Error()) + return false + } + + strPriority, err := strconv.Atoi(str) + if err != nil { + l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error()) + return false + } + if l.priority >= strPriority { + return true + } + return false +} + +func (l *Priority) setPriority() bool { + // redis lua脚本 + // 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl + script := ` + -- KEYS[1] 是全局优先级的key + local priorityKey = KEYS[1] + -- ARGV[1] 是新的优先级 + local priority = ARGV[1] + -- ARGV[2] 是过期时间 + local expireTime = ARGV[2] + + -- 校验参数完整性 + if not priorityKey or not priority or not expireTime then + return redis.error_reply("Missing required arguments") + end + + -- 尝试将字符串转换为数字 + local currentPriority = redis.call('get', priorityKey) + local currentPriorityNum = tonumber(currentPriority) + local newPriorityNum = tonumber(priority) + + if not currentPriority then + -- 如果当前优先级不存在,则设置新优先级并设置TTL + redis.call('set', priorityKey, priority, 'ex', expireTime) + return { "SET", expireTime } + elseif currentPriorityNum < newPriorityNum then + -- 如果当前优先级小于新优先级,则更新优先级并更新TTL + redis.call('set', priorityKey, priority, 'ex', expireTime) + return { "RESET", expireTime } + elseif currentPriorityNum == newPriorityNum then + -- 优先级相同则更新TTL + redis.call('expire', priorityKey, expireTime) + return { "UPDATE", expireTime } + else + -- 如果当前优先级大于新优先级,则不更新 + return { "NOAUCH", '0' } + end + ` + priority := fmt.Sprintf("%d", l.priority) + + res, err := l.redis.Eval(l.ctx, script, []string{l.redisKey}, priority, l.expireTime.Seconds()).Result() + if err != nil { + l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error()) + return false + } + + l.logger.Infof(l.ctx, "设置全局优先级返回值:%+v", res) + + // 处理返回值,包含操作结果和 TTL + resultArray := res.([]interface{}) + if len(resultArray) < 2 { + l.logger.Errorf(l.ctx, "设置全局优先级失败: 返回值格式不正确") + return false + } + operationResult := resultArray[0].(string) + ttl := resultArray[1].(string) + + if operationResult == "SET" || operationResult == "UPDATE" { + l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority) + return true + } + _ = ttl + l.logger.Infof(l.ctx, "设置全局优先级未更新:%s", priority) + return false +} diff --git a/priority/priority_test.go b/priority/priority_test.go new file mode 100644 index 0000000..6be766f --- /dev/null +++ b/priority/priority_test.go @@ -0,0 +1,44 @@ +package priority_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-redis/redis/v8" + "github.com/yuninks/timerx/priority" +) + +func getRedis() *redis.Client { + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1" + ":" + "6379", + Password: "123456", // no password set + DB: 0, // use default DB + }) + if client == nil { + panic("redis init error") + } + return client +} + +func TestPriority(t *testing.T) { + re := getRedis() + ctx := context.Background() + + ctx,cancel := context.WithCancel(ctx) + defer cancel() + + fmt.Println("ff") + + pro := priority.InitPriority(ctx, re, "test", priority.SetUpdateInterval(time.Second*5)) + + fmt.Println(pro) + + for i := 0; i < 20; i++ { + bb := pro.IsLatest(ctx) + fmt.Println("bb:", bb) + time.Sleep(time.Second) + } + +} diff --git a/single.go b/single.go index ff82e8c..5975108 100644 --- a/single.go +++ b/single.go @@ -10,6 +10,7 @@ import ( "time" uuid "github.com/satori/go.uuid" + "github.com/yuninks/timerx/logger" ) // 简单定时器 @@ -25,7 +26,7 @@ var singleOnceLimit sync.Once // 实现单例 type Single struct { ctx context.Context - logger Logger + logger logger.Logger location *time.Location }