每次执行任务需要上报&调整任务日志打印

This commit is contained in:
Yun
2025-09-19 18:43:21 +08:00
parent f79be3955f
commit 4db3cf81b7
3 changed files with 28 additions and 9 deletions
+25 -6
View File
@@ -38,6 +38,7 @@ type Cluster struct {
setKey string // 重入集合的key
heartbeatKey string // 心跳的Key
leaderKey string // 上报当前的Leader
executeInfoKey string // 执行情况的key
priority *priority.Priority // 全局优先级
priorityKey string // 全局优先级的key
@@ -80,6 +81,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
leaderUniLockKey: "timer:cluster_leaderUniLockKey" + keyPrefix, // 领导唯一锁
leaderKey: "timer:cluster_leaderKey" + keyPrefix, // 上报当前Leader
heartbeatKey: "timer:cluster_heartbeatKey" + keyPrefix, // 心跳 有序集合
executeInfoKey: "timer:cluster_executeInfoKey" + keyPrefix, // 执行情况的key 有序集合
usePriority: op.usePriority,
stopChan: make(chan struct{}),
instanceId: U.String(),
@@ -198,7 +200,13 @@ func (l *Cluster) cleanHeartbeat(cleanSelf bool) error {
if cleanSelf {
return l.redis.ZRem(l.ctx, l.heartbeatKey, l.instanceId).Err()
}
return l.redis.ZRemRangeByScore(l.ctx, l.heartbeatKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Second).UnixMilli(), 10)).Err()
// 移除心跳
l.redis.ZRemRangeByScore(l.ctx, l.heartbeatKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Second).UnixMilli(), 10)).Err()
// 移除执行信息
l.redis.ZRemRangeByScore(l.ctx, l.executeInfoKey, "0", strconv.FormatInt(time.Now().Add(-15*time.Minute).UnixMilli(), 10)).Err()
return nil
}
// 领导选举
@@ -595,10 +603,24 @@ type ReJobData struct {
// 执行任务
func (l *Cluster) processTask(taskId string) {
begin := time.Now()
ctx, cancel := context.WithTimeout(l.ctx, l.timeout)
defer cancel()
u, _ := uuid.NewV7()
ctx = context.WithValue(ctx, "trace_id", u.String())
l.logger.Infof(ctx, "doTask timer begin taskId:%s", taskId)
// 上报执行情况
executeVal := fmt.Sprintf("%s|%s|%s",taskId,l.instanceId,begin.Format(time.RFC3339Nano))
l.redis.ZAdd(ctx,l.executeInfoKey,&redis.Z{
Score: float64(begin.UnixMilli()),
Member: executeVal,
})
val, ok := l.workerList.Load(taskId)
if !ok {
l.logger.Errorf(ctx, "doTask timer:任务不存在:%s", taskId)
@@ -622,18 +644,16 @@ func (l *Cluster) processTask(taskId string) {
}
defer lock.Unlock()
begin := time.Now()
defer func() {
if err := recover(); err != nil {
l.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
l.logger.Errorf(ctx, "doTask timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
}
l.logger.Infof(ctx, "doTask timer:执行任务耗时:%s %dms", taskId, time.Since(begin).Milliseconds())
}()
u, _ := uuid.NewV7()
ctx = context.WithValue(ctx, "trace_id", u.String())
// 执行任务
if err := t.Callback(ctx, t.ExtendData); err != nil {
@@ -641,5 +661,4 @@ func (l *Cluster) processTask(taskId string) {
return
}
l.logger.Infof(ctx, "doTask timer:执行任务成功:%s", taskId)
}
+1 -1
View File
@@ -8,7 +8,7 @@ require (
github.com/satori/go.uuid v1.2.0
github.com/stretchr/testify v1.11.1
github.com/yuninks/cachex v1.0.5
github.com/yuninks/lockx v1.1.1
github.com/yuninks/lockx v1.1.2
)
require (
+2 -2
View File
@@ -31,8 +31,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM=
github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM=
github.com/yuninks/lockx v1.1.1 h1:b687xJrBblvquP9czP3iiW62VPr/RerJbQMIi0zQizs=
github.com/yuninks/lockx v1.1.1/go.mod h1:tM46x/fp9046YnTIeddmUhkxFjJ/f0g4J+1zdaGKfd8=
github.com/yuninks/lockx v1.1.2 h1:QEI68IHMHgekBu1QtPza9AS70QptMn2ShhMz7Sam35w=
github.com/yuninks/lockx v1.1.2/go.mod h1:tM46x/fp9046YnTIeddmUhkxFjJ/f0g4J+1zdaGKfd8=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=