Files
timerx/single.go
T

507 lines
13 KiB
Go
Raw Normal View History

2023-11-27 22:37:33 +08:00
package timerx
2023-08-06 01:07:29 +08:00
// 作者:黄新云
import (
"context"
2025-09-14 19:05:10 +08:00
"fmt"
2023-08-15 14:01:23 +08:00
"runtime/debug"
2023-08-06 01:07:29 +08:00
"sync"
2025-09-14 19:05:10 +08:00
"sync/atomic"
2023-08-06 01:07:29 +08:00
"time"
2024-05-28 17:28:20 +08:00
2025-10-02 17:54:38 +08:00
"github.com/google/uuid"
2025-10-05 19:21:39 +08:00
"github.com/robfig/cron/v3"
2025-07-24 17:13:17 +08:00
"github.com/yuninks/timerx/logger"
2023-08-06 01:07:29 +08:00
)
2024-05-08 13:01:55 +08:00
// 简单定时器
2023-12-27 17:19:52 +08:00
// 1. 这个定时器的作用范围是本机
2024-05-08 13:01:55 +08:00
// 2. 适用简单的时间间隔定时任务
2023-08-06 01:07:29 +08:00
2024-04-06 19:09:58 +08:00
type Single struct {
2025-09-14 19:05:10 +08:00
ctx context.Context
cancel context.CancelFunc
logger logger.Logger
location *time.Location
nextTime time.Time
nextTimeMux sync.RWMutex
wg sync.WaitGroup
2025-10-14 11:13:12 +08:00
workerList sync.Map // 任务列表,key为taskIdvalue为worker
timerIndex int64 // 任务索引,用于生成taskId
stopChan chan struct{} // 停止信号
hasRun sync.Map // 记录已经执行的任务,key为taskId,value为执行时间
timeout time.Duration // 单次任务超时时间
cronParser *cron.Parser // cron表达式解析器
2024-04-06 19:09:58 +08:00
}
2023-08-15 14:01:23 +08:00
2023-08-06 01:07:29 +08:00
// 定时器类
2024-05-20 09:35:12 +08:00
// @param ctx context.Context 上下文
// @param opts ...Option 配置项
2024-04-06 19:09:58 +08:00
func InitSingle(ctx context.Context, opts ...Option) *Single {
op := newOptions(opts...)
2025-09-14 19:05:10 +08:00
ctx, cancel := context.WithCancel(ctx)
2024-04-06 19:09:58 +08:00
sin := &Single{
2025-10-05 19:21:39 +08:00
ctx: ctx,
cancel: cancel,
logger: op.logger,
location: op.location,
nextTime: time.Now(),
stopChan: make(chan struct{}),
timeout: op.timeout,
cronParser: op.cronParser,
}
2023-09-23 11:17:42 +08:00
2025-09-28 22:57:15 +08:00
sin.startDaemon()
2025-09-14 19:05:10 +08:00
return sin
}
2025-09-28 22:57:15 +08:00
func (l *Single) startDaemon() {
l.wg.Add(1)
go l.timerLoop()
l.wg.Add(1)
go l.cleanupLoop()
}
2025-09-14 19:05:10 +08:00
// 停止所有定时任务
2025-10-04 18:51:22 +08:00
func (l *Single) Stop() {
close(l.stopChan)
2025-10-02 17:54:38 +08:00
2025-10-04 18:51:22 +08:00
if l.cancel != nil {
l.cancel()
2025-09-14 19:05:10 +08:00
}
2025-10-04 18:51:22 +08:00
l.wg.Wait()
l.logger.Infof(l.ctx, "timer single: stopped")
2025-09-14 19:05:10 +08:00
}
// 获取任务数量
2025-10-04 18:51:22 +08:00
func (l *Single) TaskCount() int {
2025-09-14 19:05:10 +08:00
count := 0
2025-10-04 18:51:22 +08:00
l.workerList.Range(func(k, v interface{}) bool {
2025-09-14 19:05:10 +08:00
count++
return true
})
return count
}
func (l *Single) MaxIndex() int64 {
return atomic.LoadInt64(&l.timerIndex) + 1
}
// 定时器主循环
2025-10-02 17:54:38 +08:00
func (l *Single) timerLoop() {
defer l.wg.Done()
2025-09-14 19:05:10 +08:00
ticker := time.NewTicker(100 * time.Millisecond) // 提高精度到100ms
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
2025-10-02 17:54:38 +08:00
l.nextTimeMux.RLock()
nextTime := l.nextTime
l.nextTimeMux.RUnlock()
2025-09-14 19:05:10 +08:00
if t.Before(nextTime) {
continue
2023-08-06 01:07:29 +08:00
}
2025-09-14 19:05:10 +08:00
2025-10-02 17:54:38 +08:00
l.iterator(l.ctx)
2025-09-14 19:05:10 +08:00
2025-10-02 17:54:38 +08:00
case <-l.ctx.Done():
l.logger.Infof(l.ctx, "timer: context cancelled, stopping timer loop")
2025-09-14 19:05:10 +08:00
return
2025-10-02 17:54:38 +08:00
case <-l.stopChan:
l.logger.Infof(l.ctx, "timer: received stop signal, stopping timer loop")
2025-09-14 19:05:10 +08:00
return
}
2025-09-14 19:05:10 +08:00
}
}
2023-09-23 11:17:42 +08:00
2025-09-14 19:05:10 +08:00
// 清理循环
2025-09-28 22:57:15 +08:00
func (s *Single) cleanupLoop() {
2025-09-14 19:05:10 +08:00
defer s.wg.Done()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
now := time.Now()
cleanupTime := now.Add(-2 * time.Minute) // 清理2分钟前的记录
2025-09-28 22:57:15 +08:00
s.hasRun.Range(func(k, v any) bool {
2025-09-14 19:05:10 +08:00
t, ok := v.(time.Time)
if !ok || t.Before(cleanupTime) {
s.hasRun.Delete(k)
}
return true
})
2025-09-28 22:57:15 +08:00
case <-s.ctx.Done():
s.logger.Infof(s.ctx, "timer: context cancelled, stopping cleanup loop")
2025-09-14 19:05:10 +08:00
return
case <-s.stopChan:
2025-09-28 22:57:15 +08:00
s.logger.Infof(s.ctx, "timer: received stop signal, stopping cleanup loop")
2025-09-14 19:05:10 +08:00
return
}
}
2023-08-06 01:07:29 +08:00
}
2024-05-20 09:35:12 +08:00
// 每月执行一次
// @param ctx 上下文
// @param taskId 任务ID
// @param day 每月的几号
// @param hour 小时
// @param minute 分钟
// @param second 秒
// @param callback 回调函数
// @param extendData 扩展数据
// @return error
2025-09-28 22:57:15 +08:00
func (c *Single) EveryMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
2025-09-14 19:05:10 +08:00
2025-10-04 18:51:22 +08:00
// nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
2025-10-04 18:51:22 +08:00
JobType: JobTypeEveryMonth,
TaskId: taskId,
// CreateTime: nowTime,
Day: day,
Hour: hour,
Minute: minute,
Second: second,
2024-05-20 09:35:12 +08:00
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每周执行一次
// @param ctx context.Context 上下文
// @param taskId string 任务ID
// @param week time.Weekday 周
// @param hour int 小时
// @param minute int 分钟
// @param second int 秒
2025-09-28 22:57:15 +08:00
func (c *Single) EveryWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
2025-10-04 18:51:22 +08:00
// nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
2025-10-04 18:51:22 +08:00
JobType: JobTypeEveryWeek,
TaskId: taskId,
// CreateTime: nowTime,
Weekday: week,
Hour: hour,
Minute: minute,
Second: second,
2024-05-20 09:35:12 +08:00
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每天执行一次
2025-09-28 22:57:15 +08:00
func (c *Single) EveryDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
2025-10-04 18:51:22 +08:00
// nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
2025-10-04 18:51:22 +08:00
JobType: JobTypeEveryDay,
TaskId: taskId,
// CreateTime: nowTime,
Hour: hour,
Minute: minute,
Second: second,
2024-05-20 09:35:12 +08:00
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每小时执行一次
2025-09-28 22:57:15 +08:00
func (c *Single) EveryHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
2025-10-04 18:51:22 +08:00
// nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
2025-10-04 18:51:22 +08:00
JobType: JobTypeEveryHour,
TaskId: taskId,
// CreateTime: nowTime,
Minute: minute,
Second: second,
2024-05-20 09:35:12 +08:00
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每分钟执行一次
2025-09-28 22:57:15 +08:00
func (c *Single) EveryMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
2025-10-04 18:51:22 +08:00
// nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
2025-10-04 18:51:22 +08:00
JobType: JobTypeEveryMinute,
TaskId: taskId,
// CreateTime: nowTime,
Second: second,
2024-05-20 09:35:12 +08:00
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 特定时间间隔
2025-09-28 22:57:15 +08:00
func (c *Single) EverySpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
2025-09-14 19:05:10 +08:00
nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
if spaceTime < 0 {
c.logger.Errorf(ctx, "间隔时间不能小于0")
2025-10-04 18:51:22 +08:00
return 0, ErrIntervalTime
2024-05-20 09:35:12 +08:00
}
2025-10-04 18:51:22 +08:00
// 获取当天的零点时间
zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location())
2024-05-20 09:35:12 +08:00
jobData := JobData{
2025-10-04 18:51:22 +08:00
JobType: JobTypeInterval,
TaskId: taskId,
// CreateTime: nowTime,
BaseTime: zeroTime,
2024-05-20 09:35:12 +08:00
IntervalTime: spaceTime,
}
return c.addJob(ctx, jobData, callback, extendData)
}
2025-10-05 19:21:39 +08:00
func (l *Single) Cron(ctx context.Context, taskId string, cronExpression string, callback func(ctx context.Context, extendData any) error, extendData any, opt ...Option) (int64, error) {
nowTime := time.Now().In(l.location)
// 获取当天的零点时间
zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location())
options := Options{}
for _, o := range opt {
o(&options)
}
cronParser := l.cronParser
if options.cronParser != nil {
cronParser = options.cronParser
}
sche, err := GetCronSche(cronExpression, cronParser)
if err != nil {
l.logger.Errorf(ctx, "timer Single Cron cronExpression error:%s", err.Error())
return 0, err
}
jobData := JobData{
JobType: JobTypeCron,
TaskId: taskId,
BaseTime: zeroTime, // 默认当天的零点
CronExpression: cronExpression,
CronSchedule: sche,
}
return l.addJob(ctx, jobData, callback, extendData)
}
2023-08-15 14:01:23 +08:00
// 间隔定时器
2023-11-13 23:49:42 +08:00
// @param space 间隔时间
// @param call 回调函数
// @param extend 附加参数
// @return int 定时器索引
// @return error 错误
2025-09-14 19:05:10 +08:00
func (l *Single) addJob(ctx context.Context, jobData JobData, call func(ctx context.Context, extendData interface{}) error, extend interface{}) (int64, error) {
if jobData.TaskId == "" {
l.logger.Errorf(ctx, "任务ID不能为空")
return 0, ErrTaskIdEmpty
}
if jobData.Day < 0 || jobData.Day > 31 {
l.logger.Errorf(ctx, "每月的天数必须在0-31之间")
return 0, ErrMonthDay
}
if jobData.Hour < 0 || jobData.Hour > 23 {
l.logger.Errorf(ctx, "小时必须在0-23之间")
return 0, ErrHour
}
if jobData.Minute < 0 || jobData.Minute > 59 {
l.logger.Errorf(ctx, "分钟必须在0-59之间")
return 0, ErrMinute
}
if jobData.Second < 0 || jobData.Second > 59 {
l.logger.Errorf(ctx, "秒必须在0-59之间")
return 0, ErrSecond
}
if call == nil {
l.logger.Errorf(ctx, "回调函数不能为空")
return 0, ErrCallbackEmpty
}
2023-08-06 01:07:29 +08:00
2025-09-14 19:05:10 +08:00
nextTime, err := GetNextTime(time.Now().In(l.location), jobData)
2024-05-20 09:35:12 +08:00
if err != nil {
l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error())
return 0, err
}
2025-09-14 19:05:10 +08:00
jobData.NextTime = *nextTime
// 生成唯一索引
index := atomic.AddInt64(&l.timerIndex, 1)
2024-05-20 09:35:12 +08:00
2023-08-15 14:01:23 +08:00
t := timerStr{
2023-08-06 01:07:29 +08:00
Callback: call,
CanRunning: make(chan struct{}, 1),
2023-11-13 23:49:42 +08:00
ExtendData: extend,
2025-09-14 19:05:10 +08:00
TaskId: jobData.TaskId,
2024-05-20 09:35:12 +08:00
JobData: &jobData,
2023-08-06 01:07:29 +08:00
}
2023-08-15 14:01:23 +08:00
2025-09-14 19:05:10 +08:00
l.workerList.Store(index, t)
// 计算下次执行时间(全局)
l.updateNextTimeIfEarlier(*nextTime)
2023-08-06 01:07:29 +08:00
2025-09-14 19:05:10 +08:00
return index, nil
}
// 如果更早则更新下次执行时间(全局)
func (s *Single) updateNextTimeIfEarlier(candidate time.Time) {
s.nextTimeMux.Lock()
defer s.nextTimeMux.Unlock()
if candidate.Before(s.nextTime) {
s.nextTime = candidate
}
2023-08-15 14:01:23 +08:00
}
2023-11-13 23:49:42 +08:00
// 删除定时器
2025-09-14 19:05:10 +08:00
func (l *Single) Del(index int64) {
2025-10-02 17:54:38 +08:00
if _, ok := l.workerList.Load(index); ok {
2025-09-14 19:05:10 +08:00
l.workerList.Delete(index)
}
2023-08-06 01:07:29 +08:00
}
2025-10-02 17:54:38 +08:00
func (l *Single) DelByTaskId(taskId string) {
l.workerList.Range(func(k, v interface{}) bool {
timeStr, ok := v.(timerStr)
if ok && timeStr.TaskId == taskId {
l.workerList.Delete(k)
}
return true
})
}
2023-08-06 01:07:29 +08:00
// 迭代定时器列表
2025-09-14 19:05:10 +08:00
func (l *Single) iterator(ctx context.Context) {
// 当前时间
nowTime := time.Now().In(l.location)
2023-08-06 01:07:29 +08:00
// 默认5秒后(如果没有值就暂停进来5秒)
newNextTime := nowTime.Add(time.Second * 5)
2025-09-14 19:05:10 +08:00
l.workerList.Range(func(k, v interface{}) bool {
timeStr, ok := v.(timerStr)
if !ok {
l.logger.Errorf(ctx, "timer: 类型断言失败,跳过该任务")
l.workerList.Delete(k)
return true
}
2023-08-06 01:07:29 +08:00
2024-05-20 09:35:12 +08:00
if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) {
2023-08-15 14:01:23 +08:00
2025-09-14 19:05:10 +08:00
originTime := timeStr.JobData.NextTime
// 计算下次执行时间
nextTime, err := GetNextTime(nowTime, *timeStr.JobData)
if err != nil {
l.logger.Errorf(ctx, "timer: 计算下次执行时间失败:%s", err.Error())
return true
2023-08-06 01:07:29 +08:00
}
2025-09-14 19:05:10 +08:00
// 更新下次执行时间
timeStr.JobData.NextTime = *nextTime
2023-08-06 01:07:29 +08:00
2024-05-20 09:35:12 +08:00
if nextTime.Before(newNextTime) {
2023-08-06 01:07:29 +08:00
// 本规则下次发送时间小于系统下次需要执行的时间:替换
2024-05-20 09:35:12 +08:00
newNextTime = *nextTime
2023-08-06 01:07:29 +08:00
}
2025-09-14 19:05:10 +08:00
go l.executeTask(ctx, timeStr, originTime)
2024-05-20 09:35:12 +08:00
2023-08-06 01:07:29 +08:00
}
2024-05-20 09:35:12 +08:00
return true
})
2023-08-06 01:07:29 +08:00
2025-09-14 19:05:10 +08:00
l.updateNextTime(newNextTime)
}
// 执行任务
func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) {
// 创建带追踪ID的上下文
2025-10-02 17:54:38 +08:00
u, _ := uuid.NewV7()
traceCtx := context.WithValue(ctx, "trace_id", u.String())
2025-09-28 22:57:15 +08:00
s.logger.Infof(traceCtx, "timer Single begin taskId:%s originTime:%d", timer.TaskId, originTime.UnixMilli())
traceCtx, cancel := context.WithTimeout(traceCtx, s.timeout) // 设置执行超时
defer cancel()
2025-09-14 19:05:10 +08:00
select {
case timer.CanRunning <- struct{}{}:
defer func() {
select {
case <-timer.CanRunning:
default:
}
}()
// 执行回调
2025-10-04 18:51:22 +08:00
begin := time.Now()
2025-09-14 19:05:10 +08:00
if err := s.doTask(traceCtx, timer, originTime); err != nil {
s.logger.Errorf(traceCtx, "timer: 任务执行失败: %s", err.Error())
2023-08-06 01:07:29 +08:00
}
2025-10-04 18:51:22 +08:00
s.logger.Infof(traceCtx, "timer Single end taskId:%s originTime:%d cost:%dms", timer.TaskId, originTime.UnixMilli(), time.Since(begin).Milliseconds())
case <-traceCtx.Done():
s.logger.Errorf(traceCtx, "timer: 任务执行超时: %s", timer.TaskId)
2025-09-14 19:05:10 +08:00
default:
// 任务正在执行中,跳过本次
s.logger.Infof(traceCtx, "timer: 任务正在执行中,跳过本次 %s", timer.TaskId)
2025-09-14 19:05:10 +08:00
}
2023-08-06 01:07:29 +08:00
}
// 定时器操作类
// 这里不应painc
2025-09-14 19:05:10 +08:00
func (l *Single) doTask(ctx context.Context, timeStr timerStr, originTime time.Time) error {
2025-09-28 22:57:15 +08:00
// 检查任务是否已执行
taskKey := fmt.Sprintf("%s:%d", timeStr.TaskId, originTime.UnixMilli())
if _, loaded := l.hasRun.LoadOrStore(taskKey, time.Now()); loaded {
l.logger.Errorf(ctx, "timer: 任务已执行,跳过本次执行 %s", timeStr.TaskId)
return ErrTaskExecuted
}
2023-08-06 01:07:29 +08:00
defer func() {
if err := recover(); err != nil {
2025-09-28 22:57:15 +08:00
l.logger.Errorf(ctx, "timer Single call panic err:%+v stack:%s", err, string(debug.Stack()))
2023-08-06 01:07:29 +08:00
}
}()
2024-05-28 17:28:20 +08:00
2025-09-28 22:57:15 +08:00
err := timeStr.Callback(ctx, timeStr.ExtendData)
if err != nil {
l.logger.Errorf(ctx, "timer Single call back %s, err: %v", timeStr.TaskId, err)
return err
2025-09-14 19:05:10 +08:00
}
2025-09-28 22:57:15 +08:00
return nil
2025-09-14 19:05:10 +08:00
}
// 更新下次执行时间
func (s *Single) updateNextTime(newTime time.Time) {
s.nextTimeMux.Lock()
defer s.nextTimeMux.Unlock()
now := time.Now()
if newTime.Before(now) {
s.nextTime = now
} else {
s.nextTime = newTime
}
2023-08-15 14:01:23 +08:00
}