部分代码尚未完全
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -44,7 +43,7 @@ type Callback interface {
|
||||
// @param data interface{} 任务数据
|
||||
// @return WorkerCode 任务执行结果
|
||||
// @return time.Duration 任务执行时间间隔
|
||||
Worker(ctx context.Context, jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration)
|
||||
Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) (WorkerCode, time.Duration)
|
||||
}
|
||||
|
||||
var wo *Once = nil
|
||||
@@ -74,13 +73,13 @@ func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Call
|
||||
return wo
|
||||
}
|
||||
|
||||
// 添加任务
|
||||
// 添加任务(覆盖)
|
||||
// 重复插入就代表覆盖
|
||||
// @param jobType string 任务类型
|
||||
// @param uniTaskId string 任务唯一标识
|
||||
// @param delayTime time.Duration 延迟时间
|
||||
// @param attachData interface{} 附加数据
|
||||
func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, attachData interface{}) error {
|
||||
func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
|
||||
if delayTime.Abs() != delayTime {
|
||||
return fmt.Errorf("时间间隔不能为负数")
|
||||
}
|
||||
@@ -88,7 +87,7 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
|
||||
return fmt.Errorf("时间间隔不能为0")
|
||||
}
|
||||
|
||||
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId)
|
||||
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
||||
|
||||
ed := extendData{
|
||||
Delay: delayTime,
|
||||
@@ -102,7 +101,7 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
|
||||
return err
|
||||
}
|
||||
|
||||
// 吸入执行时间
|
||||
// 写入执行时间
|
||||
_, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{
|
||||
Score: float64(time.Now().Add(delayTime).UnixMilli()),
|
||||
Member: redisKey,
|
||||
@@ -111,9 +110,17 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
|
||||
return err
|
||||
}
|
||||
|
||||
// 添加任务(不覆盖)
|
||||
func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
|
||||
|
||||
// 判断有序集合Key是否存在,存在则报错,不存在则写入
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 删除任务
|
||||
func (w *Once) Del(jobType string, uniTaskId string) error {
|
||||
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId)
|
||||
func (w *Once) Delete(taskType string, taskId string) error {
|
||||
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
||||
|
||||
w.redis.Del(w.ctx, redisKey).Result()
|
||||
|
||||
@@ -122,6 +129,11 @@ func (w *Once) Del(jobType string, uniTaskId string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 获取任务
|
||||
func (l *Once) Get(taskType string, taskId string) {
|
||||
//
|
||||
}
|
||||
|
||||
// 获取任务
|
||||
func (w *Once) getTask() {
|
||||
timer := time.NewTicker(time.Millisecond * 200)
|
||||
@@ -153,47 +165,44 @@ func (w *Once) watch() {
|
||||
for {
|
||||
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
|
||||
if err != nil {
|
||||
fmt.Println("watch err:", err)
|
||||
// fmt.Println("watch err:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
go w.doTask(keys[1])
|
||||
ctx := context.WithValue(w.ctx, "trace_id", uuid.NewV4().String())
|
||||
|
||||
go w.doTask(ctx, keys[1])
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// 执行任务
|
||||
func (w *Once) doTask(key string) {
|
||||
func (l *Once) doTask(ctx context.Context, key string) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
fmt.Println("timer:回调任务panic", err)
|
||||
log.Println("errStack", string(debug.Stack()))
|
||||
l.logger.Errorf(ctx, "timer:回调任务panic:%s stack:%s", err, string(debug.Stack()))
|
||||
}
|
||||
}()
|
||||
|
||||
s := strings.Split(key, "[:]")
|
||||
|
||||
// 读取数据
|
||||
str, err := w.redis.Get(w.ctx, key).Result()
|
||||
str, err := l.redis.Get(ctx, key).Result()
|
||||
if err != nil {
|
||||
fmt.Println("execJob err:", err)
|
||||
l.logger.Errorf(ctx, "获取数据失败 err:%s", err)
|
||||
return
|
||||
}
|
||||
ed := extendData{}
|
||||
json.Unmarshal([]byte(str), &ed)
|
||||
|
||||
fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||
|
||||
ctx := context.WithValue(context.Background(), "trace_id", uuid.NewV4().String)
|
||||
|
||||
code, t := w.worker.Worker(ctx, s[0], s[1], ed.Data)
|
||||
code, t := l.worker.Worker(ctx, s[0], s[1], ed.Data)
|
||||
|
||||
if code == WorkerCodeAgain {
|
||||
// 重新放入队列
|
||||
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||
if t != 0 && t == t.Abs() {
|
||||
ed.Delay = t
|
||||
}
|
||||
w.Add(s[0], s[1], ed.Delay, ed.Data)
|
||||
l.logger.Infof(ctx, "任务重新放入队列:%s", key)
|
||||
l.Create(s[0], s[1], ed.Delay, ed.Data)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user