添加单次的任务队列

This commit is contained in:
Yun
2023-09-09 23:24:11 +08:00
parent 3200f19d63
commit a9570cb743
3 changed files with 206 additions and 9 deletions
+4 -4
View File
@@ -36,10 +36,10 @@ func InitCluster(ctx context.Context, red *redis.Client) *cluster {
clu = &cluster{
ctx: ctx,
redis: red,
lockKey: "timer:globalLockKey",
nextKey: "timer:NextKey",
zsetKey: "timer:zsetKey",
listKey: "timer:listKey",
lockKey: "timer:cluster_globalLockKey",
nextKey: "timer:cluster_nextKey",
zsetKey: "timer:cluster_zsetKey",
listKey: "timer:cluster_listKey",
}
// 监听任务
+41 -4
View File
@@ -22,21 +22,58 @@ func main() {
// fmt.Println(mm)
re()
// re()
// d()
worker()
}
func re() {
func worker() {
client := getRedis()
w := timer.InitWorker(context.Background(), client, &Worker{})
w.AddJob("test", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
w.AddJob("test2", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
w.AddJob("test3", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
w.AddJob("test4", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
w.AddJob("test5", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
select {}
}
type Worker struct{}
func (w *Worker) Worker(uniqueKey string, jobType string,data map[string]interface{}) timer.WorkerCode {
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(uniqueKey, jobType)
fmt.Println(data)
return timer.WorkerCodeAgain
}
func getRedis() *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1" + ":" + "6379",
Password: "", // no password set
DB: 0, // use default DB
})
if client == nil {
fmt.Println("redis init error")
return
panic("redis init error")
}
return client
}
func re() {
client := getRedis()
ctx := context.Background()
cl := timer.InitCluster(ctx, client)
+160
View File
@@ -0,0 +1,160 @@
package timer
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// 单次的任务队列
type worker struct {
ctx context.Context
zsetKey string
listKey string
redis *redis.Client
worker WorkerInterface
}
type WorkerCode int
const (
WorkerCodeSuccess WorkerCode = 0
WorkerCodeAgain WorkerCode = -1
)
// 需要考虑执行失败重新放入队列的情况
type WorkerInterface interface {
Worker(uniqueKey string, jobType string, data map[string]interface{}) WorkerCode
}
var wo *worker = nil
var once sync.Once
type extendData struct {
Delay time.Duration
Data map[string]interface{}
}
func InitWorker(ctx context.Context, re *redis.Client, w WorkerInterface) *worker {
once.Do(func() {
wo = &worker{
ctx: ctx,
zsetKey: "timer:job_zsetkey",
listKey: "timer:job_listkey",
redis: re,
worker: w,
}
go wo.getJob()
go wo.execJob()
})
return wo
}
// 添加任务
// 重复插入就代表覆盖
func (w *worker) AddJob(uniqueKey string, jobType string, delayTime time.Duration, data map[string]interface{}) error {
if delayTime.Abs() != delayTime {
return fmt.Errorf("时间间隔不能为负数")
}
if delayTime == 0 {
return fmt.Errorf("时间间隔不能为0")
}
redisKey := fmt.Sprintf("%s[:]%s", uniqueKey, jobType)
ed := extendData{
Delay: delayTime,
Data: data,
}
b, _ := json.Marshal(ed)
_, err := w.redis.SetEX(w.ctx, redisKey, b, delayTime+time.Second*5).Result()
if err != nil {
return err
}
_, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{
Score: float64(time.Now().Add(delayTime).UnixMilli()),
Member: redisKey,
}).Result()
return err
}
// 删除任务
func (w *worker) DelJob(uniqueKey string, jobType string) error {
redisKey := fmt.Sprintf("%s[:]%s", uniqueKey, jobType)
w.redis.Del(w.ctx, redisKey).Result()
w.redis.ZRem(w.ctx, w.zsetKey, redisKey).Result()
return nil
}
// 获取任务
func (w *worker) getJob() {
timer := time.NewTicker(time.Millisecond * 100)
defer timer.Stop()
Loop:
for {
select {
case <-timer.C:
script := `
local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2])
for i,v in ipairs(token) do
redis.call('zrem',KEYS[1],v)
redis.call('lpush',KEYS[2],v)
end
return "OK"
`
w.redis.Eval(w.ctx, script, []string{w.zsetKey, w.listKey}, 0, time.Now().UnixMilli()).Result()
// fmt.Println(i, err)
case <-w.ctx.Done():
break Loop
}
}
}
// 执行任务
func (w *worker) execJob() {
for {
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
if err != nil {
fmt.Println("watch err:", err)
continue
}
go func() {
s := strings.Split(keys[1], "[:]")
// 读取数据
str, err := w.redis.Get(w.ctx, keys[1]).Result()
if err != nil {
fmt.Println("execJob err:", err)
return
}
ed := extendData{}
json.Unmarshal([]byte(str), &ed)
fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05"))
code := w.worker.Worker(s[0], s[1], ed.Data)
if code == WorkerCodeAgain {
// 重新放入队列
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
w.AddJob(s[0], s[1], ed.Delay, ed.Data)
}
}()
}
}