diff --git a/cluster.go b/cluster.go index a3f0721..db8c45c 100644 --- a/cluster.go +++ b/cluster.go @@ -81,12 +81,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin logger: op.logger, keyPrefix: keyPrefix, location: op.location, - lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 - zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 - listKey: "timer:cluster_listKey" + keyPrefix, // 列表 - setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 - priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key - executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合 + lockKey: fmt.Sprintf("timer:{%s}:cluster_lock", keyPrefix), + zsetKey: fmt.Sprintf("timer:{%s}:cluster_zset", keyPrefix), + listKey: fmt.Sprintf("timer:{%s}:cluster_list", keyPrefix), + setKey: fmt.Sprintf("timer:{%s}:cluster_set", keyPrefix), + priorityKey: fmt.Sprintf("timer:{%s}:cluster_priority", keyPrefix), + executeInfoKey: fmt.Sprintf("timer:{%s}:cluster_execinfo", keyPrefix), usePriority: false, stopChan: make(chan struct{}), instanceId: U.String(), @@ -462,10 +462,10 @@ func (l *Cluster) calculateNextTimes() { // 使用Lua脚本原子性添加任务 script := ` local zsetKey = KEYS[1] + local lockKey = KEYS[2] local score = ARGV[1] local taskID = ARGV[2] local expireTime = ARGV[3] - local lockKey = ARGV[4] -- 检查是否已存在 local existing = redis.call('zscore', zsetKey, taskID) @@ -483,9 +483,9 @@ func (l *Cluster) calculateNextTimes() { return 1 ` - lockKey := fmt.Sprintf("%s_%s_%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli()) - _, err = pipe.Eval(l.ctx, script, []string{l.zsetKey}, - nextTime.UnixMilli(), val.TaskId, 60, lockKey).Result() + lockKey := fmt.Sprintf("timer:{%s}:cluster_calc_lock:%s:%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli()) + _, err = pipe.Eval(l.ctx, script, []string{l.zsetKey, lockKey}, + nextTime.UnixMilli(), val.TaskId, 60).Result() if err != nil { l.logger.Errorf(l.ctx, "Failed to schedule task: %v", err) } diff --git a/cluster_test.go b/cluster_test.go index e42ef0d..9400566 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -3,6 +3,7 @@ package timerx_test import ( "context" "fmt" + "strings" "testing" "time" @@ -182,3 +183,80 @@ func TestCluster_Add(t *testing.T) { // TODO: verify the job is added to the cluster and can be executed after the specified duration } + +func TestClusterKeyFormat(t *testing.T) { + keyPrefix := "testcluster" + + tests := []struct { + name string + key string + hashTag string + hasPrefix string + }{ + {"zsetKey", "timer:{testcluster}:cluster_zset", "{testcluster}", "timer:"}, + {"listKey", "timer:{testcluster}:cluster_list", "{testcluster}", "timer:"}, + {"lockKey", "timer:{testcluster}:cluster_lock", "{testcluster}", "timer:"}, + {"execInfoKey", "timer:{testcluster}:cluster_execinfo", "{testcluster}", "timer:"}, + {"priorityKey", "timer:{testcluster}:cluster_priority", "{testcluster}", "timer:"}, + {"calcLockKey", "timer:{testcluster}:cluster_calc_lock:task1:1234567890", "{testcluster}", "timer:"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.key == "" || tt.key == " " { + t.Error("key should not be empty") + } + t.Logf("key format: %s", tt.key) + }) + } + + _ = keyPrefix +} + +func TestClusterZsetListKeysSameHashTag(t *testing.T) { + zsetKey := "timer:{myapp}:cluster_zset" + listKey := "timer:{myapp}:cluster_list" + + zsetTag := extractHashTag(zsetKey) + listTag := extractHashTag(listKey) + + if zsetTag != listTag { + t.Errorf("zset and list keys must share same hash tag: %q != %q", zsetTag, listTag) + } + if zsetTag != "myapp" { + t.Errorf("unexpected hash tag: %q", zsetTag) + } +} + +func extractHashTag(key string) string { + start := strings.Index(key, "{") + end := strings.Index(key, "}") + if start >= 0 && end > start { + return key[start+1 : end] + } + return "" +} + +func TestClusterMultiKeyLuaKeysShareSlot(t *testing.T) { + type keyPair struct { + key1 string + key2 string + name string + } + + pairs := []keyPair{ + {"timer:{app}:cluster_zset", "timer:{app}:cluster_list", "zset+list"}, + {"timer:{app}:cluster_zset", "timer:{app}:cluster_calc_lock:task1:9999", "zset+calc_lock"}, + } + + for _, p := range pairs { + t.Run(p.name, func(t *testing.T) { + t1 := extractHashTag(p.key1) + t2 := extractHashTag(p.key2) + if t1 != t2 { + t.Errorf("keys in multi-key Lua script must share same hash tag: %q != %q (%s, %s)", + t1, t2, p.key1, p.key2) + } + }) + } +} diff --git a/example/cluster_redis/main.go b/example/cluster_redis/main.go new file mode 100644 index 0000000..46174b1 --- /dev/null +++ b/example/cluster_redis/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/redis/go-redis/v9" + "github.com/yuninks/timerx" +) + +func main() { + ctx := context.Background() + + client := getClusterRedis() + + clu, err := timerx.InitCluster(ctx, client, "cluster_redis_example") + if err != nil { + panic(err) + } + defer clu.Stop() + + err = clu.EverySpace(ctx, "cluster_every_second", 1*time.Second, clusterCallback, "cluster每秒任务") + fmt.Println("EverySpace:", err) + + err = clu.EveryMinute(ctx, "cluster_every_minute", 30, clusterCallback, "cluster每分钟任务") + fmt.Println("EveryMinute:", err) + + err = clu.EveryDay(ctx, "cluster_every_day", 0, 0, 0, clusterCallback, "cluster每天任务") + fmt.Println("EveryDay:", err) + + err = clu.Cron(ctx, "cluster_cron_second", "*/5 * * * * ?", clusterCallback, "cluster cron任务", + timerx.WithCronParserSecond()) + fmt.Println("Cron:", err) + + select {} +} + +func getClusterRedis() *redis.ClusterClient { + client := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{ + "127.0.0.1:6379", + "127.0.0.1:6380", + "127.0.0.1:6381", + }, + Password: "", + }) + if client == nil { + panic("redis cluster init error") + } + return client +} + +func clusterCallback(ctx context.Context, extendData any) error { + fmt.Println("cluster任务执行:", extendData, "时间:", time.Now().Format("2006-01-02 15:04:05")) + return nil +} diff --git a/heartbeat/heartbeat.go b/heartbeat/heartbeat.go index e9794db..e4330a1 100644 --- a/heartbeat/heartbeat.go +++ b/heartbeat/heartbeat.go @@ -3,6 +3,7 @@ package heartbeat import ( "context" "errors" + "fmt" "strconv" "sync" "time" @@ -47,7 +48,7 @@ func InitHeartBeat(ctx context.Context, ref redis.UniversalClient, keyPrefix str ctx: ctx, cancel: cancel, - heartbeatKey: "timer:heartbeat_key" + op.source + keyPrefix, + heartbeatKey: fmt.Sprintf("timer:{%s}:heartbeat_%s", keyPrefix, op.source), priority: op.priority, redis: ref, diff --git a/leader/leader.go b/leader/leader.go index 34144e2..eea2e12 100644 --- a/leader/leader.go +++ b/leader/leader.go @@ -5,6 +5,7 @@ package leader import ( "context" "errors" + "fmt" "sync" "time" @@ -52,8 +53,8 @@ func InitLeader(ctx context.Context, ref redis.UniversalClient, keyPrefix string ctx: ctx, cancel: cancel, redis: ref, - leaderUniLockKey: "timer:leader_lockKey" + op.source + keyPrefix, - leaderKey: "timer:leader" + op.source + keyPrefix, + leaderUniLockKey: fmt.Sprintf("timer:{%s}:leader_lock_%s", keyPrefix, op.source), + leaderKey: fmt.Sprintf("timer:{%s}:leader_%s", keyPrefix, op.source), priority: op.priority, instanceId: op.instanceId, logger: op.logger, diff --git a/once.go b/once.go index cb28ed1..eee2a04 100644 --- a/once.go +++ b/once.go @@ -114,10 +114,10 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c ctx: ctx, cancel: cancel, logger: op.logger, - zsetKey: "timer:once_zsetkey" + keyPrefix, - listKey: "timer:once_listkey" + keyPrefix, - executeInfoKey: "timer:once_executeInfoKey" + keyPrefix, - globalLockPrefix: "timer:once_globalLockPrefix" + keyPrefix, + zsetKey: fmt.Sprintf("timer:{%s}:once_zset", keyPrefix), + listKey: fmt.Sprintf("timer:{%s}:once_list", keyPrefix), + executeInfoKey: fmt.Sprintf("timer:{%s}:once_execinfo", keyPrefix), + globalLockPrefix: fmt.Sprintf("timer:{%s}:once_lock:", keyPrefix), usePriority: false, redis: re, worker: call, @@ -418,7 +418,8 @@ func (w *Once) save(ctx context.Context, jobType jobType, taskType OnceTaskType, dataExpire := time.Until(expiresTime) - pipe.SetEx(w.ctx, w.keyPrefix+redisKey, b, dataExpire) + dataKey := fmt.Sprintf("timer:{%s}:once_data:%s", w.keyPrefix, redisKey) + pipe.SetEx(w.ctx, dataKey, b, dataExpire) pipe.ZAdd(w.ctx, w.zsetKey, redis.Z{ Score: float64(nextTime.UnixMilli()), Member: redisKey, @@ -492,9 +493,10 @@ func (l *Once) create(ctx context.Context, source createSource, jobType jobType, // 删除任务 func (w *Once) Delete(taskType OnceTaskType, taskId string) error { redisKey := w.buildRedisKey(taskType, taskId) + dataKey := fmt.Sprintf("timer:{%s}:once_data:%s", w.keyPrefix, redisKey) pipe := w.redis.TxPipeline() - pipe.Del(w.ctx, redisKey) + pipe.Del(w.ctx, dataKey) pipe.ZRem(w.ctx, w.zsetKey, redisKey) _, err := pipe.Exec(w.ctx) @@ -587,8 +589,8 @@ func (l *Once) processTask(key string) { }() // 读取数据 - redisKey := l.keyPrefix + l.buildRedisKey(taskType, taskId) - str, err := l.redis.Get(ctx, redisKey).Result() + dataKey := fmt.Sprintf("timer:{%s}:once_data:%s", l.keyPrefix, l.buildRedisKey(taskType, taskId)) + str, err := l.redis.Get(ctx, dataKey).Result() if err != nil { l.logger.Errorf(ctx, "processTask redis.Get key:%s err:%s", key, err) return diff --git a/once_test.go b/once_test.go index 3243778..3c92e0f 100644 --- a/once_test.go +++ b/once_test.go @@ -1,7 +1,148 @@ package timerx -import "testing" +import ( + "strings" + "testing" + "time" -func Test2(t *testing.T) { + "github.com/yuninks/timerx/logger" +) +func TestOnceBuildRedisKey(t *testing.T) { + once := &Once{keySeparator: "[:]", keyPrefix: "test_prefix"} + + key := once.buildRedisKey("normal", "task123") + if key != "normal[:]task123" { + t.Errorf("unexpected key: %s", key) + } +} + +func TestOnceParseRedisKey(t *testing.T) { + once := &Once{keySeparator: "[:]"} + + taskType, taskId, err := once.parseRedisKey("normal[:]task123") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if taskType != "normal" { + t.Errorf("unexpected taskType: %s", taskType) + } + if taskId != "task123" { + t.Errorf("unexpected taskId: %s", taskId) + } + + _, _, err = once.parseRedisKey("invalid") + if err == nil { + t.Error("expected error for invalid key") + } +} + +func TestOnceKeyFormat(t *testing.T) { + keyPrefix := "testcluster" + + tests := []struct { + name string + key string + contains string + }{ + {"zsetKey contains hash tag", "timer:{testcluster}:once_zset", "{testcluster}"}, + {"listKey contains hash tag", "timer:{testcluster}:once_list", "{testcluster}"}, + {"execinfoKey contains hash tag", "timer:{testcluster}:once_execinfo", "{testcluster}"}, + {"lockPrefix contains hash tag", "timer:{testcluster}:once_lock:", "{testcluster}"}, + {"dataKey contains hash tag", "timer:{testcluster}:once_data:normal[:]task1", "{testcluster}"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if !strings.Contains(tt.key, tt.contains) { + t.Errorf("key %q does not contain hash tag %q", tt.key, tt.contains) + } + }) + } + + _ = keyPrefix +} + +func TestOnceDataKeyFormat(t *testing.T) { + dataKey := "timer:{myapp}:once_data:normal[:]task1" + + if !strings.Contains(dataKey, "{myapp}") { + t.Errorf("data key %q missing hash tag {myapp}", dataKey) + } + if !strings.HasPrefix(dataKey, "timer:") { + t.Errorf("data key %q missing timer: prefix", dataKey) + } +} + +func TestOnceExtendData(t *testing.T) { + taskTimes := []time.Time{ + time.Now().Add(10 * time.Second), + time.Now().Add(5 * time.Second), + time.Now().Add(15 * time.Second), + } + + ed := extendData{ + TaskTimes: taskTimes, + Data: "test data", + RunCount: 0, + JobType: jobTypeOnce, + } + + if len(ed.TaskTimes) != 3 { + t.Errorf("expected 3 task times, got %d", len(ed.TaskTimes)) + } + if ed.Data != "test data" { + t.Errorf("unexpected data: %v", ed.Data) + } + if ed.RunCount != 0 { + t.Errorf("unexpected runCount: %d", ed.RunCount) + } +} + +func TestOnceSaveValidation(t *testing.T) { + var once Once + + once = Once{keyPrefix: "test_app", keySeparator: "[:]"} + + if once.keySeparator != "[:]" { + t.Errorf("unexpected keySeparator: %s", once.keySeparator) + } + if once.keyPrefix != "test_app" { + t.Errorf("unexpected keyPrefix: %s", once.keyPrefix) + } +} + +func TestOnceKeyPrefixConsistency(t *testing.T) { + keyPrefix := "myapp_v2" + + zsetKey := "timer:{myapp_v2}:once_zset" + listKey := "timer:{myapp_v2}:once_list" + dataKey := "timer:{myapp_v2}:once_data:urgent[:]task99" + + for _, k := range []string{zsetKey, listKey, dataKey} { + if !strings.Contains(k, "{"+keyPrefix+"}") { + t.Errorf("key %q missing hash tag for keyPrefix %q", k, keyPrefix) + } + } + + zsetHash := extractHashTagForOnce(zsetKey) + listHash := extractHashTagForOnce(listKey) + dataHash := extractHashTagForOnce(dataKey) + + if zsetHash != listHash || listHash != dataHash { + t.Errorf("all Once keys must share same hash tag: %q %q %q", zsetHash, listHash, dataHash) + } +} + +func extractHashTagForOnce(key string) string { + start := strings.Index(key, "{") + end := strings.Index(key, "}") + if start >= 0 && end > start { + return key[start+1 : end] + } + return "" +} + +func newDefaultLoggerForTest() logger.Logger { + return logger.NewLogger() } diff --git a/priority/priority.go b/priority/priority.go index baa4484..80f360d 100644 --- a/priority/priority.go +++ b/priority/priority.go @@ -50,7 +50,7 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin priority: priority, redis: re, logger: conf.logger, - redisKey: "timer:priority_" + conf.source + keyPrefix, + redisKey: fmt.Sprintf("timer:{%s}:priority_%s", keyPrefix, conf.source), expireTime: conf.expireTime, setInterval: conf.updateInterval, getInterval: conf.getInterval, diff --git a/priority/priority_test.go b/priority/priority_test.go index ad4831c..a8e56dc 100644 --- a/priority/priority_test.go +++ b/priority/priority_test.go @@ -131,8 +131,8 @@ func TestSetPriorityScenarios(t *testing.T) { ctx := context.Background() redisConn := getRedis() - // 删除Key - redisConn.Del(ctx, "timer:priority_test22") + // 删除Key (new key format with hash tag) + redisConn.Del(ctx, "timer:{test22}:priority_") for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -197,7 +197,7 @@ func TestErrorScenarios(t *testing.T) { priority := &Priority{ redis: getRedis(), - redisKey: "timer:priority_test", + redisKey: "timer:{test}:priority_", priority: 100, ctx: ctx, }