添加只执行一次的
This commit is contained in:
+27
-13
@@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -37,20 +38,42 @@ func once() {
|
|||||||
w := OnceWorker{}
|
w := OnceWorker{}
|
||||||
one := timerx.InitOnce(ctx, client, "test", w)
|
one := timerx.InitOnce(ctx, client, "test", w)
|
||||||
|
|
||||||
err := one.Save("test", "test", 1*time.Second, map[string]interface{}{})
|
d := OnceData{
|
||||||
|
Num: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := one.Create("test", "test", 1*time.Second, d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OnceData struct {
|
||||||
|
Num int
|
||||||
|
}
|
||||||
|
|
||||||
type OnceWorker struct{}
|
type OnceWorker struct{}
|
||||||
|
|
||||||
func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) (timerx.WorkerCode, time.Duration) {
|
func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *timerx.OnceWorkerResp {
|
||||||
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||||
fmt.Println(taskId, taskType)
|
fmt.Println(taskId, taskType)
|
||||||
fmt.Println(attachData)
|
fmt.Println(attachData)
|
||||||
return timerx.WorkerCodeAgain, time.Millisecond
|
|
||||||
|
d := OnceData{}
|
||||||
|
|
||||||
|
by, _ := json.Marshal(attachData)
|
||||||
|
json.Unmarshal(by, &d)
|
||||||
|
|
||||||
|
d.Num++
|
||||||
|
|
||||||
|
fmt.Println(d)
|
||||||
|
|
||||||
|
return &timerx.OnceWorkerResp{
|
||||||
|
Retry: true,
|
||||||
|
AttachData: d,
|
||||||
|
DelayTime: 1 * time.Second,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func cluster() {
|
func cluster() {
|
||||||
@@ -69,7 +92,7 @@ func cluster() {
|
|||||||
|
|
||||||
func worker() {
|
func worker() {
|
||||||
client := getRedis()
|
client := getRedis()
|
||||||
w := timerx.InitOnce(context.Background(), client, "test", &Worker{})
|
w := timerx.InitOnce(context.Background(), client, "test", &OnceWorker{})
|
||||||
w.Save("test", "test", 1*time.Second, map[string]interface{}{
|
w.Save("test", "test", 1*time.Second, map[string]interface{}{
|
||||||
"test": "test",
|
"test": "test",
|
||||||
})
|
})
|
||||||
@@ -89,15 +112,6 @@ func worker() {
|
|||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Worker struct{}
|
|
||||||
|
|
||||||
func (w *Worker) Worker(ctx context.Context, jobType string, uniqueKey string, data interface{}) (timerx.WorkerCode, time.Duration) {
|
|
||||||
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
|
||||||
fmt.Println(uniqueKey, jobType)
|
|
||||||
fmt.Println(data)
|
|
||||||
return timerx.WorkerCodeAgain, time.Second
|
|
||||||
}
|
|
||||||
|
|
||||||
func getRedis() *redis.Client {
|
func getRedis() *redis.Client {
|
||||||
client := redis.NewClient(&redis.Options{
|
client := redis.NewClient(&redis.Options{
|
||||||
Addr: "127.0.0.1" + ":" + "6379",
|
Addr: "127.0.0.1" + ":" + "6379",
|
||||||
|
|||||||
@@ -28,12 +28,11 @@ type Once struct {
|
|||||||
worker Callback
|
worker Callback
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkerCode int
|
type OnceWorkerResp struct {
|
||||||
|
Retry bool // 是否重试 true
|
||||||
const (
|
DelayTime time.Duration
|
||||||
WorkerCodeSuccess WorkerCode = 0 // 处理完成(不需要重入)
|
AttachData interface{}
|
||||||
WorkerCodeAgain WorkerCode = -1 // 需要继续定时,默认原来的时间
|
}
|
||||||
)
|
|
||||||
|
|
||||||
// 需要考虑执行失败重新放入队列的情况
|
// 需要考虑执行失败重新放入队列的情况
|
||||||
type Callback interface {
|
type Callback interface {
|
||||||
@@ -43,7 +42,7 @@ type Callback interface {
|
|||||||
// @param data interface{} 任务数据
|
// @param data interface{} 任务数据
|
||||||
// @return WorkerCode 任务执行结果
|
// @return WorkerCode 任务执行结果
|
||||||
// @return time.Duration 任务执行时间间隔
|
// @return time.Duration 任务执行时间间隔
|
||||||
Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) (WorkerCode, time.Duration)
|
Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp
|
||||||
}
|
}
|
||||||
|
|
||||||
var wo *Once = nil
|
var wo *Once = nil
|
||||||
@@ -217,13 +216,17 @@ func (l *Once) doTask(ctx context.Context, key string) {
|
|||||||
ed := extendData{}
|
ed := extendData{}
|
||||||
json.Unmarshal([]byte(str), &ed)
|
json.Unmarshal([]byte(str), &ed)
|
||||||
|
|
||||||
code, t := l.worker.Worker(ctx, s[0], s[1], ed.Data)
|
resp := l.worker.Worker(ctx, s[0], s[1], ed.Data)
|
||||||
|
if resp == nil {
|
||||||
if code == WorkerCodeAgain {
|
return
|
||||||
// 重新放入队列
|
|
||||||
if t != 0 && t == t.Abs() {
|
|
||||||
ed.Delay = t
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if resp.Retry {
|
||||||
|
// 重新放入队列
|
||||||
|
if resp.DelayTime != 0 && resp.DelayTime == resp.DelayTime.Abs() {
|
||||||
|
ed.Delay = resp.DelayTime
|
||||||
|
}
|
||||||
|
ed.Data = resp.AttachData
|
||||||
l.logger.Infof(ctx, "任务重新放入队列:%s", key)
|
l.logger.Infof(ctx, "任务重新放入队列:%s", key)
|
||||||
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
|
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||||
l.Create(s[0], s[1], ed.Delay, ed.Data)
|
l.Create(s[0], s[1], ed.Delay, ed.Data)
|
||||||
|
|||||||
Reference in New Issue
Block a user