diff --git a/cluster.go b/cluster.go index 7ee7b9c..5f7b5fb 100644 --- a/cluster.go +++ b/cluster.go @@ -45,7 +45,7 @@ func InitCluster(ctx context.Context, red *redis.Client) *cluster { // 监听任务 go clu.watch() - timer := time.NewTicker(time.Millisecond * 100) + timer := time.NewTicker(time.Millisecond * 200) go func(ctx context.Context, red *redis.Client) { Loop: diff --git a/cmd/main.go b/cmd/main.go index 1d44a1d..005a6b5 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -31,19 +31,19 @@ func main() { func worker() { client := getRedis() w := timer.InitWorker(context.Background(), client, &Worker{}) - w.AddJob("test", "test", 1*time.Second, map[string]interface{}{ + w.Add("test", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) - w.AddJob("test2", "test", 1*time.Second, map[string]interface{}{ + w.Add("test2", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) - w.AddJob("test3", "test", 1*time.Second, map[string]interface{}{ + w.Add("test3", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) - w.AddJob("test4", "test", 1*time.Second, map[string]interface{}{ + w.Add("test4", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) - w.AddJob("test5", "test", 1*time.Second, map[string]interface{}{ + w.Add("test5", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) diff --git a/lockx/lockx.go b/lockx/lockx.go index ffe612e..23f1ede 100644 --- a/lockx/lockx.go +++ b/lockx/lockx.go @@ -81,10 +81,7 @@ func (g *globalLock) Unlock() bool { if resp != "OK" { log.Println("globalLock Unlock", resp, err, g.uniqueKey, g.value) } - if resp == "OK" { - g.cancel() - return true - } + g.cancel() return false } diff --git a/single.go b/single.go index 891c495..5d8c364 100644 --- a/single.go +++ b/single.go @@ -32,7 +32,7 @@ const ( // 定时器类 func InitSingle(ctx context.Context) { onceLimit.Do(func() { - timer := time.NewTicker(1 * time.Millisecond) + timer := time.NewTicker( time.Millisecond*200) go func(ctx context.Context) { Loop: for { diff --git a/worker.go b/worker.go index 8a75b79..e23ffea 100644 --- a/worker.go +++ b/worker.go @@ -50,8 +50,8 @@ func InitWorker(ctx context.Context, re *redis.Client, w WorkerInterface) *worke redis: re, worker: w, } - go wo.getJob() - go wo.execJob() + go wo.getTask() + go wo.execTask() }) return wo @@ -59,7 +59,7 @@ func InitWorker(ctx context.Context, re *redis.Client, w WorkerInterface) *worke // 添加任务 // 重复插入就代表覆盖 -func (w *worker) AddJob(uniqueKey string, jobType string, delayTime time.Duration, data map[string]interface{}) error { +func (w *worker) Add(uniqueKey string, jobType string, delayTime time.Duration, data map[string]interface{}) error { if delayTime.Abs() != delayTime { return fmt.Errorf("时间间隔不能为负数") } @@ -89,7 +89,7 @@ func (w *worker) AddJob(uniqueKey string, jobType string, delayTime time.Duratio } // 删除任务 -func (w *worker) DelJob(uniqueKey string, jobType string) error { +func (w *worker) Del(uniqueKey string, jobType string) error { redisKey := fmt.Sprintf("%s[:]%s", uniqueKey, jobType) w.redis.Del(w.ctx, redisKey).Result() @@ -100,8 +100,8 @@ func (w *worker) DelJob(uniqueKey string, jobType string) error { } // 获取任务 -func (w *worker) getJob() { - timer := time.NewTicker(time.Millisecond * 100) +func (w *worker) getTask() { + timer := time.NewTicker(time.Millisecond * 200) defer timer.Stop() Loop: @@ -126,7 +126,7 @@ Loop: } // 执行任务 -func (w *worker) execJob() { +func (w *worker) execTask() { for { keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() if err != nil { @@ -152,9 +152,8 @@ func (w *worker) execJob() { if code == WorkerCodeAgain { // 重新放入队列 fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05")) - w.AddJob(s[0], s[1], ed.Delay, ed.Data) + w.Add(s[0], s[1], ed.Delay, ed.Data) } }() - } }