From f9c86cb16a6cd6d9caddbd058600064e24b63e24 Mon Sep 17 00:00:00 2001 From: Yun Date: Fri, 24 May 2024 09:55:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=85=B3=E4=BA=8EContext=E7=9A=84=E4=BC=98?= =?UTF-8?q?=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cluster.go b/cluster.go index 6f61114..d5c3d7b 100644 --- a/cluster.go +++ b/cluster.go @@ -240,10 +240,7 @@ func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, ca // TODO:考虑不同实例系统时间不一样,可能计算的下次时间不一致,会有重复执行的可能 func (c *Cluster) getNextTime() { - ctx, cancel := context.WithCancel(c.ctx) - defer cancel() - - lock := lockx.NewGlobalLock(ctx, c.redis, c.lockKey) + lock := lockx.NewGlobalLock(c.ctx, c.redis, c.lockKey) // 获取锁 lockBool := lock.Lock() if !lockBool { @@ -294,7 +291,7 @@ func (c *Cluster) getNextTime() { // TODO: expireTime := time.Minute - res, err := c.redis.Eval(ctx, script, []string{c.zsetKey}, cacheKey, expireTime.Seconds(), nextTime.UnixMilli(), val.TaskId).Result() + res, err := c.redis.Eval(c.ctx, script, []string{c.zsetKey}, cacheKey, expireTime.Seconds(), nextTime.UnixMilli(), val.TaskId).Result() if err == nil && res.(string) == "SUCCESS" { // 设置成功 @@ -373,11 +370,8 @@ func (c *Cluster) watch() { // 执行任务 func (c *Cluster) doTask(ctx context.Context, taskId string) { - defer func() { - if err := recover(); err != nil { - c.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack())) - } - }() + ctx, cancel := context.WithCancel(ctx) + defer cancel() val, ok := clusterWorkerList.Load(taskId) if !ok { @@ -395,6 +389,12 @@ func (c *Cluster) doTask(ctx context.Context, taskId string) { } defer lock.Unlock() + defer func() { + if err := recover(); err != nil { + c.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack())) + } + }() + // 执行任务 t.Callback(ctx, t.ExtendData) }