From bb5b01071a53b2109b6bd0c85221042d271c068b Mon Sep 17 00:00:00 2001 From: Yun Date: Sat, 23 Aug 2025 18:55:59 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E6=8E=89=E5=8D=95=E4=BE=8B=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=EF=BC=8C=E6=94=B9=E4=B8=BA=E6=AF=8F=E6=AC=A1=E5=88=9D?= =?UTF-8?q?=E5=A7=8B=E5=8C=96=E5=9D=87=E7=94=9F=E6=88=90=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E6=96=B0=E7=9A=84=E5=AE=9E=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 108 ++++++++++++++++++++++++++-------------------------- cmd/main.go | 21 ++++++---- once.go | 31 ++++++++------- single.go | 56 +++++++++++++-------------- 4 files changed, 110 insertions(+), 106 deletions(-) diff --git a/cluster.go b/cluster.go index b49cd95..336f675 100644 --- a/cluster.go +++ b/cluster.go @@ -25,7 +25,7 @@ import ( // 暂不支持删除定时器,因为这个定时器的设计是基于全局的,如果删除了,那么其他服务就不知道了 // 单例模式 -var clusterOnceLimit sync.Once +// var clusterOnceLimit sync.Once // 已注册的任务列表 var clusterWorkerList sync.Map @@ -48,69 +48,69 @@ type Cluster struct { priorityKey string // 全局优先级的key } -var clu *Cluster = nil +// var clu *Cluster = nil // 初始化定时器 // 全局只需要初始化一次 func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) *Cluster { - clusterOnceLimit.Do(func() { - op := newOptions(opts...) + // clusterOnceLimit.Do(func() { + op := newOptions(opts...) - clu = &Cluster{ - ctx: ctx, - redis: red, - cache: cachex.NewCache(), - timeout: op.timeout, - logger: op.logger, - keyPrefix: keyPrefix, - location: op.location, - priority: op.priority, - lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 - zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 - listKey: "timer:cluster_listKey" + keyPrefix, // 列表 - setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 - priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key + clu := &Cluster{ + ctx: ctx, + redis: red, + cache: cachex.NewCache(), + timeout: op.timeout, + logger: op.logger, + keyPrefix: keyPrefix, + location: op.location, + priority: op.priority, + lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁 + zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合 + listKey: "timer:cluster_listKey" + keyPrefix, // 列表 + setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 + priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key + } + + // 设置锁的超时时间 + lockx.InitOption(lockx.SetTimeout(op.timeout)) + + // 监听任务 + go clu.watch() + + priorityTime := time.NewTicker(time.Second * 10) + go func(ctx context.Context) { + clu.setPriority() + Loop: + for { + select { + case <-priorityTime.C: + clu.setPriority() + case <-ctx.Done(): + break Loop + } } + }(ctx) - // 设置锁的超时时间 - lockx.InitOption(lockx.SetTimeout(op.timeout)) + timer := time.NewTicker(time.Millisecond * 200) - // 监听任务 - go clu.watch() - - priorityTime := time.NewTicker(time.Second * 10) - go func(ctx context.Context) { - clu.setPriority() - Loop: - for { - select { - case <-priorityTime.C: - clu.setPriority() - case <-ctx.Done(): - break Loop + go func(ctx context.Context) { + Loop: + for { + select { + case <-timer.C: + if !clu.canRun() { + continue } + clu.getTask() + clu.getNextTime() + case <-ctx.Done(): + break Loop } - }(ctx) - - timer := time.NewTicker(time.Millisecond * 200) - - go func(ctx context.Context) { - Loop: - for { - select { - case <-timer.C: - if !clu.canRun() { - continue - } - clu.getTask() - clu.getNextTime() - case <-ctx.Done(): - break Loop - } - } - }(ctx) - }) + } + }(ctx) + // }) return clu } @@ -181,7 +181,7 @@ func (l *Cluster) setPriority() bool { ` priority := fmt.Sprintf("%d", l.priority) - expireTime := (time.Second*30).Seconds() // 设置过期时间为1分钟 + expireTime := (time.Second * 30).Seconds() // 设置过期时间为1分钟 res, err := l.redis.Eval(l.ctx, script, []string{l.priorityKey}, priority, expireTime).Result() if err != nil { l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error()) diff --git a/cmd/main.go b/cmd/main.go index 53e219a..f4c6796 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -24,8 +24,8 @@ func main() { // re() // d() - cluster() - // once() + // cluster() + once() select {} @@ -46,11 +46,12 @@ func once() { if err != nil { fmt.Println(err) } - d = OnceData{ - Num: 4, - } + // d = OnceData{ + // Num: 4, + // } + dd := 123 // dy, _ = json.Marshal(d) - err = one.Create("test", "test4", 1*time.Second, d) + err = one.Save("test", "test4", 1*time.Second, dd) if err != nil { fmt.Println(err) } @@ -67,7 +68,11 @@ func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println(taskType, taskId) - fmt.Printf("原来的参数:%+v\n", attachData) + fmt.Printf("原来的参数:%+v %T\n", attachData, attachData) + + v, ok := attachData.(int64) + fmt.Println("vvvvvvv", v, ok) + // fmt.Printf() // d := OnceData{} @@ -89,7 +94,7 @@ func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, func cluster() { client := getRedis() ctx := context.Background() - cluster := timerx.InitCluster(ctx, client, "test",timerx.SetPriority(101)) + cluster := timerx.InitCluster(ctx, client, "test", timerx.SetPriority(101)) err := cluster.EverySpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务") fmt.Println(err) err = cluster.EveryMinute(ctx, "test_min", 15, aa, "这是分钟任务") diff --git a/once.go b/once.go index 5295e78..caae376 100644 --- a/once.go +++ b/once.go @@ -6,7 +6,6 @@ import ( "fmt" "runtime/debug" "strings" - "sync" "time" "github.com/go-redis/redis/v8" @@ -47,8 +46,8 @@ type Callback interface { Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp } -var wo *Once = nil -var once sync.Once +// var wo *Once = nil +// var once sync.Once type extendData struct { Delay time.Duration @@ -62,19 +61,19 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c } op := newOptions(opts...) - once.Do(func() { - wo = &Once{ - ctx: ctx, - logger: op.logger, - zsetKey: "timer:once_zsetkey" + keyPrefix, - listKey: "timer:once_listkey" + keyPrefix, - redis: re, - worker: call, - keyPrefix: keyPrefix, - } - go wo.getTask() - go wo.watch() - }) + // once.Do(func() { + wo := &Once{ + ctx: ctx, + logger: op.logger, + zsetKey: "timer:once_zsetkey" + keyPrefix, + listKey: "timer:once_listkey" + keyPrefix, + redis: re, + worker: call, + keyPrefix: keyPrefix, + } + go wo.getTask() + go wo.watch() + // }) return wo } diff --git a/single.go b/single.go index 5975108..2496305 100644 --- a/single.go +++ b/single.go @@ -22,7 +22,7 @@ var singleWorkerList sync.Map var singleTimerIndex int // 当前定时数目 -var singleOnceLimit sync.Once // 实现单例 +// var singleOnceLimit sync.Once // 实现单例 type Single struct { ctx context.Context @@ -30,7 +30,7 @@ type Single struct { location *time.Location } -var sin *Single = nil +// var sin *Single = nil var singleNextTime = time.Now() // 下一次执行的时间 @@ -38,36 +38,36 @@ var singleNextTime = time.Now() // 下一次执行的时间 // @param ctx context.Context 上下文 // @param opts ...Option 配置项 func InitSingle(ctx context.Context, opts ...Option) *Single { - singleOnceLimit.Do(func() { - op := newOptions(opts...) + // singleOnceLimit.Do(func() { + op := newOptions(opts...) - sin = &Single{ - ctx: ctx, - logger: op.logger, - location: op.location, - } + sin := &Single{ + ctx: ctx, + logger: op.logger, + location: op.location, + } - timer := time.NewTicker(time.Millisecond * 200) - go func(ctx context.Context) { - Loop: - for { - select { - case t := <-timer.C: - if t.Before(singleNextTime) { - // 当前时间小于下次发送时间:跳过 - continue - } - // 迭代定时器 - sin.iterator(ctx) - // fmt.Println("timer: 执行") - case <-ctx.Done(): - // 跳出循环 - break Loop + timer := time.NewTicker(time.Millisecond * 200) + go func(ctx context.Context) { + Loop: + for { + select { + case t := <-timer.C: + if t.Before(singleNextTime) { + // 当前时间小于下次发送时间:跳过 + continue } + // 迭代定时器 + sin.iterator(ctx) + // fmt.Println("timer: 执行") + case <-ctx.Done(): + // 跳出循环 + break Loop } - sin.logger.Infof(ctx, "timer: initend") - }(ctx) - }) + } + sin.logger.Infof(ctx, "timer: initend") + }(ctx) + // }) return sin }