优化策略

This commit is contained in:
Yun
2025-09-18 15:34:01 +08:00
parent 44eeb8468d
commit a0ec69b7a2
6 changed files with 390 additions and 108 deletions
+13 -7
View File
@@ -21,10 +21,9 @@ import (
// 这是基于Redis的定时任务调度器,能够有效的在服务集群里面调度任务,避免了单点压力过高或单点故障问题
// 由于所有的服务代码是一致的,也就是一个定时任务将在所有的服务都有注册,具体调度到哪个服务运行看调度结果
// 暂不支持删除定时器,因为这个定时器的设计是基于全局的,如果删除了,那么其他服务就不知道了
type Cluster struct {
ctx context.Context // context
cancel context.CancelFunc // 取消函数
redis redis.UniversalClient // redis
cache *cachex.Cache // 本地缓存
timeout time.Duration // job执行超时时间
@@ -56,10 +55,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
// clusterOnceLimit.Do(func() {
op := newOptions(opts...)
// u, _ := uuid.NewV7()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
clu := &Cluster{
ctx: ctx,
cancel: cancel,
redis: red,
cache: cachex.NewCache(),
timeout: op.timeout,
@@ -88,6 +89,12 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
return clu
}
// Stop 停止集群定时器
func (c *Cluster) Stop() {
close(c.stopChan)
c.wg.Wait()
}
// 守护任务
func (l *Cluster) startDaemon() {
@@ -154,10 +161,9 @@ func (l *Cluster) getLeaderLock() error {
l.logger.Infof(l.ctx, "getLeaderLock Instance %s became leader", lock.GetValue())
// 等待超时退出
select {
case <-lock.GetCtx().Done():
<-lock.GetCtx().Done()
return nil
}
}
// isCurrentLeader 检查当前实例是否是leader
@@ -381,7 +387,7 @@ func (l *Cluster) calculateNextTimes() {
return 1
`
lockKey := fmt.Sprintf("%s:lock:calc:%s:%d", l.keyPrefix, val.TaskId, nextTime.UnixNano())
lockKey := fmt.Sprintf("%s:lock:calc:%s:%d", l.keyPrefix, val.TaskId, nextTime.UnixMilli())
_, err = pipe.Eval(l.ctx, script, []string{l.zsetKey, lockKey},
nextTime.UnixMilli(), val.TaskId, 60).Result()
if err != nil {
+32 -1
View File
@@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"os"
"time"
"github.com/go-redis/redis/v8"
@@ -26,12 +27,27 @@ func main() {
// re()
// d()
// cluster()
once()
// once()
prioritys()
select {}
}
func prioritys() {
client := getRedis()
ctx := context.Background()
pro := priority.InitPriority(ctx, client, "test", 10)
for {
b := pro.IsLatest(ctx)
fmt.Println("isLatest", b)
time.Sleep(time.Millisecond*100)
}
}
func once() {
client := getRedis()
ctx := context.Background()
@@ -172,9 +188,24 @@ func re() {
}
func aa(ctx context.Context, data interface{}) error {
fmt.Println("-执行时间:", data, time.Now().Format("2006-01-02 15:04:05"))
// fmt.Println(data)
// time.Sleep(time.Second * 5)
// 追加到文件
file, err := os.OpenFile("./test.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println("打开文件失败:", err)
return err
}
defer file.Close()
_, err = file.WriteString(fmt.Sprintf("-执行时间:%v %s\n", data, time.Now().Format("2006-01-02 15:04:05")))
if err != nil {
fmt.Println("写入文件失败:", err)
return err
}
return nil
}
+1
View File
@@ -16,6 +16,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+8 -5
View File
@@ -7,6 +7,7 @@ import (
)
type Options struct {
getInterval time.Duration // 查询周期
updateInterval time.Duration // 更新间隔
expireTime time.Duration // 有效时间
logger logger.Logger
@@ -14,8 +15,9 @@ type Options struct {
func defaultOptions() Options {
return Options{
updateInterval: time.Second * 10,
expireTime: time.Second * 32,
getInterval: time.Second * 2,
updateInterval: time.Second * 4,
expireTime: time.Second * 8,
logger: logger.NewLogger(),
}
}
@@ -36,13 +38,14 @@ func SetLogger(log logger.Logger) Option {
}
}
// 有效时间是3个周期
// 更新周期
func SetUpdateInterval(d time.Duration) Option {
if d.Abs() < time.Second {
d = time.Second * 10
d = time.Second * 5
}
return func(o *Options) {
o.updateInterval = d
o.expireTime = d*3 + time.Second
o.expireTime = d*2 + time.Second
o.getInterval = d / 3
}
}
+163 -72
View File
@@ -4,7 +4,7 @@ import (
"context"
"fmt"
"strconv"
"sync/atomic"
"sync"
"time"
"github.com/go-redis/redis/v8"
@@ -15,82 +15,119 @@ import (
type Priority struct {
ctx context.Context
cancel context.CancelFunc
priority int64 // 优先级
redis redis.UniversalClient
redisKey string
logger logger.Logger
expireTime time.Duration
updateInterval time.Duration // 更新间隔
deadTime *int64 // 缓存时间戳,单位秒
setInterval time.Duration // 尝试set的间隔
getInterval time.Duration // 尝试get的间隔
wg sync.WaitGroup
isLatest bool
latestMux sync.RWMutex
}
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) *Priority {
conf := newOptions(opts...)
ctx, cancel := context.WithCancel(ctx)
pro := &Priority{
ctx: ctx,
cancel: cancel,
priority: priority,
redis: re,
logger: conf.logger,
redisKey: "timer:priority_" + keyPrefix,
expireTime: conf.expireTime,
updateInterval: conf.updateInterval,
deadTime: new(int64),
setInterval: conf.updateInterval,
getInterval: conf.getInterval,
}
// 更新间隔
ut := time.NewTicker(conf.updateInterval)
go func(ctx context.Context) {
pro.setPriority()
Loop:
for {
select {
case <-ut.C:
pro.setPriority()
case <-ctx.Done():
break Loop
}
}
}(ctx)
pro.startDaemon()
return pro
}
func (l *Priority) IsLatest(ctx context.Context) (b bool) {
// defer func() {
// l.logger.Infof(l.ctx, "当前优先级:%d bool:%v", l.priority, b)
// }()
// 加缓存
if atomic.LoadInt64(l.deadTime) > time.Now().Unix() {
return true
func (p *Priority) Close() {
if p.cancel != nil {
p.cancel()
}
p.wg.Wait()
}
str, err := l.redis.Get(l.ctx, l.redisKey).Result()
// 守护进程
func (l *Priority) startDaemon() {
// 启动更新缓存
l.wg.Add(1)
go l.runUpdateLoop()
l.wg.Add(1)
go l.getLatestLoop()
if err != nil {
if err == redis.Nil && l.priority == 0 {
return true
}
l.logger.Errorf(l.ctx, "获取全局优先级失败:%s", err.Error())
return false
}
strPriority, err := strconv.ParseInt(str, 10, 64)
if err != nil {
l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error())
return false
}
if l.priority >= strPriority {
return true
}
return false
func (p *Priority) runUpdateLoop() {
defer p.wg.Done()
// 立即尝试设置一次优先级
if _, err := p.setPriority(); err != nil {
p.logger.Warnf(p.ctx, "Initial priority set failed: %v", err)
}
func (l *Priority) setPriority() bool {
ticker := time.NewTicker(p.setInterval)
defer ticker.Stop()
// redis lua脚本
// 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl
for {
select {
case <-ticker.C:
if _, err := p.setPriority(); err != nil {
p.logger.Warnf(p.ctx, "Priority update failed: %v", err)
}
case <-p.ctx.Done():
return
}
}
}
func (l *Priority) getLatestLoop() {
defer l.wg.Done()
if err := l.getLatest(); err != nil {
l.logger.Errorf(l.ctx, "Priority update failed: %v", err)
}
ticker := time.NewTicker(l.getInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := l.getLatest(); err != nil {
l.logger.Errorf(l.ctx, "Priority update failed: %v", err)
}
case <-l.ctx.Done():
return
}
}
}
func (p *Priority) IsLatest(ctx context.Context) bool {
p.latestMux.RLock()
defer p.latestMux.RUnlock()
return p.isLatest
}
func (p *Priority) setPriority() (string, error) {
script := `
-- KEYS[1] 是全局优先级的key
local priorityKey = KEYS[1]
@@ -112,47 +149,101 @@ func (l *Priority) setPriority() bool {
if not currentPriority then
-- 如果当前优先级不存在,则设置新优先级并设置TTL
redis.call('set', priorityKey, priority, 'ex', expireTime)
return { "SET", expireTime }
return { "SET" }
elseif currentPriorityNum < newPriorityNum then
-- 如果当前优先级小于新优先级,则更新优先级并更新TTL
redis.call('set', priorityKey, priority, 'ex', expireTime)
return { "RESET", expireTime }
return { "RESET" }
elseif currentPriorityNum == newPriorityNum then
-- 优先级相同则更新TTL
redis.call('expire', priorityKey, expireTime)
return { "UPDATE", expireTime }
return { "UPDATE" }
else
-- 如果当前优先级大于新优先级,则不更新
return { "NOAUCH", '0' }
return { "NOAUCH" }
end
`
priority := fmt.Sprintf("%d", l.priority)
res, err := l.redis.Eval(l.ctx, script, []string{l.redisKey}, priority, l.expireTime.Seconds()).Result()
newPriorityStr := strconv.FormatInt(p.priority, 10)
result, err := p.redis.Eval(p.ctx, script, []string{p.redisKey}, newPriorityStr, p.expireTime.Seconds()).Result()
// p.logger.Infof(p.ctx, "Priority update result:%+v err:%+v", result, err)
if err != nil {
l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error())
return false
p.logger.Errorf(p.ctx, "Priority update err:%s", err.Error())
return "", err
}
l.logger.Infof(l.ctx, "设置全局优先级返回值:%+v", res)
// 处理返回值,包含操作结果和 TTL
resultArray := res.([]interface{})
if len(resultArray) < 2 {
l.logger.Errorf(l.ctx, "设置全局优先级失败: 返回值格式不正确")
return false
// 解析结果
if resultMap, ok := result.([]interface{}); ok && len(resultMap) == 1 {
resultStr := resultMap[0].(string)
return resultStr, nil
}
operationResult := resultArray[0].(string)
ttl := resultArray[1].(string)
if operationResult == "SET" || operationResult == "UPDATE" {
l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority)
atomic.StoreInt64(l.deadTime, time.Now().Add(l.updateInterval).Unix())
return true
return "", fmt.Errorf("script error: %v", result)
}
_ = ttl
l.logger.Infof(l.ctx, "设置全局优先级未更新:%s", priority)
return false
func (l *Priority) getLatest() error {
// 查询Redis获取当前最高优先级
currentPriority, err := l.getCurrentPriority()
l.logger.Infof(l.ctx, "Priority getLatest currentPriority:%d l.priority:%d err:%+v", currentPriority, l.priority, err)
if err != nil {
l.logger.Errorf(l.ctx, "Priority getLatest getCurrentPriority err:%s", err.Error())
return err
}
if currentPriority > l.priority {
// 当前不是最新的
l.latestMux.Lock()
l.isLatest = false
l.latestMux.Unlock()
return nil
}
l.latestMux.Lock()
l.isLatest = true
l.latestMux.Unlock()
return nil
}
func (p *Priority) getCurrentPriority() (int64, error) {
result, err := p.redis.Get(p.ctx, p.redisKey).Result()
if err != nil {
if err == redis.Nil {
// Key不存在,返回0作为默认值
return 0, nil
}
return 0, err
}
priority, err := strconv.ParseInt(result, 10, 64)
if err != nil {
return 0, err
}
return priority, nil
}
// ForceRefresh 强制刷新优先级,用于紧急情况
func (p *Priority) ForceRefresh() error {
_, err := p.setPriority()
if err != nil {
p.logger.Errorf(p.ctx, "Priority ForceRefresh setPriority err:%s", err.Error())
return err
}
err = p.getLatest()
if err != nil {
p.logger.Errorf(p.ctx, "Priority ForceRefresh getLatest err:%s", err.Error())
return err
}
return nil
}
// GetCurrentMaxPriority 获取当前系统中的最大优先级
func (p *Priority) GetCurrentMaxPriority(ctx context.Context) (int64, error) {
return p.getCurrentPriority()
}
+154 -4
View File
@@ -1,13 +1,16 @@
package priority_test
package priority
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/go-redis/redis/v8"
"github.com/yuninks/timerx/priority"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func getRedis() *redis.Client {
@@ -36,7 +39,7 @@ func TestPriority(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
pro := priority.InitPriority(ctx, re, "test", 10, priority.SetUpdateInterval(time.Second*1))
pro := InitPriority(ctx, re, "test", 10, SetUpdateInterval(time.Second*1))
for i := 0; i < 10; i++ {
bb := pro.IsLatest(ctx)
@@ -47,7 +50,7 @@ func TestPriority(t *testing.T) {
cancel()
}()
pro := priority.InitPriority(ctx, re, "test", 0, priority.SetUpdateInterval(time.Second*1))
pro := InitPriority(ctx, re, "test", 0, SetUpdateInterval(time.Second*1))
for i := 0; i < 25; i++ {
bb := pro.IsLatest(ctx)
@@ -56,3 +59,150 @@ func TestPriority(t *testing.T) {
}
}
// MockRedisClient 模拟Redis客户端
type MockRedisClient struct {
redis.UniversalClient
mock.Mock
}
func (m *MockRedisClient) Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd {
arguments := m.Called(ctx, script, keys, args)
return arguments.Get(0).(*redis.Cmd)
}
func (m *MockRedisClient) Get(ctx context.Context, key string) *redis.StringCmd {
arguments := m.Called(ctx, key)
return arguments.Get(0).(*redis.StringCmd)
}
func (m *MockRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
arguments := m.Called(ctx, key, value, expiration)
return arguments.Get(0).(*redis.StatusCmd)
}
func TestInitPriority(t *testing.T) {
ctx := context.Background()
// 测试正常初始化
priority := InitPriority(ctx, getRedis(), "test", 100)
assert.NotNil(t, priority)
assert.Equal(t, int64(100), priority.priority)
}
func TestSetPriorityScenarios(t *testing.T) {
testCases := []struct {
name string
currentRedis interface{}
newPriority int64
expectedStatus string
expectedValue int64
}{
{
name: "首次设置优先级",
currentRedis: nil, // Redis中不存在key
newPriority: 100,
expectedStatus: "SET",
expectedValue: 100,
},
{
name: "更新更高优先级",
currentRedis: "50", // Redis中存在较低优先级
newPriority: 100,
expectedStatus: "UPDATED",
expectedValue: 100,
},
{
name: "保持相同优先级",
currentRedis: "100", // Redis中存在相同优先级
newPriority: 100,
expectedStatus: "EXTENDED",
expectedValue: 100,
},
{
name: "忽略较低优先级",
currentRedis: "150", // Redis中存在更高优先级
newPriority: 100,
expectedStatus: "IGNORED",
expectedValue: 150,
},
}
ctx := context.Background()
redisConn := getRedis()
// 删除Key
redisConn.Del(ctx, "timer:priority_test22")
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
priority := InitPriority(ctx, redisConn, "test22", tc.newPriority)
defer priority.Close()
time.Sleep(time.Second * 1)
_, err := priority.setPriority()
assert.NoError(t, err)
// assert.Equal(t, tc.expectedStatus, sta)
})
}
}
// 并发安全测试
func TestConcurrentAccess(t *testing.T) {
ctx := context.Background()
priority := InitPriority(ctx, getRedis(), "testacc", 100)
time.Sleep(time.Second * 1)
// 并发读取IsLatest
var wg sync.WaitGroup
results := make(chan bool, 100)
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
results <- priority.IsLatest(ctx)
}()
}
wg.Wait()
close(results)
// 所有结果应该相同
firstResult := <-results
for result := range results {
t.Log(result)
assert.Equal(t, firstResult, result)
}
}
// 错误处理测试
func TestErrorScenarios(t *testing.T) {
t.Run("Redis连接失败", func(t *testing.T) {
ctx := context.Background()
priority := InitPriority(ctx, getRedis(), "test", 100)
_,err := priority.setPriority()
assert.Error(t, err)
})
t.Run("Redis返回值解析错误", func(t *testing.T) {
ctx := context.Background()
priority := &Priority{
redis: getRedis(),
redisKey: "timer:priority_test",
priority: 100,
ctx: ctx,
}
_, err := priority.getCurrentPriority()
assert.Error(t, err)
})
}