From fa8e3737fa995a7f8357894136fb4a38b476fac9 Mon Sep 17 00:00:00 2001 From: Yun <995116474@qq.com> Date: Wed, 9 Oct 2024 17:03:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E4=B8=80=E6=AC=A1=E7=9A=84?= =?UTF-8?q?=E8=B0=83=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/main.go | 25 ++++++++++++++++++++++++- once.go | 25 ++++++++++++++++++++++++- 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 05918cc..1f6e3e3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -24,12 +24,35 @@ func main() { // re() // d() - cluster() + // cluster() + once() select {} } +func once() { + client := getRedis() + ctx := context.Background() + w := OnceWorker{} + one := timerx.InitOnce(ctx, client, "test", w) + + err := one.Save("test", "test", 1*time.Second, map[string]interface{}{}) + if err != nil { + fmt.Println(err) + } + +} + +type OnceWorker struct{} + +func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) (timerx.WorkerCode, time.Duration) { + fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) + fmt.Println(taskId, taskType) + fmt.Println(attachData) + return timerx.WorkerCodeAgain, time.Millisecond +} + func cluster() { client := getRedis() ctx := context.Background() diff --git a/once.go b/once.go index e309629..edd94d3 100644 --- a/once.go +++ b/once.go @@ -55,7 +55,7 @@ type extendData struct { } // 初始化 -func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Callback, opts ...Option) *Once { +func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) *Once { op := newOptions(opts...) once.Do(func() { wo = &Once{ @@ -114,6 +114,27 @@ func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, att func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { // 判断有序集合Key是否存在,存在则报错,不存在则写入 + if l.redis.Exists(l.ctx, l.zsetKey).Val() == 0 { + redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) + ed := extendData{ + Delay: delayTime, + Data: attachData, + } + b, _ := json.Marshal(ed) + + // 写入附加数据 + _, err := l.redis.SetEX(l.ctx, redisKey, b, delayTime+time.Second*5).Result() + if err != nil { + return err + } + _, err = l.redis.ZAdd(l.ctx, l.zsetKey, &redis.Z{ + Score: float64(time.Now().Add(delayTime).UnixMilli()), + Member: redisKey, + }).Result() + if err != nil { + return err + } + } return nil } @@ -178,6 +199,7 @@ func (w *Once) watch() { // 执行任务 func (l *Once) doTask(ctx context.Context, key string) { + fmt.Println("任务时间:", time.Now().Format("2006-01-02 15:04:05")) defer func() { if err := recover(); err != nil { l.logger.Errorf(ctx, "timer:回调任务panic:%s stack:%s", err, string(debug.Stack())) @@ -203,6 +225,7 @@ func (l *Once) doTask(ctx context.Context, key string) { ed.Delay = t } l.logger.Infof(ctx, "任务重新放入队列:%s", key) + fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05")) l.Create(s[0], s[1], ed.Delay, ed.Data) } }