3 Commits

Author SHA1 Message Date
Yun 4ee3c5e1c2 添加只执行一次的 2024-10-09 19:37:13 +08:00
Yun fa8e3737fa 执行一次的调试 2024-10-09 17:03:54 +08:00
yun cf3e751afe 部分代码尚未完全 2024-07-02 21:18:16 +08:00
2 changed files with 120 additions and 48 deletions
+53 -16
View File
@@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"time" "time"
@@ -24,12 +25,57 @@ func main() {
// re() // re()
// d() // d()
cluster() // cluster()
once()
select {} select {}
} }
func once() {
client := getRedis()
ctx := context.Background()
w := OnceWorker{}
one := timerx.InitOnce(ctx, client, "test", w)
d := OnceData{
Num: 1,
}
err := one.Create("test", "test", 1*time.Second, d)
if err != nil {
fmt.Println(err)
}
}
type OnceData struct {
Num int
}
type OnceWorker struct{}
func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *timerx.OnceWorkerResp {
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(taskId, taskType)
fmt.Println(attachData)
d := OnceData{}
by, _ := json.Marshal(attachData)
json.Unmarshal(by, &d)
d.Num++
fmt.Println(d)
return &timerx.OnceWorkerResp{
Retry: true,
AttachData: d,
DelayTime: 1 * time.Second,
}
}
func cluster() { func cluster() {
client := getRedis() client := getRedis()
ctx := context.Background() ctx := context.Background()
@@ -46,35 +92,26 @@ func cluster() {
func worker() { func worker() {
client := getRedis() client := getRedis()
w := timerx.InitOnce(context.Background(), client, "test", &Worker{}) w := timerx.InitOnce(context.Background(), client, "test", &OnceWorker{})
w.Add("test", "test", 1*time.Second, map[string]interface{}{ w.Save("test", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
w.Add("test2", "test", 1*time.Second, map[string]interface{}{ w.Save("test2", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
w.Add("test3", "test", 1*time.Second, map[string]interface{}{ w.Save("test3", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
w.Add("test4", "test", 1*time.Second, map[string]interface{}{ w.Save("test4", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
w.Add("test5", "test", 1*time.Second, map[string]interface{}{ w.Save("test5", "test", 1*time.Second, map[string]interface{}{
"test": "test", "test": "test",
}) })
select {} select {}
} }
type Worker struct{}
func (w *Worker) Worker(ctx context.Context, jobType string, uniqueKey string, data interface{}) (timerx.WorkerCode, time.Duration) {
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(uniqueKey, jobType)
fmt.Println(data)
return timerx.WorkerCodeAgain, time.Second
}
func getRedis() *redis.Client { func getRedis() *redis.Client {
client := redis.NewClient(&redis.Options{ client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1" + ":" + "6379", Addr: "127.0.0.1" + ":" + "6379",
+67 -32
View File
@@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"runtime/debug" "runtime/debug"
"strings" "strings"
"sync" "sync"
@@ -29,12 +28,11 @@ type Once struct {
worker Callback worker Callback
} }
type WorkerCode int type OnceWorkerResp struct {
Retry bool // 是否重试 true
const ( DelayTime time.Duration
WorkerCodeSuccess WorkerCode = 0 // 处理完成(不需要重入) AttachData interface{}
WorkerCodeAgain WorkerCode = -1 // 需要继续定时,默认原来的时间 }
)
// 需要考虑执行失败重新放入队列的情况 // 需要考虑执行失败重新放入队列的情况
type Callback interface { type Callback interface {
@@ -44,7 +42,7 @@ type Callback interface {
// @param data interface{} 任务数据 // @param data interface{} 任务数据
// @return WorkerCode 任务执行结果 // @return WorkerCode 任务执行结果
// @return time.Duration 任务执行时间间隔 // @return time.Duration 任务执行时间间隔
Worker(ctx context.Context, jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp
} }
var wo *Once = nil var wo *Once = nil
@@ -56,7 +54,7 @@ type extendData struct {
} }
// 初始化 // 初始化
func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Callback, opts ...Option) *Once { func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) *Once {
op := newOptions(opts...) op := newOptions(opts...)
once.Do(func() { once.Do(func() {
wo = &Once{ wo = &Once{
@@ -74,13 +72,13 @@ func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Call
return wo return wo
} }
// 添加任务 // 添加任务(覆盖)
// 重复插入就代表覆盖 // 重复插入就代表覆盖
// @param jobType string 任务类型 // @param jobType string 任务类型
// @param uniTaskId string 任务唯一标识 // @param uniTaskId string 任务唯一标识
// @param delayTime time.Duration 延迟时间 // @param delayTime time.Duration 延迟时间
// @param attachData interface{} 附加数据 // @param attachData interface{} 附加数据
func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, attachData interface{}) error { func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
if delayTime.Abs() != delayTime { if delayTime.Abs() != delayTime {
return fmt.Errorf("时间间隔不能为负数") return fmt.Errorf("时间间隔不能为负数")
} }
@@ -88,7 +86,7 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
return fmt.Errorf("时间间隔不能为0") return fmt.Errorf("时间间隔不能为0")
} }
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId) redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
ed := extendData{ ed := extendData{
Delay: delayTime, Delay: delayTime,
@@ -102,7 +100,7 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
return err return err
} }
// 入执行时间 // 入执行时间
_, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{ _, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{
Score: float64(time.Now().Add(delayTime).UnixMilli()), Score: float64(time.Now().Add(delayTime).UnixMilli()),
Member: redisKey, Member: redisKey,
@@ -111,9 +109,38 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
return err return err
} }
// 添加任务(不覆盖)
func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
// 判断有序集合Key是否存在,存在则报错,不存在则写入
if l.redis.Exists(l.ctx, l.zsetKey).Val() == 0 {
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
ed := extendData{
Delay: delayTime,
Data: attachData,
}
b, _ := json.Marshal(ed)
// 写入附加数据
_, err := l.redis.SetEX(l.ctx, redisKey, b, delayTime+time.Second*5).Result()
if err != nil {
return err
}
_, err = l.redis.ZAdd(l.ctx, l.zsetKey, &redis.Z{
Score: float64(time.Now().Add(delayTime).UnixMilli()),
Member: redisKey,
}).Result()
if err != nil {
return err
}
}
return nil
}
// 删除任务 // 删除任务
func (w *Once) Del(jobType string, uniTaskId string) error { func (w *Once) Delete(taskType string, taskId string) error {
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId) redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
w.redis.Del(w.ctx, redisKey).Result() w.redis.Del(w.ctx, redisKey).Result()
@@ -122,6 +149,11 @@ func (w *Once) Del(jobType string, uniTaskId string) error {
return nil return nil
} }
// 获取任务
func (l *Once) Get(taskType string, taskId string) {
//
}
// 获取任务 // 获取任务
func (w *Once) getTask() { func (w *Once) getTask() {
timer := time.NewTicker(time.Millisecond * 200) timer := time.NewTicker(time.Millisecond * 200)
@@ -153,47 +185,50 @@ func (w *Once) watch() {
for { for {
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result() keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
if err != nil { if err != nil {
fmt.Println("watch err:", err) // fmt.Println("watch err:", err)
continue continue
} }
go w.doTask(keys[1]) ctx := context.WithValue(w.ctx, "trace_id", uuid.NewV4().String())
go w.doTask(ctx, keys[1])
} }
} }
// 执行任务 // 执行任务
func (w *Once) doTask(key string) { func (l *Once) doTask(ctx context.Context, key string) {
fmt.Println("任务时间:", time.Now().Format("2006-01-02 15:04:05"))
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
fmt.Println("timer:回调任务panic", err) l.logger.Errorf(ctx, "timer:回调任务panic:%s stack:%s", err, string(debug.Stack()))
log.Println("errStack", string(debug.Stack()))
} }
}() }()
s := strings.Split(key, "[:]") s := strings.Split(key, "[:]")
// 读取数据 // 读取数据
str, err := w.redis.Get(w.ctx, key).Result() str, err := l.redis.Get(ctx, key).Result()
if err != nil { if err != nil {
fmt.Println("execJob err:", err) l.logger.Errorf(ctx, "获取数据失败 err:%s", err)
return return
} }
ed := extendData{} ed := extendData{}
json.Unmarshal([]byte(str), &ed) json.Unmarshal([]byte(str), &ed)
fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05")) resp := l.worker.Worker(ctx, s[0], s[1], ed.Data)
if resp == nil {
return
}
ctx := context.WithValue(context.Background(), "trace_id", uuid.NewV4().String) if resp.Retry {
code, t := w.worker.Worker(ctx, s[0], s[1], ed.Data)
if code == WorkerCodeAgain {
// 重新放入队列 // 重新放入队列
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05")) if resp.DelayTime != 0 && resp.DelayTime == resp.DelayTime.Abs() {
if t != 0 && t == t.Abs() { ed.Delay = resp.DelayTime
ed.Delay = t
} }
w.Add(s[0], s[1], ed.Delay, ed.Data) ed.Data = resp.AttachData
l.logger.Infof(ctx, "任务重新放入队列:%s", key)
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
l.Create(s[0], s[1], ed.Delay, ed.Data)
} }
} }