优化部分错误
This commit is contained in:
@@ -3,6 +3,7 @@ package timerx
|
|||||||
import "errors"
|
import "errors"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// 定时器不存在
|
||||||
ErrTimerNotFound = errors.New("timer not found")
|
ErrTimerNotFound = errors.New("timer not found")
|
||||||
// 任务ID不能为空
|
// 任务ID不能为空
|
||||||
ErrTaskIdEmpty = errors.New("taskId can not be empty")
|
ErrTaskIdEmpty = errors.New("taskId can not be empty")
|
||||||
@@ -24,4 +25,6 @@ var (
|
|||||||
ErrIntervalTime = errors.New("interval time must be greater than 0")
|
ErrIntervalTime = errors.New("interval time must be greater than 0")
|
||||||
// 任务Id已存在
|
// 任务Id已存在
|
||||||
ErrTaskIdExists = errors.New("taskId already exists")
|
ErrTaskIdExists = errors.New("taskId already exists")
|
||||||
|
// 任务已执行
|
||||||
|
ErrTaskExecuted = errors.New("task already executed")
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -330,7 +330,7 @@ func (w *Once) save(taskType OnceTaskType, taskId string, delayTime time.Duratio
|
|||||||
})
|
})
|
||||||
_, err := pipe.Exec(w.ctx)
|
_, err := pipe.Exec(w.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.logger.Errorf(w.ctx, "save task failed:%w", err)
|
w.logger.Errorf(w.ctx, "save task failed:%v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -374,7 +374,7 @@ func (w *Once) Delete(taskType OnceTaskType, taskId string) error {
|
|||||||
|
|
||||||
_, err := pipe.Exec(w.ctx)
|
_, err := pipe.Exec(w.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.logger.Errorf(w.ctx, "delete task failed:%w", err)
|
w.logger.Errorf(w.ctx, "delete task failed:%v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -431,7 +431,7 @@ func (l *Once) processTask(key string) {
|
|||||||
|
|
||||||
taskType, taskId, err := l.parseRedisKey(key)
|
taskType, taskId, err := l.parseRedisKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.logger.Errorf(ctx, "processTask parseRedisKey:%w key:%s", err, key)
|
l.logger.Errorf(ctx, "processTask parseRedisKey:%v key:%s", err, key)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -479,14 +479,14 @@ func (l *Once) processTask(key string) {
|
|||||||
// 删除任务
|
// 删除任务
|
||||||
l.logger.Infof(ctx, "processTask delete key:%s", key)
|
l.logger.Infof(ctx, "processTask delete key:%s", key)
|
||||||
if err := l.Delete(taskType, taskId); err != nil {
|
if err := l.Delete(taskType, taskId); err != nil {
|
||||||
l.logger.Errorf(ctx, "processTask delete errprocessTask delete err:%w", err)
|
l.logger.Errorf(ctx, "processTask delete errprocessTask delete err:%v", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// 重新放入队列
|
// 重新放入队列
|
||||||
if err := l.handleRetry(ctx, taskType, taskId, &ed, resp); err != nil {
|
if err := l.handleRetry(ctx, taskType, taskId, &ed, resp); err != nil {
|
||||||
l.logger.Errorf(ctx, "processTask handleRetry err:%w", err)
|
l.logger.Errorf(ctx, "processTask handleRetry err:%v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,9 +19,6 @@ import (
|
|||||||
// 1. 这个定时器的作用范围是本机
|
// 1. 这个定时器的作用范围是本机
|
||||||
// 2. 适用简单的时间间隔定时任务
|
// 2. 适用简单的时间间隔定时任务
|
||||||
|
|
||||||
// 避免执行重复
|
|
||||||
var singleHasRun sync.Map
|
|
||||||
|
|
||||||
type Single struct {
|
type Single struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
@@ -54,15 +51,21 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
|
|||||||
timeout: op.timeout,
|
timeout: op.timeout,
|
||||||
}
|
}
|
||||||
|
|
||||||
sin.wg.Add(1)
|
sin.startDaemon()
|
||||||
go sin.timerLoop(ctx)
|
|
||||||
|
|
||||||
sin.wg.Add(1)
|
|
||||||
go sin.cleanupLoop(ctx)
|
|
||||||
|
|
||||||
return sin
|
return sin
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *Single) startDaemon() {
|
||||||
|
|
||||||
|
l.wg.Add(1)
|
||||||
|
go l.timerLoop()
|
||||||
|
|
||||||
|
l.wg.Add(1)
|
||||||
|
go l.cleanupLoop()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// 停止所有定时任务
|
// 停止所有定时任务
|
||||||
func (s *Single) Stop() {
|
func (s *Single) Stop() {
|
||||||
if s.cancel != nil {
|
if s.cancel != nil {
|
||||||
@@ -96,7 +99,7 @@ func (l *Single) MaxIndex() int64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 定时器主循环
|
// 定时器主循环
|
||||||
func (s *Single) timerLoop(ctx context.Context) {
|
func (s *Single) timerLoop() {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
|
|
||||||
ticker := time.NewTicker(100 * time.Millisecond) // 提高精度到100ms
|
ticker := time.NewTicker(100 * time.Millisecond) // 提高精度到100ms
|
||||||
@@ -113,20 +116,20 @@ func (s *Single) timerLoop(ctx context.Context) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
s.iterator(ctx)
|
s.iterator(s.ctx)
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-s.ctx.Done():
|
||||||
s.logger.Infof(ctx, "timer: context cancelled, stopping timer loop")
|
s.logger.Infof(s.ctx, "timer: context cancelled, stopping timer loop")
|
||||||
return
|
return
|
||||||
case <-s.stopChan:
|
case <-s.stopChan:
|
||||||
s.logger.Infof(ctx, "timer: received stop signal, stopping timer loop")
|
s.logger.Infof(s.ctx, "timer: received stop signal, stopping timer loop")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 清理循环
|
// 清理循环
|
||||||
func (s *Single) cleanupLoop(ctx context.Context) {
|
func (s *Single) cleanupLoop() {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
|
|
||||||
ticker := time.NewTicker(time.Minute)
|
ticker := time.NewTicker(time.Minute)
|
||||||
@@ -138,7 +141,7 @@ func (s *Single) cleanupLoop(ctx context.Context) {
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
cleanupTime := now.Add(-2 * time.Minute) // 清理2分钟前的记录
|
cleanupTime := now.Add(-2 * time.Minute) // 清理2分钟前的记录
|
||||||
|
|
||||||
s.hasRun.Range(func(k, v interface{}) bool {
|
s.hasRun.Range(func(k, v any) bool {
|
||||||
t, ok := v.(time.Time)
|
t, ok := v.(time.Time)
|
||||||
if !ok || t.Before(cleanupTime) {
|
if !ok || t.Before(cleanupTime) {
|
||||||
s.hasRun.Delete(k)
|
s.hasRun.Delete(k)
|
||||||
@@ -146,11 +149,11 @@ func (s *Single) cleanupLoop(ctx context.Context) {
|
|||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-s.ctx.Done():
|
||||||
s.logger.Infof(ctx, "timer: context cancelled, stopping cleanup loop")
|
s.logger.Infof(s.ctx, "timer: context cancelled, stopping cleanup loop")
|
||||||
return
|
return
|
||||||
case <-s.stopChan:
|
case <-s.stopChan:
|
||||||
s.logger.Infof(ctx, "timer: received stop signal, stopping cleanup loop")
|
s.logger.Infof(s.ctx, "timer: received stop signal, stopping cleanup loop")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -166,7 +169,7 @@ func (s *Single) cleanupLoop(ctx context.Context) {
|
|||||||
// @param callback 回调函数
|
// @param callback 回调函数
|
||||||
// @param extendData 扩展数据
|
// @param extendData 扩展数据
|
||||||
// @return error
|
// @return error
|
||||||
func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
func (c *Single) EveryMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
||||||
|
|
||||||
nowTime := time.Now().In(c.location)
|
nowTime := time.Now().In(c.location)
|
||||||
|
|
||||||
@@ -190,7 +193,7 @@ func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int,
|
|||||||
// @param hour int 小时
|
// @param hour int 小时
|
||||||
// @param minute int 分钟
|
// @param minute int 分钟
|
||||||
// @param second int 秒
|
// @param second int 秒
|
||||||
func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
func (c *Single) EveryWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
||||||
nowTime := time.Now().In(c.location)
|
nowTime := time.Now().In(c.location)
|
||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
@@ -207,7 +210,7 @@ func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 每天执行一次
|
// 每天执行一次
|
||||||
func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
func (c *Single) EveryDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
||||||
nowTime := time.Now().In(c.location)
|
nowTime := time.Now().In(c.location)
|
||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
@@ -223,7 +226,7 @@ func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 每小时执行一次
|
// 每小时执行一次
|
||||||
func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
func (c *Single) EveryHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
||||||
nowTime := time.Now().In(c.location)
|
nowTime := time.Now().In(c.location)
|
||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
@@ -238,7 +241,7 @@ func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 每分钟执行一次
|
// 每分钟执行一次
|
||||||
func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
func (c *Single) EveryMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
||||||
nowTime := time.Now().In(c.location)
|
nowTime := time.Now().In(c.location)
|
||||||
|
|
||||||
jobData := JobData{
|
jobData := JobData{
|
||||||
@@ -252,7 +255,7 @@ func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callb
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 特定时间间隔
|
// 特定时间间隔
|
||||||
func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
func (c *Single) EverySpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
|
||||||
nowTime := time.Now().In(c.location)
|
nowTime := time.Now().In(c.location)
|
||||||
|
|
||||||
if spaceTime < 0 {
|
if spaceTime < 0 {
|
||||||
@@ -397,7 +400,7 @@ func (l *Single) iterator(ctx context.Context) {
|
|||||||
func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) {
|
func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) {
|
||||||
// 创建带追踪ID的上下文
|
// 创建带追踪ID的上下文
|
||||||
traceCtx := context.WithValue(ctx, "trace_id", uuid.NewV4().String())
|
traceCtx := context.WithValue(ctx, "trace_id", uuid.NewV4().String())
|
||||||
s.logger.Infof(traceCtx, "timer: 开始执行任务 %s", timer.TaskId)
|
s.logger.Infof(traceCtx, "timer Single begin taskId:%s originTime:%d", timer.TaskId, originTime.UnixMilli())
|
||||||
traceCtx, cancel := context.WithTimeout(traceCtx, s.timeout) // 设置执行超时
|
traceCtx, cancel := context.WithTimeout(traceCtx, s.timeout) // 设置执行超时
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
@@ -410,13 +413,6 @@ func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime tim
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// 检查任务是否已执行
|
|
||||||
taskKey := fmt.Sprintf("%s:%d", timer.TaskId, originTime.UnixNano())
|
|
||||||
if _, loaded := s.hasRun.LoadOrStore(taskKey, time.Now()); loaded {
|
|
||||||
s.logger.Errorf(traceCtx, "timer: 任务已执行,跳过本次执行 %s", timer.TaskId)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// 执行回调
|
// 执行回调
|
||||||
if err := s.doTask(traceCtx, timer, originTime); err != nil {
|
if err := s.doTask(traceCtx, timer, originTime); err != nil {
|
||||||
s.logger.Errorf(traceCtx, "timer: 任务执行失败: %s", err.Error())
|
s.logger.Errorf(traceCtx, "timer: 任务执行失败: %s", err.Error())
|
||||||
@@ -431,26 +427,26 @@ func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime tim
|
|||||||
// 定时器操作类
|
// 定时器操作类
|
||||||
// 这里不应painc
|
// 这里不应painc
|
||||||
func (l *Single) doTask(ctx context.Context, timeStr timerStr, originTime time.Time) error {
|
func (l *Single) doTask(ctx context.Context, timeStr timerStr, originTime time.Time) error {
|
||||||
|
// 检查任务是否已执行
|
||||||
|
taskKey := fmt.Sprintf("%s:%d", timeStr.TaskId, originTime.UnixMilli())
|
||||||
|
if _, loaded := l.hasRun.LoadOrStore(taskKey, time.Now()); loaded {
|
||||||
|
l.logger.Errorf(ctx, "timer: 任务已执行,跳过本次执行 %s", timeStr.TaskId)
|
||||||
|
return ErrTaskExecuted
|
||||||
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
l.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
|
l.logger.Errorf(ctx, "timer Single call panic err:%+v stack:%s", err, string(debug.Stack()))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
nuiKey := timeStr.TaskId + originTime.String()
|
err := timeStr.Callback(ctx, timeStr.ExtendData)
|
||||||
|
if err != nil {
|
||||||
// timeStr.TaskId
|
l.logger.Errorf(ctx, "timer Single call back %s, err: %v", timeStr.TaskId, err)
|
||||||
|
return err
|
||||||
_, loaded := singleHasRun.LoadOrStore(nuiKey, time.Now())
|
|
||||||
if loaded {
|
|
||||||
// 已经存在,说明已经执行过了
|
|
||||||
l.logger.Errorf(ctx, "timer: 任务已执行,跳过本次执行 %s", nuiKey)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String())
|
return nil
|
||||||
|
|
||||||
return timeStr.Callback(ctx, timeStr.ExtendData)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 更新下次执行时间
|
// 更新下次执行时间
|
||||||
|
|||||||
Reference in New Issue
Block a user