调整日志和封装优先级代码
This commit is contained in:
+2
-1
@@ -14,6 +14,7 @@ import (
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"github.com/yuninks/cachex"
|
||||
"github.com/yuninks/lockx"
|
||||
"github.com/yuninks/timerx/logger"
|
||||
)
|
||||
|
||||
// 功能描述
|
||||
@@ -34,7 +35,7 @@ type Cluster struct {
|
||||
redis redis.UniversalClient
|
||||
cache *cachex.Cache
|
||||
timeout time.Duration
|
||||
logger Logger
|
||||
logger logger.Logger
|
||||
keyPrefix string // key前缀
|
||||
location *time.Location // 根据时区计算的时间
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package timerx
|
||||
package logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"github.com/yuninks/timerx/logger"
|
||||
)
|
||||
|
||||
// 功能描述
|
||||
@@ -21,7 +22,7 @@ import (
|
||||
// 单次的任务队列
|
||||
type Once struct {
|
||||
ctx context.Context
|
||||
logger Logger
|
||||
logger logger.Logger
|
||||
zsetKey string
|
||||
listKey string
|
||||
redis redis.UniversalClient
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
package timerx
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/yuninks/timerx/logger"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
logger Logger
|
||||
logger logger.Logger
|
||||
location *time.Location
|
||||
timeout time.Duration
|
||||
priority int
|
||||
@@ -11,7 +15,7 @@ type Options struct {
|
||||
|
||||
func defaultOptions() Options {
|
||||
return Options{
|
||||
logger: NewLogger(),
|
||||
logger: logger.NewLogger(),
|
||||
location: time.Local,
|
||||
timeout: time.Hour,
|
||||
priority: 0,
|
||||
@@ -29,7 +33,7 @@ func newOptions(opts ...Option) Options {
|
||||
}
|
||||
|
||||
// 设置日志
|
||||
func SetLogger(log Logger) Option {
|
||||
func SetLogger(log logger.Logger) Option {
|
||||
return func(o *Options) {
|
||||
o.logger = log
|
||||
}
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
package priority
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/yuninks/timerx/logger"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
priority int // 优先级,数字越大越优先
|
||||
updateInterval time.Duration // 更新间隔
|
||||
expireTime time.Duration
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
func defaultOptions() Options {
|
||||
return Options{
|
||||
priority: 0, // 默认优先级
|
||||
updateInterval: time.Second * 10,
|
||||
expireTime: time.Second * 32,
|
||||
logger: logger.NewLogger(),
|
||||
}
|
||||
}
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
func newOptions(opts ...Option) Options {
|
||||
o := defaultOptions()
|
||||
for _, opt := range opts {
|
||||
opt(&o)
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
func SetPriority(priority int) Option {
|
||||
return func(o *Options) {
|
||||
o.priority = priority
|
||||
}
|
||||
}
|
||||
|
||||
func SetLogger(log logger.Logger) Option {
|
||||
return func(o *Options) {
|
||||
o.logger = log
|
||||
}
|
||||
}
|
||||
|
||||
// 有效时间是3个周期
|
||||
func SetUpdateInterval(d time.Duration) Option {
|
||||
if d.Abs() < time.Second {
|
||||
d = time.Second * 10
|
||||
}
|
||||
return func(o *Options) {
|
||||
o.updateInterval = d
|
||||
o.expireTime = d*3 + time.Second
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
package priority
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/yuninks/timerx/logger"
|
||||
)
|
||||
|
||||
// 多版本场景判断当前是否最新版本
|
||||
|
||||
type Priority struct {
|
||||
ctx context.Context
|
||||
priority int // 优先级
|
||||
redis redis.UniversalClient
|
||||
redisKey string
|
||||
logger logger.Logger
|
||||
expireTime time.Duration
|
||||
}
|
||||
|
||||
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, opts ...Option) *Priority {
|
||||
conf := newOptions(opts...)
|
||||
|
||||
pro := &Priority{
|
||||
ctx: ctx,
|
||||
priority: conf.priority,
|
||||
redis: re,
|
||||
logger: conf.logger,
|
||||
redisKey: "timer:priority_" + keyPrefix,
|
||||
expireTime: conf.expireTime,
|
||||
}
|
||||
|
||||
// 更新间隔
|
||||
updateTnterval := time.NewTicker(conf.updateInterval)
|
||||
go func(ctx context.Context) {
|
||||
pro.setPriority()
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-updateTnterval.C:
|
||||
pro.setPriority()
|
||||
case <-ctx.Done():
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
}(ctx)
|
||||
|
||||
return pro
|
||||
}
|
||||
|
||||
func (l *Priority) IsLatest(ctx context.Context) bool {
|
||||
// 加缓存
|
||||
str, err := l.redis.Get(l.ctx, l.redisKey).Result()
|
||||
|
||||
if err != nil {
|
||||
if err == redis.Nil && l.priority == 0 {
|
||||
return true
|
||||
}
|
||||
l.logger.Errorf(l.ctx, "获取全局优先级失败:%s", err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
strPriority, err := strconv.Atoi(str)
|
||||
if err != nil {
|
||||
l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error())
|
||||
return false
|
||||
}
|
||||
if l.priority >= strPriority {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (l *Priority) setPriority() bool {
|
||||
// redis lua脚本
|
||||
// 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl
|
||||
script := `
|
||||
-- KEYS[1] 是全局优先级的key
|
||||
local priorityKey = KEYS[1]
|
||||
-- ARGV[1] 是新的优先级
|
||||
local priority = ARGV[1]
|
||||
-- ARGV[2] 是过期时间
|
||||
local expireTime = ARGV[2]
|
||||
|
||||
-- 校验参数完整性
|
||||
if not priorityKey or not priority or not expireTime then
|
||||
return redis.error_reply("Missing required arguments")
|
||||
end
|
||||
|
||||
-- 尝试将字符串转换为数字
|
||||
local currentPriority = redis.call('get', priorityKey)
|
||||
local currentPriorityNum = tonumber(currentPriority)
|
||||
local newPriorityNum = tonumber(priority)
|
||||
|
||||
if not currentPriority then
|
||||
-- 如果当前优先级不存在,则设置新优先级并设置TTL
|
||||
redis.call('set', priorityKey, priority, 'ex', expireTime)
|
||||
return { "SET", expireTime }
|
||||
elseif currentPriorityNum < newPriorityNum then
|
||||
-- 如果当前优先级小于新优先级,则更新优先级并更新TTL
|
||||
redis.call('set', priorityKey, priority, 'ex', expireTime)
|
||||
return { "RESET", expireTime }
|
||||
elseif currentPriorityNum == newPriorityNum then
|
||||
-- 优先级相同则更新TTL
|
||||
redis.call('expire', priorityKey, expireTime)
|
||||
return { "UPDATE", expireTime }
|
||||
else
|
||||
-- 如果当前优先级大于新优先级,则不更新
|
||||
return { "NOAUCH", '0' }
|
||||
end
|
||||
`
|
||||
priority := fmt.Sprintf("%d", l.priority)
|
||||
|
||||
res, err := l.redis.Eval(l.ctx, script, []string{l.redisKey}, priority, l.expireTime.Seconds()).Result()
|
||||
if err != nil {
|
||||
l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error())
|
||||
return false
|
||||
}
|
||||
|
||||
l.logger.Infof(l.ctx, "设置全局优先级返回值:%+v", res)
|
||||
|
||||
// 处理返回值,包含操作结果和 TTL
|
||||
resultArray := res.([]interface{})
|
||||
if len(resultArray) < 2 {
|
||||
l.logger.Errorf(l.ctx, "设置全局优先级失败: 返回值格式不正确")
|
||||
return false
|
||||
}
|
||||
operationResult := resultArray[0].(string)
|
||||
ttl := resultArray[1].(string)
|
||||
|
||||
if operationResult == "SET" || operationResult == "UPDATE" {
|
||||
l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority)
|
||||
return true
|
||||
}
|
||||
_ = ttl
|
||||
l.logger.Infof(l.ctx, "设置全局优先级未更新:%s", priority)
|
||||
return false
|
||||
}
|
||||
@@ -0,0 +1,44 @@
|
||||
package priority_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/yuninks/timerx/priority"
|
||||
)
|
||||
|
||||
func getRedis() *redis.Client {
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: "127.0.0.1" + ":" + "6379",
|
||||
Password: "123456", // no password set
|
||||
DB: 0, // use default DB
|
||||
})
|
||||
if client == nil {
|
||||
panic("redis init error")
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
func TestPriority(t *testing.T) {
|
||||
re := getRedis()
|
||||
ctx := context.Background()
|
||||
|
||||
ctx,cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
fmt.Println("ff")
|
||||
|
||||
pro := priority.InitPriority(ctx, re, "test", priority.SetUpdateInterval(time.Second*5))
|
||||
|
||||
fmt.Println(pro)
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
bb := pro.IsLatest(ctx)
|
||||
fmt.Println("bb:", bb)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user