From 1ac53f768865de09e0eba34c1e6c58e9adedefa1 Mon Sep 17 00:00:00 2001 From: Yun <995116474@qq.com> Date: Thu, 10 Oct 2024 10:05:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=99=84=E5=8A=A0=E5=86=85?= =?UTF-8?q?=E5=AE=B9=E7=9A=84=E5=AD=97=E6=AE=B5=E4=B8=BA=E5=AD=97=E8=8A=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/main.go | 48 +++++++++++++++++++++++++----------------------- once.go | 10 +++++----- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 342ca50..6e1a243 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -41,8 +41,9 @@ func once() { d := OnceData{ Num: 1, } + dy, _ := json.Marshal(d) - err := one.Create("test", "test", 1*time.Second, d) + err := one.Create("test", "test", 1*time.Second, dy) if err != nil { fmt.Println(err) } @@ -55,23 +56,24 @@ type OnceData struct { type OnceWorker struct{} -func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *timerx.OnceWorkerResp { +func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData []byte) *timerx.OnceWorkerResp { fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println(taskId, taskType) - fmt.Println(attachData) + fmt.Printf("原来的参数:%s\n", string(attachData)) d := OnceData{} - by, _ := json.Marshal(attachData) - json.Unmarshal(by, &d) + json.Unmarshal(attachData, &d) d.Num++ fmt.Println(d) + dy, _ := json.Marshal(d) + return &timerx.OnceWorkerResp{ Retry: true, - AttachData: d, + AttachData: dy, DelayTime: 1 * time.Second, } } @@ -91,23 +93,23 @@ func cluster() { } func worker() { - client := getRedis() - w := timerx.InitOnce(context.Background(), client, "test", &OnceWorker{}) - w.Save("test", "test", 1*time.Second, map[string]interface{}{ - "test": "test", - }) - w.Save("test2", "test", 1*time.Second, map[string]interface{}{ - "test": "test", - }) - w.Save("test3", "test", 1*time.Second, map[string]interface{}{ - "test": "test", - }) - w.Save("test4", "test", 1*time.Second, map[string]interface{}{ - "test": "test", - }) - w.Save("test5", "test", 1*time.Second, map[string]interface{}{ - "test": "test", - }) + // client := getRedis() + // w := timerx.InitOnce(context.Background(), client, "test", &OnceWorker{}) + // w.Save("test", "test", 1*time.Second, map[string]interface{}{ + // "test": "test", + // }) + // w.Save("test2", "test", 1*time.Second, map[string]interface{}{ + // "test": "test", + // }) + // w.Save("test3", "test", 1*time.Second, map[string]interface{}{ + // "test": "test", + // }) + // w.Save("test4", "test", 1*time.Second, map[string]interface{}{ + // "test": "test", + // }) + // w.Save("test5", "test", 1*time.Second, map[string]interface{}{ + // "test": "test", + // }) select {} } diff --git a/once.go b/once.go index 222d4bb..4fb250b 100644 --- a/once.go +++ b/once.go @@ -31,7 +31,7 @@ type Once struct { type OnceWorkerResp struct { Retry bool // 是否重试 true DelayTime time.Duration - AttachData interface{} + AttachData []byte } // 需要考虑执行失败重新放入队列的情况 @@ -42,7 +42,7 @@ type Callback interface { // @param data interface{} 任务数据 // @return WorkerCode 任务执行结果 // @return time.Duration 任务执行时间间隔 - Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp + Worker(ctx context.Context, taskType string, taskId string, attachData []byte) *OnceWorkerResp } var wo *Once = nil @@ -50,7 +50,7 @@ var once sync.Once type extendData struct { Delay time.Duration - Data interface{} + Data []byte } // 初始化 @@ -78,7 +78,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c // @param uniTaskId string 任务唯一标识 // @param delayTime time.Duration 延迟时间 // @param attachData interface{} 附加数据 -func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { +func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData []byte) error { if delayTime.Abs() != delayTime { return fmt.Errorf("时间间隔不能为负数") } @@ -110,7 +110,7 @@ func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, att } // 添加任务(不覆盖) -func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { +func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData []byte) error { // 判断有序集合Key是否存在,存在则报错,不存在则写入 if l.redis.Exists(l.ctx, l.zsetKey).Val() == 0 {