优化once
This commit is contained in:
@@ -24,7 +24,7 @@ type Once struct {
|
|||||||
logger Logger
|
logger Logger
|
||||||
zsetKey string
|
zsetKey string
|
||||||
listKey string
|
listKey string
|
||||||
redis *redis.Client
|
redis redis.UniversalClient
|
||||||
worker Callback
|
worker Callback
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -38,10 +38,12 @@ const (
|
|||||||
// 需要考虑执行失败重新放入队列的情况
|
// 需要考虑执行失败重新放入队列的情况
|
||||||
type Callback interface {
|
type Callback interface {
|
||||||
// 任务执行
|
// 任务执行
|
||||||
// uniqueKey: 任务唯一标识
|
// @param jobType string 任务类型
|
||||||
// jobType: 任务类型,用于区分任务
|
// @param uniTaskId string 任务唯一标识
|
||||||
// data: 任务数据
|
// @param data interface{} 任务数据
|
||||||
Worker(jobType string, uniqueKey string, data interface{}) (WorkerCode, time.Duration)
|
// @return WorkerCode 任务执行结果
|
||||||
|
// @return time.Duration 任务执行时间间隔
|
||||||
|
Worker(jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration)
|
||||||
}
|
}
|
||||||
|
|
||||||
var wo *Once = nil
|
var wo *Once = nil
|
||||||
@@ -73,7 +75,11 @@ func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Call
|
|||||||
|
|
||||||
// 添加任务
|
// 添加任务
|
||||||
// 重复插入就代表覆盖
|
// 重复插入就代表覆盖
|
||||||
func (w *Once) Add(jobType string, uniqueKey string, delayTime time.Duration, data interface{}) error {
|
// @param jobType string 任务类型
|
||||||
|
// @param uniTaskId string 任务唯一标识
|
||||||
|
// @param delayTime time.Duration 延迟时间
|
||||||
|
// @param attachData interface{} 附加数据
|
||||||
|
func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, attachData interface{}) error {
|
||||||
if delayTime.Abs() != delayTime {
|
if delayTime.Abs() != delayTime {
|
||||||
return fmt.Errorf("时间间隔不能为负数")
|
return fmt.Errorf("时间间隔不能为负数")
|
||||||
}
|
}
|
||||||
@@ -81,19 +87,21 @@ func (w *Once) Add(jobType string, uniqueKey string, delayTime time.Duration, da
|
|||||||
return fmt.Errorf("时间间隔不能为0")
|
return fmt.Errorf("时间间隔不能为0")
|
||||||
}
|
}
|
||||||
|
|
||||||
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniqueKey)
|
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId)
|
||||||
|
|
||||||
ed := extendData{
|
ed := extendData{
|
||||||
Delay: delayTime,
|
Delay: delayTime,
|
||||||
Data: data,
|
Data: attachData,
|
||||||
}
|
}
|
||||||
b, _ := json.Marshal(ed)
|
b, _ := json.Marshal(ed)
|
||||||
|
|
||||||
|
// 写入附加数据
|
||||||
_, err := w.redis.SetEX(w.ctx, redisKey, b, delayTime+time.Second*5).Result()
|
_, err := w.redis.SetEX(w.ctx, redisKey, b, delayTime+time.Second*5).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 吸入执行时间
|
||||||
_, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{
|
_, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{
|
||||||
Score: float64(time.Now().Add(delayTime).UnixMilli()),
|
Score: float64(time.Now().Add(delayTime).UnixMilli()),
|
||||||
Member: redisKey,
|
Member: redisKey,
|
||||||
@@ -103,8 +111,8 @@ func (w *Once) Add(jobType string, uniqueKey string, delayTime time.Duration, da
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 删除任务
|
// 删除任务
|
||||||
func (w *Once) Del(jobType string, uniqueKey string) error {
|
func (w *Once) Del(jobType string, uniTaskId string) error {
|
||||||
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniqueKey)
|
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId)
|
||||||
|
|
||||||
w.redis.Del(w.ctx, redisKey).Result()
|
w.redis.Del(w.ctx, redisKey).Result()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user