去掉单例模式,改为每次初始化均生成一个新的实例
This commit is contained in:
+54
-54
@@ -25,7 +25,7 @@ import (
|
|||||||
// 暂不支持删除定时器,因为这个定时器的设计是基于全局的,如果删除了,那么其他服务就不知道了
|
// 暂不支持删除定时器,因为这个定时器的设计是基于全局的,如果删除了,那么其他服务就不知道了
|
||||||
|
|
||||||
// 单例模式
|
// 单例模式
|
||||||
var clusterOnceLimit sync.Once
|
// var clusterOnceLimit sync.Once
|
||||||
|
|
||||||
// 已注册的任务列表
|
// 已注册的任务列表
|
||||||
var clusterWorkerList sync.Map
|
var clusterWorkerList sync.Map
|
||||||
@@ -48,69 +48,69 @@ type Cluster struct {
|
|||||||
priorityKey string // 全局优先级的key
|
priorityKey string // 全局优先级的key
|
||||||
}
|
}
|
||||||
|
|
||||||
var clu *Cluster = nil
|
// var clu *Cluster = nil
|
||||||
|
|
||||||
// 初始化定时器
|
// 初始化定时器
|
||||||
// 全局只需要初始化一次
|
// 全局只需要初始化一次
|
||||||
func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) *Cluster {
|
func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix string, opts ...Option) *Cluster {
|
||||||
|
|
||||||
clusterOnceLimit.Do(func() {
|
// clusterOnceLimit.Do(func() {
|
||||||
op := newOptions(opts...)
|
op := newOptions(opts...)
|
||||||
|
|
||||||
clu = &Cluster{
|
clu := &Cluster{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
redis: red,
|
redis: red,
|
||||||
cache: cachex.NewCache(),
|
cache: cachex.NewCache(),
|
||||||
timeout: op.timeout,
|
timeout: op.timeout,
|
||||||
logger: op.logger,
|
logger: op.logger,
|
||||||
keyPrefix: keyPrefix,
|
keyPrefix: keyPrefix,
|
||||||
location: op.location,
|
location: op.location,
|
||||||
priority: op.priority,
|
priority: op.priority,
|
||||||
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
|
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
|
||||||
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
|
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
|
||||||
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
|
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
|
||||||
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
|
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
|
||||||
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
|
priorityKey: "timer:cluster_priorityKey" + keyPrefix, // 全局优先级的key
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置锁的超时时间
|
||||||
|
lockx.InitOption(lockx.SetTimeout(op.timeout))
|
||||||
|
|
||||||
|
// 监听任务
|
||||||
|
go clu.watch()
|
||||||
|
|
||||||
|
priorityTime := time.NewTicker(time.Second * 10)
|
||||||
|
go func(ctx context.Context) {
|
||||||
|
clu.setPriority()
|
||||||
|
Loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-priorityTime.C:
|
||||||
|
clu.setPriority()
|
||||||
|
case <-ctx.Done():
|
||||||
|
break Loop
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}(ctx)
|
||||||
|
|
||||||
// 设置锁的超时时间
|
timer := time.NewTicker(time.Millisecond * 200)
|
||||||
lockx.InitOption(lockx.SetTimeout(op.timeout))
|
|
||||||
|
|
||||||
// 监听任务
|
go func(ctx context.Context) {
|
||||||
go clu.watch()
|
Loop:
|
||||||
|
for {
|
||||||
priorityTime := time.NewTicker(time.Second * 10)
|
select {
|
||||||
go func(ctx context.Context) {
|
case <-timer.C:
|
||||||
clu.setPriority()
|
if !clu.canRun() {
|
||||||
Loop:
|
continue
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-priorityTime.C:
|
|
||||||
clu.setPriority()
|
|
||||||
case <-ctx.Done():
|
|
||||||
break Loop
|
|
||||||
}
|
}
|
||||||
|
clu.getTask()
|
||||||
|
clu.getNextTime()
|
||||||
|
case <-ctx.Done():
|
||||||
|
break Loop
|
||||||
}
|
}
|
||||||
}(ctx)
|
}
|
||||||
|
}(ctx)
|
||||||
timer := time.NewTicker(time.Millisecond * 200)
|
// })
|
||||||
|
|
||||||
go func(ctx context.Context) {
|
|
||||||
Loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-timer.C:
|
|
||||||
if !clu.canRun() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
clu.getTask()
|
|
||||||
clu.getNextTime()
|
|
||||||
case <-ctx.Done():
|
|
||||||
break Loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(ctx)
|
|
||||||
})
|
|
||||||
return clu
|
return clu
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -181,7 +181,7 @@ func (l *Cluster) setPriority() bool {
|
|||||||
`
|
`
|
||||||
priority := fmt.Sprintf("%d", l.priority)
|
priority := fmt.Sprintf("%d", l.priority)
|
||||||
|
|
||||||
expireTime := (time.Second*30).Seconds() // 设置过期时间为1分钟
|
expireTime := (time.Second * 30).Seconds() // 设置过期时间为1分钟
|
||||||
res, err := l.redis.Eval(l.ctx, script, []string{l.priorityKey}, priority, expireTime).Result()
|
res, err := l.redis.Eval(l.ctx, script, []string{l.priorityKey}, priority, expireTime).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error())
|
l.logger.Errorf(l.ctx, "设置全局优先级失败:%s", err.Error())
|
||||||
|
|||||||
+13
-8
@@ -24,8 +24,8 @@ func main() {
|
|||||||
|
|
||||||
// re()
|
// re()
|
||||||
// d()
|
// d()
|
||||||
cluster()
|
// cluster()
|
||||||
// once()
|
once()
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
|
|
||||||
@@ -46,11 +46,12 @@ func once() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
}
|
}
|
||||||
d = OnceData{
|
// d = OnceData{
|
||||||
Num: 4,
|
// Num: 4,
|
||||||
}
|
// }
|
||||||
|
dd := 123
|
||||||
// dy, _ = json.Marshal(d)
|
// dy, _ = json.Marshal(d)
|
||||||
err = one.Create("test", "test4", 1*time.Second, d)
|
err = one.Save("test", "test4", 1*time.Second, dd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
}
|
}
|
||||||
@@ -67,7 +68,11 @@ func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string,
|
|||||||
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||||
fmt.Println(taskType, taskId)
|
fmt.Println(taskType, taskId)
|
||||||
|
|
||||||
fmt.Printf("原来的参数:%+v\n", attachData)
|
fmt.Printf("原来的参数:%+v %T\n", attachData, attachData)
|
||||||
|
|
||||||
|
v, ok := attachData.(int64)
|
||||||
|
fmt.Println("vvvvvvv", v, ok)
|
||||||
|
// fmt.Printf()
|
||||||
|
|
||||||
// d := OnceData{}
|
// d := OnceData{}
|
||||||
|
|
||||||
@@ -89,7 +94,7 @@ func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string,
|
|||||||
func cluster() {
|
func cluster() {
|
||||||
client := getRedis()
|
client := getRedis()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
cluster := timerx.InitCluster(ctx, client, "test",timerx.SetPriority(101))
|
cluster := timerx.InitCluster(ctx, client, "test", timerx.SetPriority(101))
|
||||||
err := cluster.EverySpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
|
err := cluster.EverySpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
err = cluster.EveryMinute(ctx, "test_min", 15, aa, "这是分钟任务")
|
err = cluster.EveryMinute(ctx, "test_min", 15, aa, "这是分钟任务")
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
@@ -47,8 +46,8 @@ type Callback interface {
|
|||||||
Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp
|
Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp
|
||||||
}
|
}
|
||||||
|
|
||||||
var wo *Once = nil
|
// var wo *Once = nil
|
||||||
var once sync.Once
|
// var once sync.Once
|
||||||
|
|
||||||
type extendData struct {
|
type extendData struct {
|
||||||
Delay time.Duration
|
Delay time.Duration
|
||||||
@@ -62,19 +61,19 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
op := newOptions(opts...)
|
op := newOptions(opts...)
|
||||||
once.Do(func() {
|
// once.Do(func() {
|
||||||
wo = &Once{
|
wo := &Once{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
logger: op.logger,
|
logger: op.logger,
|
||||||
zsetKey: "timer:once_zsetkey" + keyPrefix,
|
zsetKey: "timer:once_zsetkey" + keyPrefix,
|
||||||
listKey: "timer:once_listkey" + keyPrefix,
|
listKey: "timer:once_listkey" + keyPrefix,
|
||||||
redis: re,
|
redis: re,
|
||||||
worker: call,
|
worker: call,
|
||||||
keyPrefix: keyPrefix,
|
keyPrefix: keyPrefix,
|
||||||
}
|
}
|
||||||
go wo.getTask()
|
go wo.getTask()
|
||||||
go wo.watch()
|
go wo.watch()
|
||||||
})
|
// })
|
||||||
|
|
||||||
return wo
|
return wo
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ var singleWorkerList sync.Map
|
|||||||
|
|
||||||
var singleTimerIndex int // 当前定时数目
|
var singleTimerIndex int // 当前定时数目
|
||||||
|
|
||||||
var singleOnceLimit sync.Once // 实现单例
|
// var singleOnceLimit sync.Once // 实现单例
|
||||||
|
|
||||||
type Single struct {
|
type Single struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@@ -30,7 +30,7 @@ type Single struct {
|
|||||||
location *time.Location
|
location *time.Location
|
||||||
}
|
}
|
||||||
|
|
||||||
var sin *Single = nil
|
// var sin *Single = nil
|
||||||
|
|
||||||
var singleNextTime = time.Now() // 下一次执行的时间
|
var singleNextTime = time.Now() // 下一次执行的时间
|
||||||
|
|
||||||
@@ -38,36 +38,36 @@ var singleNextTime = time.Now() // 下一次执行的时间
|
|||||||
// @param ctx context.Context 上下文
|
// @param ctx context.Context 上下文
|
||||||
// @param opts ...Option 配置项
|
// @param opts ...Option 配置项
|
||||||
func InitSingle(ctx context.Context, opts ...Option) *Single {
|
func InitSingle(ctx context.Context, opts ...Option) *Single {
|
||||||
singleOnceLimit.Do(func() {
|
// singleOnceLimit.Do(func() {
|
||||||
op := newOptions(opts...)
|
op := newOptions(opts...)
|
||||||
|
|
||||||
sin = &Single{
|
sin := &Single{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
logger: op.logger,
|
logger: op.logger,
|
||||||
location: op.location,
|
location: op.location,
|
||||||
}
|
}
|
||||||
|
|
||||||
timer := time.NewTicker(time.Millisecond * 200)
|
timer := time.NewTicker(time.Millisecond * 200)
|
||||||
go func(ctx context.Context) {
|
go func(ctx context.Context) {
|
||||||
Loop:
|
Loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case t := <-timer.C:
|
case t := <-timer.C:
|
||||||
if t.Before(singleNextTime) {
|
if t.Before(singleNextTime) {
|
||||||
// 当前时间小于下次发送时间:跳过
|
// 当前时间小于下次发送时间:跳过
|
||||||
continue
|
continue
|
||||||
}
|
|
||||||
// 迭代定时器
|
|
||||||
sin.iterator(ctx)
|
|
||||||
// fmt.Println("timer: 执行")
|
|
||||||
case <-ctx.Done():
|
|
||||||
// 跳出循环
|
|
||||||
break Loop
|
|
||||||
}
|
}
|
||||||
|
// 迭代定时器
|
||||||
|
sin.iterator(ctx)
|
||||||
|
// fmt.Println("timer: 执行")
|
||||||
|
case <-ctx.Done():
|
||||||
|
// 跳出循环
|
||||||
|
break Loop
|
||||||
}
|
}
|
||||||
sin.logger.Infof(ctx, "timer: initend")
|
}
|
||||||
}(ctx)
|
sin.logger.Infof(ctx, "timer: initend")
|
||||||
})
|
}(ctx)
|
||||||
|
// })
|
||||||
|
|
||||||
return sin
|
return sin
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user