优化策略的使用

This commit is contained in:
Yun
2025-08-27 15:52:09 +08:00
parent d39a8b14ee
commit 503cabdcbf
6 changed files with 214 additions and 29 deletions
+13 -3
View File
@@ -46,6 +46,7 @@ type Cluster struct {
priority *priority.Priority // 全局优先级
priorityKey string // 全局优先级的key
usePriority bool
}
// var clu *Cluster = nil
@@ -70,10 +71,14 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
usePriority: op.usePriority,
}
// 初始化优先级
clu.priority = priority.InitPriority(ctx, red, clu.priorityKey, op.priority, priority.SetLogger(clu.logger))
if clu.usePriority {
clu.priority = priority.InitPriority(ctx, red, clu.priorityKey, op.priorityVal, priority.SetLogger(clu.logger))
}
// 监听任务
go clu.watch()
@@ -85,9 +90,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
for {
select {
case <-timer.C:
if clu.usePriority {
if !clu.priority.IsLatest(ctx) {
continue
}
}
clu.getTask()
clu.getNextTime()
case <-ctx.Done():
@@ -358,12 +366,13 @@ func (c *Cluster) watch() {
// 执行任务
go func() {
for {
if c.usePriority {
if !c.priority.IsLatest(c.ctx) {
// 如果全局优先级不满足就不执行
time.Sleep(time.Second * 5)
continue
}
}
keys, err := c.redis.BLPop(c.ctx, time.Second*10, c.listKey).Result()
if err != nil {
@@ -392,12 +401,13 @@ func (c *Cluster) watch() {
// 处理重入任务
go func() {
for {
if c.usePriority {
if !c.priority.IsLatest(c.ctx) {
// 如果全局优先级不满足就不执行
time.Sleep(time.Second * 5)
continue
}
}
res, err := c.redis.SPop(c.ctx, c.setKey).Result()
if err != nil {
+1 -1
View File
@@ -64,7 +64,7 @@ type OnceData struct {
type OnceWorker struct{}
func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *timerx.OnceWorkerResp {
func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, taskId string, attachData interface{}) *timerx.OnceWorkerResp {
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(taskType, taskId)
+9 -1
View File
@@ -30,6 +30,7 @@ type Once struct {
keyPrefix string
priority *priority.Priority // 全局优先级
priorityKey string // 全局优先级的key
usePriority bool
}
type OnceWorkerResp struct {
@@ -73,12 +74,15 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
zsetKey: "timer:once_zsetkey" + keyPrefix,
listKey: "timer:once_listkey" + keyPrefix,
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
usePriority: op.usePriority,
redis: re,
worker: call,
keyPrefix: keyPrefix,
}
// 初始化优先级
wo.priority = priority.InitPriority(ctx, re, wo.priorityKey, op.priority, priority.SetLogger(wo.logger))
if wo.usePriority {
wo.priority = priority.InitPriority(ctx, re, wo.priorityKey, op.priorityVal, priority.SetLogger(wo.logger))
}
go wo.getTask()
go wo.watch()
@@ -178,9 +182,11 @@ Loop:
for {
select {
case <-timer.C:
if w.usePriority {
if !w.priority.IsLatest(w.ctx) {
continue
}
}
script := `
local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2])
@@ -202,10 +208,12 @@ Loop:
// 监听任务
func (w *Once) watch() {
for {
if w.usePriority {
if !w.priority.IsLatest(w.ctx) {
time.Sleep(time.Second * 5)
continue
}
}
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
if err != nil {
+6 -3
View File
@@ -10,7 +10,8 @@ type Options struct {
logger logger.Logger
location *time.Location
timeout time.Duration
priority int
usePriority bool
priorityVal int
}
func defaultOptions() Options {
@@ -18,7 +19,8 @@ func defaultOptions() Options {
logger: logger.NewLogger(),
location: time.Local,
timeout: time.Hour,
priority: 0,
usePriority: false,
priorityVal: 0,
}
}
@@ -56,6 +58,7 @@ func SetTimeout(d time.Duration) Option {
// 设置优先级
func SetPriority(priority int) Option {
return func(o *Options) {
o.priority = priority
o.usePriority = true
o.priorityVal = priority
}
}
+55
View File
@@ -0,0 +1,55 @@
package priority
import (
"errors"
"math"
"strconv"
"strings"
)
var (
ErrVersionFormat = errors.New("version format error")
)
// 版本号转策略等级
func PriorityByVersion(version string) (priority int64, err error) {
// 版本不能为空
if version == "" {
return 0, ErrVersionFormat
}
// 除掉版本号中的v或V
if version[0] == 'v' || version[0] == 'V' {
version = version[1:]
}
// 用点号切割
vs := strings.Split(version, ".")
// 最多只支持5位
if len(vs) > 5 {
return 0, ErrVersionFormat
}
// base 16位
var baseNum float64 = 0
// 每一位转成数字&每一位不能大于999
for key, val := range vs {
if val == "" {
return 0, ErrVersionFormat
}
i, err := strconv.ParseInt(val, 10, 32)
if err != nil {
return 0, ErrVersionFormat
}
if i <= 0 || i > 999 {
return 0, ErrVersionFormat
}
p := (4 - key) * 3
num := math.Pow10(p) * float64(i)
baseNum += num
}
return int64(baseNum), nil
}
+109
View File
@@ -0,0 +1,109 @@
package priority_test
import (
"testing"
"github.com/yuninks/timerx/priority"
)
func TestVersionToPriority(t *testing.T) {
tests := []struct {
name string
version string
want int64
wantErr bool
}{
{
name: "standard version",
version: "1.2.3",
want: 1002003000000,
wantErr: false,
},
{
name: "version with v prefix",
version: "v1.2.3",
want: 1002003000000,
wantErr: false,
},
{
name: "version with V prefix",
version: "V1.2.3",
want: 1002003000000,
wantErr: false,
},
{
name: "single digit version",
version: "5",
want: 5000000000000,
wantErr: false,
},
{
name: "max digits version",
version: "999.999.999.999.999",
want: 999999999999999,
wantErr: false,
},
{
name: "empty version",
version: "",
want: 0,
wantErr: true,
},
{
name: "invalid character",
version: "1.a.3",
want: 0,
wantErr: true,
},
{
name: "zero version part",
version: "1.0.3",
want: 0,
wantErr: true,
},
{
name: "zero version part 2",
version: "1.0.3.",
want: 0,
wantErr: true,
},
{
name: "negative version part",
version: "1.-2.3",
want: 0,
wantErr: true,
},
{
name: "version part too large",
version: "1.1000.3",
want: 0,
wantErr: true,
},
{
name: "too many parts",
version: "1.2.3.4.5.6",
want: 0,
wantErr: true,
},
{
name: "empty part",
version: "1..3",
want: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := priority.PriorityByVersion(tt.version)
if (err != nil) != tt.wantErr {
t.Errorf("VersionToPriority() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("VersionToPriority() = %v, want %v", got, tt.want)
}
})
}
}