优化调整单次任务的执行
This commit is contained in:
+20
-4
@@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"github.com/yuninks/timerx"
|
"github.com/yuninks/timerx"
|
||||||
|
"github.com/yuninks/timerx/priority"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@@ -35,14 +36,24 @@ func once() {
|
|||||||
client := getRedis()
|
client := getRedis()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
w := OnceWorker{}
|
w := OnceWorker{}
|
||||||
one := timerx.InitOnce(ctx, client, "test", w)
|
|
||||||
|
ver, err := priority.PriorityByVersion("v2.2.3.4.5")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ops := []timerx.Option{
|
||||||
|
timerx.SetPriority(ver),
|
||||||
|
}
|
||||||
|
|
||||||
|
one := timerx.InitOnce(ctx, client, "test", w, ops...)
|
||||||
|
|
||||||
d := OnceData{
|
d := OnceData{
|
||||||
Num: 3,
|
Num: 3,
|
||||||
}
|
}
|
||||||
// dy, _ := json.Marshal(d)
|
// dy, _ := json.Marshal(d)
|
||||||
|
|
||||||
err := one.Create("test", "test3", 1*time.Second, d)
|
err = one.Create("test", "test3", 1*time.Second, d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
}
|
}
|
||||||
@@ -51,7 +62,12 @@ func once() {
|
|||||||
// }
|
// }
|
||||||
dd := 123
|
dd := 123
|
||||||
// dy, _ = json.Marshal(d)
|
// dy, _ = json.Marshal(d)
|
||||||
err = one.Save("test", "test4", 1*time.Second, dd)
|
err = one.Save("test", "test4", 2*time.Second, dd)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = one.Save("test", "test5", 5*time.Second, dd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
}
|
}
|
||||||
@@ -87,7 +103,7 @@ func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, ta
|
|||||||
return &timerx.OnceWorkerResp{
|
return &timerx.OnceWorkerResp{
|
||||||
Retry: true,
|
Retry: true,
|
||||||
AttachData: attachData,
|
AttachData: attachData,
|
||||||
DelayTime: 1 * time.Second,
|
DelayTime: 10 * time.Second,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -114,8 +114,9 @@ func (w *Once) Save(taskType OnceTaskType, taskId string, delayTime time.Duratio
|
|||||||
b, _ := json.Marshal(ed)
|
b, _ := json.Marshal(ed)
|
||||||
|
|
||||||
// 写入附加数据
|
// 写入附加数据
|
||||||
_, err := w.redis.SetEX(w.ctx, w.keyPrefix+redisKey, b, delayTime+time.Second*5).Result()
|
_, err := w.redis.SetEX(w.ctx, w.keyPrefix+redisKey, b, delayTime+time.Minute*30).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
w.logger.Errorf(w.ctx, "写入附加数据失败:%s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -130,6 +131,13 @@ func (w *Once) Save(taskType OnceTaskType, taskId string, delayTime time.Duratio
|
|||||||
|
|
||||||
// 添加任务(不覆盖)
|
// 添加任务(不覆盖)
|
||||||
func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error {
|
func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error {
|
||||||
|
if delayTime.Abs() != delayTime {
|
||||||
|
return fmt.Errorf("时间间隔不能为负数")
|
||||||
|
}
|
||||||
|
if delayTime == 0 {
|
||||||
|
return fmt.Errorf("时间间隔不能为0")
|
||||||
|
}
|
||||||
|
|
||||||
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
||||||
// 判断有序集合Key是否存在,存在则报错,不存在则写入
|
// 判断有序集合Key是否存在,存在则报错,不存在则写入
|
||||||
if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 {
|
if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 {
|
||||||
@@ -141,8 +149,9 @@ func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Durat
|
|||||||
b, _ := json.Marshal(ed)
|
b, _ := json.Marshal(ed)
|
||||||
|
|
||||||
// 写入附加数据
|
// 写入附加数据
|
||||||
_, err := l.redis.SetEX(l.ctx, l.keyPrefix+redisKey, b, delayTime+time.Second*5).Result()
|
_, err := l.redis.SetEX(l.ctx, l.keyPrefix+redisKey, b, delayTime+time.Minute*30).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
l.logger.Errorf(l.ctx, "写入附加数据失败:%s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = l.redis.ZAdd(l.ctx, l.zsetKey, &redis.Z{
|
_, err = l.redis.ZAdd(l.ctx, l.zsetKey, &redis.Z{
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ type Options struct {
|
|||||||
location *time.Location
|
location *time.Location
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
usePriority bool
|
usePriority bool
|
||||||
priorityVal int
|
priorityVal int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultOptions() Options {
|
func defaultOptions() Options {
|
||||||
@@ -56,7 +56,7 @@ func SetTimeout(d time.Duration) Option {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 设置优先级
|
// 设置优先级
|
||||||
func SetPriority(priority int) Option {
|
func SetPriority(priority int64) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.usePriority = true
|
o.usePriority = true
|
||||||
o.priorityVal = priority
|
o.priorityVal = priority
|
||||||
|
|||||||
+13
-7
@@ -15,16 +15,16 @@ import (
|
|||||||
|
|
||||||
type Priority struct {
|
type Priority struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
priority int // 优先级
|
priority int64 // 优先级
|
||||||
redis redis.UniversalClient
|
redis redis.UniversalClient
|
||||||
redisKey string
|
redisKey string
|
||||||
logger logger.Logger
|
logger logger.Logger
|
||||||
expireTime time.Duration
|
expireTime time.Duration
|
||||||
updateInterval time.Duration // 更新间隔
|
updateInterval time.Duration // 更新间隔
|
||||||
deadTime int64 // 缓存时间戳,单位秒
|
deadTime *int64 // 缓存时间戳,单位秒
|
||||||
}
|
}
|
||||||
|
|
||||||
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int, opts ...Option) *Priority {
|
func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) *Priority {
|
||||||
conf := newOptions(opts...)
|
conf := newOptions(opts...)
|
||||||
|
|
||||||
pro := &Priority{
|
pro := &Priority{
|
||||||
@@ -35,6 +35,7 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin
|
|||||||
redisKey: "timer:priority_" + keyPrefix,
|
redisKey: "timer:priority_" + keyPrefix,
|
||||||
expireTime: conf.expireTime,
|
expireTime: conf.expireTime,
|
||||||
updateInterval: conf.updateInterval,
|
updateInterval: conf.updateInterval,
|
||||||
|
deadTime: new(int64),
|
||||||
}
|
}
|
||||||
|
|
||||||
// 更新间隔
|
// 更新间隔
|
||||||
@@ -55,9 +56,13 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin
|
|||||||
return pro
|
return pro
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Priority) IsLatest(ctx context.Context) bool {
|
func (l *Priority) IsLatest(ctx context.Context) (b bool) {
|
||||||
|
// defer func() {
|
||||||
|
// l.logger.Infof(l.ctx, "当前优先级:%d bool:%v", l.priority, b)
|
||||||
|
// }()
|
||||||
|
|
||||||
// 加缓存
|
// 加缓存
|
||||||
if atomic.LoadInt64(&l.deadTime) > time.Now().Unix() {
|
if atomic.LoadInt64(l.deadTime) > time.Now().Unix() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -71,7 +76,7 @@ func (l *Priority) IsLatest(ctx context.Context) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
strPriority, err := strconv.Atoi(str)
|
strPriority, err := strconv.ParseInt(str, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error())
|
l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error())
|
||||||
return false
|
return false
|
||||||
@@ -83,6 +88,7 @@ func (l *Priority) IsLatest(ctx context.Context) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *Priority) setPriority() bool {
|
func (l *Priority) setPriority() bool {
|
||||||
|
|
||||||
// redis lua脚本
|
// redis lua脚本
|
||||||
// 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl
|
// 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl
|
||||||
script := `
|
script := `
|
||||||
@@ -142,7 +148,7 @@ func (l *Priority) setPriority() bool {
|
|||||||
if operationResult == "SET" || operationResult == "UPDATE" {
|
if operationResult == "SET" || operationResult == "UPDATE" {
|
||||||
l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority)
|
l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority)
|
||||||
|
|
||||||
atomic.StoreInt64(&l.deadTime, time.Now().Add(l.updateInterval).Unix())
|
atomic.StoreInt64(l.deadTime, time.Now().Add(l.updateInterval).Unix())
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
+1
-1
@@ -37,7 +37,7 @@ func PriorityByVersion(version string) (priority int64, err error) {
|
|||||||
if val == "" {
|
if val == "" {
|
||||||
return 0, ErrVersionFormat
|
return 0, ErrVersionFormat
|
||||||
}
|
}
|
||||||
i, err := strconv.ParseInt(val, 10, 32)
|
i, err := strconv.ParseInt(val, 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, ErrVersionFormat
|
return 0, ErrVersionFormat
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user