diff --git a/.gitignore b/.gitignore index 6797c00..cc46b82 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ log cache +test.txt diff --git a/cluster.go b/cluster.go index 39af518..66de7fd 100644 --- a/cluster.go +++ b/cluster.go @@ -151,12 +151,18 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin // Stop 停止集群定时器 func (l *Cluster) Stop() { close(l.stopChan) - if l.cancel != nil { - l.cancel() - } if l.usePriority && l.priority != nil { l.priority.Close() } + if l.leader != nil { + l.leader.Close() + } + if l.heartbeat != nil { + l.heartbeat.Close() + } + if l.cancel != nil { + l.cancel() + } l.wg.Wait() } @@ -481,6 +487,8 @@ func (c *Cluster) executeTasks() { if err != nil { if err != redis.Nil { c.logger.Errorf(c.ctx, "Failed to pop task: %v", err) + // Redis 异常,休眠一会儿 + time.Sleep(5 * time.Second) } continue } diff --git a/cluster_test.go b/cluster_test.go index 946a43a..1d4ff0f 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -10,16 +10,25 @@ import ( "github.com/yuninks/timerx" ) -func TestCluster_AddEveryMonth(t *testing.T) { - ctx := context.Background() - redis := redis.NewClient(&redis.Options{ +func redisInit() *redis.Client { + return redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "123456", DB: 0, }) +} + +func TestCluster_AddEveryMonth(t *testing.T) { + ctx := context.Background() + redis := redisInit() defer redis.Close() - cluster, _ := timerx.InitCluster(ctx, redis, "test") + cluster, err := timerx.InitCluster(ctx, redis, "test") + if err != nil { + t.Errorf("InitCluster failed, err: %v", err) + return + } + defer cluster.Stop() taskId := "testTask" hour := 2 @@ -32,7 +41,7 @@ func TestCluster_AddEveryMonth(t *testing.T) { } extendData := "testData" - err := cluster.EveryMonth(ctx, taskId, 1, hour, minute, second, callback, extendData) + err = cluster.EveryMonth(ctx, taskId, 1, hour, minute, second, callback, extendData) if err != nil { t.Errorf("AddEveryMonth failed, err: %v", err) } @@ -44,12 +53,10 @@ func TestCluster_AddEveryMonth(t *testing.T) { func TestCluster_AddEveryWeek(t *testing.T) { ctx := context.Background() - redis := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - }) + redis := redisInit() defer redis.Close() - cluster,_ := timerx.InitCluster(ctx, redis, "test") + cluster, _ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" week := time.Sunday @@ -73,12 +80,10 @@ func TestCluster_AddEveryWeek(t *testing.T) { func TestCluster_AddEveryDay(t *testing.T) { ctx := context.Background() - redis := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - }) + redis := redisInit() defer redis.Close() - cluster,_ := timerx.InitCluster(ctx, redis, "test") + cluster, _ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" hour := 2 @@ -101,12 +106,10 @@ func TestCluster_AddEveryDay(t *testing.T) { func TestCluster_AddEveryHour(t *testing.T) { ctx := context.Background() - redis := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - }) + redis := redisInit() defer redis.Close() - cluster,_ := timerx.InitCluster(ctx, redis, "test") + cluster, _ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" minute := 3 @@ -128,12 +131,10 @@ func TestCluster_AddEveryHour(t *testing.T) { func TestCluster_AddEveryMinute(t *testing.T) { ctx := context.Background() - redis := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - }) + redis := redisInit() defer redis.Close() - cluster,_ := timerx.InitCluster(ctx, redis, "test") + cluster, _ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" second := 4 @@ -156,16 +157,12 @@ func TestCluster_Add(t *testing.T) { fmt.Println("66666") ctx := context.Background() fmt.Println("66666") - redis := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "123456", - DB: 0, - }) + redis := redisInit() defer redis.Close() t.Log("6666") - cluster,_ := timerx.InitCluster(ctx, redis, "test") + cluster, _ := timerx.InitCluster(ctx, redis, "test") taskId := "testTask" dur := time.Second @@ -185,29 +182,3 @@ func TestCluster_Add(t *testing.T) { // TODO: verify the job is added to the cluster and can be executed after the specified duration } - -// func TestMain(m *testing.M) { -// client := redis.NewClient(&redis.Options{ -// Addr: "127.0.0.1" + ":" + "6379", -// Password: "", // no password set -// DB: 0, // use default DB -// }) -// if client == nil { -// fmt.Println("redis init error") -// return -// } -// // Redis = client - -// } - -// func TestRedis(t *testing.T) { -// fmt.Println("6666") -// t.Log("fffff") -// // t.Fail() -// // t.Error("ffff") -// // Redis.Set(context.Background(), "dddd", "dddd", 0) -// // str, err := Redis.Get(context.Background(), "dddd").Result() -// // fmt.Println("ssss", str, err) -// // t.Log(str, err) -// // t.Fail() -// } diff --git a/cmd/main.go b/cmd/main.go index 0b1c30b..0d6a905 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -26,8 +26,8 @@ func main() { // re() // d() - // cluster() - once() + cluster() + // once() // prioritys() select {} diff --git a/heartbeat/heartbeat.go b/heartbeat/heartbeat.go index 4c86df7..3a8ebc0 100644 --- a/heartbeat/heartbeat.go +++ b/heartbeat/heartbeat.go @@ -64,8 +64,8 @@ func InitHeartBeat(ctx context.Context, ref redis.UniversalClient, keyPrefix str } func (l *HeartBeat) Close() { - l.cancel() l.cleanHeartbeat(true) + l.cancel() l.wg.Wait() } diff --git a/once.go b/once.go index a1ec45d..e310dba 100644 --- a/once.go +++ b/once.go @@ -174,7 +174,9 @@ func (l *Once) Close() { if l.leader != nil { l.leader.Close() } - l.heartbeat.Close() + if l.heartbeat != nil { + l.heartbeat.Close() + } l.cancel() l.wg.Wait() } @@ -275,6 +277,11 @@ func (l *Once) executeTasks() { keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() if err != nil { + if err != redis.Nil { + l.logger.Errorf(l.ctx, "Failed to pop task: %v", err) + // Redis 异常,休眠一会儿再重试 + time.Sleep(time.Second * 5) + } continue }