From 4ee3c5e1c2d97f179ceb0513ce5835e96654b90a Mon Sep 17 00:00:00 2001 From: Yun <995116474@qq.com> Date: Wed, 9 Oct 2024 19:37:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8F=AA=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E4=B8=80=E6=AC=A1=E7=9A=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/main.go | 40 +++++++++++++++++++++++++++------------- once.go | 25 ++++++++++++++----------- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 1f6e3e3..342ca50 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "encoding/json" "fmt" "time" @@ -37,20 +38,42 @@ func once() { w := OnceWorker{} one := timerx.InitOnce(ctx, client, "test", w) - err := one.Save("test", "test", 1*time.Second, map[string]interface{}{}) + d := OnceData{ + Num: 1, + } + + err := one.Create("test", "test", 1*time.Second, d) if err != nil { fmt.Println(err) } } +type OnceData struct { + Num int +} + type OnceWorker struct{} -func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) (timerx.WorkerCode, time.Duration) { +func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *timerx.OnceWorkerResp { fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println(taskId, taskType) fmt.Println(attachData) - return timerx.WorkerCodeAgain, time.Millisecond + + d := OnceData{} + + by, _ := json.Marshal(attachData) + json.Unmarshal(by, &d) + + d.Num++ + + fmt.Println(d) + + return &timerx.OnceWorkerResp{ + Retry: true, + AttachData: d, + DelayTime: 1 * time.Second, + } } func cluster() { @@ -69,7 +92,7 @@ func cluster() { func worker() { client := getRedis() - w := timerx.InitOnce(context.Background(), client, "test", &Worker{}) + w := timerx.InitOnce(context.Background(), client, "test", &OnceWorker{}) w.Save("test", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) @@ -89,15 +112,6 @@ func worker() { select {} } -type Worker struct{} - -func (w *Worker) Worker(ctx context.Context, jobType string, uniqueKey string, data interface{}) (timerx.WorkerCode, time.Duration) { - fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) - fmt.Println(uniqueKey, jobType) - fmt.Println(data) - return timerx.WorkerCodeAgain, time.Second -} - func getRedis() *redis.Client { client := redis.NewClient(&redis.Options{ Addr: "127.0.0.1" + ":" + "6379", diff --git a/once.go b/once.go index edd94d3..222d4bb 100644 --- a/once.go +++ b/once.go @@ -28,12 +28,11 @@ type Once struct { worker Callback } -type WorkerCode int - -const ( - WorkerCodeSuccess WorkerCode = 0 // 处理完成(不需要重入) - WorkerCodeAgain WorkerCode = -1 // 需要继续定时,默认原来的时间 -) +type OnceWorkerResp struct { + Retry bool // 是否重试 true + DelayTime time.Duration + AttachData interface{} +} // 需要考虑执行失败重新放入队列的情况 type Callback interface { @@ -43,7 +42,7 @@ type Callback interface { // @param data interface{} 任务数据 // @return WorkerCode 任务执行结果 // @return time.Duration 任务执行时间间隔 - Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) (WorkerCode, time.Duration) + Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp } var wo *Once = nil @@ -217,13 +216,17 @@ func (l *Once) doTask(ctx context.Context, key string) { ed := extendData{} json.Unmarshal([]byte(str), &ed) - code, t := l.worker.Worker(ctx, s[0], s[1], ed.Data) + resp := l.worker.Worker(ctx, s[0], s[1], ed.Data) + if resp == nil { + return + } - if code == WorkerCodeAgain { + if resp.Retry { // 重新放入队列 - if t != 0 && t == t.Abs() { - ed.Delay = t + if resp.DelayTime != 0 && resp.DelayTime == resp.DelayTime.Abs() { + ed.Delay = resp.DelayTime } + ed.Data = resp.AttachData l.logger.Infof(ctx, "任务重新放入队列:%s", key) fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05")) l.Create(s[0], s[1], ed.Delay, ed.Data)