From 3ccce0bba29f6f18990c1b559294a2693337eca7 Mon Sep 17 00:00:00 2001 From: Yun Date: Sun, 27 Aug 2023 23:39:58 +0800 Subject: [PATCH] create cluster --- .vscode/launch.json | 22 ++++ cluster.go | 248 ++++++++++++++++++++++++++++++++++++++++++ cmd/main.go | 97 +++++++++++++++++ example_test.go | 48 ++++++-- go.mod | 10 +- go.sum | 15 +++ lockx/lockx.go | 96 ++++++++++++++++ readme.md | 14 +++ rreadme.md | 8 -- timer.go => single.go | 116 +++----------------- types.go | 18 +++ 11 files changed, 571 insertions(+), 121 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 cluster.go create mode 100644 cmd/main.go create mode 100644 lockx/lockx.go create mode 100644 readme.md delete mode 100644 rreadme.md rename timer.go => single.go (55%) create mode 100644 types.go diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..5cc56c5 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,22 @@ +{ + // 使用 IntelliSense 了解相关属性。 + // 悬停以查看现有属性的描述。 + // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch file", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${file}" + }, + { + "name": "Attach to Process", + "type": "go", + "request": "attach", + "mode": "local", + "processId": 0 + } + ] +} \ No newline at end of file diff --git a/cluster.go b/cluster.go new file mode 100644 index 0000000..f4de751 --- /dev/null +++ b/cluster.go @@ -0,0 +1,248 @@ +package timer + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "runtime/debug" + "sync" + "time" + + "code.yun.ink/open/timer/lockx" + "github.com/go-redis/redis/v8" +) + +// 单例模式 +var clusterOnceLimit sync.Once + +// 已注册的任务列表 +var clusterWorkerList sync.Map + +type cluster struct { + ctx context.Context + redis *redis.Client + lockKey string // 全局计算的key + nextKey string // 下一次执行的key + zsetKey string // 有序集合的key +} + +var clu *cluster = nil + +func InitCluster(ctx context.Context, red *redis.Client) *cluster { + clusterOnceLimit.Do(func() { + clu = &cluster{ + ctx: ctx, + redis: red, + lockKey: "timer:globalLockKey", + nextKey: "timer:NextKey", + zsetKey: "timer:zsetKey", + } + + timer := time.NewTicker(time.Millisecond*100) + + go func(ctx context.Context, red *redis.Client) { + Loop: + for { + select { + case <-timer.C: + go clu.computeTime() + go clu.getTask() + case <-ctx.Done(): + break Loop + } + } + }(ctx, red) + }) + return clu +} + +func (c *cluster) AddTimer(ctx context.Context, uniqueKey string, spaceTime time.Duration, callback callback, extend ExtendParams) error { + _, ok := clusterWorkerList.Load(uniqueKey) + if ok { + return errors.New("key已存在") + } + + if spaceTime != spaceTime.Abs() { + return errors.New("时间间隔不能为负数") + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + lock := lockx.NewGlobalLock(ctx, c.redis, uniqueKey) + tB := lock.Try(10) + if !tB { + return errors.New("添加失败") + } + defer lock.Unlock() + lock.Refresh() + + nowTime := time.Now() + + t := timerStr{ + BeginTime: nowTime, + NextTime: nowTime, + SpaceTime: spaceTime, + Callback: callback, + Extend: extend, + UniqueKey: uniqueKey, + } + + clusterWorkerList.Store(uniqueKey, t) + + cacheStr, _ := c.redis.Get(ctx, c.nextKey).Result() + execTime := make(map[string]time.Time) + json.Unmarshal([]byte(cacheStr), &execTime) + + p := c.redis.Pipeline() + + p.ZAdd(ctx, c.zsetKey, &redis.Z{ + Score: float64(nextTime.UnixMilli()), + Member: uniqueKey, + }) + execTime[uniqueKey] = nextTime + n, _ := json.Marshal(execTime) + fmt.Println("execTime:", execTime, string(n)) + p.Set(ctx, c.nextKey, string(n), 0) + + _, err := p.Exec(ctx) + + fmt.Println("添加", err) + + return err +} + +// 计算下一次执行的时间 +func (c *cluster) computeTime() { + log.Println("begin computer") + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + + lock := lockx.NewGlobalLock(ctx, c.redis, c.lockKey) + // 获取锁 + lockBool := lock.Lock() + if !lockBool { + log.Println("timer:获取锁失败") + return + } + defer lock.Unlock() + + // 更新锁 + lock.Refresh() + + // 计算下一次时间 + + // 读取执行的缓存 + cacheStr, _ := c.redis.Get(ctx, c.nextKey).Result() + execTime := make(map[string]time.Time) + json.Unmarshal([]byte(cacheStr), &execTime) + + // log.Println("cacheStr:", cacheStr, execTime) + // return + + p := c.redis.Pipeline() + + nowTime := time.Now() + + clusterWorkerList.Range(func(key, value interface{}) bool { + // log.Println("range:", key, value) + val := value.(timerStr) + beforeTime := execTime[val.UniqueKey] + if beforeTime.After(nowTime) { + // log.Println("sssss") + return true + } + nextTime := getNextExecTime(beforeTime, val.SpaceTime) + execTime[val.UniqueKey] = nextTime + + p.ZAdd(ctx, c.zsetKey, &redis.Z{ + Score: float64(nextTime.UnixMilli()), + Member: val.UniqueKey, + }) + // log.Println("ffffffffffff") + return true + }) + + // log.Println("ssssssddddd") + + // 更新缓存 + b, _ := json.Marshal(execTime) + p.Set(ctx, c.nextKey, string(b), 0) + + // log.Println("B", string(b)) + + _, err := p.Exec(ctx) + fmt.Println(err) +} + +// 递归遍历获取执行时间 +func getNextExecTime(beforeTime time.Time, spaceTime time.Duration) time.Time { + nowTime := time.Now() + if beforeTime.After(nowTime) { + return beforeTime + } + nextTime := beforeTime.Add(spaceTime) + // fmt.Println(nextTime.Format(time.RFC3339)) + // fmt.Println(nowTime.Format(time.RFC3339)) + // fmt.Println(beforeTime.Before(nowTime)) + if nextTime.Before(nowTime) { + nextTime = getNextExecTime(nextTime, spaceTime) + } + return nextTime +} + +// 获取任务 +func (c *cluster) getTask() { + // 定时去Redis获取任务 + zb := redis.ZRangeBy{ + Min: "0", + Max: fmt.Sprintf("%+v", time.Now().UnixMilli()), + } + + taskList, _ := c.redis.ZRangeByScore(c.ctx, c.zsetKey, &zb).Result() + + // 删除粉丝 + inter := []interface{}{} + for _, val := range taskList { + inter = append(inter, val) + } + c.redis.ZRem(c.ctx, c.zsetKey, inter...) + + for _, val := range taskList { + go doTask(c.ctx, c.redis, val) + } + +} + +// 执行任务 +func doTask(ctx context.Context, red *redis.Client, taskId string) { + + defer func() { + if err := recover(); err != nil { + fmt.Println("timer:定时器出错", err) + log.Println("errStack", string(debug.Stack())) + } + }() + + val, ok := clusterWorkerList.Load(taskId) + if !ok { + return + } + t := val.(timerStr) + + // 这里加一个全局锁 + lock := lockx.NewGlobalLock(ctx, red, taskId) + tB := lock.Lock() + if !tB { + return + } + defer lock.Unlock() + lock.Refresh() + + ctx = context.WithValue(ctx, extendParamKey, t.Extend) + + // 执行任务 + t.Callback(ctx) +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..3f685ba --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,97 @@ +package main + +import ( + "context" + "fmt" + "time" + + "code.yun.ink/open/timer" + "github.com/go-redis/redis/v8" +) + +func main() { + // m := make(map[string]time.Time) + // m["sss"] = time.Now() + + // b, _ := json.Marshal(m) + + // fmt.Println(string(b)) + + // mm := make(map[string]time.Time) + // json.Unmarshal(b, &mm) + + // fmt.Println(mm) + + re() + +} + +func re() { + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1" + ":" + "6379", + Password: "", // no password set + DB: 0, // use default DB + }) + if client == nil { + fmt.Println("redis init error") + return + } + + ctx := context.Background() + cl := timer.InitCluster(ctx, client) + cl.AddTimer(ctx, "test1", 1*time.Second, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text1", + }, + }) + cl.AddTimer(ctx, "test2", 1*time.Second, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text2", + }, + }) + cl.AddTimer(ctx, "test3", 1*time.Second, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text3", + }, + }) + cl.AddTimer(ctx, "test4", 1*time.Second, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text4", + }, + }) + cl.AddTimer(ctx, "test5", 1*time.Second, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text5", + }, + }) + cl.AddTimer(ctx, "test6", 1*time.Second, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text6", + }, + }) + cl.AddTimer(ctx, "test7", 1*time.Second, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text7", + }, + }) + cl.AddTimer(ctx, "test8", 1*time.Second, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text8", + }, + }) + cl.AddTimer(ctx, "test9", 1*time.Second, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text9", + }, + }) + + select {} +} + +func aa(ctx context.Context) bool { + fmt.Println(time.Now().Format(time.RFC3339)) + fmt.Println("gggggggggggggggggggggggggggg") + a, err := timer.GetExtendParams(ctx) + fmt.Printf("%+v %+v", a, err) + return true +} diff --git a/example_test.go b/example_test.go index a7a952f..dc54c3b 100644 --- a/example_test.go +++ b/example_test.go @@ -1,21 +1,47 @@ -package timer_test +package timer import ( - "context" "fmt" - "code.yun.ink/open/timer" + "testing" + + "github.com/go-redis/redis/v8" ) // 示例测试 -func exampleDemo(ctx context.Context) bool { - fmt.Println("fff") - return false +// func exampleDemo(ctx context.Context) bool { +// fmt.Println("fff") +// return false +// } + +// func ExampleB() { +// ctx := context.Background() +// timer.InitSingle(ctx) +// timer.AddToTimer(1, exampleDemo) +// // OutPut: +// } + +func TestMain(m *testing.M) { + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1" + ":" + "6379", + Password: "", // no password set + DB: 0, // use default DB + }) + if client == nil { + fmt.Println("redis init error") + return + } + Redis = client + } -func ExampleB() { - ctx := context.Background() - timer.InitSingle(ctx) - timer.AddToTimer(1, exampleDemo) - // OutPut: +func TestRedis(t *testing.T) { + fmt.Println("6666") + // t.Fail() + // t.Error("ffff") + // Redis.Set(context.Background(), "dddd", "dddd", 0) + // str, err := Redis.Get(context.Background(), "dddd").Result() + // fmt.Println("ssss", str, err) + // t.Log(str, err) + // t.Fail() } diff --git a/go.mod b/go.mod index 92b2a3b..dbbf4e2 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,12 @@ module code.yun.ink/open/timer go 1.19 -require github.com/gomodule/redigo v1.8.9 +require ( + github.com/go-redis/redis/v8 v8.11.5 + github.com/gomodule/redigo v1.8.9 +) + +require ( + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect +) diff --git a/go.sum b/go.sum index 6c8a5ca..8d83d53 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,27 @@ +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/lockx/lockx.go b/lockx/lockx.go new file mode 100644 index 0000000..5c38ab4 --- /dev/null +++ b/lockx/lockx.go @@ -0,0 +1,96 @@ +package lockx + +import ( + "context" + "fmt" + "time" + + "github.com/go-redis/redis/v8" +) + +// 全局锁 +type globalLock struct { + redis *redis.Client + ctx context.Context + uniqueKey string + value string +} + +func NewGlobalLock(ctx context.Context, red *redis.Client, uniqueKey string) *globalLock { + return &globalLock{ + redis: red, + ctx: ctx, + uniqueKey: uniqueKey, + value: fmt.Sprintf("%d", time.Now().UnixNano()), + } +} + +// 获取锁 +func (g *globalLock) Lock() bool { + + script := ` + return redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) + ` + + resp, _ := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 10).Result() + + return resp == "OK" +} + +// 尝试获取锁 +func (g *globalLock) Try(limitTimes int) bool { + for i := 0; i < limitTimes; i++ { + if g.Lock() { + return true + } + time.Sleep(time.Millisecond * 100) + } + return false +} + +// 删除锁 +func (g *globalLock) Unlock() bool { + + script := ` + local token = redis.call('get',KEYS[1]) + if token == ARGV[1] + then + return redis.call('del',KEYS[1]) + end + return 'ERROR' + ` + + resp, _ := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result() + return resp == "OK" +} + +// 刷新锁 +func (g *globalLock) Refresh() { + go func() { + ctx = context.WithTimeout(g.ctx, time.Second*30) + t := time.NewTicker(time.Second) + for { + select { + case <-t.C: + g.refresh() + case <-ctx.Done(): + t.Stop() + return + } + } + }() +} + +func (g *globalLock) refresh() bool { + script := ` + local token = redis.call('get',KEYS[1]) + if token == ARGV[1] + then + return redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) + end + return 'ERROR' + ` + + resp, _ := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() + return resp == "OK" +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..8a25d15 --- /dev/null +++ b/readme.md @@ -0,0 +1,14 @@ +开发目标 + +1. 支持单机定时 +2. 支持集群定时 +3. 支持间隔定时 +4. 支持固定时间 +5. 支持全局唯一 + + +设计思想 +1. 不再单独区分单机还是集群,统一按集群处理,单机只是集群里面只有一个节点 +2. 计算和执行分离,计算只负责计算,执行只负责执行,计算和执行之间通过消息队列进行通信 + + diff --git a/rreadme.md b/rreadme.md deleted file mode 100644 index 587b6ee..0000000 --- a/rreadme.md +++ /dev/null @@ -1,8 +0,0 @@ -开发目标 - -1. 支持单机定时 -2. 支持集群定时 -3. 支持间隔定时 -4. 支持固定时间 -5. 支持全局唯一 - diff --git a/timer.go b/single.go similarity index 55% rename from timer.go rename to single.go index 5a90f20..891c495 100644 --- a/timer.go +++ b/single.go @@ -15,25 +15,13 @@ import ( // 定时器 // 原理:每毫秒的时间触发 -type timerStr struct { - Callback callback // 需要回调的方法 - CanRunning chan (struct{}) - BeginTime time.Time // 初始化任务的时间 - NextTime time.Time // 下一次执行的时间 - SpaceTime time.Duration // 间隔时间 - // Params []int - // UniqueLimitFunc UniqueLimitFunc - UniqueKey string - TimerType string // 普通类型(default) + 全局唯一(unique) - Extend ExtendParams // 附加参数 -} - -var timerMap = make(map[int]*timerStr) +// uuid -> timerStr +var timerMap = make(map[string]*timerStr) var timerMapMux sync.Mutex + var timerCount int // 当前定时数目 var onceLimit sync.Once // 实现单例 var nextTime = time.Now() // 下一次执行的时间 -var timerUnique UniqueLimitFunc type ContextValueKey string // 定义context 传递的Key类型 @@ -41,23 +29,9 @@ const ( extendParamKey ContextValueKey = "extend_param" ) -// 外部唯一限制接口 -type UniqueLimitFunc interface { - SetLimit(key, value string) error - DeleteLimit(key, value string) error - RefreshLimit(key, value string) error -} - -// 扩展参数 -type ExtendParams struct { - UniqueKey string // 唯一键,如果填写了就会全局唯一 - Params map[string]interface{} // 带出去的参数 -} - // 定时器类 -func InitTimer(ctx context.Context, uni UniqueLimitFunc) { +func InitSingle(ctx context.Context) { onceLimit.Do(func() { - timerUnique = uni timer := time.NewTicker(1 * time.Millisecond) go func(ctx context.Context) { Loop: @@ -86,20 +60,8 @@ func AddTimer(space time.Duration, call callback, extend ExtendParams) (int, err timerMapMux.Lock() defer timerMapMux.Unlock() - timerType := "default" - if extend.UniqueKey != "" { - // 判断唯一限制 - if timerUnique == nil { - return 0, errors.New("唯一限制查询不到") - } - - // uniqueKey只可以添加一次 - for _, val := range timerMap { - if val.UniqueKey == extend.UniqueKey { - return 0, errors.New("uniqueKey重复") - } - } - timerType = "unique" + if space != space.Abs() { + return 0, errors.New("space must be positive") } timerCount += 1 @@ -112,12 +74,11 @@ func AddTimer(space time.Duration, call callback, extend ExtendParams) (int, err NextTime: nowTime, // nowTime.Add(space), // 添加任务的时候就执行一次 SpaceTime: space, CanRunning: make(chan struct{}, 1), - TimerType: timerType, - UniqueKey: extend.UniqueKey, + UniqueKey: "", Extend: extend, } - timerMap[timerCount] = &t + timerMap[fmt.Sprintf("%d", timerCount)] = &t if t.NextTime.Before(nextTime) { // 本条规则下次需要发送的时间小于系统下次发送时间:替换 @@ -134,16 +95,7 @@ func AddToTimer(space time.Duration, call callback) int { return count } -// 添加互斥执行的任务 -func AddUniqueTimer(space time.Duration, call callback, uniqueKey string) (int, error) { - extend := ExtendParams{ - UniqueKey: uniqueKey, - } - count, err := AddTimer(space, call, extend) - return count, err -} - -func DelToTimer(index int) { +func DelToTimer(index string) { timerMapMux.Lock() defer timerMapMux.Unlock() delete(timerMap, index) @@ -189,7 +141,6 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) { go func(ctx context.Context, v *timerStr) { select { case v.CanRunning <- struct{}{}: - // TODO: 需要考虑分布式锁 defer func() { // fmt.Printf("timer: 执行完成 %v %v \n", k, v.Tag) select { @@ -200,7 +151,7 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) { } }() // fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag) - timerAction(ctx, v.Callback, v.TimerType, v.UniqueKey, v.Extend) + timerAction(ctx, v.Callback, v.UniqueKey, v.Extend) default: // fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag) return @@ -228,7 +179,7 @@ type callback func(ctx context.Context) bool // 定时器操作类 // 这里不应painc -func timerAction(ctx context.Context, call callback, timerType string, uniqueKey string, extend ExtendParams) bool { +func timerAction(ctx context.Context, call callback, uniqueKey string, extend ExtendParams) bool { defer func() { if err := recover(); err != nil { fmt.Println("timer:定时器出错", err) @@ -238,43 +189,6 @@ func timerAction(ctx context.Context, call callback, timerType string, uniqueKey ctx, cancel := context.WithCancel(ctx) defer cancel() - // 唯一的需要设置唯一键&需要刷新唯一键 - if timerType == "unique" { - NowUnixNano := time.Now().UnixNano() - redisValue := fmt.Sprintf("%v", NowUnixNano) - err := timerUnique.SetLimit(uniqueKey, redisValue) - if err != nil { - fmt.Println("unique跳过") - return false - } - fmt.Println("unique开始执行") - - go func() { - // 5秒刷新一次 - timer := time.NewTicker(time.Second * 1) - Loop2: - for { - select { - case t := <-timer.C: - // 更新 - err = timerUnique.RefreshLimit(uniqueKey, redisValue) - if err != nil { - fmt.Printf("unique更新:%+v %+v\n", err, t.Unix()) - } - case <-ctx.Done(): - // 取消 - err = timerUnique.DeleteLimit(uniqueKey, redisValue) - if err != nil { - fmt.Printf("unique删除:%+v\n", err) - } - break Loop2 - } - } - timer.Stop() - fmt.Println("unique执行结束") - }() - } - // 附加数据 ctx = context.WithValue(ctx, extendParamKey, extend) @@ -282,11 +196,11 @@ func timerAction(ctx context.Context, call callback, timerType string, uniqueKey } // 快捷方法 -func GetExtendParams(ctx context.Context) (*ExtendParams,error) { +func GetExtendParams(ctx context.Context) (*ExtendParams, error) { val := ctx.Value(extendParamKey) - params,ok := val.(ExtendParams) + params, ok := val.(ExtendParams) if !ok { - return nil,errors.New("没找到参数") + return nil, errors.New("没找到参数") } - return ¶ms,nil + return ¶ms, nil } diff --git a/types.go b/types.go new file mode 100644 index 0000000..2941402 --- /dev/null +++ b/types.go @@ -0,0 +1,18 @@ +package timer + +import "time" + +type timerStr struct { + Callback callback // 需要回调的方法 + CanRunning chan (struct{}) // 是否允许执行 + BeginTime time.Time // 初始化任务的时间 + NextTime time.Time // [删]下一次执行的时间 + SpaceTime time.Duration // 任务间隔时间 + UniqueKey string // 全局唯一键 + Extend ExtendParams // 附加参数 +} + +// 扩展参数 +type ExtendParams struct { + Params map[string]interface{} // 带出去的参数 +}