Files
timerx/single.go
T

195 lines
4.4 KiB
Go
Raw Normal View History

2023-11-27 22:37:33 +08:00
package timerx
2023-08-06 01:07:29 +08:00
// 作者:黄新云
import (
"context"
2023-08-15 14:01:23 +08:00
"errors"
2023-08-06 01:07:29 +08:00
"fmt"
2023-08-15 14:01:23 +08:00
"runtime/debug"
2023-08-06 01:07:29 +08:00
"sync"
"time"
)
// 定时器
2023-12-27 17:19:52 +08:00
// 1. 这个定时器的作用范围是本机
2023-08-06 01:07:29 +08:00
2023-08-27 23:39:58 +08:00
// uuid -> timerStr
var timerMap = make(map[string]*timerStr)
2023-08-06 01:07:29 +08:00
var timerMapMux sync.Mutex
2023-08-27 23:39:58 +08:00
2023-11-13 23:49:42 +08:00
var timerCount int // 当前定时数目
var onceLimit sync.Once // 实现单例
2023-08-15 14:01:23 +08:00
2024-04-06 19:09:58 +08:00
type Single struct {
ctx context.Context
logger Logger
}
2023-08-15 14:01:23 +08:00
2023-11-13 23:49:42 +08:00
var sin *Single = nil
2023-09-23 11:17:42 +08:00
2023-08-06 01:07:29 +08:00
// 定时器类
2024-04-06 19:09:58 +08:00
func InitSingle(ctx context.Context, opts ...Option) *Single {
2023-08-06 01:07:29 +08:00
onceLimit.Do(func() {
2024-04-06 19:09:58 +08:00
op := newOptions(opts...)
sin = &Single{
ctx: ctx,
logger: op.logger,
}
2023-09-23 11:17:42 +08:00
timer := time.NewTicker(time.Millisecond * 200)
2023-08-06 01:07:29 +08:00
go func(ctx context.Context) {
Loop:
for {
select {
case t := <-timer.C:
if t.Before(nextTime) {
// 当前时间小于下次发送时间:跳过
continue
}
// 迭代定时器
2023-11-13 23:49:42 +08:00
sin.iterator(ctx, t)
2023-08-06 01:07:29 +08:00
// fmt.Println("timer: 执行")
case <-ctx.Done():
// 跳出循环
break Loop
}
}
2024-04-06 19:09:58 +08:00
sin.logger.Infof(ctx, "timer: initend")
2023-08-06 01:07:29 +08:00
}(ctx)
})
2023-09-23 11:17:42 +08:00
return sin
2023-08-06 01:07:29 +08:00
}
2023-08-15 14:01:23 +08:00
// 间隔定时器
2023-11-13 23:49:42 +08:00
// @param space 间隔时间
// @param call 回调函数
// @param extend 附加参数
// @return int 定时器索引
// @return error 错误
func (s *Single) Add(space time.Duration, call callback, extend interface{}) (int, error) {
2023-08-06 01:07:29 +08:00
timerMapMux.Lock()
defer timerMapMux.Unlock()
2023-08-27 23:39:58 +08:00
if space != space.Abs() {
2024-04-06 19:09:58 +08:00
s.logger.Errorf(s.ctx, "space must be positive")
2023-08-27 23:39:58 +08:00
return 0, errors.New("space must be positive")
2023-08-15 14:01:23 +08:00
}
2023-08-06 01:07:29 +08:00
timerCount += 1
nowTime := time.Now()
2023-08-15 14:01:23 +08:00
t := timerStr{
2023-08-06 01:07:29 +08:00
Callback: call,
BeginTime: nowTime,
2023-08-15 14:01:23 +08:00
NextTime: nowTime, // nowTime.Add(space), // 添加任务的时候就执行一次
2023-08-06 01:07:29 +08:00
SpaceTime: space,
CanRunning: make(chan struct{}, 1),
2023-11-13 23:49:42 +08:00
ExtendData: extend,
2023-08-06 01:07:29 +08:00
}
2023-08-15 14:01:23 +08:00
2023-08-27 23:39:58 +08:00
timerMap[fmt.Sprintf("%d", timerCount)] = &t
2023-08-06 01:07:29 +08:00
if t.NextTime.Before(nextTime) {
// 本条规则下次需要发送的时间小于系统下次发送时间:替换
nextTime = t.NextTime
}
2023-08-15 14:01:23 +08:00
return timerCount, nil
}
2023-11-13 23:49:42 +08:00
// 删除定时器
func (s *Single) Del(index string) {
2023-08-06 01:07:29 +08:00
timerMapMux.Lock()
defer timerMapMux.Unlock()
delete(timerMap, index)
}
// 迭代定时器列表
2023-11-13 23:49:42 +08:00
func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
2023-08-06 01:07:29 +08:00
timerMapMux.Lock()
defer timerMapMux.Unlock()
// fmt.Println("nowTime:", nowTime.Format("2006-01-02 15:04:05.000"))
// 默认5秒后(如果没有值就暂停进来5秒)
newNextTime := nowTime.Add(time.Second * 5)
2023-08-15 14:01:23 +08:00
index := 0
for _, v := range timerMap {
index++
2023-08-06 01:07:29 +08:00
v := v
// 判断执行的时机
if v.NextTime.Before(nowTime) {
// fmt.Println("NextTime", v.NextTime.Format("2006-01-02 15:04:05.000"))
v.NextTime = v.NextTime.Add(v.SpaceTime)
2023-08-15 14:01:23 +08:00
// 判断下次执行时间与当前时间
if v.NextTime.Before(nowTime) {
v.NextTime = nowTime.Add(v.SpaceTime)
}
if index == 1 {
2023-08-06 01:07:29 +08:00
// 循环的第一个需要替换默认值
newNextTime = v.NextTime
}
// 获取最小的
if v.NextTime.Before(newNextTime) {
// 本规则下次发送时间小于系统下次需要执行的时间:替换
newNextTime = v.NextTime
}
// 处理中就跳过本次
2023-08-15 14:01:23 +08:00
go func(ctx context.Context, v *timerStr) {
2023-08-06 01:07:29 +08:00
select {
case v.CanRunning <- struct{}{}:
defer func() {
// fmt.Printf("timer: 执行完成 %v %v \n", k, v.Tag)
select {
case <-v.CanRunning:
return
default:
return
}
}()
// fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag)
2024-04-04 10:58:57 +08:00
s.doTask(ctx, v.Callback, v.ExtendData)
2023-08-06 01:07:29 +08:00
default:
// fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag)
return
}
}(ctx, v)
}
}
// 实际下次时间小于预期下次时间:替换
if nextTime.Before(newNextTime) {
// 判断一下避免异常
if newNextTime.Before(nowTime) {
// 比当前时间小
nextTime = nowTime
} else {
nextTime = newNextTime
}
}
// fmt.Println("timer: one finish")
}
// 定时器操作类
// 这里不应painc
2024-04-04 10:58:57 +08:00
func (s *Single) doTask(ctx context.Context, call callback, extend interface{}) error {
2023-08-06 01:07:29 +08:00
defer func() {
if err := recover(); err != nil {
2024-04-06 19:09:58 +08:00
s.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack()))
2023-08-06 01:07:29 +08:00
}
}()
2023-11-13 23:49:42 +08:00
return call(ctx, extend)
2023-08-15 14:01:23 +08:00
}