优化
This commit is contained in:
+30
-16
@@ -45,7 +45,7 @@ func InitCluster(ctx context.Context, red *redis.Client) *cluster {
|
|||||||
// 监听任务
|
// 监听任务
|
||||||
go clu.watch()
|
go clu.watch()
|
||||||
|
|
||||||
timer := time.NewTicker(time.Millisecond * 100)
|
timer := time.NewTicker(time.Millisecond*100)
|
||||||
|
|
||||||
go func(ctx context.Context, red *redis.Client) {
|
go func(ctx context.Context, red *redis.Client) {
|
||||||
Loop:
|
Loop:
|
||||||
@@ -190,24 +190,38 @@ func getNextExecTime(beforeTime time.Time, spaceTime time.Duration) time.Time {
|
|||||||
// 获取任务
|
// 获取任务
|
||||||
func (c *cluster) getTask() {
|
func (c *cluster) getTask() {
|
||||||
// 定时去Redis获取任务
|
// 定时去Redis获取任务
|
||||||
zb := redis.ZRangeBy{
|
// zb := redis.ZRangeBy{
|
||||||
Min: "0",
|
// Min: "0",
|
||||||
Max: fmt.Sprintf("%+v", time.Now().UnixMilli()),
|
// Max: fmt.Sprintf("%+v", time.Now().UnixMilli()),
|
||||||
}
|
// }
|
||||||
|
|
||||||
taskList, _ := c.redis.ZRangeByScore(c.ctx, c.zsetKey, &zb).Result()
|
// taskList, _ := c.redis.ZRangeByScore(c.ctx, c.zsetKey, &zb).Result()
|
||||||
|
|
||||||
p := c.redis.Pipeline()
|
// if len(taskList) == 0 {
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
|
||||||
for _, val := range taskList {
|
// p := c.redis.Pipeline()
|
||||||
// 添加到可执行队列
|
|
||||||
p.LPush(c.ctx, c.listKey, val)
|
// for _, val := range taskList {
|
||||||
// 删除有序集合
|
// // 添加到可执行队列
|
||||||
p.ZRem(c.ctx, c.zsetKey, val)
|
// p.LPush(c.ctx, c.listKey, val)
|
||||||
}
|
// // 删除有序集合
|
||||||
_, err := p.Exec(c.ctx)
|
// p.ZRem(c.ctx, c.zsetKey, val)
|
||||||
// fmt.Println(err)
|
// }
|
||||||
_ = err
|
// _, err := p.Exec(c.ctx)
|
||||||
|
// // fmt.Println(err)
|
||||||
|
// _ = err
|
||||||
|
|
||||||
|
script := `
|
||||||
|
local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2])
|
||||||
|
for i,v in ipairs(token) do
|
||||||
|
redis.call('zrem',KEYS[1],v)
|
||||||
|
redis.call('lpush',KEYS[2],v)
|
||||||
|
end
|
||||||
|
return "OK"
|
||||||
|
`
|
||||||
|
c.redis.Eval(c.ctx, script, []string{c.zsetKey, c.listKey}, 0, time.Now().UnixMilli()).Result()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+77
-41
@@ -23,6 +23,7 @@ func main() {
|
|||||||
// fmt.Println(mm)
|
// fmt.Println(mm)
|
||||||
|
|
||||||
re()
|
re()
|
||||||
|
// d()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -39,51 +40,51 @@ func re() {
|
|||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
cl := timer.InitCluster(ctx, client)
|
cl := timer.InitCluster(ctx, client)
|
||||||
cl.AddTimer(ctx, "test1", 1*time.Second, aa, timer.ExtendParams{
|
cl.AddTimer(ctx, "test1", 1*time.Millisecond, aa, timer.ExtendParams{
|
||||||
Params: map[string]interface{}{
|
Params: map[string]interface{}{
|
||||||
"test": "text1",
|
"test": "text1",
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
// cl.AddTimer(ctx, "test2", 1*time.Second, aa, timer.ExtendParams{
|
cl.AddTimer(ctx, "test2", 1*time.Millisecond, aa, timer.ExtendParams{
|
||||||
// Params: map[string]interface{}{
|
Params: map[string]interface{}{
|
||||||
// "test": "text2",
|
"test": "text2",
|
||||||
// },
|
},
|
||||||
// })
|
})
|
||||||
// cl.AddTimer(ctx, "test3", 1*time.Second, aa, timer.ExtendParams{
|
cl.AddTimer(ctx, "test3", 1*time.Millisecond, aa, timer.ExtendParams{
|
||||||
// Params: map[string]interface{}{
|
Params: map[string]interface{}{
|
||||||
// "test": "text3",
|
"test": "text3",
|
||||||
// },
|
},
|
||||||
// })
|
})
|
||||||
// cl.AddTimer(ctx, "test4", 1*time.Second, aa, timer.ExtendParams{
|
cl.AddTimer(ctx, "test4", 1*time.Millisecond, aa, timer.ExtendParams{
|
||||||
// Params: map[string]interface{}{
|
Params: map[string]interface{}{
|
||||||
// "test": "text4",
|
"test": "text4",
|
||||||
// },
|
},
|
||||||
// })
|
})
|
||||||
// cl.AddTimer(ctx, "test5", 1*time.Second, aa, timer.ExtendParams{
|
cl.AddTimer(ctx, "test5", 1*time.Millisecond, aa, timer.ExtendParams{
|
||||||
// Params: map[string]interface{}{
|
Params: map[string]interface{}{
|
||||||
// "test": "text5",
|
"test": "text5",
|
||||||
// },
|
},
|
||||||
// })
|
})
|
||||||
// cl.AddTimer(ctx, "test6", 1*time.Second, aa, timer.ExtendParams{
|
cl.AddTimer(ctx, "test6", 1*time.Millisecond, aa, timer.ExtendParams{
|
||||||
// Params: map[string]interface{}{
|
Params: map[string]interface{}{
|
||||||
// "test": "text6",
|
"test": "text6",
|
||||||
// },
|
},
|
||||||
// })
|
})
|
||||||
// cl.AddTimer(ctx, "test7", 1*time.Second, aa, timer.ExtendParams{
|
cl.AddTimer(ctx, "test7", 1*time.Millisecond, aa, timer.ExtendParams{
|
||||||
// Params: map[string]interface{}{
|
Params: map[string]interface{}{
|
||||||
// "test": "text7",
|
"test": "text7",
|
||||||
// },
|
},
|
||||||
// })
|
})
|
||||||
// cl.AddTimer(ctx, "test8", 1*time.Second, aa, timer.ExtendParams{
|
cl.AddTimer(ctx, "test8", 1*time.Millisecond, aa, timer.ExtendParams{
|
||||||
// Params: map[string]interface{}{
|
Params: map[string]interface{}{
|
||||||
// "test": "text8",
|
"test": "text8",
|
||||||
// },
|
},
|
||||||
// })
|
})
|
||||||
// cl.AddTimer(ctx, "test9", 1*time.Second, aa, timer.ExtendParams{
|
cl.AddTimer(ctx, "test9", 1*time.Millisecond, aa, timer.ExtendParams{
|
||||||
// Params: map[string]interface{}{
|
Params: map[string]interface{}{
|
||||||
// "test": "text9",
|
"test": "text9",
|
||||||
// },
|
},
|
||||||
// })
|
})
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
@@ -95,3 +96,38 @@ func aa(ctx context.Context) bool {
|
|||||||
fmt.Printf("%+v %+v \n\n", a, err)
|
fmt.Printf("%+v %+v \n\n", a, err)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func d() {
|
||||||
|
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "127.0.0.1" + ":" + "6379",
|
||||||
|
Password: "", // no password set
|
||||||
|
DB: 0, // use default DB
|
||||||
|
})
|
||||||
|
if client == nil {
|
||||||
|
fmt.Println("redis init error")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
client.ZAdd(context.Background(), "lockx:test2", &redis.Z{
|
||||||
|
Score: 50,
|
||||||
|
Member: "test",
|
||||||
|
})
|
||||||
|
|
||||||
|
script := `
|
||||||
|
local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2])
|
||||||
|
for i,v in ipairs(token) do
|
||||||
|
redis.call('zrem',KEYS[1],v)
|
||||||
|
redis.call('lpush',KEYS[2],v)
|
||||||
|
end
|
||||||
|
return "OK"
|
||||||
|
`
|
||||||
|
res, err := client.Eval(context.Background(), script, []string{"lockx:test2", "lockx:push"}, 0, 100).Result()
|
||||||
|
fmt.Println(res, err)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
l, e := client.RPop(context.Background(), "lockx:push").Result()
|
||||||
|
fmt.Println(l, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|||||||
+2
-2
@@ -38,10 +38,10 @@ func (g *globalLock) Lock() bool {
|
|||||||
return 'ERROR'
|
return 'ERROR'
|
||||||
`
|
`
|
||||||
|
|
||||||
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 10).Result()
|
resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result()
|
||||||
if resp != "OK" {
|
if resp != "OK" {
|
||||||
_ = err
|
_ = err
|
||||||
// log.Println("globalLock Lock", resp, err, g.uniqueKey, g.value)
|
log.Println("globalLock Lock", resp, err, g.uniqueKey, g.value)
|
||||||
}
|
}
|
||||||
return resp == "OK"
|
return resp == "OK"
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user