This commit is contained in:
Yun
2023-09-09 23:32:37 +08:00
parent a9570cb743
commit fdc303e618
5 changed files with 16 additions and 20 deletions
+1 -1
View File
@@ -45,7 +45,7 @@ func InitCluster(ctx context.Context, red *redis.Client) *cluster {
// 监听任务 // 监听任务
go clu.watch() go clu.watch()
timer := time.NewTicker(time.Millisecond * 100) timer := time.NewTicker(time.Millisecond * 200)
go func(ctx context.Context, red *redis.Client) { go func(ctx context.Context, red *redis.Client) {
Loop: Loop:
+5 -5
View File
@@ -31,19 +31,19 @@ func main() {
func worker() { func worker() {
client := getRedis() client := getRedis()
w := timer.InitWorker(context.Background(), client, &Worker{}) w := timer.InitWorker(context.Background(), client, &Worker{})
w.AddJob("test", "test", 1*time.Second, map[string]interface{}{ w.Add("test", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
w.AddJob("test2", "test", 1*time.Second, map[string]interface{}{ w.Add("test2", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
w.AddJob("test3", "test", 1*time.Second, map[string]interface{}{ w.Add("test3", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
w.AddJob("test4", "test", 1*time.Second, map[string]interface{}{ w.Add("test4", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
w.AddJob("test5", "test", 1*time.Second, map[string]interface{}{ w.Add("test5", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
+1 -4
View File
@@ -81,10 +81,7 @@ func (g *globalLock) Unlock() bool {
if resp != "OK" { if resp != "OK" {
log.Println("globalLock Unlock", resp, err, g.uniqueKey, g.value) log.Println("globalLock Unlock", resp, err, g.uniqueKey, g.value)
} }
if resp == "OK" { g.cancel()
g.cancel()
return true
}
return false return false
} }
+1 -1
View File
@@ -32,7 +32,7 @@ const (
// 定时器类 // 定时器类
func InitSingle(ctx context.Context) { func InitSingle(ctx context.Context) {
onceLimit.Do(func() { onceLimit.Do(func() {
timer := time.NewTicker(1 * time.Millisecond) timer := time.NewTicker( time.Millisecond*200)
go func(ctx context.Context) { go func(ctx context.Context) {
Loop: Loop:
for { for {
+8 -9
View File
@@ -50,8 +50,8 @@ func InitWorker(ctx context.Context, re *redis.Client, w WorkerInterface) *worke
redis: re, redis: re,
worker: w, worker: w,
} }
go wo.getJob() go wo.getTask()
go wo.execJob() go wo.execTask()
}) })
return wo return wo
@@ -59,7 +59,7 @@ func InitWorker(ctx context.Context, re *redis.Client, w WorkerInterface) *worke
// 添加任务 // 添加任务
// 重复插入就代表覆盖 // 重复插入就代表覆盖
func (w *worker) AddJob(uniqueKey string, jobType string, delayTime time.Duration, data map[string]interface{}) error { func (w *worker) Add(uniqueKey string, jobType string, delayTime time.Duration, data map[string]interface{}) error {
if delayTime.Abs() != delayTime { if delayTime.Abs() != delayTime {
return fmt.Errorf("时间间隔不能为负数") return fmt.Errorf("时间间隔不能为负数")
} }
@@ -89,7 +89,7 @@ func (w *worker) AddJob(uniqueKey string, jobType string, delayTime time.Duratio
} }
// 删除任务 // 删除任务
func (w *worker) DelJob(uniqueKey string, jobType string) error { func (w *worker) Del(uniqueKey string, jobType string) error {
redisKey := fmt.Sprintf("%s[:]%s", uniqueKey, jobType) redisKey := fmt.Sprintf("%s[:]%s", uniqueKey, jobType)
w.redis.Del(w.ctx, redisKey).Result() w.redis.Del(w.ctx, redisKey).Result()
@@ -100,8 +100,8 @@ func (w *worker) DelJob(uniqueKey string, jobType string) error {
} }
// 获取任务 // 获取任务
func (w *worker) getJob() { func (w *worker) getTask() {
timer := time.NewTicker(time.Millisecond * 100) timer := time.NewTicker(time.Millisecond * 200)
defer timer.Stop() defer timer.Stop()
Loop: Loop:
@@ -126,7 +126,7 @@ Loop:
} }
// 执行任务 // 执行任务
func (w *worker) execJob() { func (w *worker) execTask() {
for { for {
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
if err != nil { if err != nil {
@@ -152,9 +152,8 @@ func (w *worker) execJob() {
if code == WorkerCodeAgain { if code == WorkerCodeAgain {
// 重新放入队列 // 重新放入队列
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
w.AddJob(s[0], s[1], ed.Delay, ed.Data) w.Add(s[0], s[1], ed.Delay, ed.Data)
} }
}() }()
} }
} }