From a9f37bc4e7a3bd2910a2b52035e7adb503b146e6 Mon Sep 17 00:00:00 2001 From: Yun Date: Tue, 14 Oct 2025 15:07:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E7=9B=B4=E6=8E=A5option?= =?UTF-8?q?=E6=B3=A8=E6=98=8E=E7=89=88=E6=9C=AC=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 15 +++++++++++++-- once.go | 16 ++++++++++++++-- option.go | 54 +++++++++++++++++++++++++++++++++++------------------- 3 files changed, 62 insertions(+), 23 deletions(-) diff --git a/cluster.go b/cluster.go index aa52b20..1e81c6a 100644 --- a/cluster.go +++ b/cluster.go @@ -87,7 +87,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合 - usePriority: op.usePriority, + usePriority: false, stopChan: make(chan struct{}), instanceId: U.String(), cronParser: op.cronParser, @@ -98,7 +98,18 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin // 初始化优先级 - if clu.usePriority { + if op.priorityType != priorityTypeNone { + + clu.usePriority = true + + if op.priorityType == priorityTypeVersion { + pVal, err := priority.PriorityByVersion(op.priorityVersion) + if err != nil { + clu.logger.Errorf(ctx, "PriorityByVersion version:%s err:%v", op.priorityVersion, err) + return nil, err + } + op.priorityVal = pVal + } pri, err := priority.InitPriority( ctx, diff --git a/once.go b/once.go index 493e638..4a2279b 100644 --- a/once.go +++ b/once.go @@ -111,7 +111,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c listKey: "timer:once_listkey" + keyPrefix, executeInfoKey: "timer:once_executeInfoKey" + keyPrefix, globalLockPrefix: "timer:once_globalLockPrefix" + keyPrefix, - usePriority: op.usePriority, + usePriority: false, redis: re, worker: call, keyPrefix: keyPrefix, @@ -126,7 +126,19 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c } // 初始化优先级 - if wo.usePriority { + if op.priorityType != priorityTypeNone { + + wo.usePriority = true + + if op.priorityType == priorityTypeVersion { + pVal, err := priority.PriorityByVersion(op.priorityVersion) + if err != nil { + wo.logger.Errorf(ctx, "PriorityByVersion version:%s err:%v", op.priorityVersion, err) + return nil, err + } + op.priorityVal = pVal + } + pri, err := priority.InitPriority( ctx, re, diff --git a/option.go b/option.go index 6ea190b..c96373c 100644 --- a/option.go +++ b/option.go @@ -8,32 +8,41 @@ import ( ) type Options struct { - logger logger.Logger - location *time.Location - timeout time.Duration // 任务最长执行时间 - usePriority bool - priorityVal int64 - batchSize int - maxRunCount int // 单个任务最大运行次数 0代表不限 - maxWorkers int // 最大工作协程数 - cronParser *cron.Parser // cron表达式解析器 + logger logger.Logger + location *time.Location + timeout time.Duration // 任务最长执行时间 + priorityType priorityType // 策略类型 0.不使用 1.优先级 2.版本 + priorityVal int64 // 策略优先级 + priorityVersion string // 策略版本的集 + batchSize int + maxRunCount int // 单个任务最大运行次数 0代表不限 + maxWorkers int // 最大工作协程数 + cronParser *cron.Parser // cron表达式解析器 } +type priorityType int8 + +const ( + priorityTypeNone priorityType = 0 // 不使用优先级 + priorityTypePriority priorityType = 1 + priorityTypeVersion priorityType = 2 // 版本 +) + func defaultOptions() Options { // 默认使用Linux的定时任务兼容 parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) return Options{ - logger: logger.NewLogger(), - location: time.Local, - timeout: time.Hour, // - usePriority: false, - priorityVal: 0, - batchSize: 100, - maxRunCount: 0, - maxWorkers: 100, - cronParser: &parser, + logger: logger.NewLogger(), + location: time.Local, + timeout: time.Hour, // + priorityType: priorityTypeNone, + priorityVal: 0, + batchSize: 100, + maxRunCount: 0, + maxWorkers: 100, + cronParser: &parser, } } @@ -81,11 +90,18 @@ func WithTimeout(d time.Duration) Option { // 设置优先级 func WithPriority(priority int64) Option { return func(o *Options) { - o.usePriority = true + o.priorityType = priorityTypePriority o.priorityVal = priority } } +func WithPriorityByVersion(version string) Option { + return func(o *Options) { + o.priorityType = priorityTypeVersion + o.priorityVersion = version + } +} + func WithBatchSize(size int) Option { return func(o *Options) { if size <= 1 {