支持直接option注明版本号

This commit is contained in:
Yun
2025-10-14 15:07:53 +08:00
parent 459911a579
commit a9f37bc4e7
3 changed files with 62 additions and 23 deletions
+13 -2
View File
@@ -87,7 +87,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合 executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合
usePriority: op.usePriority, usePriority: false,
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
instanceId: U.String(), instanceId: U.String(),
cronParser: op.cronParser, cronParser: op.cronParser,
@@ -98,7 +98,18 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
// 初始化优先级 // 初始化优先级
if clu.usePriority { if op.priorityType != priorityTypeNone {
clu.usePriority = true
if op.priorityType == priorityTypeVersion {
pVal, err := priority.PriorityByVersion(op.priorityVersion)
if err != nil {
clu.logger.Errorf(ctx, "PriorityByVersion version:%s err:%v", op.priorityVersion, err)
return nil, err
}
op.priorityVal = pVal
}
pri, err := priority.InitPriority( pri, err := priority.InitPriority(
ctx, ctx,
+14 -2
View File
@@ -111,7 +111,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
listKey: "timer:once_listkey" + keyPrefix, listKey: "timer:once_listkey" + keyPrefix,
executeInfoKey: "timer:once_executeInfoKey" + keyPrefix, executeInfoKey: "timer:once_executeInfoKey" + keyPrefix,
globalLockPrefix: "timer:once_globalLockPrefix" + keyPrefix, globalLockPrefix: "timer:once_globalLockPrefix" + keyPrefix,
usePriority: op.usePriority, usePriority: false,
redis: re, redis: re,
worker: call, worker: call,
keyPrefix: keyPrefix, keyPrefix: keyPrefix,
@@ -126,7 +126,19 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
} }
// 初始化优先级 // 初始化优先级
if wo.usePriority { if op.priorityType != priorityTypeNone {
wo.usePriority = true
if op.priorityType == priorityTypeVersion {
pVal, err := priority.PriorityByVersion(op.priorityVersion)
if err != nil {
wo.logger.Errorf(ctx, "PriorityByVersion version:%s err:%v", op.priorityVersion, err)
return nil, err
}
op.priorityVal = pVal
}
pri, err := priority.InitPriority( pri, err := priority.InitPriority(
ctx, ctx,
re, re,
+35 -19
View File
@@ -8,32 +8,41 @@ import (
) )
type Options struct { type Options struct {
logger logger.Logger logger logger.Logger
location *time.Location location *time.Location
timeout time.Duration // 任务最长执行时间 timeout time.Duration // 任务最长执行时间
usePriority bool priorityType priorityType // 策略类型 0.不使用 1.优先级 2.版本
priorityVal int64 priorityVal int64 // 策略优先级
batchSize int priorityVersion string // 策略版本的集
maxRunCount int // 单个任务最大运行次数 0代表不限 batchSize int
maxWorkers int // 最大工作协程数 maxRunCount int // 单个任务最大运行次数 0代表不限
cronParser *cron.Parser // cron表达式解析器 maxWorkers int // 最大工作协程数
cronParser *cron.Parser // cron表达式解析器
} }
type priorityType int8
const (
priorityTypeNone priorityType = 0 // 不使用优先级
priorityTypePriority priorityType = 1
priorityTypeVersion priorityType = 2 // 版本
)
func defaultOptions() Options { func defaultOptions() Options {
// 默认使用Linux的定时任务兼容 // 默认使用Linux的定时任务兼容
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
return Options{ return Options{
logger: logger.NewLogger(), logger: logger.NewLogger(),
location: time.Local, location: time.Local,
timeout: time.Hour, // timeout: time.Hour, //
usePriority: false, priorityType: priorityTypeNone,
priorityVal: 0, priorityVal: 0,
batchSize: 100, batchSize: 100,
maxRunCount: 0, maxRunCount: 0,
maxWorkers: 100, maxWorkers: 100,
cronParser: &parser, cronParser: &parser,
} }
} }
@@ -81,11 +90,18 @@ func WithTimeout(d time.Duration) Option {
// 设置优先级 // 设置优先级
func WithPriority(priority int64) Option { func WithPriority(priority int64) Option {
return func(o *Options) { return func(o *Options) {
o.usePriority = true o.priorityType = priorityTypePriority
o.priorityVal = priority o.priorityVal = priority
} }
} }
func WithPriorityByVersion(version string) Option {
return func(o *Options) {
o.priorityType = priorityTypeVersion
o.priorityVersion = version
}
}
func WithBatchSize(size int) Option { func WithBatchSize(size int) Option {
return func(o *Options) { return func(o *Options) {
if size <= 1 { if size <= 1 {