修改Option与once的实现

This commit is contained in:
yun
2025-10-10 21:03:00 +08:00
parent bae669a1d9
commit 2c762dee2a
5 changed files with 148 additions and 59 deletions
+3 -4
View File
@@ -370,10 +370,7 @@ func (l *Cluster) Cron(ctx context.Context, taskId string, cronExpression string
// 获取当天的零点时间 // 获取当天的零点时间
zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location()) zeroTime := time.Date(nowTime.Year(), nowTime.Month(), nowTime.Day(), 0, 0, 0, 0, nowTime.Location())
options := Options{} options := newEmptyOptions(opt...)
for _, o := range opt {
o(&options)
}
cronParser := l.cronParser cronParser := l.cronParser
if options.cronParser != nil { if options.cronParser != nil {
cronParser = options.cronParser cronParser = options.cronParser
@@ -544,6 +541,8 @@ func (c *Cluster) executeTasks() {
} }
if len(taskID) < 2 { if len(taskID) < 2 {
c.logger.Errorf(c.ctx, "Invalid BLPop result: %v", taskID)
// 数据异常,继续下一个
continue continue
} }
+8
View File
@@ -33,4 +33,12 @@ var (
ErrCronExpression = errors.New("cron expression error") ErrCronExpression = errors.New("cron expression error")
// ErrCronParser 错误 // ErrCronParser 错误
ErrCronParser = errors.New("cron parser error") ErrCronParser = errors.New("cron parser error")
// ExecuteTime 错误
ErrExecuteTime = errors.New("execute time error")
// RunCount 错误
ErrRunCount = errors.New("run count error")
// DelayTime 错误
ErrDelayTime = errors.New("delay time error")
// 任务已存在
ErrTaskExists = errors.New("task already exists")
) )
+30 -3
View File
@@ -27,13 +27,34 @@ func main() {
panic(err) panic(err)
} }
ch := make(chan ChanStatus, 1000)
go func() {
for a := 0; a < 100; a++ {
go func(a int) {
for status := range ch {
// fmt.Println("协程", a, "处理任务", status)
// time.Sleep(10 * time.Millisecond) // 模拟处理时间
err = once.SaveByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("task_%d_%d", status.I, status.J), status.T, fmt.Sprintf("任务数据_%d_%d 预期时间%s", status.I, status.J, status.T.Format("2006-01-02 15:04:05")))
if err != nil {
fmt.Println("保存任务失败:", err)
}
}
}(a)
}
}()
go func() { go func() {
// 一千万任务,每个任务间隔1秒 // 一千万任务,每个任务间隔1秒
for i := 0; i < 10000000; i++ { for i := 0; i < 100; i++ {
runTime := t.Add(time.Duration(i) * time.Second) runTime := t.Add(time.Duration(i) * time.Second)
for j := 0; j < 50000; j++ { for j := 0; j < 100; j++ {
once.CreateByTime(ctx, OnceTaskTypeNormal, fmt.Sprintf("task_%d_%d", i, j), runTime, fmt.Sprintf("任务数据_%d_%d 预期时间%s", i, j, runTime.Format("2006-01-02 15:04:05"))) ch <- ChanStatus{
I: i,
J: j,
T: runTime,
}
} }
} }
}() }()
@@ -42,6 +63,12 @@ func main() {
} }
type ChanStatus struct {
I int
J int
T time.Time
}
func getRedis() *redis.Client { func getRedis() *redis.Client {
client := redis.NewClient(&redis.Options{ client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1" + ":" + "6379", Addr: "127.0.0.1" + ":" + "6379",
+80 -34
View File
@@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -51,7 +52,7 @@ type Once struct {
keySeparator string // 分割符 keySeparator string // 分割符
timeout time.Duration // 任务执行超时时间 timeout time.Duration // 任务执行超时时间
maxRetryCount int // 最大重试次数 0代表不限 maxRunCount int // 最大重试次数 0代表不限
} }
type OnceWorkerResp struct { type OnceWorkerResp struct {
@@ -74,11 +75,19 @@ type Callback interface {
} }
type extendData struct { type extendData struct {
Delay time.Duration TaskTimes []time.Time
Data any Data any
RetryCount int // 重试次数 RunCount int // 运行次数
JobType jobType
} }
type jobType string
const (
jobTypeOnce = "once"
jobTypeList = "list"
)
// 初始化 // 初始化
func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) (*Once, error) { func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) (*Once, error) {
op := newOptions(opts...) op := newOptions(opts...)
@@ -109,7 +118,7 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
instanceId: u.String(), instanceId: u.String(),
keySeparator: "[:]", keySeparator: "[:]",
timeout: op.timeout, timeout: op.timeout,
maxRetryCount: op.maxRetryCount, maxRunCount: op.maxRunCount,
} }
// 初始化优先级 // 初始化优先级
@@ -288,6 +297,12 @@ func (l *Once) executeTasks() {
continue continue
} }
if len(keys) < 2 {
l.logger.Errorf(l.ctx, "Invalid task data: %v", keys)
// 数据异常,继续下一个
continue
}
// 处理任务
go l.processTask(keys[1]) go l.processTask(keys[1])
} }
@@ -316,45 +331,69 @@ func (l *Once) parseRedisKey(key string) (OnceTaskType, string, error) {
// @param delayTime time.Duration 延迟时间 // @param delayTime time.Duration 延迟时间
// @param attachData interface{} 附加数据 // @param attachData interface{} 附加数据
func (l *Once) Save(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { func (l *Once) Save(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error {
return l.save(ctx, taskType, taskId, delayTime, attachData, 0) execTime := time.Now().Add(delayTime)
return l.save(ctx, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0)
} }
// 指定时间添加任务(覆盖) // 指定时间添加任务(覆盖)
func (l *Once) SaveByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData interface{}) error { func (l *Once) SaveByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData interface{}) error {
return l.save(ctx, taskType, taskId, time.Until(executeTime), attachData, 0) return l.save(ctx, jobTypeOnce, taskType, taskId, []time.Time{executeTime}, attachData, 0)
} }
// 添加任务(覆盖) // 添加任务(覆盖)
// 重复插入就代表覆盖 // 重复插入就代表覆盖
func (w *Once) save(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}, retryCount int) error { func (w *Once) save(ctx context.Context, jobType jobType, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData interface{}, runCount int) error {
if delayTime <= 0 { if len(taskTimes) == 0 {
w.logger.Errorf(ctx, "delay time must be positive delayTime:%v taskType:%v taskId:%v attachData:%v retryCount:%v", delayTime, taskType, taskId, attachData, retryCount) w.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount)
return fmt.Errorf("delay time must be positive") return ErrExecuteTime
}
if jobType == jobTypeList && len(taskTimes) <= runCount {
w.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount)
return ErrRunCount
}
// 根据时间从小到大排序
sort.Slice(taskTimes, func(i, j int) bool {
return taskTimes[i].Before(taskTimes[j])
})
latestTime := taskTimes[len(taskTimes)-1]
if latestTime.Before(time.Now()) {
w.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount)
return ErrDelayTime
}
nextTime := taskTimes[0]
if jobType == jobTypeList {
nextTime = taskTimes[runCount]
} }
redisKey := w.buildRedisKey(taskType, taskId) redisKey := w.buildRedisKey(taskType, taskId)
executeTime := time.Now().Add(delayTime)
ed := extendData{ ed := extendData{
Delay: delayTime, TaskTimes: taskTimes,
Data: attachData, Data: attachData,
RetryCount: retryCount, RunCount: runCount,
JobType: jobType,
} }
b, _ := json.Marshal(ed) b, _ := json.Marshal(ed)
// 使用事务确保原子性 // 使用事务确保原子性
pipe := w.redis.TxPipeline() pipe := w.redis.TxPipeline()
dataExpire := delayTime + time.Minute*30 expiresTime := latestTime.Add(time.Minute * 30)
dataExpire := time.Until(expiresTime)
pipe.SetEx(w.ctx, w.keyPrefix+redisKey, b, dataExpire) pipe.SetEx(w.ctx, w.keyPrefix+redisKey, b, dataExpire)
pipe.ZAdd(w.ctx, w.zsetKey, redis.Z{ pipe.ZAdd(w.ctx, w.zsetKey, redis.Z{
Score: float64(executeTime.UnixMilli()), Score: float64(nextTime.UnixMilli()),
Member: redisKey, Member: redisKey,
}) })
_, err := pipe.Exec(w.ctx) _, err := pipe.Exec(w.ctx)
if err != nil { if err != nil {
w.logger.Errorf(w.ctx, "save task failed:%v taskType:%v taskId:%v attachData:%v retryCount:%v", err, taskType, taskId, attachData, retryCount) w.logger.Errorf(w.ctx, "save task failed:%v taskType:%v taskId:%v attachData:%v retryCount:%v", err, taskType, taskId, attachData, runCount)
return err return err
} }
@@ -363,18 +402,23 @@ func (w *Once) save(ctx context.Context, taskType OnceTaskType, taskId string, d
// 添加任务(不覆盖) // 添加任务(不覆盖)
func (l *Once) Create(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error { func (l *Once) Create(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error {
return l.create(ctx, taskType, taskId, delayTime, attachData, 0) if delayTime <= 0 {
l.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v", taskType, taskId, attachData)
return ErrDelayTime
}
execTime := time.Now().Add(delayTime)
return l.create(ctx, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0)
} }
// 指定时间执行(不覆盖) // 指定时间执行(不覆盖)
func (l *Once) CreateByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData any) error { func (l *Once) CreateByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData any) error {
delay := time.Until(executeTime) return l.create(ctx, jobTypeOnce, taskType, taskId, []time.Time{executeTime}, attachData, 0)
return l.create(ctx, taskType, taskId, delay, attachData, 0)
} }
func (l *Once) create(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any, retryCount int) error { func (l *Once) create(ctx context.Context, jobType jobType, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData any, runCount int) error {
if delayTime <= 0 { if len(taskTimes) <= 0 {
return fmt.Errorf("delay time must be positive") l.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount)
return ErrExecuteTime
} }
redisKey := l.buildRedisKey(taskType, taskId) redisKey := l.buildRedisKey(taskType, taskId)
@@ -382,16 +426,17 @@ func (l *Once) create(ctx context.Context, taskType OnceTaskType, taskId string,
score, err := l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Result() score, err := l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Result()
if err != nil { if err != nil {
if errors.Is(err, redis.Nil) { if errors.Is(err, redis.Nil) {
return l.Save(ctx, taskType, taskId, delayTime, attachData) return l.save(ctx, jobTypeOnce, taskType, taskId, taskTimes, attachData, runCount)
} }
l.logger.Errorf(l.ctx, "redis.ZScore err:%v", err) l.logger.Errorf(l.ctx, "redis.ZScore err:%v", err)
return err return err
} }
if score > 0 { if score > 0 {
return fmt.Errorf("task already exists") l.logger.Errorf(l.ctx, "task exists taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount)
return ErrTaskExists
} }
return l.save(ctx, taskType, taskId, delayTime, attachData, retryCount) return l.save(ctx, jobTypeOnce, taskType, taskId, taskTimes, attachData, runCount)
} }
// 删除任务 // 删除任务
@@ -523,23 +568,24 @@ func (l *Once) processTask(key string) {
func (l *Once) handleRetry(ctx context.Context, taskType OnceTaskType, taskId string, func (l *Once) handleRetry(ctx context.Context, taskType OnceTaskType, taskId string,
ed *extendData, resp *OnceWorkerResp) error { ed *extendData, resp *OnceWorkerResp) error {
// 限制重试次数 // 限制重试次数
ed.RetryCount++ ed.RunCount++
if l.maxRetryCount > 0 && ed.RetryCount > l.maxRetryCount { if l.maxRunCount > 0 && ed.RunCount > l.maxRunCount {
l.logger.Infof(ctx, "handleRetry task exceeded retry limit: %s %s %d", taskType, taskId, l.maxRetryCount) l.logger.Infof(ctx, "handleRetry task exceeded retry limit: %s %s %d", taskType, taskId, l.maxRunCount)
return nil return nil
} }
// 更新延迟时间 // 更新延迟时间
if resp.DelayTime > 0 { if ed.JobType == jobTypeOnce {
ed.Delay = resp.DelayTime ed.TaskTimes = []time.Time{time.Now().Add(resp.DelayTime)}
} }
if resp.AttachData != nil { if resp.AttachData != nil {
ed.Data = resp.AttachData ed.Data = resp.AttachData
} }
l.logger.Infof(ctx, "handleRetry retrying task: %s:%s, retry count: %d", l.logger.Infof(ctx, "handleRetry retrying task: %s:%s, retry count: %d",
taskType, taskId, ed.RetryCount) taskType, taskId, ed.RunCount)
// 不覆盖的新建 // 不覆盖的新建
return l.create(ctx, taskType, taskId, ed.Delay, ed.Data, ed.RetryCount) return l.create(ctx, ed.JobType, taskType, taskId, ed.TaskTimes, ed.Data, ed.RunCount)
} }
+27 -18
View File
@@ -8,14 +8,14 @@ import (
) )
type Options struct { type Options struct {
logger logger.Logger logger logger.Logger
location *time.Location location *time.Location
timeout time.Duration // 任务最长执行时间 timeout time.Duration // 任务最长执行时间
usePriority bool usePriority bool
priorityVal int64 priorityVal int64
batchSize int batchSize int
maxRetryCount int maxRunCount int // 单个任务最大运行次数 0 代表不限
cronParser *cron.Parser // cron表达式解析器 cronParser *cron.Parser // cron表达式解析器
} }
func defaultOptions() Options { func defaultOptions() Options {
@@ -24,19 +24,20 @@ func defaultOptions() Options {
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
return Options{ return Options{
logger: logger.NewLogger(), logger: logger.NewLogger(),
location: time.Local, location: time.Local,
timeout: time.Hour, // timeout: time.Hour, //
usePriority: false, usePriority: false,
priorityVal: 0, priorityVal: 0,
batchSize: 100, batchSize: 100,
maxRetryCount: 0, maxRunCount: 0,
cronParser: &parser, cronParser: &parser,
} }
} }
type Option func(*Options) type Option func(*Options)
// 返回带默认值的配置
func newOptions(opts ...Option) Options { func newOptions(opts ...Option) Options {
o := defaultOptions() o := defaultOptions()
for _, opt := range opts { for _, opt := range opts {
@@ -45,6 +46,15 @@ func newOptions(opts ...Option) Options {
return o return o
} }
// 返回空的配置
func newEmptyOptions(opts ...Option) Options {
o := Options{}
for _, opt := range opts {
opt(&o)
}
return o
}
// 设置日志 // 设置日志
func WithLogger(log logger.Logger) Option { func WithLogger(log logger.Logger) Option {
return func(o *Options) { return func(o *Options) {
@@ -88,7 +98,7 @@ func WithMaxRetryCount(count int) Option {
if count < 0 { if count < 0 {
count = 0 count = 0
} }
o.maxRetryCount = count o.maxRunCount = count
} }
} }
@@ -144,4 +154,3 @@ func WithCronParserDescriptor() Option {
o.cronParser = &parser o.cronParser = &parser
} }
} }