调整部分options

This commit is contained in:
Yun
2025-09-24 14:50:30 +08:00
parent 062a39b209
commit 0be8fd0cdc
7 changed files with 96 additions and 58 deletions
+18 -4
View File
@@ -55,9 +55,12 @@ type Cluster struct {
// 初始化定时器
// 全局只需要初始化一次
func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) *Cluster {
func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) (*Cluster, error) {
if red == nil {
return nil, errors.New("redis is nil")
}
// clusterOnceLimit.Do(func() {
op := newOptions(opts...)
ctx, cancel := context.WithCancel(ctx)
@@ -90,7 +93,13 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
// 初始化优先级
if clu.usePriority {
clu.priority = priority.InitPriority(ctx, red, clu.priorityKey, op.priorityVal, priority.SetLogger(clu.logger))
pri, err := priority.InitPriority(ctx, red, clu.priorityKey, op.priorityVal, priority.WithLogger(clu.logger))
if err != nil {
clu.logger.Errorf(ctx, "InitPriority err:%v", err)
return nil, err
}
clu.priority = pri
}
// 启动守护进程
@@ -98,7 +107,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
clu.logger.Infof(ctx, "InitCluster success keyPrefix:%s instanceId:%s", clu.keyPrefix, clu.instanceId)
return clu
return clu, nil
}
// Stop 停止集群定时器
@@ -270,6 +279,10 @@ func (l *Cluster) getLeaderLock() error {
l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue())
go func() {
if !l.usePriority || l.priority == nil {
return
}
for {
select {
case <-ctx.Done():
@@ -279,6 +292,7 @@ func (l *Cluster) getLeaderLock() error {
default:
if !l.priority.IsLatest(l.ctx) {
cancel()
return
}
time.Sleep(100 * time.Millisecond)
}
+12 -14
View File
@@ -13,13 +13,13 @@ import (
func TestCluster_AddEveryMonth(t *testing.T) {
ctx := context.Background()
redis := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Addr: "localhost:6379",
Password: "123456",
DB: 0,
DB: 0,
})
defer redis.Close()
cluster := timerx.InitCluster(ctx, redis, "test")
cluster, _ := timerx.InitCluster(ctx, redis, "test")
taskId := "testTask"
hour := 2
@@ -49,7 +49,7 @@ func TestCluster_AddEveryWeek(t *testing.T) {
})
defer redis.Close()
cluster := timerx.InitCluster(ctx, redis, "test")
cluster,_ := timerx.InitCluster(ctx, redis, "test")
taskId := "testTask"
week := time.Sunday
@@ -78,7 +78,7 @@ func TestCluster_AddEveryDay(t *testing.T) {
})
defer redis.Close()
cluster := timerx.InitCluster(ctx, redis, "test")
cluster,_ := timerx.InitCluster(ctx, redis, "test")
taskId := "testTask"
hour := 2
@@ -106,12 +106,12 @@ func TestCluster_AddEveryHour(t *testing.T) {
})
defer redis.Close()
cluster := timerx.InitCluster(ctx, redis, "test")
cluster,_ := timerx.InitCluster(ctx, redis, "test")
taskId := "testTask"
minute := 3
second := 4
callback := func(ctx context.Context, data interface{}) error{
callback := func(ctx context.Context, data interface{}) error {
// do something
fmt.Println("Task executed:", data)
return nil
@@ -133,11 +133,11 @@ func TestCluster_AddEveryMinute(t *testing.T) {
})
defer redis.Close()
cluster := timerx.InitCluster(ctx, redis, "test")
cluster,_ := timerx.InitCluster(ctx, redis, "test")
taskId := "testTask"
second := 4
callback := func(ctx context.Context, data interface{}) error{
callback := func(ctx context.Context, data interface{}) error {
// do something
fmt.Println("Task executed:", data)
return nil
@@ -157,15 +157,15 @@ func TestCluster_Add(t *testing.T) {
ctx := context.Background()
fmt.Println("66666")
redis := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Addr: "localhost:6379",
Password: "123456",
DB: 0,
DB: 0,
})
defer redis.Close()
t.Log("6666")
cluster := timerx.InitCluster(ctx, redis, "test")
cluster,_ := timerx.InitCluster(ctx, redis, "test")
taskId := "testTask"
dur := time.Second
@@ -181,10 +181,8 @@ func TestCluster_Add(t *testing.T) {
t.Errorf("Add failed,1 err: %v", err)
}
time.Sleep(time.Second * 20)
// TODO: verify the job is added to the cluster and can be executed after the specified duration
}
+36 -14
View File
@@ -7,20 +7,24 @@ import (
)
type Options struct {
logger logger.Logger
location *time.Location
timeout time.Duration
usePriority bool
priorityVal int64
logger logger.Logger
location *time.Location
timeout time.Duration
usePriority bool
priorityVal int64
batchSize int
maxRetryCount int
}
func defaultOptions() Options {
return Options{
logger: logger.NewLogger(),
location: time.Local,
timeout: time.Hour,
usePriority: false,
priorityVal: 0,
logger: logger.NewLogger(),
location: time.Local,
timeout: time.Hour,
usePriority: false,
priorityVal: 0,
batchSize: 100,
maxRetryCount: 100,
}
}
@@ -35,30 +39,48 @@ func newOptions(opts ...Option) Options {
}
// 设置日志
func SetLogger(log logger.Logger) Option {
func WithLogger(log logger.Logger) Option {
return func(o *Options) {
o.logger = log
}
}
// 设定时区
func SetTimeZone(zone *time.Location) Option {
func WithLocation(zone *time.Location) Option {
return func(o *Options) {
o.location = zone
}
}
// 设置任务最长执行时间
func SetTimeout(d time.Duration) Option {
func WithTimeout(d time.Duration) Option {
return func(o *Options) {
o.timeout = d
}
}
// 设置优先级
func SetPriority(priority int64) Option {
func WithPriority(priority int64) Option {
return func(o *Options) {
o.usePriority = true
o.priorityVal = priority
}
}
func WithBatchSize(size int) Option {
return func(o *Options) {
if size <= 1 {
size = 1
}
o.batchSize = size
}
}
func WithMaxRetryCount(count int) Option {
return func(o *Options) {
if count <= 0 {
count = 1
}
o.maxRetryCount = count
}
}
+2 -2
View File
@@ -32,14 +32,14 @@ func newOptions(opts ...Option) Options {
return o
}
func SetLogger(log logger.Logger) Option {
func WithLogger(log logger.Logger) Option {
return func(o *Options) {
o.logger = log
}
}
// 更新周期
func SetUpdateInterval(d time.Duration) Option {
func WithUpdateInterval(d time.Duration) Option {
if d.Abs() < time.Second {
d = time.Second * 5
}
+8 -4
View File
@@ -2,6 +2,7 @@ package priority
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
@@ -31,7 +32,12 @@ type Priority struct {
latestMux sync.RWMutex
}
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) *Priority {
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) (*Priority, error) {
if re != nil {
return nil, errors.New("redis is nil")
}
conf := newOptions(opts...)
ctx, cancel := context.WithCancel(ctx)
@@ -50,7 +56,7 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin
pro.startDaemon()
return pro
return pro, nil
}
func (p *Priority) Close() {
@@ -118,8 +124,6 @@ func (l *Priority) getLatestLoop() {
}
func (p *Priority) IsLatest(ctx context.Context) bool {
p.latestMux.RLock()
defer p.latestMux.RUnlock()
+7 -7
View File
@@ -39,7 +39,7 @@ func TestPriority(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
pro := InitPriority(ctx, re, "test", 10, SetUpdateInterval(time.Second*1))
pro, _ := InitPriority(ctx, re, "test", 10, WithUpdateInterval(time.Second*1))
for i := 0; i < 10; i++ {
bb := pro.IsLatest(ctx)
@@ -50,7 +50,7 @@ func TestPriority(t *testing.T) {
cancel()
}()
pro := InitPriority(ctx, re, "test", 0, SetUpdateInterval(time.Second*1))
pro, _ := InitPriority(ctx, re, "test", 0, WithUpdateInterval(time.Second*1))
for i := 0; i < 25; i++ {
bb := pro.IsLatest(ctx)
@@ -85,7 +85,7 @@ func TestInitPriority(t *testing.T) {
ctx := context.Background()
// 测试正常初始化
priority := InitPriority(ctx, getRedis(), "test", 100)
priority, _ := InitPriority(ctx, getRedis(), "test", 100)
assert.NotNil(t, priority)
assert.Equal(t, int64(100), priority.priority)
}
@@ -137,7 +137,7 @@ func TestSetPriorityScenarios(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
priority := InitPriority(ctx, redisConn, "test22", tc.newPriority)
priority, _ := InitPriority(ctx, redisConn, "test22", tc.newPriority)
defer priority.Close()
time.Sleep(time.Second * 1)
@@ -154,7 +154,7 @@ func TestSetPriorityScenarios(t *testing.T) {
func TestConcurrentAccess(t *testing.T) {
ctx := context.Background()
priority := InitPriority(ctx, getRedis(), "testacc", 100)
priority, _ := InitPriority(ctx, getRedis(), "testacc", 100)
time.Sleep(time.Second * 1)
@@ -186,8 +186,8 @@ func TestErrorScenarios(t *testing.T) {
t.Run("Redis连接失败", func(t *testing.T) {
ctx := context.Background()
priority := InitPriority(ctx, getRedis(), "test", 100)
_,err := priority.setPriority()
priority, _ := InitPriority(ctx, getRedis(), "test", 100)
_, err := priority.setPriority()
assert.Error(t, err)
})
+9 -9
View File
@@ -52,8 +52,8 @@ func TestSingleTimer_Basic(t *testing.T) {
mockLogger := &MockLogger{}
timer := timerx.InitSingle(ctx,
timerx.SetLogger(mockLogger),
timerx.SetTimeZone(time.UTC))
timerx.WithLogger(mockLogger),
timerx.WithLocation(time.UTC))
defer timer.Stop()
// 测试任务计数
@@ -108,7 +108,7 @@ func TestSingleTimer_Deduplication(t *testing.T) {
ctx := context.Background()
mockLogger := &MockLogger{}
timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger))
timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger))
defer timer.Stop()
var executionCount int32
@@ -190,7 +190,7 @@ func TestSingleTimer_Timeout(t *testing.T) {
ctx := context.Background()
mockLogger := &MockLogger{}
timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger))
timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger))
defer timer.Stop()
// 长时间运行的任务
@@ -217,7 +217,7 @@ func TestSingleTimer_PanicRecovery(t *testing.T) {
ctx := context.Background()
mockLogger := &MockLogger{}
timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger))
timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger))
defer timer.Stop()
panicTask := func(ctx context.Context, data interface{}) error {
@@ -236,7 +236,7 @@ func TestSingleTimer_PanicRecovery(t *testing.T) {
// 测试不同时间类型的任务
func TestSingleTimer_DifferentJobTypes(t *testing.T) {
ctx := context.Background()
timer := timerx.InitSingle(ctx, timerx.SetTimeZone(time.UTC))
timer := timerx.InitSingle(ctx, timerx.WithLocation(time.UTC))
defer timer.Stop()
var counts struct {
@@ -287,7 +287,7 @@ func TestSingleTimer_ContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
mockLogger := &MockLogger{}
timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger))
timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger))
var executionCount int32
_, err := timer.AddSpace(ctx, "cancel-test", 100*time.Millisecond,
@@ -419,7 +419,7 @@ func TestSingleTimer_Logging(t *testing.T) {
ctx := context.Background()
mockLogger := &MockLogger{}
timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger))
timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger))
defer timer.Stop()
// 添加会panic的任务
@@ -448,7 +448,7 @@ func TestSingleTimer_Timezone(t *testing.T) {
for _, loc := range locations {
t.Run(loc.String(), func(t *testing.T) {
ctx := context.Background()
timer := timerx.InitSingle(ctx, timerx.SetTimeZone(loc))
timer := timerx.InitSingle(ctx, timerx.WithLocation(loc))
defer timer.Stop()
var executed bool