From 0be8fd0cdc4dfb6e498e3c4b657fcfe193da4b11 Mon Sep 17 00:00:00 2001 From: Yun Date: Wed, 24 Sep 2025 14:50:30 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E9=83=A8=E5=88=86options?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 22 +++++++++++++---- cluster_test.go | 26 ++++++++++---------- option.go | 50 ++++++++++++++++++++++++++++----------- priority/option.go | 4 ++-- priority/priority.go | 12 ++++++---- priority/priority_test.go | 22 ++++++++--------- single_test.go | 18 +++++++------- 7 files changed, 96 insertions(+), 58 deletions(-) diff --git a/cluster.go b/cluster.go index 0c3795b..0d209d0 100644 --- a/cluster.go +++ b/cluster.go @@ -55,9 +55,12 @@ type Cluster struct { // 初始化定时器 // 全局只需要初始化一次 -func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) *Cluster { +func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) (*Cluster, error) { + + if red == nil { + return nil, errors.New("redis is nil") + } - // clusterOnceLimit.Do(func() { op := newOptions(opts...) ctx, cancel := context.WithCancel(ctx) @@ -90,7 +93,13 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin // 初始化优先级 if clu.usePriority { - clu.priority = priority.InitPriority(ctx, red, clu.priorityKey, op.priorityVal, priority.SetLogger(clu.logger)) + + pri, err := priority.InitPriority(ctx, red, clu.priorityKey, op.priorityVal, priority.WithLogger(clu.logger)) + if err != nil { + clu.logger.Errorf(ctx, "InitPriority err:%v", err) + return nil, err + } + clu.priority = pri } // 启动守护进程 @@ -98,7 +107,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin clu.logger.Infof(ctx, "InitCluster success keyPrefix:%s instanceId:%s", clu.keyPrefix, clu.instanceId) - return clu + return clu, nil } // Stop 停止集群定时器 @@ -270,6 +279,10 @@ func (l *Cluster) getLeaderLock() error { l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue()) go func() { + if !l.usePriority || l.priority == nil { + return + } + for { select { case <-ctx.Done(): @@ -279,6 +292,7 @@ func (l *Cluster) getLeaderLock() error { default: if !l.priority.IsLatest(l.ctx) { cancel() + return } time.Sleep(100 * time.Millisecond) } diff --git a/cluster_test.go b/cluster_test.go index fd99c15..946a43a 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -13,13 +13,13 @@ import ( func TestCluster_AddEveryMonth(t *testing.T) { ctx := context.Background() redis := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", + Addr: "localhost:6379", Password: "123456", - DB: 0, + DB: 0, }) defer redis.Close() - cluster := timerx.InitCluster(ctx, redis, "test") + cluster, _ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" hour := 2 @@ -49,7 +49,7 @@ func TestCluster_AddEveryWeek(t *testing.T) { }) defer redis.Close() - cluster := timerx.InitCluster(ctx, redis, "test") + cluster,_ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" week := time.Sunday @@ -78,7 +78,7 @@ func TestCluster_AddEveryDay(t *testing.T) { }) defer redis.Close() - cluster := timerx.InitCluster(ctx, redis, "test") + cluster,_ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" hour := 2 @@ -106,12 +106,12 @@ func TestCluster_AddEveryHour(t *testing.T) { }) defer redis.Close() - cluster := timerx.InitCluster(ctx, redis, "test") + cluster,_ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" minute := 3 second := 4 - callback := func(ctx context.Context, data interface{}) error{ + callback := func(ctx context.Context, data interface{}) error { // do something fmt.Println("Task executed:", data) return nil @@ -133,11 +133,11 @@ func TestCluster_AddEveryMinute(t *testing.T) { }) defer redis.Close() - cluster := timerx.InitCluster(ctx, redis, "test") + cluster,_ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" second := 4 - callback := func(ctx context.Context, data interface{}) error{ + callback := func(ctx context.Context, data interface{}) error { // do something fmt.Println("Task executed:", data) return nil @@ -157,15 +157,15 @@ func TestCluster_Add(t *testing.T) { ctx := context.Background() fmt.Println("66666") redis := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", + Addr: "localhost:6379", Password: "123456", - DB: 0, + DB: 0, }) defer redis.Close() t.Log("6666") - cluster := timerx.InitCluster(ctx, redis, "test") + cluster,_ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" dur := time.Second @@ -181,10 +181,8 @@ func TestCluster_Add(t *testing.T) { t.Errorf("Add failed,1 err: %v", err) } - time.Sleep(time.Second * 20) - // TODO: verify the job is added to the cluster and can be executed after the specified duration } diff --git a/option.go b/option.go index b0945d4..5fd17a2 100644 --- a/option.go +++ b/option.go @@ -7,20 +7,24 @@ import ( ) type Options struct { - logger logger.Logger - location *time.Location - timeout time.Duration - usePriority bool - priorityVal int64 + logger logger.Logger + location *time.Location + timeout time.Duration + usePriority bool + priorityVal int64 + batchSize int + maxRetryCount int } func defaultOptions() Options { return Options{ - logger: logger.NewLogger(), - location: time.Local, - timeout: time.Hour, - usePriority: false, - priorityVal: 0, + logger: logger.NewLogger(), + location: time.Local, + timeout: time.Hour, + usePriority: false, + priorityVal: 0, + batchSize: 100, + maxRetryCount: 100, } } @@ -35,30 +39,48 @@ func newOptions(opts ...Option) Options { } // 设置日志 -func SetLogger(log logger.Logger) Option { +func WithLogger(log logger.Logger) Option { return func(o *Options) { o.logger = log } } // 设定时区 -func SetTimeZone(zone *time.Location) Option { +func WithLocation(zone *time.Location) Option { return func(o *Options) { o.location = zone } } // 设置任务最长执行时间 -func SetTimeout(d time.Duration) Option { +func WithTimeout(d time.Duration) Option { return func(o *Options) { o.timeout = d } } // 设置优先级 -func SetPriority(priority int64) Option { +func WithPriority(priority int64) Option { return func(o *Options) { o.usePriority = true o.priorityVal = priority } } + +func WithBatchSize(size int) Option { + return func(o *Options) { + if size <= 1 { + size = 1 + } + o.batchSize = size + } +} + +func WithMaxRetryCount(count int) Option { + return func(o *Options) { + if count <= 0 { + count = 1 + } + o.maxRetryCount = count + } +} diff --git a/priority/option.go b/priority/option.go index fc6728c..dffad0b 100644 --- a/priority/option.go +++ b/priority/option.go @@ -32,14 +32,14 @@ func newOptions(opts ...Option) Options { return o } -func SetLogger(log logger.Logger) Option { +func WithLogger(log logger.Logger) Option { return func(o *Options) { o.logger = log } } // 更新周期 -func SetUpdateInterval(d time.Duration) Option { +func WithUpdateInterval(d time.Duration) Option { if d.Abs() < time.Second { d = time.Second * 5 } diff --git a/priority/priority.go b/priority/priority.go index 0d1cb7e..5dc37be 100644 --- a/priority/priority.go +++ b/priority/priority.go @@ -2,6 +2,7 @@ package priority import ( "context" + "errors" "fmt" "strconv" "sync" @@ -31,7 +32,12 @@ type Priority struct { latestMux sync.RWMutex } -func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) *Priority { +func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) (*Priority, error) { + + if re != nil { + return nil, errors.New("redis is nil") + } + conf := newOptions(opts...) ctx, cancel := context.WithCancel(ctx) @@ -50,7 +56,7 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin pro.startDaemon() - return pro + return pro, nil } func (p *Priority) Close() { @@ -118,8 +124,6 @@ func (l *Priority) getLatestLoop() { } - - func (p *Priority) IsLatest(ctx context.Context) bool { p.latestMux.RLock() defer p.latestMux.RUnlock() diff --git a/priority/priority_test.go b/priority/priority_test.go index c8c3bdd..2ad39fb 100644 --- a/priority/priority_test.go +++ b/priority/priority_test.go @@ -39,7 +39,7 @@ func TestPriority(t *testing.T) { ctx, cancel := context.WithCancel(ctx) - pro := InitPriority(ctx, re, "test", 10, SetUpdateInterval(time.Second*1)) + pro, _ := InitPriority(ctx, re, "test", 10, WithUpdateInterval(time.Second*1)) for i := 0; i < 10; i++ { bb := pro.IsLatest(ctx) @@ -50,7 +50,7 @@ func TestPriority(t *testing.T) { cancel() }() - pro := InitPriority(ctx, re, "test", 0, SetUpdateInterval(time.Second*1)) + pro, _ := InitPriority(ctx, re, "test", 0, WithUpdateInterval(time.Second*1)) for i := 0; i < 25; i++ { bb := pro.IsLatest(ctx) @@ -85,7 +85,7 @@ func TestInitPriority(t *testing.T) { ctx := context.Background() // 测试正常初始化 - priority := InitPriority(ctx, getRedis(), "test", 100) + priority, _ := InitPriority(ctx, getRedis(), "test", 100) assert.NotNil(t, priority) assert.Equal(t, int64(100), priority.priority) } @@ -137,7 +137,7 @@ func TestSetPriorityScenarios(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - priority := InitPriority(ctx, redisConn, "test22", tc.newPriority) + priority, _ := InitPriority(ctx, redisConn, "test22", tc.newPriority) defer priority.Close() time.Sleep(time.Second * 1) @@ -154,7 +154,7 @@ func TestSetPriorityScenarios(t *testing.T) { func TestConcurrentAccess(t *testing.T) { ctx := context.Background() - priority := InitPriority(ctx, getRedis(), "testacc", 100) + priority, _ := InitPriority(ctx, getRedis(), "testacc", 100) time.Sleep(time.Second * 1) @@ -186,12 +186,12 @@ func TestErrorScenarios(t *testing.T) { t.Run("Redis连接失败", func(t *testing.T) { ctx := context.Background() - priority := InitPriority(ctx, getRedis(), "test", 100) - _,err := priority.setPriority() - + priority, _ := InitPriority(ctx, getRedis(), "test", 100) + _, err := priority.setPriority() + assert.Error(t, err) }) - + t.Run("Redis返回值解析错误", func(t *testing.T) { ctx := context.Background() @@ -201,8 +201,8 @@ func TestErrorScenarios(t *testing.T) { priority: 100, ctx: ctx, } - + _, err := priority.getCurrentPriority() assert.Error(t, err) }) -} \ No newline at end of file +} diff --git a/single_test.go b/single_test.go index b18fb89..bf8e008 100644 --- a/single_test.go +++ b/single_test.go @@ -52,8 +52,8 @@ func TestSingleTimer_Basic(t *testing.T) { mockLogger := &MockLogger{} timer := timerx.InitSingle(ctx, - timerx.SetLogger(mockLogger), - timerx.SetTimeZone(time.UTC)) + timerx.WithLogger(mockLogger), + timerx.WithLocation(time.UTC)) defer timer.Stop() // 测试任务计数 @@ -108,7 +108,7 @@ func TestSingleTimer_Deduplication(t *testing.T) { ctx := context.Background() mockLogger := &MockLogger{} - timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger)) defer timer.Stop() var executionCount int32 @@ -190,7 +190,7 @@ func TestSingleTimer_Timeout(t *testing.T) { ctx := context.Background() mockLogger := &MockLogger{} - timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger)) defer timer.Stop() // 长时间运行的任务 @@ -217,7 +217,7 @@ func TestSingleTimer_PanicRecovery(t *testing.T) { ctx := context.Background() mockLogger := &MockLogger{} - timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger)) defer timer.Stop() panicTask := func(ctx context.Context, data interface{}) error { @@ -236,7 +236,7 @@ func TestSingleTimer_PanicRecovery(t *testing.T) { // 测试不同时间类型的任务 func TestSingleTimer_DifferentJobTypes(t *testing.T) { ctx := context.Background() - timer := timerx.InitSingle(ctx, timerx.SetTimeZone(time.UTC)) + timer := timerx.InitSingle(ctx, timerx.WithLocation(time.UTC)) defer timer.Stop() var counts struct { @@ -287,7 +287,7 @@ func TestSingleTimer_ContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) mockLogger := &MockLogger{} - timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger)) var executionCount int32 _, err := timer.AddSpace(ctx, "cancel-test", 100*time.Millisecond, @@ -419,7 +419,7 @@ func TestSingleTimer_Logging(t *testing.T) { ctx := context.Background() mockLogger := &MockLogger{} - timer := timerx.InitSingle(ctx, timerx.SetLogger(mockLogger)) + timer := timerx.InitSingle(ctx, timerx.WithLogger(mockLogger)) defer timer.Stop() // 添加会panic的任务 @@ -448,7 +448,7 @@ func TestSingleTimer_Timezone(t *testing.T) { for _, loc := range locations { t.Run(loc.String(), func(t *testing.T) { ctx := context.Background() - timer := timerx.InitSingle(ctx, timerx.SetTimeZone(loc)) + timer := timerx.InitSingle(ctx, timerx.WithLocation(loc)) defer timer.Stop() var executed bool