Files

173 lines
4.0 KiB
Go
Raw Permalink Normal View History

2023-09-16 20:14:20 +08:00
package timer
// 作者:黄新云
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// 定时器
// 原理:每毫秒的时间触发
// TODO:不是最新的
type timerStr struct {
Callback callback // 需要回调的方法
CanRunning chan (struct{})
BeginTime time.Time // 初始化任务的时间
NextTime time.Time // 下一次执行的时间
SpaceTime time.Duration // 间隔时间
Params []int
}
var timerMap = make(map[int]*timerStr)
var timerMapMux sync.Mutex
var timerCount int // 当前定时数目
var onceLimit sync.Once // 实现单例
var nextTime = time.Now() // 下一次执行的时间
// 定时器类
func InitTimer(ctx context.Context) {
onceLimit.Do(func() {
timer := time.NewTicker(1 * time.Millisecond)
go func(ctx context.Context) {
Loop:
for {
select {
case t := <-timer.C:
if t.Before(nextTime) {
// 当前时间小于下次发送时间:跳过
continue
}
// 迭代定时器
iteratorTimer(ctx, t)
// fmt.Println("timer: 执行")
case <-ctx.Done():
// 跳出循环
break Loop
}
}
log.Println("timer: initend")
}(ctx)
})
}
// 添加需要定时的规则
func AddToTimer(space time.Duration, call callback) int {
timerMapMux.Lock()
defer timerMapMux.Unlock()
timerCount += 1
// 计算出首次开始时间和时间间隔,保存在map里面
nowTime := time.Now()
t := timerStr{
Callback: call,
BeginTime: nowTime,
NextTime: nowTime.Add(space),
SpaceTime: space,
CanRunning: make(chan struct{}, 1),
}
timerMap[timerCount] = &t
if t.NextTime.Before(nextTime) {
// 本条规则下次需要发送的时间小于系统下次发送时间:替换
nextTime = t.NextTime
}
return timerCount
}
func DelToTimer(index int) {
timerMapMux.Lock()
defer timerMapMux.Unlock()
delete(timerMap, index)
}
// 迭代定时器列表
func iteratorTimer(ctx context.Context, nowTime time.Time) {
timerMapMux.Lock()
defer timerMapMux.Unlock()
// fmt.Println("nowTime:", nowTime.Format("2006-01-02 15:04:05.000"))
// 默认5秒后(如果没有值就暂停进来5秒)
newNextTime := nowTime.Add(time.Second * 5)
for k, v := range timerMap {
v := v
// 判断执行的时机
if v.NextTime.Before(nowTime) {
// fmt.Println("NextTime", v.NextTime.Format("2006-01-02 15:04:05.000"))
// TODO:这个有问题:假如加上一个时间段还是比当前时间小,会导致连续多次执行
v.NextTime = v.NextTime.Add(v.SpaceTime)
if k == 0 {
// 循环的第一个需要替换默认值
newNextTime = v.NextTime
}
// 获取最小的
if v.NextTime.Before(newNextTime) {
// 本规则下次发送时间小于系统下次需要执行的时间:替换
newNextTime = v.NextTime
}
// 处理中就跳过本次
go func(ctx context.Context, v *timerStr) {
select {
case v.CanRunning <- struct{}{}:
// TODO: 需要考虑分布式锁
defer func() {
// fmt.Printf("timer: 执行完成 %v %v \n", k, v.Tag)
select {
case <-v.CanRunning:
return
default:
return
}
}()
// fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag)
timerAction(ctx, v.Callback)
default:
// fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag)
return
}
}(ctx, v)
}
}
// 实际下次时间小于预期下次时间:替换
if nextTime.Before(newNextTime) {
// 判断一下避免异常
if newNextTime.Before(nowTime) {
// 比当前时间小
nextTime = nowTime
} else {
nextTime = newNextTime
}
}
// fmt.Println("timer: one finish")
}
// 定义各个回调函数
type callback func(context.Context) bool
// 定时器操作类
// 这里不应painc
func timerAction(ctx context.Context, call callback) bool {
defer func() {
if err := recover(); err != nil {
fmt.Println("timer:定时器出错", err)
}
}()
return call(ctx)
}