diff --git a/option.go b/option.go index 5135297..9882233 100644 --- a/option.go +++ b/option.go @@ -9,7 +9,7 @@ import ( type Options struct { logger logger.Logger location *time.Location - timeout time.Duration + timeout time.Duration // 任务最长执行时间 usePriority bool priorityVal int64 batchSize int @@ -20,7 +20,7 @@ func defaultOptions() Options { return Options{ logger: logger.NewLogger(), location: time.Local, - timeout: time.Hour, + timeout: time.Hour, // usePriority: false, priorityVal: 0, batchSize: 100, diff --git a/single.go b/single.go index 646bcc0..5cce885 100644 --- a/single.go +++ b/single.go @@ -34,6 +34,7 @@ type Single struct { timerIndex int64 stopChan chan struct{} hasRun sync.Map + timeout time.Duration } // 定时器类 @@ -50,6 +51,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single { location: op.location, nextTime: time.Now(), stopChan: make(chan struct{}), + timeout: op.timeout, } sin.wg.Add(1) @@ -393,6 +395,12 @@ func (l *Single) iterator(ctx context.Context) { // 执行任务 func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) { + // 创建带追踪ID的上下文 + traceCtx := context.WithValue(ctx, "trace_id", uuid.NewV4().String()) + s.logger.Infof(traceCtx, "timer: 开始执行任务 %s", timer.TaskId) + traceCtx, cancel := context.WithTimeout(traceCtx, s.timeout) // 设置执行超时 + defer cancel() + select { case timer.CanRunning <- struct{}{}: defer func() { @@ -405,15 +413,10 @@ func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime tim // 检查任务是否已执行 taskKey := fmt.Sprintf("%s:%d", timer.TaskId, originTime.UnixNano()) if _, loaded := s.hasRun.LoadOrStore(taskKey, time.Now()); loaded { - s.logger.Errorf(ctx, "timer: 任务已执行,跳过本次执行 %s", timer.TaskId) + s.logger.Errorf(traceCtx, "timer: 任务已执行,跳过本次执行 %s", timer.TaskId) return } - // 创建带追踪ID的上下文 - traceCtx := context.WithValue(ctx, "trace_id", uuid.NewV4().String()) - traceCtx, cancel := context.WithTimeout(traceCtx, 30*time.Second) // 设置执行超时 - defer cancel() - // 执行回调 if err := s.doTask(traceCtx, timer, originTime); err != nil { s.logger.Errorf(traceCtx, "timer: 任务执行失败: %s", err.Error()) @@ -421,6 +424,7 @@ func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime tim default: // 任务正在执行中,跳过本次 + s.logger.Infof(traceCtx, "timer: 任务正在执行中,跳过本次 %s", timer.TaskId) } }