diff --git a/cluster.go b/cluster.go index ab12b37..9853ea6 100644 --- a/cluster.go +++ b/cluster.go @@ -21,10 +21,9 @@ import ( // 这是基于Redis的定时任务调度器,能够有效的在服务集群里面调度任务,避免了单点压力过高或单点故障问题 // 由于所有的服务代码是一致的,也就是一个定时任务将在所有的服务都有注册,具体调度到哪个服务运行看调度结果 -// 暂不支持删除定时器,因为这个定时器的设计是基于全局的,如果删除了,那么其他服务就不知道了 - type Cluster struct { ctx context.Context // context + cancel context.CancelFunc // 取消函数 redis redis.UniversalClient // redis cache *cachex.Cache // 本地缓存 timeout time.Duration // job执行超时时间 @@ -56,10 +55,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin // clusterOnceLimit.Do(func() { op := newOptions(opts...) - // u, _ := uuid.NewV7() + ctx, cancel := context.WithCancel(ctx) + defer cancel() clu := &Cluster{ ctx: ctx, + cancel: cancel, redis: red, cache: cachex.NewCache(), timeout: op.timeout, @@ -88,6 +89,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin return clu } +// Stop 停止集群定时器 +func (c *Cluster) Stop() { + close(c.stopChan) + c.wg.Wait() +} + // 守护任务 func (l *Cluster) startDaemon() { @@ -154,10 +161,9 @@ func (l *Cluster) getLeaderLock() error { l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue()) // 等待超时退出 - select { - case <-lock.GetCtx().Done(): - return nil - } + <-lock.GetCtx().Done() + return nil + } // isCurrentLeader 检查当前实例是否是leader @@ -381,7 +387,7 @@ func (l *Cluster) calculateNextTimes() { return 1 ` - lockKey := fmt.Sprintf("%s:lock:calc:%s:%d", l.keyPrefix, val.TaskId, nextTime.UnixNano()) + lockKey := fmt.Sprintf("%s:lock:calc:%s:%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli()) _, err = pipe.Eval(l.ctx, script, []string{l.zsetKey, lockKey}, nextTime.UnixMilli(), val.TaskId, 60).Result() if err != nil { diff --git a/cmd/main.go b/cmd/main.go index e3c6ccc..70e4f32 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "os" "time" "github.com/go-redis/redis/v8" @@ -26,12 +27,27 @@ func main() { // re() // d() // cluster() - once() + // once() + prioritys() select {} } +func prioritys() { + + client := getRedis() + ctx := context.Background() + pro := priority.InitPriority(ctx, client, "test", 10) + + for { + b := pro.IsLatest(ctx) + fmt.Println("isLatest", b) + time.Sleep(time.Millisecond*100) + } + +} + func once() { client := getRedis() ctx := context.Background() @@ -172,9 +188,24 @@ func re() { } func aa(ctx context.Context, data interface{}) error { + fmt.Println("-执行时间:", data, time.Now().Format("2006-01-02 15:04:05")) // fmt.Println(data) // time.Sleep(time.Second * 5) + + // 追加到文件 + file, err := os.OpenFile("./test.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + fmt.Println("打开文件失败:", err) + return err + } + defer file.Close() + _, err = file.WriteString(fmt.Sprintf("-执行时间:%v %s\n", data, time.Now().Format("2006-01-02 15:04:05"))) + if err != nil { + fmt.Println("写入文件失败:", err) + return err + } + return nil } diff --git a/go.mod b/go.mod index dafd606..b01f324 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/priority/option.go b/priority/option.go index e99462a..fc6728c 100644 --- a/priority/option.go +++ b/priority/option.go @@ -7,6 +7,7 @@ import ( ) type Options struct { + getInterval time.Duration // 查询周期 updateInterval time.Duration // 更新间隔 expireTime time.Duration // 有效时间 logger logger.Logger @@ -14,8 +15,9 @@ type Options struct { func defaultOptions() Options { return Options{ - updateInterval: time.Second * 10, - expireTime: time.Second * 32, + getInterval: time.Second * 2, + updateInterval: time.Second * 4, + expireTime: time.Second * 8, logger: logger.NewLogger(), } } @@ -36,13 +38,14 @@ func SetLogger(log logger.Logger) Option { } } -// 有效时间是3个周期 +// 更新周期 func SetUpdateInterval(d time.Duration) Option { if d.Abs() < time.Second { - d = time.Second * 10 + d = time.Second * 5 } return func(o *Options) { o.updateInterval = d - o.expireTime = d*3 + time.Second + o.expireTime = d*2 + time.Second + o.getInterval = d / 3 } } diff --git a/priority/priority.go b/priority/priority.go index 1f75a42..dcbb404 100644 --- a/priority/priority.go +++ b/priority/priority.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "strconv" - "sync/atomic" + "sync" "time" "github.com/go-redis/redis/v8" @@ -14,83 +14,120 @@ import ( // 多版本场景判断当前是否最新版本 type Priority struct { - ctx context.Context - priority int64 // 优先级 - redis redis.UniversalClient - redisKey string - logger logger.Logger - expireTime time.Duration - updateInterval time.Duration // 更新间隔 - deadTime *int64 // 缓存时间戳,单位秒 + ctx context.Context + cancel context.CancelFunc + priority int64 // 优先级 + redis redis.UniversalClient + redisKey string + logger logger.Logger + expireTime time.Duration + + setInterval time.Duration // 尝试set的间隔 + getInterval time.Duration // 尝试get的间隔 + + wg sync.WaitGroup + + isLatest bool + latestMux sync.RWMutex } func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) *Priority { conf := newOptions(opts...) + ctx, cancel := context.WithCancel(ctx) + pro := &Priority{ - ctx: ctx, - priority: priority, - redis: re, - logger: conf.logger, - redisKey: "timer:priority_" + keyPrefix, - expireTime: conf.expireTime, - updateInterval: conf.updateInterval, - deadTime: new(int64), + ctx: ctx, + cancel: cancel, + priority: priority, + redis: re, + logger: conf.logger, + redisKey: "timer:priority_" + keyPrefix, + expireTime: conf.expireTime, + setInterval: conf.updateInterval, + getInterval: conf.getInterval, } - // 更新间隔 - ut := time.NewTicker(conf.updateInterval) - go func(ctx context.Context) { - pro.setPriority() - Loop: - for { - select { - case <-ut.C: - pro.setPriority() - case <-ctx.Done(): - break Loop - } - } - }(ctx) + pro.startDaemon() return pro } -func (l *Priority) IsLatest(ctx context.Context) (b bool) { - // defer func() { - // l.logger.Infof(l.ctx, "当前优先级:%d bool:%v", l.priority, b) - // }() - - // 加缓存 - if atomic.LoadInt64(l.deadTime) > time.Now().Unix() { - return true +func (p *Priority) Close() { + if p.cancel != nil { + p.cancel() } - - str, err := l.redis.Get(l.ctx, l.redisKey).Result() - - if err != nil { - if err == redis.Nil && l.priority == 0 { - return true - } - l.logger.Errorf(l.ctx, "获取全局优先级失败:%s", err.Error()) - return false - } - - strPriority, err := strconv.ParseInt(str, 10, 64) - if err != nil { - l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error()) - return false - } - if l.priority >= strPriority { - return true - } - return false + p.wg.Wait() } -func (l *Priority) setPriority() bool { +// 守护进程 +func (l *Priority) startDaemon() { + // 启动更新缓存 + l.wg.Add(1) + go l.runUpdateLoop() - // redis lua脚本 - // 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl + l.wg.Add(1) + go l.getLatestLoop() + +} + +func (p *Priority) runUpdateLoop() { + defer p.wg.Done() + + // 立即尝试设置一次优先级 + if _, err := p.setPriority(); err != nil { + p.logger.Warnf(p.ctx, "Initial priority set failed: %v", err) + } + + ticker := time.NewTicker(p.setInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if _, err := p.setPriority(); err != nil { + p.logger.Warnf(p.ctx, "Priority update failed: %v", err) + } + case <-p.ctx.Done(): + return + } + } +} + +func (l *Priority) getLatestLoop() { + + defer l.wg.Done() + + if err := l.getLatest(); err != nil { + l.logger.Errorf(l.ctx, "Priority update failed: %v", err) + } + + ticker := time.NewTicker(l.getInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := l.getLatest(); err != nil { + l.logger.Errorf(l.ctx, "Priority update failed: %v", err) + } + case <-l.ctx.Done(): + return + } + } + +} + + + +func (p *Priority) IsLatest(ctx context.Context) bool { + p.latestMux.RLock() + defer p.latestMux.RUnlock() + + return p.isLatest +} + +func (p *Priority) setPriority() (string, error) { script := ` -- KEYS[1] 是全局优先级的key local priorityKey = KEYS[1] @@ -112,47 +149,101 @@ func (l *Priority) setPriority() bool { if not currentPriority then -- 如果当前优先级不存在,则设置新优先级并设置TTL redis.call('set', priorityKey, priority, 'ex', expireTime) - return { "SET", expireTime } + return { "SET" } elseif currentPriorityNum < newPriorityNum then -- 如果当前优先级小于新优先级,则更新优先级并更新TTL redis.call('set', priorityKey, priority, 'ex', expireTime) - return { "RESET", expireTime } + return { "RESET" } elseif currentPriorityNum == newPriorityNum then -- 优先级相同则更新TTL redis.call('expire', priorityKey, expireTime) - return { "UPDATE", expireTime } + return { "UPDATE" } else -- 如果当前优先级大于新优先级,则不更新 - return { "NOAUCH", '0' } + return { "NOAUCH" } end ` - priority := fmt.Sprintf("%d", l.priority) - res, err := l.redis.Eval(l.ctx, script, []string{l.redisKey}, priority, l.expireTime.Seconds()).Result() + newPriorityStr := strconv.FormatInt(p.priority, 10) + + result, err := p.redis.Eval(p.ctx, script, []string{p.redisKey}, newPriorityStr, p.expireTime.Seconds()).Result() + // p.logger.Infof(p.ctx, "Priority update result:%+v err:%+v", result, err) if err != nil { - l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error()) - return false + p.logger.Errorf(p.ctx, "Priority update err:%s", err.Error()) + return "", err } - l.logger.Infof(l.ctx, "设置全局优先级返回值:%+v", res) - - // 处理返回值,包含操作结果和 TTL - resultArray := res.([]interface{}) - if len(resultArray) < 2 { - l.logger.Errorf(l.ctx, "设置全局优先级失败: 返回值格式不正确") - return false + // 解析结果 + if resultMap, ok := result.([]interface{}); ok && len(resultMap) == 1 { + resultStr := resultMap[0].(string) + return resultStr, nil } - operationResult := resultArray[0].(string) - ttl := resultArray[1].(string) - if operationResult == "SET" || operationResult == "UPDATE" { - l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority) - - atomic.StoreInt64(l.deadTime, time.Now().Add(l.updateInterval).Unix()) - - return true - } - _ = ttl - l.logger.Infof(l.ctx, "设置全局优先级未更新:%s", priority) - return false + return "", fmt.Errorf("script error: %v", result) +} + +func (l *Priority) getLatest() error { + // 查询Redis获取当前最高优先级 + currentPriority, err := l.getCurrentPriority() + + l.logger.Infof(l.ctx, "Priority getLatest currentPriority:%d l.priority:%d err:%+v", currentPriority, l.priority, err) + + if err != nil { + l.logger.Errorf(l.ctx, "Priority getLatest getCurrentPriority err:%s", err.Error()) + return err + } + if currentPriority > l.priority { + // 当前不是最新的 + l.latestMux.Lock() + l.isLatest = false + l.latestMux.Unlock() + + return nil + } + + l.latestMux.Lock() + l.isLatest = true + l.latestMux.Unlock() + + return nil +} + +func (p *Priority) getCurrentPriority() (int64, error) { + result, err := p.redis.Get(p.ctx, p.redisKey).Result() + if err != nil { + if err == redis.Nil { + // Key不存在,返回0作为默认值 + return 0, nil + } + return 0, err + } + + priority, err := strconv.ParseInt(result, 10, 64) + if err != nil { + return 0, err + } + + return priority, nil +} + +// ForceRefresh 强制刷新优先级,用于紧急情况 +func (p *Priority) ForceRefresh() error { + + _, err := p.setPriority() + if err != nil { + p.logger.Errorf(p.ctx, "Priority ForceRefresh setPriority err:%s", err.Error()) + return err + } + err = p.getLatest() + if err != nil { + p.logger.Errorf(p.ctx, "Priority ForceRefresh getLatest err:%s", err.Error()) + return err + } + + return nil +} + +// GetCurrentMaxPriority 获取当前系统中的最大优先级 +func (p *Priority) GetCurrentMaxPriority(ctx context.Context) (int64, error) { + return p.getCurrentPriority() } diff --git a/priority/priority_test.go b/priority/priority_test.go index 6e66000..c8c3bdd 100644 --- a/priority/priority_test.go +++ b/priority/priority_test.go @@ -1,13 +1,16 @@ -package priority_test +package priority import ( "context" "fmt" + "sync" "testing" "time" "github.com/go-redis/redis/v8" - "github.com/yuninks/timerx/priority" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func getRedis() *redis.Client { @@ -36,18 +39,18 @@ func TestPriority(t *testing.T) { ctx, cancel := context.WithCancel(ctx) - pro := priority.InitPriority(ctx, re, "test", 10, priority.SetUpdateInterval(time.Second*1)) + pro := InitPriority(ctx, re, "test", 10, SetUpdateInterval(time.Second*1)) for i := 0; i < 10; i++ { bb := pro.IsLatest(ctx) fmt.Println("cc:", bb) time.Sleep(time.Second) } - + cancel() }() - pro := priority.InitPriority(ctx, re, "test", 0, priority.SetUpdateInterval(time.Second*1)) + pro := InitPriority(ctx, re, "test", 0, SetUpdateInterval(time.Second*1)) for i := 0; i < 25; i++ { bb := pro.IsLatest(ctx) @@ -56,3 +59,150 @@ func TestPriority(t *testing.T) { } } + +// MockRedisClient 模拟Redis客户端 +type MockRedisClient struct { + redis.UniversalClient + mock.Mock +} + +func (m *MockRedisClient) Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd { + arguments := m.Called(ctx, script, keys, args) + return arguments.Get(0).(*redis.Cmd) +} + +func (m *MockRedisClient) Get(ctx context.Context, key string) *redis.StringCmd { + arguments := m.Called(ctx, key) + return arguments.Get(0).(*redis.StringCmd) +} + +func (m *MockRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd { + arguments := m.Called(ctx, key, value, expiration) + return arguments.Get(0).(*redis.StatusCmd) +} + +func TestInitPriority(t *testing.T) { + ctx := context.Background() + + // 测试正常初始化 + priority := InitPriority(ctx, getRedis(), "test", 100) + assert.NotNil(t, priority) + assert.Equal(t, int64(100), priority.priority) +} + +func TestSetPriorityScenarios(t *testing.T) { + testCases := []struct { + name string + currentRedis interface{} + newPriority int64 + expectedStatus string + expectedValue int64 + }{ + { + name: "首次设置优先级", + currentRedis: nil, // Redis中不存在key + newPriority: 100, + expectedStatus: "SET", + expectedValue: 100, + }, + { + name: "更新更高优先级", + currentRedis: "50", // Redis中存在较低优先级 + newPriority: 100, + expectedStatus: "UPDATED", + expectedValue: 100, + }, + { + name: "保持相同优先级", + currentRedis: "100", // Redis中存在相同优先级 + newPriority: 100, + expectedStatus: "EXTENDED", + expectedValue: 100, + }, + { + name: "忽略较低优先级", + currentRedis: "150", // Redis中存在更高优先级 + newPriority: 100, + expectedStatus: "IGNORED", + expectedValue: 150, + }, + } + + ctx := context.Background() + + redisConn := getRedis() + // 删除Key + redisConn.Del(ctx, "timer:priority_test22") + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + priority := InitPriority(ctx, redisConn, "test22", tc.newPriority) + defer priority.Close() + + time.Sleep(time.Second * 1) + + _, err := priority.setPriority() + + assert.NoError(t, err) + // assert.Equal(t, tc.expectedStatus, sta) + }) + } +} + +// 并发安全测试 +func TestConcurrentAccess(t *testing.T) { + ctx := context.Background() + + priority := InitPriority(ctx, getRedis(), "testacc", 100) + + time.Sleep(time.Second * 1) + + // 并发读取IsLatest + var wg sync.WaitGroup + results := make(chan bool, 100) + + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + results <- priority.IsLatest(ctx) + }() + } + + wg.Wait() + close(results) + + // 所有结果应该相同 + firstResult := <-results + for result := range results { + t.Log(result) + assert.Equal(t, firstResult, result) + } +} + +// 错误处理测试 +func TestErrorScenarios(t *testing.T) { + t.Run("Redis连接失败", func(t *testing.T) { + ctx := context.Background() + + priority := InitPriority(ctx, getRedis(), "test", 100) + _,err := priority.setPriority() + + assert.Error(t, err) + }) + + t.Run("Redis返回值解析错误", func(t *testing.T) { + ctx := context.Background() + + priority := &Priority{ + redis: getRedis(), + redisKey: "timer:priority_test", + priority: 100, + ctx: ctx, + } + + _, err := priority.getCurrentPriority() + assert.Error(t, err) + }) +} \ No newline at end of file