Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1ac53f7688 | |||
| 4ee3c5e1c2 | |||
| fa8e3737fa | |||
| cf3e751afe |
+66
-27
@@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -24,12 +25,59 @@ func main() {
|
|||||||
|
|
||||||
// re()
|
// re()
|
||||||
// d()
|
// d()
|
||||||
cluster()
|
// cluster()
|
||||||
|
once()
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func once() {
|
||||||
|
client := getRedis()
|
||||||
|
ctx := context.Background()
|
||||||
|
w := OnceWorker{}
|
||||||
|
one := timerx.InitOnce(ctx, client, "test", w)
|
||||||
|
|
||||||
|
d := OnceData{
|
||||||
|
Num: 1,
|
||||||
|
}
|
||||||
|
dy, _ := json.Marshal(d)
|
||||||
|
|
||||||
|
err := one.Create("test", "test", 1*time.Second, dy)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
type OnceData struct {
|
||||||
|
Num int
|
||||||
|
}
|
||||||
|
|
||||||
|
type OnceWorker struct{}
|
||||||
|
|
||||||
|
func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData []byte) *timerx.OnceWorkerResp {
|
||||||
|
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||||
|
fmt.Println(taskId, taskType)
|
||||||
|
fmt.Printf("原来的参数:%s\n", string(attachData))
|
||||||
|
|
||||||
|
d := OnceData{}
|
||||||
|
|
||||||
|
json.Unmarshal(attachData, &d)
|
||||||
|
|
||||||
|
d.Num++
|
||||||
|
|
||||||
|
fmt.Println(d)
|
||||||
|
|
||||||
|
dy, _ := json.Marshal(d)
|
||||||
|
|
||||||
|
return &timerx.OnceWorkerResp{
|
||||||
|
Retry: true,
|
||||||
|
AttachData: dy,
|
||||||
|
DelayTime: 1 * time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func cluster() {
|
func cluster() {
|
||||||
client := getRedis()
|
client := getRedis()
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
@@ -45,36 +93,27 @@ func cluster() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func worker() {
|
func worker() {
|
||||||
client := getRedis()
|
// client := getRedis()
|
||||||
w := timerx.InitOnce(context.Background(), client, "test", &Worker{})
|
// w := timerx.InitOnce(context.Background(), client, "test", &OnceWorker{})
|
||||||
w.Add("test", "test", 1*time.Second, map[string]interface{}{
|
// w.Save("test", "test", 1*time.Second, map[string]interface{}{
|
||||||
"test": "test",
|
// "test": "test",
|
||||||
})
|
// })
|
||||||
w.Add("test2", "test", 1*time.Second, map[string]interface{}{
|
// w.Save("test2", "test", 1*time.Second, map[string]interface{}{
|
||||||
"test": "test",
|
// "test": "test",
|
||||||
})
|
// })
|
||||||
w.Add("test3", "test", 1*time.Second, map[string]interface{}{
|
// w.Save("test3", "test", 1*time.Second, map[string]interface{}{
|
||||||
"test": "test",
|
// "test": "test",
|
||||||
})
|
// })
|
||||||
w.Add("test4", "test", 1*time.Second, map[string]interface{}{
|
// w.Save("test4", "test", 1*time.Second, map[string]interface{}{
|
||||||
"test": "test",
|
// "test": "test",
|
||||||
})
|
// })
|
||||||
w.Add("test5", "test", 1*time.Second, map[string]interface{}{
|
// w.Save("test5", "test", 1*time.Second, map[string]interface{}{
|
||||||
"test": "test",
|
// "test": "test",
|
||||||
})
|
// })
|
||||||
|
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Worker struct{}
|
|
||||||
|
|
||||||
func (w *Worker) Worker(ctx context.Context, jobType string, uniqueKey string, data interface{}) (timerx.WorkerCode, time.Duration) {
|
|
||||||
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
|
||||||
fmt.Println(uniqueKey, jobType)
|
|
||||||
fmt.Println(data)
|
|
||||||
return timerx.WorkerCodeAgain, time.Second
|
|
||||||
}
|
|
||||||
|
|
||||||
func getRedis() *redis.Client {
|
func getRedis() *redis.Client {
|
||||||
client := redis.NewClient(&redis.Options{
|
client := redis.NewClient(&redis.Options{
|
||||||
Addr: "127.0.0.1" + ":" + "6379",
|
Addr: "127.0.0.1" + ":" + "6379",
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -29,12 +28,11 @@ type Once struct {
|
|||||||
worker Callback
|
worker Callback
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkerCode int
|
type OnceWorkerResp struct {
|
||||||
|
Retry bool // 是否重试 true
|
||||||
const (
|
DelayTime time.Duration
|
||||||
WorkerCodeSuccess WorkerCode = 0 // 处理完成(不需要重入)
|
AttachData []byte
|
||||||
WorkerCodeAgain WorkerCode = -1 // 需要继续定时,默认原来的时间
|
}
|
||||||
)
|
|
||||||
|
|
||||||
// 需要考虑执行失败重新放入队列的情况
|
// 需要考虑执行失败重新放入队列的情况
|
||||||
type Callback interface {
|
type Callback interface {
|
||||||
@@ -44,7 +42,7 @@ type Callback interface {
|
|||||||
// @param data interface{} 任务数据
|
// @param data interface{} 任务数据
|
||||||
// @return WorkerCode 任务执行结果
|
// @return WorkerCode 任务执行结果
|
||||||
// @return time.Duration 任务执行时间间隔
|
// @return time.Duration 任务执行时间间隔
|
||||||
Worker(ctx context.Context, jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration)
|
Worker(ctx context.Context, taskType string, taskId string, attachData []byte) *OnceWorkerResp
|
||||||
}
|
}
|
||||||
|
|
||||||
var wo *Once = nil
|
var wo *Once = nil
|
||||||
@@ -52,11 +50,11 @@ var once sync.Once
|
|||||||
|
|
||||||
type extendData struct {
|
type extendData struct {
|
||||||
Delay time.Duration
|
Delay time.Duration
|
||||||
Data interface{}
|
Data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// 初始化
|
// 初始化
|
||||||
func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Callback, opts ...Option) *Once {
|
func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) *Once {
|
||||||
op := newOptions(opts...)
|
op := newOptions(opts...)
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
wo = &Once{
|
wo = &Once{
|
||||||
@@ -74,13 +72,13 @@ func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Call
|
|||||||
return wo
|
return wo
|
||||||
}
|
}
|
||||||
|
|
||||||
// 添加任务
|
// 添加任务(覆盖)
|
||||||
// 重复插入就代表覆盖
|
// 重复插入就代表覆盖
|
||||||
// @param jobType string 任务类型
|
// @param jobType string 任务类型
|
||||||
// @param uniTaskId string 任务唯一标识
|
// @param uniTaskId string 任务唯一标识
|
||||||
// @param delayTime time.Duration 延迟时间
|
// @param delayTime time.Duration 延迟时间
|
||||||
// @param attachData interface{} 附加数据
|
// @param attachData interface{} 附加数据
|
||||||
func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, attachData interface{}) error {
|
func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData []byte) error {
|
||||||
if delayTime.Abs() != delayTime {
|
if delayTime.Abs() != delayTime {
|
||||||
return fmt.Errorf("时间间隔不能为负数")
|
return fmt.Errorf("时间间隔不能为负数")
|
||||||
}
|
}
|
||||||
@@ -88,7 +86,7 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
|
|||||||
return fmt.Errorf("时间间隔不能为0")
|
return fmt.Errorf("时间间隔不能为0")
|
||||||
}
|
}
|
||||||
|
|
||||||
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId)
|
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
||||||
|
|
||||||
ed := extendData{
|
ed := extendData{
|
||||||
Delay: delayTime,
|
Delay: delayTime,
|
||||||
@@ -102,7 +100,7 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 吸入执行时间
|
// 写入执行时间
|
||||||
_, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{
|
_, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{
|
||||||
Score: float64(time.Now().Add(delayTime).UnixMilli()),
|
Score: float64(time.Now().Add(delayTime).UnixMilli()),
|
||||||
Member: redisKey,
|
Member: redisKey,
|
||||||
@@ -111,9 +109,38 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 添加任务(不覆盖)
|
||||||
|
func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData []byte) error {
|
||||||
|
|
||||||
|
// 判断有序集合Key是否存在,存在则报错,不存在则写入
|
||||||
|
if l.redis.Exists(l.ctx, l.zsetKey).Val() == 0 {
|
||||||
|
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
||||||
|
ed := extendData{
|
||||||
|
Delay: delayTime,
|
||||||
|
Data: attachData,
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(ed)
|
||||||
|
|
||||||
|
// 写入附加数据
|
||||||
|
_, err := l.redis.SetEX(l.ctx, redisKey, b, delayTime+time.Second*5).Result()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = l.redis.ZAdd(l.ctx, l.zsetKey, &redis.Z{
|
||||||
|
Score: float64(time.Now().Add(delayTime).UnixMilli()),
|
||||||
|
Member: redisKey,
|
||||||
|
}).Result()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// 删除任务
|
// 删除任务
|
||||||
func (w *Once) Del(jobType string, uniTaskId string) error {
|
func (w *Once) Delete(taskType string, taskId string) error {
|
||||||
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId)
|
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
||||||
|
|
||||||
w.redis.Del(w.ctx, redisKey).Result()
|
w.redis.Del(w.ctx, redisKey).Result()
|
||||||
|
|
||||||
@@ -122,6 +149,11 @@ func (w *Once) Del(jobType string, uniTaskId string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 获取任务
|
||||||
|
func (l *Once) Get(taskType string, taskId string) {
|
||||||
|
//
|
||||||
|
}
|
||||||
|
|
||||||
// 获取任务
|
// 获取任务
|
||||||
func (w *Once) getTask() {
|
func (w *Once) getTask() {
|
||||||
timer := time.NewTicker(time.Millisecond * 200)
|
timer := time.NewTicker(time.Millisecond * 200)
|
||||||
@@ -153,47 +185,50 @@ func (w *Once) watch() {
|
|||||||
for {
|
for {
|
||||||
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
|
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("watch err:", err)
|
// fmt.Println("watch err:", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
go w.doTask(keys[1])
|
ctx := context.WithValue(w.ctx, "trace_id", uuid.NewV4().String())
|
||||||
|
|
||||||
|
go w.doTask(ctx, keys[1])
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 执行任务
|
// 执行任务
|
||||||
func (w *Once) doTask(key string) {
|
func (l *Once) doTask(ctx context.Context, key string) {
|
||||||
|
fmt.Println("任务时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := recover(); err != nil {
|
if err := recover(); err != nil {
|
||||||
fmt.Println("timer:回调任务panic", err)
|
l.logger.Errorf(ctx, "timer:回调任务panic:%s stack:%s", err, string(debug.Stack()))
|
||||||
log.Println("errStack", string(debug.Stack()))
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
s := strings.Split(key, "[:]")
|
s := strings.Split(key, "[:]")
|
||||||
|
|
||||||
// 读取数据
|
// 读取数据
|
||||||
str, err := w.redis.Get(w.ctx, key).Result()
|
str, err := l.redis.Get(ctx, key).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("execJob err:", err)
|
l.logger.Errorf(ctx, "获取数据失败 err:%s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ed := extendData{}
|
ed := extendData{}
|
||||||
json.Unmarshal([]byte(str), &ed)
|
json.Unmarshal([]byte(str), &ed)
|
||||||
|
|
||||||
fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05"))
|
resp := l.worker.Worker(ctx, s[0], s[1], ed.Data)
|
||||||
|
if resp == nil {
|
||||||
ctx := context.WithValue(context.Background(), "trace_id", uuid.NewV4().String)
|
return
|
||||||
|
|
||||||
code, t := w.worker.Worker(ctx, s[0], s[1], ed.Data)
|
|
||||||
|
|
||||||
if code == WorkerCodeAgain {
|
|
||||||
// 重新放入队列
|
|
||||||
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
|
|
||||||
if t != 0 && t == t.Abs() {
|
|
||||||
ed.Delay = t
|
|
||||||
}
|
}
|
||||||
w.Add(s[0], s[1], ed.Delay, ed.Data)
|
|
||||||
|
if resp.Retry {
|
||||||
|
// 重新放入队列
|
||||||
|
if resp.DelayTime != 0 && resp.DelayTime == resp.DelayTime.Abs() {
|
||||||
|
ed.Delay = resp.DelayTime
|
||||||
|
}
|
||||||
|
ed.Data = resp.AttachData
|
||||||
|
l.logger.Infof(ctx, "任务重新放入队列:%s", key)
|
||||||
|
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||||
|
l.Create(s[0], s[1], ed.Delay, ed.Data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user