优化&测试

This commit is contained in:
Yun
2024-05-22 15:02:39 +08:00
parent 7912bbc56c
commit 4d769cf997
7 changed files with 107 additions and 43 deletions
+73 -29
View File
@@ -3,12 +3,14 @@ package timerx
import (
"context"
"errors"
"fmt"
"runtime/debug"
"sync"
"time"
"code.yun.ink/pkg/lockx"
"github.com/go-redis/redis/v8"
"github.com/yuninks/cachex"
"github.com/yuninks/lockx"
)
// 功能描述
@@ -25,9 +27,11 @@ var clusterOnceLimit sync.Once
var clusterWorkerList sync.Map
type Cluster struct {
ctx context.Context
redis *redis.Client
logger Logger
ctx context.Context
redis *redis.Client
cache *cachex.Cache
logger Logger
keyPrefix string // key前缀
lockKey string // 全局计算的key
zsetKey string // 有序集合的key
@@ -45,13 +49,15 @@ func InitCluster(ctx context.Context, red *redis.Client, keyPrefix string, opts
op := newOptions(opts...)
clu = &Cluster{
ctx: ctx,
redis: red,
logger: op.logger,
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
ctx: ctx,
redis: red,
cache: cachex.NewCache(),
logger: op.logger,
keyPrefix: keyPrefix,
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
}
// 监听任务
@@ -111,7 +117,7 @@ func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday,
nowTime := time.Now()
jobData := JobData{
JobType: JobTypeEveryMonth,
JobType: JobTypeEveryWeek,
CreateTime: nowTime,
Weekday: week,
Hour: hour,
@@ -127,7 +133,7 @@ func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute in
nowTime := time.Now()
jobData := JobData{
JobType: JobTypeEveryMonth,
JobType: JobTypeEveryDay,
CreateTime: nowTime,
Hour: hour,
Minute: minute,
@@ -142,7 +148,7 @@ func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second
nowTime := time.Now()
jobData := JobData{
JobType: JobTypeEveryMonth,
JobType: JobTypeEveryHour,
CreateTime: nowTime,
Minute: minute,
Second: second,
@@ -156,7 +162,7 @@ func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, call
nowTime := time.Now()
jobData := JobData{
JobType: JobTypeEveryMonth,
JobType: JobTypeEveryMinute,
CreateTime: nowTime,
Second: second,
}
@@ -173,8 +179,12 @@ func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Du
return errors.New("间隔时间不能小于0")
}
// 获取当天的零点时间
zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location())
jobData := JobData{
JobType: JobTypeEveryMonth,
JobType: JobTypeInterval,
BaseTime: zeroTime, // 默认当天的零点
CreateTime: nowTime,
IntervalTime: spaceTime,
}
@@ -206,7 +216,7 @@ func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, ca
defer cancel()
lock := lockx.NewGlobalLock(ctx, c.redis, taskId)
tB := lock.Try(10)
tB := lock.Try(2)
if !tB {
c.logger.Errorf(ctx, "添加失败:%s", taskId)
return errors.New("添加失败")
@@ -244,7 +254,7 @@ func (c *Cluster) getNextTime() {
// 计算下一次时间
p := c.redis.Pipeline()
// p := c.redis.Pipeline()
// 根据内部注册的任务列表计算下一次执行的时间
clusterWorkerList.Range(func(key, value interface{}) bool {
@@ -252,16 +262,50 @@ func (c *Cluster) getNextTime() {
nextTime, _ := GetNextTime(time.Now(), time.Local, *val.JobData)
p.ZAdd(ctx, c.zsetKey, &redis.Z{
Score: float64(nextTime.UnixMilli()),
Member: val.TaskId,
})
// log.Println("computeTime add", c.zsetKey, val.taskId, nextTime.UnixMilli())
// fmt.Println(val.ExtendData, val.JobData, nextTime)
// 内部判定是否重复
cacheKey := fmt.Sprintf("%s_%s_%d", c.keyPrefix, val.TaskId, nextTime.Unix())
_, err := c.cache.Get(cacheKey)
if err == nil {
// 缓存已有值
return true
}
// redis lua脚本,尝试设置nx锁时间为一分钟,如果能设置进去则添加到有序集合zsetKey
script := `
local zsetKey = KEYS[1]
local cacheKey = ARGV[1]
local expireTime = ARGV[2]
local score = ARGV[3]
local member = ARGV[4]
local res = redis.call('set', cacheKey, '', 'nx', 'ex', expireTime)
if res then
redis.call('zadd', zsetKey, score, member)
return "SUCCESS"
end
return "ERROR"
`
// TODO:
expireTime := time.Minute
res, err := c.redis.Eval(ctx, script, []string{c.zsetKey}, cacheKey, expireTime.Seconds(), nextTime.UnixMilli(), val.TaskId).Result()
if err == nil && res.(string) == "SUCCESS" {
// 设置成功
c.cache.Set(cacheKey, "", expireTime)
}
return true
})
_, err := p.Exec(ctx)
_ = err
// _, err := p.Exec(ctx)
// _ = err
}
// 获取任务
@@ -293,7 +337,7 @@ func (c *Cluster) watch() {
}
_, ok := clusterWorkerList.Load(keys[1])
if !ok {
c.logger.Errorf(c.ctx, "watch timer:任务不存在", keys[1])
c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", keys[1])
c.redis.SAdd(c.ctx, c.setKey, keys[1])
continue
}
@@ -316,7 +360,7 @@ func (c *Cluster) watch() {
}
_, ok := clusterWorkerList.Load(taskId)
if !ok {
c.logger.Errorf(c.ctx, "watch timer:任务不存在", taskId)
c.logger.Errorf(c.ctx, "watch timer:任务不存在%+v", taskId)
c.redis.SAdd(c.ctx, c.setKey, taskId)
continue
}
@@ -337,7 +381,7 @@ func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string)
val, ok := clusterWorkerList.Load(taskId)
if !ok {
c.logger.Errorf(ctx, "doTask timer:任务不存在", taskId)
c.logger.Errorf(ctx, "doTask timer:任务不存在:%s", taskId)
return
}
t := val.(timerStr)
@@ -346,7 +390,7 @@ func (c *Cluster) doTask(ctx context.Context, red *redis.Client, taskId string)
lock := lockx.NewGlobalLock(ctx, red, taskId)
tB := lock.Lock()
if !tB {
c.logger.Errorf(ctx, "doTask timer:获取锁失败", taskId)
c.logger.Errorf(ctx, "doTask timer:获取锁失败:%s", taskId)
return
}
defer lock.Unlock()