优化部分测试用例

This commit is contained in:
Yun
2025-10-06 00:13:42 +08:00
parent 9caa984846
commit bae669a1d9
4 changed files with 371 additions and 17 deletions
+23 -12
View File
@@ -11,8 +11,8 @@ import (
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"github.com/yuninks/lockx"
"github.com/yuninks/timerx/heartbeat"
"github.com/yuninks/timerx/leader"
@@ -122,7 +122,6 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
priority.WithLogger(wo.logger),
priority.WithInstanceId(wo.instanceId),
priority.WithSource("once"),
)
if err != nil {
wo.logger.Errorf(ctx, "InitPriority err:%v", err)
@@ -316,14 +315,20 @@ func (l *Once) parseRedisKey(key string) (OnceTaskType, string, error) {
// @param uniTaskId string 任务唯一标识
// @param delayTime time.Duration 延迟时间
// @param attachData interface{} 附加数据
func (l *Once) Save(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error {
return l.save(taskType, taskId, delayTime, attachData, 0)
func (l *Once) Save(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error {
return l.save(ctx, taskType, taskId, delayTime, attachData, 0)
}
// 指定时间添加任务(覆盖)
func (l *Once) SaveByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData interface{}) error {
return l.save(ctx, taskType, taskId, time.Until(executeTime), attachData, 0)
}
// 添加任务(覆盖)
// 重复插入就代表覆盖
func (w *Once) save(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}, retryCount int) error {
func (w *Once) save(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}, retryCount int) error {
if delayTime <= 0 {
w.logger.Errorf(ctx, "delay time must be positive delayTime:%v taskType:%v taskId:%v attachData:%v retryCount:%v", delayTime, taskType, taskId, attachData, retryCount)
return fmt.Errorf("delay time must be positive")
}
@@ -349,7 +354,7 @@ func (w *Once) save(taskType OnceTaskType, taskId string, delayTime time.Duratio
})
_, err := pipe.Exec(w.ctx)
if err != nil {
w.logger.Errorf(w.ctx, "save task failed:%v", err)
w.logger.Errorf(w.ctx, "save task failed:%v taskType:%v taskId:%v attachData:%v retryCount:%v", err, taskType, taskId, attachData, retryCount)
return err
}
@@ -357,11 +362,17 @@ func (w *Once) save(taskType OnceTaskType, taskId string, delayTime time.Duratio
}
// 添加任务(不覆盖)
func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error {
return l.create(taskType, taskId, delayTime, attachData, 0)
func (l *Once) Create(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any) error {
return l.create(ctx, taskType, taskId, delayTime, attachData, 0)
}
func (l *Once) create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any, retryCount int) error {
// 指定时间执行(不覆盖)
func (l *Once) CreateByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData any) error {
delay := time.Until(executeTime)
return l.create(ctx, taskType, taskId, delay, attachData, 0)
}
func (l *Once) create(ctx context.Context, taskType OnceTaskType, taskId string, delayTime time.Duration, attachData any, retryCount int) error {
if delayTime <= 0 {
return fmt.Errorf("delay time must be positive")
}
@@ -371,7 +382,7 @@ func (l *Once) create(taskType OnceTaskType, taskId string, delayTime time.Durat
score, err := l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return l.Save(taskType, taskId, delayTime, attachData)
return l.Save(ctx, taskType, taskId, delayTime, attachData)
}
l.logger.Errorf(l.ctx, "redis.ZScore err:%v", err)
return err
@@ -380,7 +391,7 @@ func (l *Once) create(taskType OnceTaskType, taskId string, delayTime time.Durat
return fmt.Errorf("task already exists")
}
return l.save(taskType, taskId, delayTime, attachData, retryCount)
return l.save(ctx, taskType, taskId, delayTime, attachData, retryCount)
}
// 删除任务
@@ -530,5 +541,5 @@ func (l *Once) handleRetry(ctx context.Context, taskType OnceTaskType, taskId st
taskType, taskId, ed.RetryCount)
// 不覆盖的新建
return l.create(taskType, taskId, ed.Delay, ed.Data, ed.RetryCount)
return l.create(ctx, taskType, taskId, ed.Delay, ed.Data, ed.RetryCount)
}