修复一个单次的BUG

This commit is contained in:
Yun
2024-10-11 16:17:22 +08:00
parent 1ac53f7688
commit ca0d5a1b99
2 changed files with 54 additions and 36 deletions
+33 -23
View File
@@ -20,18 +20,19 @@ import (
// 单次的任务队列
type Once struct {
ctx context.Context
logger Logger
zsetKey string
listKey string
redis redis.UniversalClient
worker Callback
ctx context.Context
logger Logger
zsetKey string
listKey string
redis redis.UniversalClient
worker Callback
keyPrefix string
}
type OnceWorkerResp struct {
Retry bool // 是否重试 true
DelayTime time.Duration
AttachData []byte
AttachData interface{}
}
// 需要考虑执行失败重新放入队列的情况
@@ -42,7 +43,7 @@ type Callback interface {
// @param data interface{} 任务数据
// @return WorkerCode 任务执行结果
// @return time.Duration 任务执行时间间隔
Worker(ctx context.Context, taskType string, taskId string, attachData []byte) *OnceWorkerResp
Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp
}
var wo *Once = nil
@@ -50,20 +51,25 @@ var once sync.Once
type extendData struct {
Delay time.Duration
Data []byte
Data interface{}
}
// 初始化
func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) *Once {
if re == nil {
panic("redis client is nil")
}
op := newOptions(opts...)
once.Do(func() {
wo = &Once{
ctx: ctx,
logger: op.logger,
zsetKey: "timer:once_zsetkey" + keyPrefix,
listKey: "timer:once_listkey" + keyPrefix,
redis: re,
worker: call,
ctx: ctx,
logger: op.logger,
zsetKey: "timer:once_zsetkey" + keyPrefix,
listKey: "timer:once_listkey" + keyPrefix,
redis: re,
worker: call,
keyPrefix: keyPrefix,
}
go wo.getTask()
go wo.watch()
@@ -78,7 +84,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
// @param uniTaskId string 任务唯一标识
// @param delayTime time.Duration 延迟时间
// @param attachData interface{} 附加数据
func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData []byte) error {
func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
if delayTime.Abs() != delayTime {
return fmt.Errorf("时间间隔不能为负数")
}
@@ -95,7 +101,7 @@ func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, att
b, _ := json.Marshal(ed)
// 写入附加数据
_, err := w.redis.SetEX(w.ctx, redisKey, b, delayTime+time.Second*5).Result()
_, err := w.redis.SetEX(w.ctx, w.keyPrefix+redisKey, b, delayTime+time.Second*5).Result()
if err != nil {
return err
}
@@ -110,11 +116,11 @@ func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, att
}
// 添加任务(不覆盖)
func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData []byte) error {
func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
// 判断有序集合Key是否存在,存在则报错,不存在则写入
if l.redis.Exists(l.ctx, l.zsetKey).Val() == 0 {
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 {
ed := extendData{
Delay: delayTime,
Data: attachData,
@@ -122,7 +128,7 @@ func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, a
b, _ := json.Marshal(ed)
// 写入附加数据
_, err := l.redis.SetEX(l.ctx, redisKey, b, delayTime+time.Second*5).Result()
_, err := l.redis.SetEX(l.ctx, l.keyPrefix+redisKey, b, delayTime+time.Second*5).Result()
if err != nil {
return err
}
@@ -208,11 +214,15 @@ func (l *Once) doTask(ctx context.Context, key string) {
s := strings.Split(key, "[:]")
// 读取数据
str, err := l.redis.Get(ctx, key).Result()
redisKey := l.keyPrefix + key
str, err := l.redis.Get(ctx, redisKey).Result()
if err != nil {
l.logger.Errorf(ctx, "获取数据失败 err:%s", err)
return
}
fmt.Println("参数:", str)
ed := extendData{}
json.Unmarshal([]byte(str), &ed)