Files
timerx/single.go
T
2024-05-31 13:05:51 +08:00

295 lines
7.5 KiB
Go

package timerx
// 作者:黄新云
import (
"context"
"errors"
"runtime/debug"
"sync"
"time"
uuid "github.com/satori/go.uuid"
)
// 简单定时器
// 1. 这个定时器的作用范围是本机
// 2. 适用简单的时间间隔定时任务
// 定时器结构体
var singleWorkerList sync.Map
var singleTimerIndex int // 当前定时数目
var singleOnceLimit sync.Once // 实现单例
type Single struct {
ctx context.Context
logger Logger
location *time.Location
}
var sin *Single = nil
var singleNextTime = time.Now() // 下一次执行的时间
// 定时器类
// @param ctx context.Context 上下文
// @param opts ...Option 配置项
func InitSingle(ctx context.Context, opts ...Option) *Single {
singleOnceLimit.Do(func() {
op := newOptions(opts...)
sin = &Single{
ctx: ctx,
logger: op.logger,
location: op.location,
}
timer := time.NewTicker(time.Millisecond * 200)
go func(ctx context.Context) {
Loop:
for {
select {
case t := <-timer.C:
if t.Before(singleNextTime) {
// 当前时间小于下次发送时间:跳过
continue
}
// 迭代定时器
sin.iterator(ctx)
// fmt.Println("timer: 执行")
case <-ctx.Done():
// 跳出循环
break Loop
}
}
sin.logger.Infof(ctx, "timer: initend")
}(ctx)
})
return sin
}
// 每月执行一次
// @param ctx 上下文
// @param taskId 任务ID
// @param day 每月的几号
// @param hour 小时
// @param minute 分钟
// @param second 秒
// @param callback 回调函数
// @param extendData 扩展数据
// @return error
func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
JobType: JobTypeEveryMonth,
CreateTime: nowTime,
Day: day,
Hour: hour,
Minute: minute,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每周执行一次
// @param ctx context.Context 上下文
// @param taskId string 任务ID
// @param week time.Weekday 周
// @param hour int 小时
// @param minute int 分钟
// @param second int 秒
func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
JobType: JobTypeEveryWeek,
CreateTime: nowTime,
Weekday: week,
Hour: hour,
Minute: minute,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每天执行一次
func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
JobType: JobTypeEveryDay,
CreateTime: nowTime,
Hour: hour,
Minute: minute,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每小时执行一次
func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
JobType: JobTypeEveryHour,
CreateTime: nowTime,
Minute: minute,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 每分钟执行一次
func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
JobType: JobTypeEveryMinute,
CreateTime: nowTime,
Second: second,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 特定时间间隔
func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
if spaceTime < 0 {
c.logger.Errorf(ctx, "间隔时间不能小于0")
return 0, errors.New("间隔时间不能小于0")
}
jobData := JobData{
JobType: JobTypeInterval,
CreateTime: nowTime,
IntervalTime: spaceTime,
}
return c.addJob(ctx, jobData, callback, extendData)
}
// 间隔定时器
// @param space 间隔时间
// @param call 回调函数
// @param extend 附加参数
// @return int 定时器索引
// @return error 错误
func (l *Single) addJob(ctx context.Context, jobData JobData, call func(ctx context.Context, extendData interface{}) error, extend interface{}) (int, error) {
singleTimerIndex += 1
_, err := GetNextTime(time.Now().In(l.location), jobData)
if err != nil {
l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error())
return 0, err
}
t := timerStr{
Callback: call,
CanRunning: make(chan struct{}, 1),
ExtendData: extend,
JobData: &jobData,
}
singleWorkerList.Store(singleTimerIndex, t)
return singleTimerIndex, nil
}
// 删除定时器
func (s *Single) Del(index int) {
singleWorkerList.Delete(index)
}
// 迭代定时器列表
func (s *Single) iterator(ctx context.Context) {
nowTime := time.Now().In(s.location)
// 默认5秒后(如果没有值就暂停进来5秒)
newNextTime := nowTime.Add(time.Second * 5)
index := 0
singleWorkerList.Range(func(k, v interface{}) bool {
index++
timeStr := v.(timerStr)
if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) {
// 可执行
nextTime, _ := GetNextTime(nowTime, *timeStr.JobData)
timeStr.JobData.NextTime = *nextTime
if index == 1 {
// 循环的第一个需要替换默认值
newNextTime = timeStr.JobData.NextTime
}
if nextTime.Before(newNextTime) {
// 本规则下次发送时间小于系统下次需要执行的时间:替换
newNextTime = *nextTime
}
// 处理中就跳过本次
go func(ctx context.Context, v timerStr) {
select {
case v.CanRunning <- struct{}{}:
defer func() {
// fmt.Printf("timer: 执行完成 %v %v \n", k, v.Tag)
select {
case <-v.CanRunning:
return
default:
return
}
}()
// fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag)
s.doTask(ctx, v.Callback, v.ExtendData)
default:
// fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag)
return
}
}(ctx, timeStr)
}
return true
})
// 实际下次时间小于预期下次时间:替换
if singleNextTime.Before(newNextTime) {
// 判断一下避免异常
if newNextTime.Before(nowTime) {
// 比当前时间小
singleNextTime = nowTime
} else {
singleNextTime = newNextTime
}
}
// fmt.Println("timer: one finish")
}
// 定时器操作类
// 这里不应painc
func (s *Single) doTask(ctx context.Context, call func(ctx context.Context, extendData interface{}) error, extend interface{}) error {
defer func() {
if err := recover(); err != nil {
s.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
}
}()
ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String)
return call(ctx, extend)
}