Files
timerx/single.go
T

468 lines
12 KiB
Go
Raw Normal View History

2023-11-27 22:37:33 +08:00
package timerx
2023-08-06 01:07:29 +08:00
// 作者:黄新云
import (
"context"
2023-08-15 14:01:23 +08:00
"errors"
2025-09-14 19:05:10 +08:00
"fmt"
2023-08-15 14:01:23 +08:00
"runtime/debug"
2023-08-06 01:07:29 +08:00
"sync"
2025-09-14 19:05:10 +08:00
"sync/atomic"
2023-08-06 01:07:29 +08:00
"time"
2024-05-28 17:28:20 +08:00
uuid "github.com/satori/go.uuid"
2025-07-24 17:13:17 +08:00
"github.com/yuninks/timerx/logger"
2023-08-06 01:07:29 +08:00
)
2024-05-08 13:01:55 +08:00
// 简单定时器
2023-12-27 17:19:52 +08:00
// 1. 这个定时器的作用范围是本机
2024-05-08 13:01:55 +08:00
// 2. 适用简单的时间间隔定时任务
2023-08-06 01:07:29 +08:00
2025-09-14 19:05:10 +08:00
// 避免执行重复
var singleHasRun sync.Map
2023-08-15 14:01:23 +08:00
2024-04-06 19:09:58 +08:00
type Single struct {
2025-09-14 19:05:10 +08:00
ctx context.Context
cancel context.CancelFunc
logger logger.Logger
location *time.Location
nextTime time.Time
nextTimeMux sync.RWMutex
wg sync.WaitGroup
workerList sync.Map
timerIndex int64
stopChan chan struct{}
hasRun sync.Map
timeout time.Duration
2024-04-06 19:09:58 +08:00
}
2023-08-15 14:01:23 +08:00
2023-08-06 01:07:29 +08:00
// 定时器类
2024-05-20 09:35:12 +08:00
// @param ctx context.Context 上下文
// @param opts ...Option 配置项
2024-04-06 19:09:58 +08:00
func InitSingle(ctx context.Context, opts ...Option) *Single {
op := newOptions(opts...)
2025-09-14 19:05:10 +08:00
ctx, cancel := context.WithCancel(ctx)
2024-04-06 19:09:58 +08:00
sin := &Single{
ctx: ctx,
2025-09-14 19:05:10 +08:00
cancel: cancel,
logger: op.logger,
location: op.location,
2025-09-14 19:05:10 +08:00
nextTime: time.Now(),
stopChan: make(chan struct{}),
timeout: op.timeout,
}
2023-09-23 11:17:42 +08:00
2025-09-14 19:05:10 +08:00
sin.wg.Add(1)
go sin.timerLoop(ctx)
sin.wg.Add(1)
go sin.cleanupLoop(ctx)
return sin
}
// 停止所有定时任务
func (s *Single) Stop() {
if s.cancel != nil {
s.cancel()
}
close(s.stopChan)
s.wg.Wait()
// 清理所有资源
s.workerList.Range(func(k, v interface{}) bool {
if timer, ok := v.(timerStr); ok {
close(timer.CanRunning)
}
s.workerList.Delete(k)
return true
})
}
// 获取任务数量
func (s *Single) TaskCount() int {
count := 0
s.workerList.Range(func(k, v interface{}) bool {
count++
return true
})
return count
}
func (l *Single) MaxIndex() int64 {
return atomic.LoadInt64(&l.timerIndex) + 1
}
// 定时器主循环
func (s *Single) timerLoop(ctx context.Context) {
defer s.wg.Done()
ticker := time.NewTicker(100 * time.Millisecond) // 提高精度到100ms
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
s.nextTimeMux.RLock()
nextTime := s.nextTime
s.nextTimeMux.RUnlock()
if t.Before(nextTime) {
continue
2023-08-06 01:07:29 +08:00
}
2025-09-14 19:05:10 +08:00
s.iterator(ctx)
case <-ctx.Done():
s.logger.Infof(ctx, "timer: context cancelled, stopping timer loop")
return
case <-s.stopChan:
s.logger.Infof(ctx, "timer: received stop signal, stopping timer loop")
return
}
2025-09-14 19:05:10 +08:00
}
}
2023-09-23 11:17:42 +08:00
2025-09-14 19:05:10 +08:00
// 清理循环
func (s *Single) cleanupLoop(ctx context.Context) {
defer s.wg.Done()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
now := time.Now()
cleanupTime := now.Add(-2 * time.Minute) // 清理2分钟前的记录
s.hasRun.Range(func(k, v interface{}) bool {
t, ok := v.(time.Time)
if !ok || t.Before(cleanupTime) {
s.hasRun.Delete(k)
}
return true
})
case <-ctx.Done():
s.logger.Infof(ctx, "timer: context cancelled, stopping cleanup loop")
return
case <-s.stopChan:
s.logger.Infof(ctx, "timer: received stop signal, stopping cleanup loop")
return
}
}
2023-08-06 01:07:29 +08:00
}
2024-05-20 09:35:12 +08:00
// 每月执行一次
// @param ctx 上下文
// @param taskId 任务ID
// @param day 每月的几号
// @param hour 小时
// @param minute 分钟
// @param second 秒
// @param callback 回调函数
// @param extendData 扩展数据
// @return error
2025-09-14 19:05:10 +08:00
func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
JobType: JobTypeEveryMonth,
2025-09-14 19:05:10 +08:00
TaskId: taskId,
2024-05-20 09:35:12 +08:00
CreateTime: nowTime,
Day: day,
Hour: hour,
Minute: minute,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每周执行一次
// @param ctx context.Context 上下文
// @param taskId string 任务ID
// @param week time.Weekday 周
// @param hour int 小时
// @param minute int 分钟
// @param second int 秒
2025-09-14 19:05:10 +08:00
func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
2024-05-22 15:02:39 +08:00
JobType: JobTypeEveryWeek,
2025-09-14 19:05:10 +08:00
TaskId: taskId,
2024-05-20 09:35:12 +08:00
CreateTime: nowTime,
Weekday: week,
Hour: hour,
Minute: minute,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每天执行一次
2025-09-14 19:05:10 +08:00
func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
2024-05-22 15:02:39 +08:00
JobType: JobTypeEveryDay,
2025-09-14 19:05:10 +08:00
TaskId: taskId,
2024-05-20 09:35:12 +08:00
CreateTime: nowTime,
Hour: hour,
Minute: minute,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每小时执行一次
2025-09-14 19:05:10 +08:00
func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
2024-05-22 15:02:39 +08:00
JobType: JobTypeEveryHour,
2025-09-14 19:05:10 +08:00
TaskId: taskId,
2024-05-20 09:35:12 +08:00
CreateTime: nowTime,
Minute: minute,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每分钟执行一次
2025-09-14 19:05:10 +08:00
func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
jobData := JobData{
2024-05-22 15:02:39 +08:00
JobType: JobTypeEveryMinute,
2025-09-14 19:05:10 +08:00
TaskId: taskId,
2024-05-20 09:35:12 +08:00
CreateTime: nowTime,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 特定时间间隔
2025-09-14 19:05:10 +08:00
func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int64, error) {
nowTime := time.Now().In(c.location)
2024-05-20 09:35:12 +08:00
if spaceTime < 0 {
c.logger.Errorf(ctx, "间隔时间不能小于0")
return 0, errors.New("间隔时间不能小于0")
}
jobData := JobData{
2024-05-22 15:02:39 +08:00
JobType: JobTypeInterval,
2025-09-14 19:05:10 +08:00
TaskId: taskId,
2024-05-20 09:35:12 +08:00
CreateTime: nowTime,
IntervalTime: spaceTime,
}
return c.addJob(ctx, jobData, callback, extendData)
}
2023-08-15 14:01:23 +08:00
// 间隔定时器
2023-11-13 23:49:42 +08:00
// @param space 间隔时间
// @param call 回调函数
// @param extend 附加参数
// @return int 定时器索引
// @return error 错误
2025-09-14 19:05:10 +08:00
func (l *Single) addJob(ctx context.Context, jobData JobData, call func(ctx context.Context, extendData interface{}) error, extend interface{}) (int64, error) {
if jobData.TaskId == "" {
l.logger.Errorf(ctx, "任务ID不能为空")
return 0, ErrTaskIdEmpty
}
if jobData.Day < 0 || jobData.Day > 31 {
l.logger.Errorf(ctx, "每月的天数必须在0-31之间")
return 0, ErrMonthDay
}
if jobData.Hour < 0 || jobData.Hour > 23 {
l.logger.Errorf(ctx, "小时必须在0-23之间")
return 0, ErrHour
}
if jobData.Minute < 0 || jobData.Minute > 59 {
l.logger.Errorf(ctx, "分钟必须在0-59之间")
return 0, ErrMinute
}
if jobData.Second < 0 || jobData.Second > 59 {
l.logger.Errorf(ctx, "秒必须在0-59之间")
return 0, ErrSecond
}
if call == nil {
l.logger.Errorf(ctx, "回调函数不能为空")
return 0, ErrCallbackEmpty
}
2023-08-06 01:07:29 +08:00
2025-09-14 19:05:10 +08:00
nextTime, err := GetNextTime(time.Now().In(l.location), jobData)
2024-05-20 09:35:12 +08:00
if err != nil {
l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error())
return 0, err
}
2025-09-14 19:05:10 +08:00
jobData.NextTime = *nextTime
// 生成唯一索引
index := atomic.AddInt64(&l.timerIndex, 1)
2024-05-20 09:35:12 +08:00
2023-08-15 14:01:23 +08:00
t := timerStr{
2023-08-06 01:07:29 +08:00
Callback: call,
CanRunning: make(chan struct{}, 1),
2023-11-13 23:49:42 +08:00
ExtendData: extend,
2025-09-14 19:05:10 +08:00
TaskId: jobData.TaskId,
2024-05-20 09:35:12 +08:00
JobData: &jobData,
2023-08-06 01:07:29 +08:00
}
2023-08-15 14:01:23 +08:00
2025-09-14 19:05:10 +08:00
l.workerList.Store(index, t)
// 计算下次执行时间(全局)
l.updateNextTimeIfEarlier(*nextTime)
2023-08-06 01:07:29 +08:00
2025-09-14 19:05:10 +08:00
return index, nil
}
// 如果更早则更新下次执行时间(全局)
func (s *Single) updateNextTimeIfEarlier(candidate time.Time) {
s.nextTimeMux.Lock()
defer s.nextTimeMux.Unlock()
if candidate.Before(s.nextTime) {
s.nextTime = candidate
}
2023-08-15 14:01:23 +08:00
}
2023-11-13 23:49:42 +08:00
// 删除定时器
2025-09-14 19:05:10 +08:00
func (l *Single) Del(index int64) {
if val, ok := l.workerList.Load(index); ok {
if timer, ok := val.(timerStr); ok {
close(timer.CanRunning)
}
l.workerList.Delete(index)
}
2023-08-06 01:07:29 +08:00
}
// 迭代定时器列表
2025-09-14 19:05:10 +08:00
func (l *Single) iterator(ctx context.Context) {
// 当前时间
nowTime := time.Now().In(l.location)
2023-08-06 01:07:29 +08:00
// 默认5秒后(如果没有值就暂停进来5秒)
newNextTime := nowTime.Add(time.Second * 5)
2025-09-14 19:05:10 +08:00
l.workerList.Range(func(k, v interface{}) bool {
timeStr, ok := v.(timerStr)
if !ok {
l.logger.Errorf(ctx, "timer: 类型断言失败,跳过该任务")
l.workerList.Delete(k)
return true
}
2023-08-06 01:07:29 +08:00
2024-05-20 09:35:12 +08:00
if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) {
2023-08-15 14:01:23 +08:00
2025-09-14 19:05:10 +08:00
originTime := timeStr.JobData.NextTime
// 计算下次执行时间
nextTime, err := GetNextTime(nowTime, *timeStr.JobData)
if err != nil {
l.logger.Errorf(ctx, "timer: 计算下次执行时间失败:%s", err.Error())
return true
2023-08-06 01:07:29 +08:00
}
2025-09-14 19:05:10 +08:00
// 更新下次执行时间
timeStr.JobData.NextTime = *nextTime
2023-08-06 01:07:29 +08:00
2024-05-20 09:35:12 +08:00
if nextTime.Before(newNextTime) {
2023-08-06 01:07:29 +08:00
// 本规则下次发送时间小于系统下次需要执行的时间:替换
2024-05-20 09:35:12 +08:00
newNextTime = *nextTime
2023-08-06 01:07:29 +08:00
}
2025-09-14 19:05:10 +08:00
go l.executeTask(ctx, timeStr, originTime)
2024-05-20 09:35:12 +08:00
2023-08-06 01:07:29 +08:00
}
2024-05-20 09:35:12 +08:00
return true
})
2023-08-06 01:07:29 +08:00
2025-09-14 19:05:10 +08:00
l.updateNextTime(newNextTime)
}
// 执行任务
func (s *Single) executeTask(ctx context.Context, timer timerStr, originTime time.Time) {
// 创建带追踪ID的上下文
traceCtx := context.WithValue(ctx, "trace_id", uuid.NewV4().String())
s.logger.Infof(traceCtx, "timer: 开始执行任务 %s", timer.TaskId)
traceCtx, cancel := context.WithTimeout(traceCtx, s.timeout) // 设置执行超时
defer cancel()
2025-09-14 19:05:10 +08:00
select {
case timer.CanRunning <- struct{}{}:
defer func() {
select {
case <-timer.CanRunning:
default:
}
}()
// 检查任务是否已执行
taskKey := fmt.Sprintf("%s:%d", timer.TaskId, originTime.UnixNano())
if _, loaded := s.hasRun.LoadOrStore(taskKey, time.Now()); loaded {
s.logger.Errorf(traceCtx, "timer: 任务已执行,跳过本次执行 %s", timer.TaskId)
2025-09-14 19:05:10 +08:00
return
}
// 执行回调
if err := s.doTask(traceCtx, timer, originTime); err != nil {
s.logger.Errorf(traceCtx, "timer: 任务执行失败: %s", err.Error())
2023-08-06 01:07:29 +08:00
}
2025-09-14 19:05:10 +08:00
default:
// 任务正在执行中,跳过本次
s.logger.Infof(traceCtx, "timer: 任务正在执行中,跳过本次 %s", timer.TaskId)
2025-09-14 19:05:10 +08:00
}
2023-08-06 01:07:29 +08:00
}
// 定时器操作类
// 这里不应painc
2025-09-14 19:05:10 +08:00
func (l *Single) doTask(ctx context.Context, timeStr timerStr, originTime time.Time) error {
2023-08-06 01:07:29 +08:00
defer func() {
if err := recover(); err != nil {
2025-09-14 19:05:10 +08:00
l.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
2023-08-06 01:07:29 +08:00
}
}()
2024-05-28 17:28:20 +08:00
2025-09-14 19:05:10 +08:00
nuiKey := timeStr.TaskId + originTime.String()
// timeStr.TaskId
_, loaded := singleHasRun.LoadOrStore(nuiKey, time.Now())
if loaded {
// 已经存在,说明已经执行过了
l.logger.Errorf(ctx, "timer: 任务已执行,跳过本次执行 %s", nuiKey)
return nil
}
2025-04-02 20:26:27 +08:00
ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String())
2024-05-28 17:28:20 +08:00
2025-09-14 19:05:10 +08:00
return timeStr.Callback(ctx, timeStr.ExtendData)
}
// 更新下次执行时间
func (s *Single) updateNextTime(newTime time.Time) {
s.nextTimeMux.Lock()
defer s.nextTimeMux.Unlock()
now := time.Now()
if newTime.Before(now) {
s.nextTime = now
} else {
s.nextTime = newTime
}
2023-08-15 14:01:23 +08:00
}