From 4db3cf81b751f7af00ebee1483468cd428b60091 Mon Sep 17 00:00:00 2001 From: Yun Date: Fri, 19 Sep 2025 18:43:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=AF=8F=E6=AC=A1=E6=89=A7=E8=A1=8C=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E9=9C=80=E8=A6=81=E4=B8=8A=E6=8A=A5&=E8=B0=83?= =?UTF-8?q?=E6=95=B4=E4=BB=BB=E5=8A=A1=E6=97=A5=E5=BF=97=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 31 +++++++++++++++++++++++++------ go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/cluster.go b/cluster.go index 6329aa7..e6c1f98 100644 --- a/cluster.go +++ b/cluster.go @@ -38,6 +38,7 @@ type Cluster struct { setKey string // 重入集合的key heartbeatKey string // 心跳的Key leaderKey string // 上报当前的Leader + executeInfoKey string // 执行情况的key priority *priority.Priority // 全局优先级 priorityKey string // 全局优先级的key @@ -80,6 +81,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin leaderUniLockKey: "timer:cluster_leaderUniLockKey" + keyPrefix, // 领导唯一锁 leaderKey: "timer:cluster_leaderKey" + keyPrefix, // 上报当前Leader heartbeatKey: "timer:cluster_heartbeatKey" + keyPrefix, // 心跳 有序集合 + executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合 usePriority: op.usePriority, stopChan: make(chan struct{}), instanceId: U.String(), @@ -198,7 +200,13 @@ func (l *Cluster) cleanHeartbeat(cleanSelf bool) error { if cleanSelf { return l.redis.ZRem(l.ctx, l.heartbeatKey, l.instanceId).Err() } - return l.redis.ZRemRangeByScore(l.ctx, l.heartbeatKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Second).UnixMilli(), 10)).Err() + + // 移除心跳 + l.redis.ZRemRangeByScore(l.ctx, l.heartbeatKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Second).UnixMilli(), 10)).Err() + + // 移除执行信息 + l.redis.ZRemRangeByScore(l.ctx, l.executeInfoKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Minute).UnixMilli(), 10)).Err() + return nil } // 领导选举 @@ -595,10 +603,24 @@ type ReJobData struct { // 执行任务 func (l *Cluster) processTask(taskId string) { + begin := time.Now() ctx, cancel := context.WithTimeout(l.ctx, l.timeout) defer cancel() + u, _ := uuid.NewV7() + + ctx = context.WithValue(ctx, "trace_id", u.String()) + + l.logger.Infof(ctx, "doTask timer begin taskId:%s", taskId) + + // 上报执行情况 + executeVal := fmt.Sprintf("%s|%s|%s",taskId,l.instanceId,begin.Format(time.RFC3339Nano)) + l.redis.ZAdd(ctx,l.executeInfoKey,&redis.Z{ + Score: float64(begin.UnixMilli()), + Member: executeVal, + }) + val, ok := l.workerList.Load(taskId) if !ok { l.logger.Errorf(ctx, "doTask timer:任务不存在:%s", taskId) @@ -622,18 +644,16 @@ func (l *Cluster) processTask(taskId string) { } defer lock.Unlock() - begin := time.Now() + defer func() { if err := recover(); err != nil { - l.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack())) + l.logger.Errorf(ctx, "doTask timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack())) } l.logger.Infof(ctx, "doTask timer:执行任务耗时:%s %dms", taskId, time.Since(begin).Milliseconds()) }() - u, _ := uuid.NewV7() - ctx = context.WithValue(ctx, "trace_id", u.String()) // 执行任务 if err := t.Callback(ctx, t.ExtendData); err != nil { @@ -641,5 +661,4 @@ func (l *Cluster) processTask(taskId string) { return } - l.logger.Infof(ctx, "doTask timer:执行任务成功:%s", taskId) } diff --git a/go.mod b/go.mod index 8c8a030..153a7a1 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/satori/go.uuid v1.2.0 github.com/stretchr/testify v1.11.1 github.com/yuninks/cachex v1.0.5 - github.com/yuninks/lockx v1.1.1 + github.com/yuninks/lockx v1.1.2 ) require ( diff --git a/go.sum b/go.sum index 8a6e23f..4d0069b 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM= github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM= -github.com/yuninks/lockx v1.1.1 h1:b687xJrBblvquP9czP3iiW62VPr/RerJbQMIi0zQizs= -github.com/yuninks/lockx v1.1.1/go.mod h1:tM46x/fp9046YnTIeddmUhkxFjJ/f0g4J+1zdaGKfd8= +github.com/yuninks/lockx v1.1.2 h1:QEI68IHMHgekBu1QtPza9AS70QptMn2ShhMz7Sam35w= +github.com/yuninks/lockx v1.1.2/go.mod h1:tM46x/fp9046YnTIeddmUhkxFjJ/f0g4J+1zdaGKfd8= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=