From cf3e751afecbcd4ad59b5ce66f64de25880178a7 Mon Sep 17 00:00:00 2001 From: Yun Date: Tue, 2 Jul 2024 21:18:16 +0800 Subject: [PATCH] =?UTF-8?q?=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81=E5=B0=9A?= =?UTF-8?q?=E6=9C=AA=E5=AE=8C=E5=85=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/main.go | 10 +++++----- once.go | 53 +++++++++++++++++++++++++++++++---------------------- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 13dd98e..05918cc 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -47,19 +47,19 @@ func cluster() { func worker() { client := getRedis() w := timerx.InitOnce(context.Background(), client, "test", &Worker{}) - w.Add("test", "test", 1*time.Second, map[string]interface{}{ + w.Save("test", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) - w.Add("test2", "test", 1*time.Second, map[string]interface{}{ + w.Save("test2", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) - w.Add("test3", "test", 1*time.Second, map[string]interface{}{ + w.Save("test3", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) - w.Add("test4", "test", 1*time.Second, map[string]interface{}{ + w.Save("test4", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) - w.Add("test5", "test", 1*time.Second, map[string]interface{}{ + w.Save("test5", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) diff --git a/once.go b/once.go index c0bbad6..e309629 100644 --- a/once.go +++ b/once.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "log" "runtime/debug" "strings" "sync" @@ -44,7 +43,7 @@ type Callback interface { // @param data interface{} 任务数据 // @return WorkerCode 任务执行结果 // @return time.Duration 任务执行时间间隔 - Worker(ctx context.Context, jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration) + Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) (WorkerCode, time.Duration) } var wo *Once = nil @@ -74,13 +73,13 @@ func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Call return wo } -// 添加任务 +// 添加任务(覆盖) // 重复插入就代表覆盖 // @param jobType string 任务类型 // @param uniTaskId string 任务唯一标识 // @param delayTime time.Duration 延迟时间 // @param attachData interface{} 附加数据 -func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, attachData interface{}) error { +func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { if delayTime.Abs() != delayTime { return fmt.Errorf("时间间隔不能为负数") } @@ -88,7 +87,7 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at return fmt.Errorf("时间间隔不能为0") } - redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId) + redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) ed := extendData{ Delay: delayTime, @@ -102,7 +101,7 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at return err } - // 吸入执行时间 + // 写入执行时间 _, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{ Score: float64(time.Now().Add(delayTime).UnixMilli()), Member: redisKey, @@ -111,9 +110,17 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at return err } +// 添加任务(不覆盖) +func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { + + // 判断有序集合Key是否存在,存在则报错,不存在则写入 + + return nil +} + // 删除任务 -func (w *Once) Del(jobType string, uniTaskId string) error { - redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId) +func (w *Once) Delete(taskType string, taskId string) error { + redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) w.redis.Del(w.ctx, redisKey).Result() @@ -122,6 +129,11 @@ func (w *Once) Del(jobType string, uniTaskId string) error { return nil } +// 获取任务 +func (l *Once) Get(taskType string, taskId string) { + // +} + // 获取任务 func (w *Once) getTask() { timer := time.NewTicker(time.Millisecond * 200) @@ -153,47 +165,44 @@ func (w *Once) watch() { for { keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() if err != nil { - fmt.Println("watch err:", err) + // fmt.Println("watch err:", err) continue } - go w.doTask(keys[1]) + ctx := context.WithValue(w.ctx, "trace_id", uuid.NewV4().String()) + + go w.doTask(ctx, keys[1]) } } // 执行任务 -func (w *Once) doTask(key string) { +func (l *Once) doTask(ctx context.Context, key string) { defer func() { if err := recover(); err != nil { - fmt.Println("timer:回调任务panic", err) - log.Println("errStack", string(debug.Stack())) + l.logger.Errorf(ctx, "timer:回调任务panic:%s stack:%s", err, string(debug.Stack())) } }() s := strings.Split(key, "[:]") // 读取数据 - str, err := w.redis.Get(w.ctx, key).Result() + str, err := l.redis.Get(ctx, key).Result() if err != nil { - fmt.Println("execJob err:", err) + l.logger.Errorf(ctx, "获取数据失败 err:%s", err) return } ed := extendData{} json.Unmarshal([]byte(str), &ed) - fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05")) - - ctx := context.WithValue(context.Background(), "trace_id", uuid.NewV4().String) - - code, t := w.worker.Worker(ctx, s[0], s[1], ed.Data) + code, t := l.worker.Worker(ctx, s[0], s[1], ed.Data) if code == WorkerCodeAgain { // 重新放入队列 - fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05")) if t != 0 && t == t.Abs() { ed.Delay = t } - w.Add(s[0], s[1], ed.Delay, ed.Data) + l.logger.Infof(ctx, "任务重新放入队列:%s", key) + l.Create(s[0], s[1], ed.Delay, ed.Data) } }