From ca0d5a1b9918375a8056fc674dab163eb99f4b4d Mon Sep 17 00:00:00 2001 From: Yun <995116474@qq.com> Date: Fri, 11 Oct 2024 16:17:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=80=E4=B8=AA=E5=8D=95?= =?UTF-8?q?=E6=AC=A1=E7=9A=84BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/main.go | 34 +++++++++++++++++++------------- once.go | 56 +++++++++++++++++++++++++++++++---------------------- 2 files changed, 54 insertions(+), 36 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 6e1a243..e5b099b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "fmt" "time" @@ -39,11 +38,19 @@ func once() { one := timerx.InitOnce(ctx, client, "test", w) d := OnceData{ - Num: 1, + Num: 3, } - dy, _ := json.Marshal(d) + // dy, _ := json.Marshal(d) - err := one.Create("test", "test", 1*time.Second, dy) + err := one.Create("test", "test3", 1*time.Second, d) + if err != nil { + fmt.Println(err) + } + d = OnceData{ + Num: 4, + } + // dy, _ = json.Marshal(d) + err = one.Create("test", "test4", 1*time.Second, d) if err != nil { fmt.Println(err) } @@ -56,24 +63,25 @@ type OnceData struct { type OnceWorker struct{} -func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData []byte) *timerx.OnceWorkerResp { +func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *timerx.OnceWorkerResp { fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) - fmt.Println(taskId, taskType) - fmt.Printf("原来的参数:%s\n", string(attachData)) + fmt.Println(taskType, taskId) - d := OnceData{} + fmt.Printf("原来的参数:%+v\n", attachData) - json.Unmarshal(attachData, &d) + // d := OnceData{} - d.Num++ + // json.Unmarshal(ab, &d) - fmt.Println(d) + // d.Num++ - dy, _ := json.Marshal(d) + // fmt.Println(d) + + // dy, _ := json.Marshal(d) return &timerx.OnceWorkerResp{ Retry: true, - AttachData: dy, + AttachData: attachData, DelayTime: 1 * time.Second, } } diff --git a/once.go b/once.go index 4fb250b..b9b5567 100644 --- a/once.go +++ b/once.go @@ -20,18 +20,19 @@ import ( // 单次的任务队列 type Once struct { - ctx context.Context - logger Logger - zsetKey string - listKey string - redis redis.UniversalClient - worker Callback + ctx context.Context + logger Logger + zsetKey string + listKey string + redis redis.UniversalClient + worker Callback + keyPrefix string } type OnceWorkerResp struct { Retry bool // 是否重试 true DelayTime time.Duration - AttachData []byte + AttachData interface{} } // 需要考虑执行失败重新放入队列的情况 @@ -42,7 +43,7 @@ type Callback interface { // @param data interface{} 任务数据 // @return WorkerCode 任务执行结果 // @return time.Duration 任务执行时间间隔 - Worker(ctx context.Context, taskType string, taskId string, attachData []byte) *OnceWorkerResp + Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp } var wo *Once = nil @@ -50,20 +51,25 @@ var once sync.Once type extendData struct { Delay time.Duration - Data []byte + Data interface{} } // 初始化 func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) *Once { + if re == nil { + panic("redis client is nil") + } + op := newOptions(opts...) once.Do(func() { wo = &Once{ - ctx: ctx, - logger: op.logger, - zsetKey: "timer:once_zsetkey" + keyPrefix, - listKey: "timer:once_listkey" + keyPrefix, - redis: re, - worker: call, + ctx: ctx, + logger: op.logger, + zsetKey: "timer:once_zsetkey" + keyPrefix, + listKey: "timer:once_listkey" + keyPrefix, + redis: re, + worker: call, + keyPrefix: keyPrefix, } go wo.getTask() go wo.watch() @@ -78,7 +84,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c // @param uniTaskId string 任务唯一标识 // @param delayTime time.Duration 延迟时间 // @param attachData interface{} 附加数据 -func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData []byte) error { +func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { if delayTime.Abs() != delayTime { return fmt.Errorf("时间间隔不能为负数") } @@ -95,7 +101,7 @@ func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, att b, _ := json.Marshal(ed) // 写入附加数据 - _, err := w.redis.SetEX(w.ctx, redisKey, b, delayTime+time.Second*5).Result() + _, err := w.redis.SetEX(w.ctx, w.keyPrefix+redisKey, b, delayTime+time.Second*5).Result() if err != nil { return err } @@ -110,11 +116,11 @@ func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, att } // 添加任务(不覆盖) -func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData []byte) error { - +func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { + redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) // 判断有序集合Key是否存在,存在则报错,不存在则写入 - if l.redis.Exists(l.ctx, l.zsetKey).Val() == 0 { - redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) + if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 { + ed := extendData{ Delay: delayTime, Data: attachData, @@ -122,7 +128,7 @@ func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, a b, _ := json.Marshal(ed) // 写入附加数据 - _, err := l.redis.SetEX(l.ctx, redisKey, b, delayTime+time.Second*5).Result() + _, err := l.redis.SetEX(l.ctx, l.keyPrefix+redisKey, b, delayTime+time.Second*5).Result() if err != nil { return err } @@ -208,11 +214,15 @@ func (l *Once) doTask(ctx context.Context, key string) { s := strings.Split(key, "[:]") // 读取数据 - str, err := l.redis.Get(ctx, key).Result() + redisKey := l.keyPrefix + key + str, err := l.redis.Get(ctx, redisKey).Result() if err != nil { l.logger.Errorf(ctx, "获取数据失败 err:%s", err) return } + + fmt.Println("参数:", str) + ed := extendData{} json.Unmarshal([]byte(str), &ed)