From 9622d9f5b970dbceb5d8da3b4ba8d83eb091f98d Mon Sep 17 00:00:00 2001 From: Yun Date: Sat, 2 Sep 2023 12:19:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster.go | 60 ++++++++++++++++++++-------------- cmd/main.go | 80 ++++++++++++++++++++++----------------------- lockx/lockx.go | 22 ++++++++----- lockx/lockx_test.go | 27 ++++++++++----- 4 files changed, 108 insertions(+), 81 deletions(-) diff --git a/cluster.go b/cluster.go index 1af028f..3b11c25 100644 --- a/cluster.go +++ b/cluster.go @@ -26,6 +26,7 @@ type cluster struct { lockKey string // 全局计算的key nextKey string // 下一次执行的key zsetKey string // 有序集合的key + listKey string // 可执行的任务列表的key } var clu *cluster = nil @@ -38,8 +39,12 @@ func InitCluster(ctx context.Context, red *redis.Client) *cluster { lockKey: "timer:globalLockKey", nextKey: "timer:NextKey", zsetKey: "timer:zsetKey", + listKey: "timer:listKey", } + // 监听任务 + go clu.watch() + timer := time.NewTicker(time.Millisecond * 100) go func(ctx context.Context, red *redis.Client) { @@ -47,8 +52,8 @@ func InitCluster(ctx context.Context, red *redis.Client) *cluster { for { select { case <-timer.C: - go clu.computeTime() - go clu.getTask() + clu.getTask() + clu.getNextTime() case <-ctx.Done(): break Loop } @@ -115,7 +120,8 @@ func (c *cluster) AddTimer(ctx context.Context, uniqueKey string, spaceTime time } // 计算下一次执行的时间 -func (c *cluster) computeTime() { +func (c *cluster) getNextTime() { + // log.Println("begin computer") ctx, cancel := context.WithCancel(c.ctx) defer cancel() @@ -124,7 +130,7 @@ func (c *cluster) computeTime() { // 获取锁 lockBool := lock.Lock() if !lockBool { - log.Println("timer:获取锁失败") + // log.Println("timer:获取锁失败") return } defer lock.Unlock() @@ -139,19 +145,14 @@ func (c *cluster) computeTime() { execTime := make(map[string]time.Time) json.Unmarshal([]byte(cacheStr), &execTime) - // log.Println("cacheStr:", cacheStr, execTime) - // return - p := c.redis.Pipeline() nowTime := time.Now() clusterWorkerList.Range(func(key, value interface{}) bool { - // log.Println("range:", key, value) val := value.(timerStr) beforeTime := execTime[val.UniqueKey] if beforeTime.After(nowTime) { - // log.Println("sssss") return true } nextTime := getNextExecTime(beforeTime, val.SpaceTime) @@ -161,21 +162,16 @@ func (c *cluster) computeTime() { Score: float64(nextTime.UnixMilli()), Member: val.UniqueKey, }) - // log.Println("ffffffffffff") + // log.Println("computeTime add", c.zsetKey, val.UniqueKey, nextTime.UnixMilli()) return true }) - // log.Println("ssssssddddd") - // 更新缓存 b, _ := json.Marshal(execTime) p.Set(ctx, c.nextKey, string(b), 0) - // log.Println("B", string(b)) - _, err := p.Exec(ctx) _ = err - // fmt.Println(err) } // 递归遍历获取执行时间 @@ -185,9 +181,6 @@ func getNextExecTime(beforeTime time.Time, spaceTime time.Duration) time.Time { return beforeTime } nextTime := beforeTime.Add(spaceTime) - // fmt.Println(nextTime.Format(time.RFC3339)) - // fmt.Println(nowTime.Format(time.RFC3339)) - // fmt.Println(beforeTime.Before(nowTime)) if nextTime.Before(nowTime) { nextTime = getNextExecTime(nextTime, spaceTime) } @@ -204,19 +197,35 @@ func (c *cluster) getTask() { taskList, _ := c.redis.ZRangeByScore(c.ctx, c.zsetKey, &zb).Result() - // 删除粉丝 - inter := []interface{}{} - for _, val := range taskList { - inter = append(inter, val) - } - c.redis.ZRem(c.ctx, c.zsetKey, inter...) + p := c.redis.Pipeline() for _, val := range taskList { - go doTask(c.ctx, c.redis, val) + // 添加到可执行队列 + p.LPush(c.ctx, c.listKey, val) + // 删除有序集合 + p.ZRem(c.ctx, c.zsetKey, val) } + _, err := p.Exec(c.ctx) + // fmt.Println(err) + _ = err } +// 监听任务 +func (c *cluster) watch() { + // 执行任务 + for { + keys, err := c.redis.BLPop(c.ctx, time.Second*10, c.listKey).Result() + if err != nil { + fmt.Println("watch err:", err) + continue + } + for _, val := range keys { + go doTask(c.ctx, c.redis, val) + } + } +} + // 执行任务 func doTask(ctx context.Context, red *redis.Client, taskId string) { ctx, cancel := context.WithCancel(ctx) @@ -239,6 +248,7 @@ func doTask(ctx context.Context, red *redis.Client, taskId string) { lock := lockx.NewGlobalLock(ctx, red, taskId) tB := lock.Lock() if !tB { + fmt.Println("doTask timer:获取锁失败") return } defer lock.Unlock() diff --git a/cmd/main.go b/cmd/main.go index 4caa9d1..5ea6d88 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -44,46 +44,46 @@ func re() { "test": "text1", }, }) - cl.AddTimer(ctx, "test2", 1*time.Second, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text2", - }, - }) - cl.AddTimer(ctx, "test3", 1*time.Second, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text3", - }, - }) - cl.AddTimer(ctx, "test4", 1*time.Second, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text4", - }, - }) - cl.AddTimer(ctx, "test5", 1*time.Second, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text5", - }, - }) - cl.AddTimer(ctx, "test6", 1*time.Second, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text6", - }, - }) - cl.AddTimer(ctx, "test7", 1*time.Second, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text7", - }, - }) - cl.AddTimer(ctx, "test8", 1*time.Second, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text8", - }, - }) - cl.AddTimer(ctx, "test9", 1*time.Second, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text9", - }, - }) + // cl.AddTimer(ctx, "test2", 1*time.Second, aa, timer.ExtendParams{ + // Params: map[string]interface{}{ + // "test": "text2", + // }, + // }) + // cl.AddTimer(ctx, "test3", 1*time.Second, aa, timer.ExtendParams{ + // Params: map[string]interface{}{ + // "test": "text3", + // }, + // }) + // cl.AddTimer(ctx, "test4", 1*time.Second, aa, timer.ExtendParams{ + // Params: map[string]interface{}{ + // "test": "text4", + // }, + // }) + // cl.AddTimer(ctx, "test5", 1*time.Second, aa, timer.ExtendParams{ + // Params: map[string]interface{}{ + // "test": "text5", + // }, + // }) + // cl.AddTimer(ctx, "test6", 1*time.Second, aa, timer.ExtendParams{ + // Params: map[string]interface{}{ + // "test": "text6", + // }, + // }) + // cl.AddTimer(ctx, "test7", 1*time.Second, aa, timer.ExtendParams{ + // Params: map[string]interface{}{ + // "test": "text7", + // }, + // }) + // cl.AddTimer(ctx, "test8", 1*time.Second, aa, timer.ExtendParams{ + // Params: map[string]interface{}{ + // "test": "text8", + // }, + // }) + // cl.AddTimer(ctx, "test9", 1*time.Second, aa, timer.ExtendParams{ + // Params: map[string]interface{}{ + // "test": "text9", + // }, + // }) select {} } diff --git a/lockx/lockx.go b/lockx/lockx.go index 6dc3d84..eea5432 100644 --- a/lockx/lockx.go +++ b/lockx/lockx.go @@ -30,12 +30,18 @@ func NewGlobalLock(ctx context.Context, red *redis.Client, uniqueKey string) *gl func (g *globalLock) Lock() bool { script := ` - return redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) + local token = redis.call('get',KEYS[1]) + if token == false + then + return redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) + end + return 'ERROR' ` resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 10).Result() if resp != "OK" { - log.Println("globalLock Lock", resp, err) + _ = err + // log.Println("globalLock Lock", resp, err, g.uniqueKey, g.value) } return resp == "OK" } @@ -58,15 +64,15 @@ func (g *globalLock) Unlock() bool { local token = redis.call('get',KEYS[1]) if token == ARGV[1] then - redis.call('del',KEYS[1]) - return 'OK' + redis.call('del',KEYS[1]) + return 'OK' end - return 'ERROR' + return 'ERROR' ` resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result() if resp != "OK" { - log.Println("globalLock Unlock", resp, err) + log.Println("globalLock Unlock", resp, err, g.uniqueKey, g.value) } return resp == "OK" } @@ -98,12 +104,12 @@ func (g *globalLock) refresh() bool { redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) return 'OK' end - return 'ERROR' + return 'ERROR' ` resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() if resp != "OK" { - log.Println("globalLock refresh", resp, err) + log.Println("globalLock refresh", resp, err, g.uniqueKey, g.value) } return resp == "OK" } diff --git a/lockx/lockx_test.go b/lockx/lockx_test.go index bc3c1f2..25d1606 100644 --- a/lockx/lockx_test.go +++ b/lockx/lockx_test.go @@ -11,7 +11,21 @@ import ( var Redis *redis.Client -func TestMain(m *testing.M) { +// func TestMain(m *testing.M) { +// client := redis.NewClient(&redis.Options{ +// Addr: "127.0.0.1" + ":" + "6379", +// Password: "", // no password set +// DB: 0, // use default DB +// }) +// if client == nil { +// fmt.Println("redis init error") +// return +// } +// // fmt.Println("ffff") +// Redis = client +// } + +func TestLockx(t *testing.T) { client := redis.NewClient(&redis.Options{ Addr: "127.0.0.1" + ":" + "6379", Password: "", // no password set @@ -21,22 +35,19 @@ func TestMain(m *testing.M) { fmt.Println("redis init error") return } - Redis = client -} - -func TestLockx(t *testing.T) { + fmt.Println("begin") ctx := context.Background() ctx, cancel := context.WithCancel(ctx) defer cancel() - lock := lockx.NewGlobalLock(ctx, Redis, "lockx:test") + lock := lockx.NewGlobalLock(ctx, client, "lockx:test") if !lock.Lock() { - t.Log("lock error") + fmt.Println("lock error") } defer lock.Unlock() lock.Refresh() - t.Log("doing") + fmt.Println("ssss") }