diff --git a/once.go b/once.go index 9a83c50..5acb887 100644 --- a/once.go +++ b/once.go @@ -38,6 +38,8 @@ type OnceWorkerResp struct { AttachData interface{} } +type OnceTaskType string + // 需要考虑执行失败重新放入队列的情况 type Callback interface { // 任务执行 @@ -91,7 +93,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c // @param uniTaskId string 任务唯一标识 // @param delayTime time.Duration 延迟时间 // @param attachData interface{} 附加数据 -func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { +func (w *Once) Save(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { if delayTime.Abs() != delayTime { return fmt.Errorf("时间间隔不能为负数") } @@ -123,7 +125,7 @@ func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, att } // 添加任务(不覆盖) -func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error { +func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) // 判断有序集合Key是否存在,存在则报错,不存在则写入 if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 { @@ -152,7 +154,7 @@ func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, a } // 删除任务 -func (w *Once) Delete(taskType string, taskId string) error { +func (w *Once) Delete(taskType OnceTaskType, taskId string) error { redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) w.redis.Del(w.ctx, redisKey).Result() @@ -163,7 +165,7 @@ func (w *Once) Delete(taskType string, taskId string) error { } // 获取任务 -func (l *Once) Get(taskType string, taskId string) { +func (l *Once) Get(taskType OnceTaskType, taskId string) { // } @@ -253,6 +255,6 @@ func (l *Once) doTask(ctx context.Context, key string) { } ed.Data = resp.AttachData l.logger.Infof(ctx, "任务重新放入队列:%s", key) - l.Create(s[0], s[1], ed.Delay, ed.Data) + l.Create(OnceTaskType(s[0]), s[1], ed.Delay, ed.Data) } }