diff --git a/cluster.go b/cluster.go index d003926..e33c33b 100644 --- a/cluster.go +++ b/cluster.go @@ -46,6 +46,7 @@ type Cluster struct { priority *priority.Priority // 全局优先级 priorityKey string // 全局优先级的key + usePriority bool } // var clu *Cluster = nil @@ -70,10 +71,14 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin listKey: "timer:cluster_listKey" + keyPrefix, // 列表 setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key + usePriority: op.usePriority, } // 初始化优先级 - clu.priority = priority.InitPriority(ctx, red, clu.priorityKey, op.priority, priority.SetLogger(clu.logger)) + + if clu.usePriority { + clu.priority = priority.InitPriority(ctx, red, clu.priorityKey, op.priorityVal, priority.SetLogger(clu.logger)) + } // 监听任务 go clu.watch() @@ -85,9 +90,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin for { select { case <-timer.C: - if !clu.priority.IsLatest(ctx) { - continue + if clu.usePriority { + if !clu.priority.IsLatest(ctx) { + continue + } } + clu.getTask() clu.getNextTime() case <-ctx.Done(): @@ -358,11 +366,12 @@ func (c *Cluster) watch() { // 执行任务 go func() { for { - - if !c.priority.IsLatest(c.ctx) { - // 如果全局优先级不满足就不执行 - time.Sleep(time.Second * 5) - continue + if c.usePriority { + if !c.priority.IsLatest(c.ctx) { + // 如果全局优先级不满足就不执行 + time.Sleep(time.Second * 5) + continue + } } keys, err := c.redis.BLPop(c.ctx, time.Second*10, c.listKey).Result() @@ -392,11 +401,12 @@ func (c *Cluster) watch() { // 处理重入任务 go func() { for { - - if !c.priority.IsLatest(c.ctx) { - // 如果全局优先级不满足就不执行 - time.Sleep(time.Second * 5) - continue + if c.usePriority { + if !c.priority.IsLatest(c.ctx) { + // 如果全局优先级不满足就不执行 + time.Sleep(time.Second * 5) + continue + } } res, err := c.redis.SPop(c.ctx, c.setKey).Result() diff --git a/cmd/main.go b/cmd/main.go index f4c6796..b5b4cac 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -64,7 +64,7 @@ type OnceData struct { type OnceWorker struct{} -func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *timerx.OnceWorkerResp { +func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, taskId string, attachData interface{}) *timerx.OnceWorkerResp { fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println(taskType, taskId) diff --git a/once.go b/once.go index e30f1cb..fee6129 100644 --- a/once.go +++ b/once.go @@ -30,6 +30,7 @@ type Once struct { keyPrefix string priority *priority.Priority // 全局优先级 priorityKey string // 全局优先级的key + usePriority bool } type OnceWorkerResp struct { @@ -73,12 +74,15 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c zsetKey: "timer:once_zsetkey" + keyPrefix, listKey: "timer:once_listkey" + keyPrefix, priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key + usePriority: op.usePriority, redis: re, worker: call, keyPrefix: keyPrefix, } // 初始化优先级 - wo.priority = priority.InitPriority(ctx, re, wo.priorityKey, op.priority, priority.SetLogger(wo.logger)) + if wo.usePriority { + wo.priority = priority.InitPriority(ctx, re, wo.priorityKey, op.priorityVal, priority.SetLogger(wo.logger)) + } go wo.getTask() go wo.watch() @@ -178,8 +182,10 @@ Loop: for { select { case <-timer.C: - if !w.priority.IsLatest(w.ctx) { - continue + if w.usePriority { + if !w.priority.IsLatest(w.ctx) { + continue + } } script := ` @@ -202,9 +208,11 @@ Loop: // 监听任务 func (w *Once) watch() { for { - if !w.priority.IsLatest(w.ctx) { - time.Sleep(time.Second * 5) - continue + if w.usePriority { + if !w.priority.IsLatest(w.ctx) { + time.Sleep(time.Second * 5) + continue + } } keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() diff --git a/option.go b/option.go index e21a8b5..d369849 100644 --- a/option.go +++ b/option.go @@ -7,18 +7,20 @@ import ( ) type Options struct { - logger logger.Logger - location *time.Location - timeout time.Duration - priority int + logger logger.Logger + location *time.Location + timeout time.Duration + usePriority bool + priorityVal int } func defaultOptions() Options { return Options{ - logger: logger.NewLogger(), - location: time.Local, - timeout: time.Hour, - priority: 0, + logger: logger.NewLogger(), + location: time.Local, + timeout: time.Hour, + usePriority: false, + priorityVal: 0, } } @@ -56,6 +58,7 @@ func SetTimeout(d time.Duration) Option { // 设置优先级 func SetPriority(priority int) Option { return func(o *Options) { - o.priority = priority + o.usePriority = true + o.priorityVal = priority } } diff --git a/priority/version.go b/priority/version.go new file mode 100644 index 0000000..a327120 --- /dev/null +++ b/priority/version.go @@ -0,0 +1,55 @@ +package priority + +import ( + "errors" + "math" + "strconv" + "strings" +) + +var ( + ErrVersionFormat = errors.New("version format error") +) + +// 版本号转策略等级 +func PriorityByVersion(version string) (priority int64, err error) { + // 版本不能为空 + if version == "" { + return 0, ErrVersionFormat + } + + // 除掉版本号中的v或V + if version[0] == 'v' || version[0] == 'V' { + version = version[1:] + } + // 用点号切割 + vs := strings.Split(version, ".") + // 最多只支持5位 + if len(vs) > 5 { + return 0, ErrVersionFormat + } + + // base 16位 + var baseNum float64 = 0 + + // 每一位转成数字&每一位不能大于999 + for key, val := range vs { + if val == "" { + return 0, ErrVersionFormat + } + i, err := strconv.ParseInt(val, 10, 32) + if err != nil { + return 0, ErrVersionFormat + } + if i <= 0 || i > 999 { + return 0, ErrVersionFormat + } + p := (4 - key) * 3 + num := math.Pow10(p) * float64(i) + + baseNum += num + + } + + return int64(baseNum), nil +} diff --git a/priority/version_test.go b/priority/version_test.go new file mode 100644 index 0000000..b3028c2 --- /dev/null +++ b/priority/version_test.go @@ -0,0 +1,109 @@ +package priority_test + +import ( + "testing" + + "github.com/yuninks/timerx/priority" +) + + +func TestVersionToPriority(t *testing.T) { + tests := []struct { + name string + version string + want int64 + wantErr bool + }{ + { + name: "standard version", + version: "1.2.3", + want: 1002003000000, + wantErr: false, + }, + { + name: "version with v prefix", + version: "v1.2.3", + want: 1002003000000, + wantErr: false, + }, + { + name: "version with V prefix", + version: "V1.2.3", + want: 1002003000000, + wantErr: false, + }, + { + name: "single digit version", + version: "5", + want: 5000000000000, + wantErr: false, + }, + { + name: "max digits version", + version: "999.999.999.999.999", + want: 999999999999999, + wantErr: false, + }, + { + name: "empty version", + version: "", + want: 0, + wantErr: true, + }, + { + name: "invalid character", + version: "1.a.3", + want: 0, + wantErr: true, + }, + { + name: "zero version part", + version: "1.0.3", + want: 0, + wantErr: true, + }, + { + name: "zero version part 2", + version: "1.0.3.", + want: 0, + wantErr: true, + }, + { + name: "negative version part", + version: "1.-2.3", + want: 0, + wantErr: true, + }, + { + name: "version part too large", + version: "1.1000.3", + want: 0, + wantErr: true, + }, + { + name: "too many parts", + version: "1.2.3.4.5.6", + want: 0, + wantErr: true, + }, + { + name: "empty part", + version: "1..3", + want: 0, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := priority.PriorityByVersion(tt.version) + if (err != nil) != tt.wantErr { + t.Errorf("VersionToPriority() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("VersionToPriority() = %v, want %v", got, tt.want) + } + }) + } +}