初始化项目

This commit is contained in:
Yun
2023-08-15 14:01:23 +08:00
parent a36216f3fd
commit 60e0478a62
4 changed files with 305 additions and 24 deletions
+2
View File
@@ -1,3 +1,5 @@
module code.yun.ink/open/timer
go 1.19
require github.com/gomodule/redigo v1.8.9
+12
View File
@@ -0,0 +1,12 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+137 -24
View File
@@ -4,8 +4,10 @@ package timer
import (
"context"
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
"time"
)
@@ -13,30 +15,49 @@ import (
// 定时器
// 原理:每毫秒的时间触发
type singleTimer struct {
type timerStr struct {
Callback callback // 需要回调的方法
CanRunning chan (struct{})
BeginTime time.Time // 初始化任务的时间
NextTime time.Time // 下一次执行的时间
SpaceTime time.Duration // 间隔时间
Params []int
// Params []int
// UniqueLimitFunc UniqueLimitFunc
UniqueKey string
TimerType string // 普通类型(default) + 全局唯一(unique)
Extend ExtendParams // 附加参数
}
type clusterTimer struct {
}
var timerMap = make(map[int]*singleTimer)
var timerMap = make(map[int]*timerStr)
var timerMapMux sync.Mutex
var timerCount int // 当前定时数目
var onceLimit sync.Once // 实现单例
var nextTime = time.Now() // 下一次执行的时间
var timerUnique UniqueLimitFunc
type ContextValueKey string // 定义context 传递的Key类型
const (
extendParamKey ContextValueKey = "extend_param"
)
// 外部唯一限制接口
type UniqueLimitFunc interface {
SetLimit(key, value string) error
DeleteLimit(key, value string) error
RefreshLimit(key, value string) error
}
// 扩展参数
type ExtendParams struct {
UniqueKey string // 唯一键,如果填写了就会全局唯一
Params map[string]interface{} // 带出去的参数
}
// 定时器类
func InitSingle(ctx context.Context) {
func InitTimer(ctx context.Context, uni UniqueLimitFunc) {
onceLimit.Do(func() {
timerUnique = uni
timer := time.NewTicker(1 * time.Millisecond)
go func(ctx context.Context) {
Loop:
@@ -58,28 +79,44 @@ func InitSingle(ctx context.Context) {
log.Println("timer: initend")
}(ctx)
})
}
func InitCluster(ctx context.Context) {}
// 添加需要定时的规则
func AddToTimer(space time.Duration, call callback) int {
// 间隔定时器
func AddTimer(space time.Duration, call callback, extend ExtendParams) (int, error) {
timerMapMux.Lock()
defer timerMapMux.Unlock()
timerType := "default"
if extend.UniqueKey != "" {
// 判断唯一限制
if timerUnique == nil {
return 0, errors.New("唯一限制查询不到")
}
// uniqueKey只可以添加一次
for _, val := range timerMap {
if val.UniqueKey == extend.UniqueKey {
return 0, errors.New("uniqueKey重复")
}
}
timerType = "unique"
}
timerCount += 1
// 计算出首次开始时间和时间间隔,保存在map里面
nowTime := time.Now()
t := singleTimer{
t := timerStr{
Callback: call,
BeginTime: nowTime,
NextTime: nowTime.Add(space),
NextTime: nowTime, // nowTime.Add(space), // 添加任务的时候就执行一次
SpaceTime: space,
CanRunning: make(chan struct{}, 1),
TimerType: timerType,
UniqueKey: extend.UniqueKey,
Extend: extend,
}
timerMap[timerCount] = &t
if t.NextTime.Before(nextTime) {
@@ -87,7 +124,23 @@ func AddToTimer(space time.Duration, call callback) int {
nextTime = t.NextTime
}
return timerCount
return timerCount, nil
}
// 添加需要定时的规则
func AddToTimer(space time.Duration, call callback) int {
extend := ExtendParams{}
count, _ := AddTimer(space, call, extend)
return count
}
// 添加互斥执行的任务
func AddUniqueTimer(space time.Duration, call callback, uniqueKey string) (int, error) {
extend := ExtendParams{
UniqueKey: uniqueKey,
}
count, err := AddTimer(space, call, extend)
return count, err
}
func DelToTimer(index int) {
@@ -106,16 +159,22 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) {
// 默认5秒后(如果没有值就暂停进来5秒)
newNextTime := nowTime.Add(time.Second * 5)
for k, v := range timerMap {
index := 0
for _, v := range timerMap {
index++
v := v
// 判断执行的时机
if v.NextTime.Before(nowTime) {
// fmt.Println("NextTime", v.NextTime.Format("2006-01-02 15:04:05.000"))
// TODO:这个有问题:假如加上一个时间段还是比当前时间小,会导致连续多次执行
v.NextTime = v.NextTime.Add(v.SpaceTime)
if k == 0 {
// 判断下次执行时间与当前时间
if v.NextTime.Before(nowTime) {
v.NextTime = nowTime.Add(v.SpaceTime)
}
if index == 1 {
// 循环的第一个需要替换默认值
newNextTime = v.NextTime
}
@@ -127,7 +186,7 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) {
}
// 处理中就跳过本次
go func(ctx context.Context, v *singleTimer) {
go func(ctx context.Context, v *timerStr) {
select {
case v.CanRunning <- struct{}{}:
// TODO: 需要考虑分布式锁
@@ -141,7 +200,7 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) {
}
}()
// fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag)
timerAction(ctx, v.Callback)
timerAction(ctx, v.Callback, v.TimerType, v.UniqueKey, v.Extend)
default:
// fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag)
return
@@ -165,15 +224,69 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) {
}
// 定义各个回调函数
type callback func(context.Context) bool
type callback func(ctx context.Context) bool
// 定时器操作类
// 这里不应painc
func timerAction(ctx context.Context, call callback) bool {
func timerAction(ctx context.Context, call callback, timerType string, uniqueKey string, extend ExtendParams) bool {
defer func() {
if err := recover(); err != nil {
fmt.Println("timer:定时器出错", err)
log.Println("errStack", string(debug.Stack()))
}
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 唯一的需要设置唯一键&需要刷新唯一键
if timerType == "unique" {
NowUnixNano := time.Now().UnixNano()
redisValue := fmt.Sprintf("%v", NowUnixNano)
err := timerUnique.SetLimit(uniqueKey, redisValue)
if err != nil {
fmt.Println("unique跳过")
return false
}
fmt.Println("unique开始执行")
go func() {
// 5秒刷新一次
timer := time.NewTicker(time.Second * 1)
Loop2:
for {
select {
case t := <-timer.C:
// 更新
err = timerUnique.RefreshLimit(uniqueKey, redisValue)
if err != nil {
fmt.Printf("unique更新:%+v %+v\n", err, t.Unix())
}
case <-ctx.Done():
// 取消
err = timerUnique.DeleteLimit(uniqueKey, redisValue)
if err != nil {
fmt.Printf("unique删除:%+v\n", err)
}
break Loop2
}
}
timer.Stop()
fmt.Println("unique执行结束")
}()
}
// 附加数据
ctx = context.WithValue(ctx, extendParamKey, extend)
return call(ctx)
}
// 快捷方法
func GetExtendParams(ctx context.Context) (*ExtendParams,error) {
val := ctx.Value(extendParamKey)
params,ok := val.(ExtendParams)
if !ok {
return nil,errors.New("没找到参数")
}
return &params,nil
}
+154
View File
@@ -0,0 +1,154 @@
package uniquer
import (
"errors"
"fmt"
"github.com/gomodule/redigo/redis"
)
type uniqueRedisgo struct {
Redis *redis.Pool
}
var exporeSecond int64 = 15
func NewUniqueRedisGo(redis *redis.Pool) *uniqueRedisgo {
return &uniqueRedisgo{redis}
}
func (u *uniqueRedisgo) SetLimit(key, value string) error {
conn := u.Redis.Get()
defer conn.Close()
if err := conn.Err(); err != nil {
fmt.Println("get redis connect fail, err: ", err)
return err
}
response, err := conn.Do("SET", key, value, "NX", "EX", exporeSecond)
// fmt.Println("SetLimit:", response, err)
if err != nil {
fmt.Println("redis setex fail, err: ", err)
return err
}
if response != "OK" {
return errors.New("response not ok")
}
return nil
}
func (u *uniqueRedisgo) DeleteLimit(key, value string) error {
conn := u.Redis.Get()
defer conn.Close()
if err := conn.Err(); err != nil {
fmt.Println("get redis connect fail, err: ", err)
return err
}
_, err := conn.Do("WATCH", key)
if err != nil {
return err
}
defer conn.Do("UNWATCH")
val, err := redis.String(conn.Do("GET", key))
// fmt.Println("DeleteLimit Do:", val, err)
if err != nil {
return err
}
if val != value {
return errors.New("值不一致")
}
// 处理
err = conn.Send("MULTI")
if err != nil {
return err
}
err = conn.Send("DEL", key)
if err != nil {
return err
}
reply, err := conn.Do("EXEC")
// fmt.Printf("DeleteLimit EXEC:%T val:%+v err:%+v\n", reply, reply, err)
if err != nil {
return err
}
if reply == nil {
return errors.New("删除失败")
}
return nil
}
func (u *uniqueRedisgo) RefreshLimit(key, value string) error {
conn := u.Redis.Get()
defer conn.Close()
if err := conn.Err(); err != nil {
fmt.Println("get redis connect fail, err: ", err)
return err
}
_, err := conn.Do("WATCH", key)
if err != nil {
return err
}
defer conn.Do("UNWATCH")
val, err := redis.String(conn.Do("GET", key))
// fmt.Println("RefreshLimit GET:", val, err, value)
if err != nil {
return err
}
if val != value {
return errors.New("值不一致")
}
// time.Sleep(time.Second * 5)
// 处理
err = conn.Send("MULTI")
// fmt.Println("RefreshLimit MULTI:", err)
if err != nil {
return err
}
err = conn.Send("EXPIRE", key, exporeSecond)
// fmt.Println("RefreshLimit EXPIRE:", err)
if err != nil {
return err
}
// 管道
// reply 执行失败将会返回nil 执行成功就是返回的值
reply, err := conn.Do("EXEC")
// fmt.Printf("RefreshLimit EXEC:%T val:%+v err:%+v\n", reply, reply, err)
// i, ok := reply.([]interface{})
// if ok {
// fmt.Printf("reply i %T\n", i[0])
// uu, ok := i[0].([]uint8)
// if ok {
// fmt.Printf("reply %+v\n", string(uu))
// }
// }
if err != nil {
return err
}
if reply == nil {
return errors.New("更新失败")
}
return nil
}