单次执行的任务类型定义方法
This commit is contained in:
@@ -38,6 +38,8 @@ type OnceWorkerResp struct {
|
|||||||
AttachData interface{}
|
AttachData interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OnceTaskType string
|
||||||
|
|
||||||
// 需要考虑执行失败重新放入队列的情况
|
// 需要考虑执行失败重新放入队列的情况
|
||||||
type Callback interface {
|
type Callback interface {
|
||||||
// 任务执行
|
// 任务执行
|
||||||
@@ -91,7 +93,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
|
|||||||
// @param uniTaskId string 任务唯一标识
|
// @param uniTaskId string 任务唯一标识
|
||||||
// @param delayTime time.Duration 延迟时间
|
// @param delayTime time.Duration 延迟时间
|
||||||
// @param attachData interface{} 附加数据
|
// @param attachData interface{} 附加数据
|
||||||
func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
|
func (w *Once) Save(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error {
|
||||||
if delayTime.Abs() != delayTime {
|
if delayTime.Abs() != delayTime {
|
||||||
return fmt.Errorf("时间间隔不能为负数")
|
return fmt.Errorf("时间间隔不能为负数")
|
||||||
}
|
}
|
||||||
@@ -123,7 +125,7 @@ func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, att
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 添加任务(不覆盖)
|
// 添加任务(不覆盖)
|
||||||
func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
|
func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error {
|
||||||
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
||||||
// 判断有序集合Key是否存在,存在则报错,不存在则写入
|
// 判断有序集合Key是否存在,存在则报错,不存在则写入
|
||||||
if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 {
|
if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 {
|
||||||
@@ -152,7 +154,7 @@ func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, a
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 删除任务
|
// 删除任务
|
||||||
func (w *Once) Delete(taskType string, taskId string) error {
|
func (w *Once) Delete(taskType OnceTaskType, taskId string) error {
|
||||||
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
|
||||||
|
|
||||||
w.redis.Del(w.ctx, redisKey).Result()
|
w.redis.Del(w.ctx, redisKey).Result()
|
||||||
@@ -163,7 +165,7 @@ func (w *Once) Delete(taskType string, taskId string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 获取任务
|
// 获取任务
|
||||||
func (l *Once) Get(taskType string, taskId string) {
|
func (l *Once) Get(taskType OnceTaskType, taskId string) {
|
||||||
//
|
//
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -253,6 +255,6 @@ func (l *Once) doTask(ctx context.Context, key string) {
|
|||||||
}
|
}
|
||||||
ed.Data = resp.AttachData
|
ed.Data = resp.AttachData
|
||||||
l.logger.Infof(ctx, "任务重新放入队列:%s", key)
|
l.logger.Infof(ctx, "任务重新放入队列:%s", key)
|
||||||
l.Create(s[0], s[1], ed.Delay, ed.Data)
|
l.Create(OnceTaskType(s[0]), s[1], ed.Delay, ed.Data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user