From 60e0478a62fbc649e9c2dd0f181fd1ab524c46f7 Mon Sep 17 00:00:00 2001 From: Yun Date: Tue, 15 Aug 2023 14:01:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96=E9=A1=B9=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 + go.sum | 12 ++++ timer.go | 161 ++++++++++++++++++++++++++++++++++++++------- uniquer/redisgo.go | 154 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 305 insertions(+), 24 deletions(-) create mode 100644 go.sum create mode 100644 uniquer/redisgo.go diff --git a/go.mod b/go.mod index 2cef1aa..92b2a3b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module code.yun.ink/open/timer go 1.19 + +require github.com/gomodule/redigo v1.8.9 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6c8a5ca --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= +github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/timer.go b/timer.go index 8cd6668..5a90f20 100644 --- a/timer.go +++ b/timer.go @@ -4,8 +4,10 @@ package timer import ( "context" + "errors" "fmt" "log" + "runtime/debug" "sync" "time" ) @@ -13,30 +15,49 @@ import ( // 定时器 // 原理:每毫秒的时间触发 -type singleTimer struct { +type timerStr struct { Callback callback // 需要回调的方法 CanRunning chan (struct{}) BeginTime time.Time // 初始化任务的时间 NextTime time.Time // 下一次执行的时间 SpaceTime time.Duration // 间隔时间 - Params []int + // Params []int + // UniqueLimitFunc UniqueLimitFunc + UniqueKey string + TimerType string // 普通类型(default) + 全局唯一(unique) + Extend ExtendParams // 附加参数 } -type clusterTimer struct { - -} - -var timerMap = make(map[int]*singleTimer) +var timerMap = make(map[int]*timerStr) var timerMapMux sync.Mutex var timerCount int // 当前定时数目 var onceLimit sync.Once // 实现单例 var nextTime = time.Now() // 下一次执行的时间 +var timerUnique UniqueLimitFunc +type ContextValueKey string // 定义context 传递的Key类型 +const ( + extendParamKey ContextValueKey = "extend_param" +) + +// 外部唯一限制接口 +type UniqueLimitFunc interface { + SetLimit(key, value string) error + DeleteLimit(key, value string) error + RefreshLimit(key, value string) error +} + +// 扩展参数 +type ExtendParams struct { + UniqueKey string // 唯一键,如果填写了就会全局唯一 + Params map[string]interface{} // 带出去的参数 +} // 定时器类 -func InitSingle(ctx context.Context) { +func InitTimer(ctx context.Context, uni UniqueLimitFunc) { onceLimit.Do(func() { + timerUnique = uni timer := time.NewTicker(1 * time.Millisecond) go func(ctx context.Context) { Loop: @@ -58,28 +79,44 @@ func InitSingle(ctx context.Context) { log.Println("timer: initend") }(ctx) }) - } -func InitCluster(ctx context.Context) {} - -// 添加需要定时的规则 -func AddToTimer(space time.Duration, call callback) int { +// 间隔定时器 +func AddTimer(space time.Duration, call callback, extend ExtendParams) (int, error) { timerMapMux.Lock() defer timerMapMux.Unlock() + timerType := "default" + if extend.UniqueKey != "" { + // 判断唯一限制 + if timerUnique == nil { + return 0, errors.New("唯一限制查询不到") + } + + // uniqueKey只可以添加一次 + for _, val := range timerMap { + if val.UniqueKey == extend.UniqueKey { + return 0, errors.New("uniqueKey重复") + } + } + timerType = "unique" + } + timerCount += 1 - // 计算出首次开始时间和时间间隔,保存在map里面 nowTime := time.Now() - t := singleTimer{ + t := timerStr{ Callback: call, BeginTime: nowTime, - NextTime: nowTime.Add(space), + NextTime: nowTime, // nowTime.Add(space), // 添加任务的时候就执行一次 SpaceTime: space, CanRunning: make(chan struct{}, 1), + TimerType: timerType, + UniqueKey: extend.UniqueKey, + Extend: extend, } + timerMap[timerCount] = &t if t.NextTime.Before(nextTime) { @@ -87,7 +124,23 @@ func AddToTimer(space time.Duration, call callback) int { nextTime = t.NextTime } - return timerCount + return timerCount, nil +} + +// 添加需要定时的规则 +func AddToTimer(space time.Duration, call callback) int { + extend := ExtendParams{} + count, _ := AddTimer(space, call, extend) + return count +} + +// 添加互斥执行的任务 +func AddUniqueTimer(space time.Duration, call callback, uniqueKey string) (int, error) { + extend := ExtendParams{ + UniqueKey: uniqueKey, + } + count, err := AddTimer(space, call, extend) + return count, err } func DelToTimer(index int) { @@ -106,16 +159,22 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) { // 默认5秒后(如果没有值就暂停进来5秒) newNextTime := nowTime.Add(time.Second * 5) - for k, v := range timerMap { + index := 0 + for _, v := range timerMap { + index++ v := v // 判断执行的时机 if v.NextTime.Before(nowTime) { // fmt.Println("NextTime", v.NextTime.Format("2006-01-02 15:04:05.000")) - // TODO:这个有问题:假如加上一个时间段还是比当前时间小,会导致连续多次执行 v.NextTime = v.NextTime.Add(v.SpaceTime) - if k == 0 { + // 判断下次执行时间与当前时间 + if v.NextTime.Before(nowTime) { + v.NextTime = nowTime.Add(v.SpaceTime) + } + + if index == 1 { // 循环的第一个需要替换默认值 newNextTime = v.NextTime } @@ -127,7 +186,7 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) { } // 处理中就跳过本次 - go func(ctx context.Context, v *singleTimer) { + go func(ctx context.Context, v *timerStr) { select { case v.CanRunning <- struct{}{}: // TODO: 需要考虑分布式锁 @@ -141,7 +200,7 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) { } }() // fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag) - timerAction(ctx, v.Callback) + timerAction(ctx, v.Callback, v.TimerType, v.UniqueKey, v.Extend) default: // fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag) return @@ -165,15 +224,69 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) { } // 定义各个回调函数 -type callback func(context.Context) bool +type callback func(ctx context.Context) bool // 定时器操作类 // 这里不应painc -func timerAction(ctx context.Context, call callback) bool { +func timerAction(ctx context.Context, call callback, timerType string, uniqueKey string, extend ExtendParams) bool { defer func() { if err := recover(); err != nil { fmt.Println("timer:定时器出错", err) + log.Println("errStack", string(debug.Stack())) } }() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // 唯一的需要设置唯一键&需要刷新唯一键 + if timerType == "unique" { + NowUnixNano := time.Now().UnixNano() + redisValue := fmt.Sprintf("%v", NowUnixNano) + err := timerUnique.SetLimit(uniqueKey, redisValue) + if err != nil { + fmt.Println("unique跳过") + return false + } + fmt.Println("unique开始执行") + + go func() { + // 5秒刷新一次 + timer := time.NewTicker(time.Second * 1) + Loop2: + for { + select { + case t := <-timer.C: + // 更新 + err = timerUnique.RefreshLimit(uniqueKey, redisValue) + if err != nil { + fmt.Printf("unique更新:%+v %+v\n", err, t.Unix()) + } + case <-ctx.Done(): + // 取消 + err = timerUnique.DeleteLimit(uniqueKey, redisValue) + if err != nil { + fmt.Printf("unique删除:%+v\n", err) + } + break Loop2 + } + } + timer.Stop() + fmt.Println("unique执行结束") + }() + } + + // 附加数据 + ctx = context.WithValue(ctx, extendParamKey, extend) + return call(ctx) } + +// 快捷方法 +func GetExtendParams(ctx context.Context) (*ExtendParams,error) { + val := ctx.Value(extendParamKey) + params,ok := val.(ExtendParams) + if !ok { + return nil,errors.New("没找到参数") + } + return ¶ms,nil +} diff --git a/uniquer/redisgo.go b/uniquer/redisgo.go new file mode 100644 index 0000000..1412ac8 --- /dev/null +++ b/uniquer/redisgo.go @@ -0,0 +1,154 @@ +package uniquer + +import ( + "errors" + "fmt" + + "github.com/gomodule/redigo/redis" +) + +type uniqueRedisgo struct { + Redis *redis.Pool +} + +var exporeSecond int64 = 15 + +func NewUniqueRedisGo(redis *redis.Pool) *uniqueRedisgo { + return &uniqueRedisgo{redis} +} + +func (u *uniqueRedisgo) SetLimit(key, value string) error { + conn := u.Redis.Get() + defer conn.Close() + + if err := conn.Err(); err != nil { + fmt.Println("get redis connect fail, err: ", err) + return err + } + + response, err := conn.Do("SET", key, value, "NX", "EX", exporeSecond) + + // fmt.Println("SetLimit:", response, err) + + if err != nil { + fmt.Println("redis setex fail, err: ", err) + return err + } + + if response != "OK" { + return errors.New("response not ok") + } + + return nil + +} + +func (u *uniqueRedisgo) DeleteLimit(key, value string) error { + conn := u.Redis.Get() + defer conn.Close() + + if err := conn.Err(); err != nil { + fmt.Println("get redis connect fail, err: ", err) + return err + } + + _, err := conn.Do("WATCH", key) + if err != nil { + return err + } + defer conn.Do("UNWATCH") + + val, err := redis.String(conn.Do("GET", key)) + // fmt.Println("DeleteLimit Do:", val, err) + if err != nil { + return err + } + if val != value { + return errors.New("值不一致") + } + // 处理 + err = conn.Send("MULTI") + if err != nil { + return err + } + err = conn.Send("DEL", key) + if err != nil { + return err + } + reply, err := conn.Do("EXEC") + // fmt.Printf("DeleteLimit EXEC:%T val:%+v err:%+v\n", reply, reply, err) + if err != nil { + return err + } + + if reply == nil { + return errors.New("删除失败") + } + + return nil +} + +func (u *uniqueRedisgo) RefreshLimit(key, value string) error { + conn := u.Redis.Get() + defer conn.Close() + + if err := conn.Err(); err != nil { + fmt.Println("get redis connect fail, err: ", err) + return err + } + + _, err := conn.Do("WATCH", key) + if err != nil { + return err + } + defer conn.Do("UNWATCH") + + val, err := redis.String(conn.Do("GET", key)) + + // fmt.Println("RefreshLimit GET:", val, err, value) + if err != nil { + return err + } + if val != value { + return errors.New("值不一致") + } + + // time.Sleep(time.Second * 5) + + // 处理 + err = conn.Send("MULTI") + // fmt.Println("RefreshLimit MULTI:", err) + if err != nil { + return err + } + + err = conn.Send("EXPIRE", key, exporeSecond) + // fmt.Println("RefreshLimit EXPIRE:", err) + if err != nil { + return err + } + + // 管道 + // reply 执行失败将会返回nil 执行成功就是返回的值 + reply, err := conn.Do("EXEC") + + // fmt.Printf("RefreshLimit EXEC:%T val:%+v err:%+v\n", reply, reply, err) + // i, ok := reply.([]interface{}) + // if ok { + // fmt.Printf("reply i %T\n", i[0]) + // uu, ok := i[0].([]uint8) + // if ok { + // fmt.Printf("reply %+v\n", string(uu)) + // } + // } + + if err != nil { + return err + } + + if reply == nil { + return errors.New("更新失败") + } + + return nil +}