diff --git a/cluster.go b/cluster.go index 8874609..7ee7b9c 100644 --- a/cluster.go +++ b/cluster.go @@ -36,10 +36,10 @@ func InitCluster(ctx context.Context, red *redis.Client) *cluster { clu = &cluster{ ctx: ctx, redis: red, - lockKey: "timer:globalLockKey", - nextKey: "timer:NextKey", - zsetKey: "timer:zsetKey", - listKey: "timer:listKey", + lockKey: "timer:cluster_globalLockKey", + nextKey: "timer:cluster_nextKey", + zsetKey: "timer:cluster_zsetKey", + listKey: "timer:cluster_listKey", } // 监听任务 diff --git a/cmd/main.go b/cmd/main.go index 1f79574..1d44a1d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,21 +22,58 @@ func main() { // fmt.Println(mm) - re() + // re() // d() + worker() } -func re() { +func worker() { + client := getRedis() + w := timer.InitWorker(context.Background(), client, &Worker{}) + w.AddJob("test", "test", 1*time.Second, map[string]interface{}{ + "test": "test", + }) + w.AddJob("test2", "test", 1*time.Second, map[string]interface{}{ + "test": "test", + }) + w.AddJob("test3", "test", 1*time.Second, map[string]interface{}{ + "test": "test", + }) + w.AddJob("test4", "test", 1*time.Second, map[string]interface{}{ + "test": "test", + }) + w.AddJob("test5", "test", 1*time.Second, map[string]interface{}{ + "test": "test", + }) + + select {} +} + +type Worker struct{} + +func (w *Worker) Worker(uniqueKey string, jobType string,data map[string]interface{}) timer.WorkerCode { + fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) + fmt.Println(uniqueKey, jobType) + fmt.Println(data) + return timer.WorkerCodeAgain +} + +func getRedis() *redis.Client { client := redis.NewClient(&redis.Options{ Addr: "127.0.0.1" + ":" + "6379", Password: "", // no password set DB: 0, // use default DB }) if client == nil { - fmt.Println("redis init error") - return + panic("redis init error") } + return client +} + +func re() { + + client := getRedis() ctx := context.Background() cl := timer.InitCluster(ctx, client) @@ -94,7 +131,7 @@ func aa(ctx context.Context) bool { // fmt.Println("gggggggggggggggggggggggggggg") a, err := timer.GetExtendParams(ctx) fmt.Printf("%+v %+v \n\n", a, err) - time.Sleep(time.Second*5) + time.Sleep(time.Second * 5) return true } diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..8a75b79 --- /dev/null +++ b/worker.go @@ -0,0 +1,160 @@ +package timer + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "github.com/go-redis/redis/v8" +) + +// 单次的任务队列 +type worker struct { + ctx context.Context + zsetKey string + listKey string + redis *redis.Client + worker WorkerInterface +} + +type WorkerCode int + +const ( + WorkerCodeSuccess WorkerCode = 0 + WorkerCodeAgain WorkerCode = -1 +) + +// 需要考虑执行失败重新放入队列的情况 +type WorkerInterface interface { + Worker(uniqueKey string, jobType string, data map[string]interface{}) WorkerCode +} + +var wo *worker = nil +var once sync.Once + +type extendData struct { + Delay time.Duration + Data map[string]interface{} +} + +func InitWorker(ctx context.Context, re *redis.Client, w WorkerInterface) *worker { + + once.Do(func() { + wo = &worker{ + ctx: ctx, + zsetKey: "timer:job_zsetkey", + listKey: "timer:job_listkey", + redis: re, + worker: w, + } + go wo.getJob() + go wo.execJob() + }) + + return wo +} + +// 添加任务 +// 重复插入就代表覆盖 +func (w *worker) AddJob(uniqueKey string, jobType string, delayTime time.Duration, data map[string]interface{}) error { + if delayTime.Abs() != delayTime { + return fmt.Errorf("时间间隔不能为负数") + } + if delayTime == 0 { + return fmt.Errorf("时间间隔不能为0") + } + + redisKey := fmt.Sprintf("%s[:]%s", uniqueKey, jobType) + + ed := extendData{ + Delay: delayTime, + Data: data, + } + b, _ := json.Marshal(ed) + + _, err := w.redis.SetEX(w.ctx, redisKey, b, delayTime+time.Second*5).Result() + if err != nil { + return err + } + + _, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{ + Score: float64(time.Now().Add(delayTime).UnixMilli()), + Member: redisKey, + }).Result() + + return err +} + +// 删除任务 +func (w *worker) DelJob(uniqueKey string, jobType string) error { + redisKey := fmt.Sprintf("%s[:]%s", uniqueKey, jobType) + + w.redis.Del(w.ctx, redisKey).Result() + + w.redis.ZRem(w.ctx, w.zsetKey, redisKey).Result() + + return nil +} + +// 获取任务 +func (w *worker) getJob() { + timer := time.NewTicker(time.Millisecond * 100) + defer timer.Stop() + +Loop: + for { + select { + case <-timer.C: + script := ` + local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) + for i,v in ipairs(token) do + redis.call('zrem',KEYS[1],v) + redis.call('lpush',KEYS[2],v) + end + return "OK" + ` + w.redis.Eval(w.ctx, script, []string{w.zsetKey, w.listKey}, 0, time.Now().UnixMilli()).Result() + // fmt.Println(i, err) + + case <-w.ctx.Done(): + break Loop + } + } +} + +// 执行任务 +func (w *worker) execJob() { + for { + keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() + if err != nil { + fmt.Println("watch err:", err) + continue + } + + go func() { + s := strings.Split(keys[1], "[:]") + + // 读取数据 + str, err := w.redis.Get(w.ctx, keys[1]).Result() + if err != nil { + fmt.Println("execJob err:", err) + return + } + ed := extendData{} + json.Unmarshal([]byte(str), &ed) + + fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05")) + code := w.worker.Worker(s[0], s[1], ed.Data) + + if code == WorkerCodeAgain { + // 重新放入队列 + fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05")) + w.AddJob(s[0], s[1], ed.Delay, ed.Data) + } + }() + + } +}