From 5f3193d95e9c68b0ce8f56750dfb11402f1134a3 Mon Sep 17 00:00:00 2001 From: Yun Date: Sat, 2 Sep 2023 13:32:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 46 ++++++++++++------- cmd/main.go | 118 ++++++++++++++++++++++++++++++++----------------- lockx/lockx.go | 4 +- 3 files changed, 109 insertions(+), 59 deletions(-) diff --git a/cluster.go b/cluster.go index 3b11c25..9fe2b5e 100644 --- a/cluster.go +++ b/cluster.go @@ -45,7 +45,7 @@ func InitCluster(ctx context.Context, red *redis.Client) *cluster { // 监听任务 go clu.watch() - timer := time.NewTicker(time.Millisecond * 100) + timer := time.NewTicker(time.Millisecond*100) go func(ctx context.Context, red *redis.Client) { Loop: @@ -190,24 +190,38 @@ func getNextExecTime(beforeTime time.Time, spaceTime time.Duration) time.Time { // 获取任务 func (c *cluster) getTask() { // 定时去Redis获取任务 - zb := redis.ZRangeBy{ - Min: "0", - Max: fmt.Sprintf("%+v", time.Now().UnixMilli()), - } + // zb := redis.ZRangeBy{ + // Min: "0", + // Max: fmt.Sprintf("%+v", time.Now().UnixMilli()), + // } - taskList, _ := c.redis.ZRangeByScore(c.ctx, c.zsetKey, &zb).Result() + // taskList, _ := c.redis.ZRangeByScore(c.ctx, c.zsetKey, &zb).Result() - p := c.redis.Pipeline() + // if len(taskList) == 0 { + // return + // } - for _, val := range taskList { - // 添加到可执行队列 - p.LPush(c.ctx, c.listKey, val) - // 删除有序集合 - p.ZRem(c.ctx, c.zsetKey, val) - } - _, err := p.Exec(c.ctx) - // fmt.Println(err) - _ = err + // p := c.redis.Pipeline() + + // for _, val := range taskList { + // // 添加到可执行队列 + // p.LPush(c.ctx, c.listKey, val) + // // 删除有序集合 + // p.ZRem(c.ctx, c.zsetKey, val) + // } + // _, err := p.Exec(c.ctx) + // // fmt.Println(err) + // _ = err + + script := ` + local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) + for i,v in ipairs(token) do + redis.call('zrem',KEYS[1],v) + redis.call('lpush',KEYS[2],v) + end + return "OK" + ` + c.redis.Eval(c.ctx, script, []string{c.zsetKey, c.listKey}, 0, time.Now().UnixMilli()).Result() } diff --git a/cmd/main.go b/cmd/main.go index 5ea6d88..84a51de 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -23,6 +23,7 @@ func main() { // fmt.Println(mm) re() + // d() } @@ -39,51 +40,51 @@ func re() { ctx := context.Background() cl := timer.InitCluster(ctx, client) - cl.AddTimer(ctx, "test1", 1*time.Second, aa, timer.ExtendParams{ + cl.AddTimer(ctx, "test1", 1*time.Millisecond, aa, timer.ExtendParams{ Params: map[string]interface{}{ "test": "text1", }, }) - // cl.AddTimer(ctx, "test2", 1*time.Second, aa, timer.ExtendParams{ - // Params: map[string]interface{}{ - // "test": "text2", - // }, - // }) - // cl.AddTimer(ctx, "test3", 1*time.Second, aa, timer.ExtendParams{ - // Params: map[string]interface{}{ - // "test": "text3", - // }, - // }) - // cl.AddTimer(ctx, "test4", 1*time.Second, aa, timer.ExtendParams{ - // Params: map[string]interface{}{ - // "test": "text4", - // }, - // }) - // cl.AddTimer(ctx, "test5", 1*time.Second, aa, timer.ExtendParams{ - // Params: map[string]interface{}{ - // "test": "text5", - // }, - // }) - // cl.AddTimer(ctx, "test6", 1*time.Second, aa, timer.ExtendParams{ - // Params: map[string]interface{}{ - // "test": "text6", - // }, - // }) - // cl.AddTimer(ctx, "test7", 1*time.Second, aa, timer.ExtendParams{ - // Params: map[string]interface{}{ - // "test": "text7", - // }, - // }) - // cl.AddTimer(ctx, "test8", 1*time.Second, aa, timer.ExtendParams{ - // Params: map[string]interface{}{ - // "test": "text8", - // }, - // }) - // cl.AddTimer(ctx, "test9", 1*time.Second, aa, timer.ExtendParams{ - // Params: map[string]interface{}{ - // "test": "text9", - // }, - // }) + cl.AddTimer(ctx, "test2", 1*time.Millisecond, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text2", + }, + }) + cl.AddTimer(ctx, "test3", 1*time.Millisecond, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text3", + }, + }) + cl.AddTimer(ctx, "test4", 1*time.Millisecond, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text4", + }, + }) + cl.AddTimer(ctx, "test5", 1*time.Millisecond, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text5", + }, + }) + cl.AddTimer(ctx, "test6", 1*time.Millisecond, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text6", + }, + }) + cl.AddTimer(ctx, "test7", 1*time.Millisecond, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text7", + }, + }) + cl.AddTimer(ctx, "test8", 1*time.Millisecond, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text8", + }, + }) + cl.AddTimer(ctx, "test9", 1*time.Millisecond, aa, timer.ExtendParams{ + Params: map[string]interface{}{ + "test": "text9", + }, + }) select {} } @@ -95,3 +96,38 @@ func aa(ctx context.Context) bool { fmt.Printf("%+v %+v \n\n", a, err) return true } + +func d() { + + client := redis.NewClient(&redis.Options{ + Addr: "127.0.0.1" + ":" + "6379", + Password: "", // no password set + DB: 0, // use default DB + }) + if client == nil { + fmt.Println("redis init error") + return + } + + client.ZAdd(context.Background(), "lockx:test2", &redis.Z{ + Score: 50, + Member: "test", + }) + + script := ` + local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) + for i,v in ipairs(token) do + redis.call('zrem',KEYS[1],v) + redis.call('lpush',KEYS[2],v) + end + return "OK" + ` + res, err := client.Eval(context.Background(), script, []string{"lockx:test2", "lockx:push"}, 0, 100).Result() + fmt.Println(res, err) + + for i := 0; i < 10; i++ { + l, e := client.RPop(context.Background(), "lockx:push").Result() + fmt.Println(l, e) + } + +} diff --git a/lockx/lockx.go b/lockx/lockx.go index eea5432..21c7e12 100644 --- a/lockx/lockx.go +++ b/lockx/lockx.go @@ -38,10 +38,10 @@ func (g *globalLock) Lock() bool { return 'ERROR' ` - resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 10).Result() + resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() if resp != "OK" { _ = err - // log.Println("globalLock Lock", resp, err, g.uniqueKey, g.value) + log.Println("globalLock Lock", resp, err, g.uniqueKey, g.value) } return resp == "OK" }