create cluster

This commit is contained in:
Yun
2023-08-27 23:39:58 +08:00
parent 60e0478a62
commit 3ccce0bba2
11 changed files with 571 additions and 121 deletions
+22
View File
@@ -0,0 +1,22 @@
{
// 使用 IntelliSense 了解相关属性。
// 悬停以查看现有属性的描述。
// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch file",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${file}"
},
{
"name": "Attach to Process",
"type": "go",
"request": "attach",
"mode": "local",
"processId": 0
}
]
}
+248
View File
@@ -0,0 +1,248 @@
package timer
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
"time"
"code.yun.ink/open/timer/lockx"
"github.com/go-redis/redis/v8"
)
// 单例模式
var clusterOnceLimit sync.Once
// 已注册的任务列表
var clusterWorkerList sync.Map
type cluster struct {
ctx context.Context
redis *redis.Client
lockKey string // 全局计算的key
nextKey string // 下一次执行的key
zsetKey string // 有序集合的key
}
var clu *cluster = nil
func InitCluster(ctx context.Context, red *redis.Client) *cluster {
clusterOnceLimit.Do(func() {
clu = &cluster{
ctx: ctx,
redis: red,
lockKey: "timer:globalLockKey",
nextKey: "timer:NextKey",
zsetKey: "timer:zsetKey",
}
timer := time.NewTicker(time.Millisecond*100)
go func(ctx context.Context, red *redis.Client) {
Loop:
for {
select {
case <-timer.C:
go clu.computeTime()
go clu.getTask()
case <-ctx.Done():
break Loop
}
}
}(ctx, red)
})
return clu
}
func (c *cluster) AddTimer(ctx context.Context, uniqueKey string, spaceTime time.Duration, callback callback, extend ExtendParams) error {
_, ok := clusterWorkerList.Load(uniqueKey)
if ok {
return errors.New("key已存在")
}
if spaceTime != spaceTime.Abs() {
return errors.New("时间间隔不能为负数")
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
lock := lockx.NewGlobalLock(ctx, c.redis, uniqueKey)
tB := lock.Try(10)
if !tB {
return errors.New("添加失败")
}
defer lock.Unlock()
lock.Refresh()
nowTime := time.Now()
t := timerStr{
BeginTime: nowTime,
NextTime: nowTime,
SpaceTime: spaceTime,
Callback: callback,
Extend: extend,
UniqueKey: uniqueKey,
}
clusterWorkerList.Store(uniqueKey, t)
cacheStr, _ := c.redis.Get(ctx, c.nextKey).Result()
execTime := make(map[string]time.Time)
json.Unmarshal([]byte(cacheStr), &execTime)
p := c.redis.Pipeline()
p.ZAdd(ctx, c.zsetKey, &redis.Z{
Score: float64(nextTime.UnixMilli()),
Member: uniqueKey,
})
execTime[uniqueKey] = nextTime
n, _ := json.Marshal(execTime)
fmt.Println("execTime:", execTime, string(n))
p.Set(ctx, c.nextKey, string(n), 0)
_, err := p.Exec(ctx)
fmt.Println("添加", err)
return err
}
// 计算下一次执行的时间
func (c *cluster) computeTime() {
log.Println("begin computer")
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
lock := lockx.NewGlobalLock(ctx, c.redis, c.lockKey)
// 获取锁
lockBool := lock.Lock()
if !lockBool {
log.Println("timer:获取锁失败")
return
}
defer lock.Unlock()
// 更新锁
lock.Refresh()
// 计算下一次时间
// 读取执行的缓存
cacheStr, _ := c.redis.Get(ctx, c.nextKey).Result()
execTime := make(map[string]time.Time)
json.Unmarshal([]byte(cacheStr), &execTime)
// log.Println("cacheStr:", cacheStr, execTime)
// return
p := c.redis.Pipeline()
nowTime := time.Now()
clusterWorkerList.Range(func(key, value interface{}) bool {
// log.Println("range:", key, value)
val := value.(timerStr)
beforeTime := execTime[val.UniqueKey]
if beforeTime.After(nowTime) {
// log.Println("sssss")
return true
}
nextTime := getNextExecTime(beforeTime, val.SpaceTime)
execTime[val.UniqueKey] = nextTime
p.ZAdd(ctx, c.zsetKey, &redis.Z{
Score: float64(nextTime.UnixMilli()),
Member: val.UniqueKey,
})
// log.Println("ffffffffffff")
return true
})
// log.Println("ssssssddddd")
// 更新缓存
b, _ := json.Marshal(execTime)
p.Set(ctx, c.nextKey, string(b), 0)
// log.Println("B", string(b))
_, err := p.Exec(ctx)
fmt.Println(err)
}
// 递归遍历获取执行时间
func getNextExecTime(beforeTime time.Time, spaceTime time.Duration) time.Time {
nowTime := time.Now()
if beforeTime.After(nowTime) {
return beforeTime
}
nextTime := beforeTime.Add(spaceTime)
// fmt.Println(nextTime.Format(time.RFC3339))
// fmt.Println(nowTime.Format(time.RFC3339))
// fmt.Println(beforeTime.Before(nowTime))
if nextTime.Before(nowTime) {
nextTime = getNextExecTime(nextTime, spaceTime)
}
return nextTime
}
// 获取任务
func (c *cluster) getTask() {
// 定时去Redis获取任务
zb := redis.ZRangeBy{
Min: "0",
Max: fmt.Sprintf("%+v", time.Now().UnixMilli()),
}
taskList, _ := c.redis.ZRangeByScore(c.ctx, c.zsetKey, &zb).Result()
// 删除粉丝
inter := []interface{}{}
for _, val := range taskList {
inter = append(inter, val)
}
c.redis.ZRem(c.ctx, c.zsetKey, inter...)
for _, val := range taskList {
go doTask(c.ctx, c.redis, val)
}
}
// 执行任务
func doTask(ctx context.Context, red *redis.Client, taskId string) {
defer func() {
if err := recover(); err != nil {
fmt.Println("timer:定时器出错", err)
log.Println("errStack", string(debug.Stack()))
}
}()
val, ok := clusterWorkerList.Load(taskId)
if !ok {
return
}
t := val.(timerStr)
// 这里加一个全局锁
lock := lockx.NewGlobalLock(ctx, red, taskId)
tB := lock.Lock()
if !tB {
return
}
defer lock.Unlock()
lock.Refresh()
ctx = context.WithValue(ctx, extendParamKey, t.Extend)
// 执行任务
t.Callback(ctx)
}
+97
View File
@@ -0,0 +1,97 @@
package main
import (
"context"
"fmt"
"time"
"code.yun.ink/open/timer"
"github.com/go-redis/redis/v8"
)
func main() {
// m := make(map[string]time.Time)
// m["sss"] = time.Now()
// b, _ := json.Marshal(m)
// fmt.Println(string(b))
// mm := make(map[string]time.Time)
// json.Unmarshal(b, &mm)
// fmt.Println(mm)
re()
}
func re() {
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1" + ":" + "6379",
Password: "", // no password set
DB: 0, // use default DB
})
if client == nil {
fmt.Println("redis init error")
return
}
ctx := context.Background()
cl := timer.InitCluster(ctx, client)
cl.AddTimer(ctx, "test1", 1*time.Second, aa, timer.ExtendParams{
Params: map[string]interface{}{
"test": "text1",
},
})
cl.AddTimer(ctx, "test2", 1*time.Second, aa, timer.ExtendParams{
Params: map[string]interface{}{
"test": "text2",
},
})
cl.AddTimer(ctx, "test3", 1*time.Second, aa, timer.ExtendParams{
Params: map[string]interface{}{
"test": "text3",
},
})
cl.AddTimer(ctx, "test4", 1*time.Second, aa, timer.ExtendParams{
Params: map[string]interface{}{
"test": "text4",
},
})
cl.AddTimer(ctx, "test5", 1*time.Second, aa, timer.ExtendParams{
Params: map[string]interface{}{
"test": "text5",
},
})
cl.AddTimer(ctx, "test6", 1*time.Second, aa, timer.ExtendParams{
Params: map[string]interface{}{
"test": "text6",
},
})
cl.AddTimer(ctx, "test7", 1*time.Second, aa, timer.ExtendParams{
Params: map[string]interface{}{
"test": "text7",
},
})
cl.AddTimer(ctx, "test8", 1*time.Second, aa, timer.ExtendParams{
Params: map[string]interface{}{
"test": "text8",
},
})
cl.AddTimer(ctx, "test9", 1*time.Second, aa, timer.ExtendParams{
Params: map[string]interface{}{
"test": "text9",
},
})
select {}
}
func aa(ctx context.Context) bool {
fmt.Println(time.Now().Format(time.RFC3339))
fmt.Println("gggggggggggggggggggggggggggg")
a, err := timer.GetExtendParams(ctx)
fmt.Printf("%+v %+v", a, err)
return true
}
+37 -11
View File
@@ -1,21 +1,47 @@
package timer_test package timer
import ( import (
"context"
"fmt" "fmt"
"code.yun.ink/open/timer" "testing"
"github.com/go-redis/redis/v8"
) )
// 示例测试 // 示例测试
func exampleDemo(ctx context.Context) bool { // func exampleDemo(ctx context.Context) bool {
fmt.Println("fff") // fmt.Println("fff")
return false // return false
// }
// func ExampleB() {
// ctx := context.Background()
// timer.InitSingle(ctx)
// timer.AddToTimer(1, exampleDemo)
// // OutPut:
// }
func TestMain(m *testing.M) {
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1" + ":" + "6379",
Password: "", // no password set
DB: 0, // use default DB
})
if client == nil {
fmt.Println("redis init error")
return
}
Redis = client
} }
func ExampleB() { func TestRedis(t *testing.T) {
ctx := context.Background() fmt.Println("6666")
timer.InitSingle(ctx) // t.Fail()
timer.AddToTimer(1, exampleDemo) // t.Error("ffff")
// OutPut: // Redis.Set(context.Background(), "dddd", "dddd", 0)
// str, err := Redis.Get(context.Background(), "dddd").Result()
// fmt.Println("ssss", str, err)
// t.Log(str, err)
// t.Fail()
} }
+9 -1
View File
@@ -2,4 +2,12 @@ module code.yun.ink/open/timer
go 1.19 go 1.19
require github.com/gomodule/redigo v1.8.9 require (
github.com/go-redis/redis/v8 v8.11.5
github.com/gomodule/redigo v1.8.9
)
require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)
+15
View File
@@ -1,12 +1,27 @@
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+96
View File
@@ -0,0 +1,96 @@
package lockx
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v8"
)
// 全局锁
type globalLock struct {
redis *redis.Client
ctx context.Context
uniqueKey string
value string
}
func NewGlobalLock(ctx context.Context, red *redis.Client, uniqueKey string) *globalLock {
return &globalLock{
redis: red,
ctx: ctx,
uniqueKey: uniqueKey,
value: fmt.Sprintf("%d", time.Now().UnixNano()),
}
}
// 获取锁
func (g *globalLock) Lock() bool {
script := `
return redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2])
`
resp, _ := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 10).Result()
return resp == "OK"
}
// 尝试获取锁
func (g *globalLock) Try(limitTimes int) bool {
for i := 0; i < limitTimes; i++ {
if g.Lock() {
return true
}
time.Sleep(time.Millisecond * 100)
}
return false
}
// 删除锁
func (g *globalLock) Unlock() bool {
script := `
local token = redis.call('get',KEYS[1])
if token == ARGV[1]
then
return redis.call('del',KEYS[1])
end
return 'ERROR'
`
resp, _ := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result()
return resp == "OK"
}
// 刷新锁
func (g *globalLock) Refresh() {
go func() {
ctx = context.WithTimeout(g.ctx, time.Second*30)
t := time.NewTicker(time.Second)
for {
select {
case <-t.C:
g.refresh()
case <-ctx.Done():
t.Stop()
return
}
}
}()
}
func (g *globalLock) refresh() bool {
script := `
local token = redis.call('get',KEYS[1])
if token == ARGV[1]
then
return redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2])
end
return 'ERROR'
`
resp, _ := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result()
return resp == "OK"
}
+14
View File
@@ -0,0 +1,14 @@
开发目标
1. 支持单机定时
2. 支持集群定时
3. 支持间隔定时
4. 支持固定时间
5. 支持全局唯一
设计思想
1. 不再单独区分单机还是集群,统一按集群处理,单机只是集群里面只有一个节点
2. 计算和执行分离,计算只负责计算,执行只负责执行,计算和执行之间通过消息队列进行通信
-8
View File
@@ -1,8 +0,0 @@
开发目标
1. 支持单机定时
2. 支持集群定时
3. 支持间隔定时
4. 支持固定时间
5. 支持全局唯一
+11 -97
View File
@@ -15,25 +15,13 @@ import (
// 定时器 // 定时器
// 原理:每毫秒的时间触发 // 原理:每毫秒的时间触发
type timerStr struct { // uuid -> timerStr
Callback callback // 需要回调的方法 var timerMap = make(map[string]*timerStr)
CanRunning chan (struct{})
BeginTime time.Time // 初始化任务的时间
NextTime time.Time // 下一次执行的时间
SpaceTime time.Duration // 间隔时间
// Params []int
// UniqueLimitFunc UniqueLimitFunc
UniqueKey string
TimerType string // 普通类型(default) + 全局唯一(unique)
Extend ExtendParams // 附加参数
}
var timerMap = make(map[int]*timerStr)
var timerMapMux sync.Mutex var timerMapMux sync.Mutex
var timerCount int // 当前定时数目 var timerCount int // 当前定时数目
var onceLimit sync.Once // 实现单例 var onceLimit sync.Once // 实现单例
var nextTime = time.Now() // 下一次执行的时间 var nextTime = time.Now() // 下一次执行的时间
var timerUnique UniqueLimitFunc
type ContextValueKey string // 定义context 传递的Key类型 type ContextValueKey string // 定义context 传递的Key类型
@@ -41,23 +29,9 @@ const (
extendParamKey ContextValueKey = "extend_param" extendParamKey ContextValueKey = "extend_param"
) )
// 外部唯一限制接口
type UniqueLimitFunc interface {
SetLimit(key, value string) error
DeleteLimit(key, value string) error
RefreshLimit(key, value string) error
}
// 扩展参数
type ExtendParams struct {
UniqueKey string // 唯一键,如果填写了就会全局唯一
Params map[string]interface{} // 带出去的参数
}
// 定时器类 // 定时器类
func InitTimer(ctx context.Context, uni UniqueLimitFunc) { func InitSingle(ctx context.Context) {
onceLimit.Do(func() { onceLimit.Do(func() {
timerUnique = uni
timer := time.NewTicker(1 * time.Millisecond) timer := time.NewTicker(1 * time.Millisecond)
go func(ctx context.Context) { go func(ctx context.Context) {
Loop: Loop:
@@ -86,20 +60,8 @@ func AddTimer(space time.Duration, call callback, extend ExtendParams) (int, err
timerMapMux.Lock() timerMapMux.Lock()
defer timerMapMux.Unlock() defer timerMapMux.Unlock()
timerType := "default" if space != space.Abs() {
if extend.UniqueKey != "" { return 0, errors.New("space must be positive")
// 判断唯一限制
if timerUnique == nil {
return 0, errors.New("唯一限制查询不到")
}
// uniqueKey只可以添加一次
for _, val := range timerMap {
if val.UniqueKey == extend.UniqueKey {
return 0, errors.New("uniqueKey重复")
}
}
timerType = "unique"
} }
timerCount += 1 timerCount += 1
@@ -112,12 +74,11 @@ func AddTimer(space time.Duration, call callback, extend ExtendParams) (int, err
NextTime: nowTime, // nowTime.Add(space), // 添加任务的时候就执行一次 NextTime: nowTime, // nowTime.Add(space), // 添加任务的时候就执行一次
SpaceTime: space, SpaceTime: space,
CanRunning: make(chan struct{}, 1), CanRunning: make(chan struct{}, 1),
TimerType: timerType, UniqueKey: "",
UniqueKey: extend.UniqueKey,
Extend: extend, Extend: extend,
} }
timerMap[timerCount] = &t timerMap[fmt.Sprintf("%d", timerCount)] = &t
if t.NextTime.Before(nextTime) { if t.NextTime.Before(nextTime) {
// 本条规则下次需要发送的时间小于系统下次发送时间:替换 // 本条规则下次需要发送的时间小于系统下次发送时间:替换
@@ -134,16 +95,7 @@ func AddToTimer(space time.Duration, call callback) int {
return count return count
} }
// 添加互斥执行的任务 func DelToTimer(index string) {
func AddUniqueTimer(space time.Duration, call callback, uniqueKey string) (int, error) {
extend := ExtendParams{
UniqueKey: uniqueKey,
}
count, err := AddTimer(space, call, extend)
return count, err
}
func DelToTimer(index int) {
timerMapMux.Lock() timerMapMux.Lock()
defer timerMapMux.Unlock() defer timerMapMux.Unlock()
delete(timerMap, index) delete(timerMap, index)
@@ -189,7 +141,6 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) {
go func(ctx context.Context, v *timerStr) { go func(ctx context.Context, v *timerStr) {
select { select {
case v.CanRunning <- struct{}{}: case v.CanRunning <- struct{}{}:
// TODO: 需要考虑分布式锁
defer func() { defer func() {
// fmt.Printf("timer: 执行完成 %v %v \n", k, v.Tag) // fmt.Printf("timer: 执行完成 %v %v \n", k, v.Tag)
select { select {
@@ -200,7 +151,7 @@ func iteratorTimer(ctx context.Context, nowTime time.Time) {
} }
}() }()
// fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag) // fmt.Printf("timer: 准备执行 %v %v \n", k, v.Tag)
timerAction(ctx, v.Callback, v.TimerType, v.UniqueKey, v.Extend) timerAction(ctx, v.Callback, v.UniqueKey, v.Extend)
default: default:
// fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag) // fmt.Printf("timer: 已在执行 %v %v \n", k, v.Tag)
return return
@@ -228,7 +179,7 @@ type callback func(ctx context.Context) bool
// 定时器操作类 // 定时器操作类
// 这里不应painc // 这里不应painc
func timerAction(ctx context.Context, call callback, timerType string, uniqueKey string, extend ExtendParams) bool { func timerAction(ctx context.Context, call callback, uniqueKey string, extend ExtendParams) bool {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
fmt.Println("timer:定时器出错", err) fmt.Println("timer:定时器出错", err)
@@ -238,43 +189,6 @@ func timerAction(ctx context.Context, call callback, timerType string, uniqueKey
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
// 唯一的需要设置唯一键&需要刷新唯一键
if timerType == "unique" {
NowUnixNano := time.Now().UnixNano()
redisValue := fmt.Sprintf("%v", NowUnixNano)
err := timerUnique.SetLimit(uniqueKey, redisValue)
if err != nil {
fmt.Println("unique跳过")
return false
}
fmt.Println("unique开始执行")
go func() {
// 5秒刷新一次
timer := time.NewTicker(time.Second * 1)
Loop2:
for {
select {
case t := <-timer.C:
// 更新
err = timerUnique.RefreshLimit(uniqueKey, redisValue)
if err != nil {
fmt.Printf("unique更新:%+v %+v\n", err, t.Unix())
}
case <-ctx.Done():
// 取消
err = timerUnique.DeleteLimit(uniqueKey, redisValue)
if err != nil {
fmt.Printf("unique删除:%+v\n", err)
}
break Loop2
}
}
timer.Stop()
fmt.Println("unique执行结束")
}()
}
// 附加数据 // 附加数据
ctx = context.WithValue(ctx, extendParamKey, extend) ctx = context.WithValue(ctx, extendParamKey, extend)
+18
View File
@@ -0,0 +1,18 @@
package timer
import "time"
type timerStr struct {
Callback callback // 需要回调的方法
CanRunning chan (struct{}) // 是否允许执行
BeginTime time.Time // 初始化任务的时间
NextTime time.Time // [删]下一次执行的时间
SpaceTime time.Duration // 任务间隔时间
UniqueKey string // 全局唯一键
Extend ExtendParams // 附加参数
}
// 扩展参数
type ExtendParams struct {
Params map[string]interface{} // 带出去的参数
}