1 Commits

Author SHA1 Message Date
Yun 1ac53f7688 修改附加内容的字段为字节 2024-10-10 10:05:28 +08:00
2 changed files with 30 additions and 28 deletions
+25 -23
View File
@@ -41,8 +41,9 @@ func once() {
d := OnceData{ d := OnceData{
Num: 1, Num: 1,
} }
dy, _ := json.Marshal(d)
err := one.Create("test", "test", 1*time.Second, d) err := one.Create("test", "test", 1*time.Second, dy)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
@@ -55,23 +56,24 @@ type OnceData struct {
type OnceWorker struct{} type OnceWorker struct{}
func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *timerx.OnceWorkerResp { func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData []byte) *timerx.OnceWorkerResp {
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(taskId, taskType) fmt.Println(taskId, taskType)
fmt.Println(attachData) fmt.Printf("原来的参数:%s\n", string(attachData))
d := OnceData{} d := OnceData{}
by, _ := json.Marshal(attachData) json.Unmarshal(attachData, &d)
json.Unmarshal(by, &d)
d.Num++ d.Num++
fmt.Println(d) fmt.Println(d)
dy, _ := json.Marshal(d)
return &timerx.OnceWorkerResp{ return &timerx.OnceWorkerResp{
Retry: true, Retry: true,
AttachData: d, AttachData: dy,
DelayTime: 1 * time.Second, DelayTime: 1 * time.Second,
} }
} }
@@ -91,23 +93,23 @@ func cluster() {
} }
func worker() { func worker() {
client := getRedis() // client := getRedis()
w := timerx.InitOnce(context.Background(), client, "test", &OnceWorker{}) // w := timerx.InitOnce(context.Background(), client, "test", &OnceWorker{})
w.Save("test", "test", 1*time.Second, map[string]interface{}{ // w.Save("test", "test", 1*time.Second, map[string]interface{}{
"test": "test", // "test": "test",
}) // })
w.Save("test2", "test", 1*time.Second, map[string]interface{}{ // w.Save("test2", "test", 1*time.Second, map[string]interface{}{
"test": "test", // "test": "test",
}) // })
w.Save("test3", "test", 1*time.Second, map[string]interface{}{ // w.Save("test3", "test", 1*time.Second, map[string]interface{}{
"test": "test", // "test": "test",
}) // })
w.Save("test4", "test", 1*time.Second, map[string]interface{}{ // w.Save("test4", "test", 1*time.Second, map[string]interface{}{
"test": "test", // "test": "test",
}) // })
w.Save("test5", "test", 1*time.Second, map[string]interface{}{ // w.Save("test5", "test", 1*time.Second, map[string]interface{}{
"test": "test", // "test": "test",
}) // })
select {} select {}
} }
+5 -5
View File
@@ -31,7 +31,7 @@ type Once struct {
type OnceWorkerResp struct { type OnceWorkerResp struct {
Retry bool // 是否重试 true Retry bool // 是否重试 true
DelayTime time.Duration DelayTime time.Duration
AttachData interface{} AttachData []byte
} }
// 需要考虑执行失败重新放入队列的情况 // 需要考虑执行失败重新放入队列的情况
@@ -42,7 +42,7 @@ type Callback interface {
// @param data interface{} 任务数据 // @param data interface{} 任务数据
// @return WorkerCode 任务执行结果 // @return WorkerCode 任务执行结果
// @return time.Duration 任务执行时间间隔 // @return time.Duration 任务执行时间间隔
Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp Worker(ctx context.Context, taskType string, taskId string, attachData []byte) *OnceWorkerResp
} }
var wo *Once = nil var wo *Once = nil
@@ -50,7 +50,7 @@ var once sync.Once
type extendData struct { type extendData struct {
Delay time.Duration Delay time.Duration
Data interface{} Data []byte
} }
// 初始化 // 初始化
@@ -78,7 +78,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
// @param uniTaskId string 任务唯一标识 // @param uniTaskId string 任务唯一标识
// @param delayTime time.Duration 延迟时间 // @param delayTime time.Duration 延迟时间
// @param attachData interface{} 附加数据 // @param attachData interface{} 附加数据
func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData []byte) error {
if delayTime.Abs() != delayTime { if delayTime.Abs() != delayTime {
return fmt.Errorf("时间间隔不能为负数") return fmt.Errorf("时间间隔不能为负数")
} }
@@ -110,7 +110,7 @@ func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, att
} }
// 添加任务(不覆盖) // 添加任务(不覆盖)
func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData []byte) error {
// 判断有序集合Key是否存在,存在则报错,不存在则写入 // 判断有序集合Key是否存在,存在则报错,不存在则写入
if l.redis.Exists(l.ctx, l.zsetKey).Val() == 0 { if l.redis.Exists(l.ctx, l.zsetKey).Val() == 0 {