优化添加的逻辑
This commit is contained in:
@@ -30,7 +30,7 @@ func main() {
|
||||
ch := make(chan ChanStatus, 1000)
|
||||
|
||||
go func() {
|
||||
for a := 0; a < 100; a++ {
|
||||
for a := 0; a < 10; a++ {
|
||||
go func(a int) {
|
||||
for status := range ch {
|
||||
// fmt.Println("协程", a, "处理任务", status)
|
||||
@@ -47,9 +47,9 @@ func main() {
|
||||
go func() {
|
||||
// 一千万任务,每个任务间隔1秒
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
for i := 0; i < 10; i++ {
|
||||
runTime := t.Add(time.Duration(i) * time.Second)
|
||||
for j := 0; j < 100; j++ {
|
||||
for j := 0; j < 10; j++ {
|
||||
ch <- ChanStatus{
|
||||
I: i,
|
||||
J: j,
|
||||
|
||||
@@ -88,6 +88,13 @@ const (
|
||||
jobTypeList = "list"
|
||||
)
|
||||
|
||||
type createSource string
|
||||
|
||||
const (
|
||||
createSourceDefault = "default"
|
||||
createSourceRetry = "retry"
|
||||
)
|
||||
|
||||
// 初始化
|
||||
func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) (*Once, error) {
|
||||
op := newOptions(opts...)
|
||||
@@ -411,20 +418,20 @@ func (l *Once) Create(ctx context.Context, taskType OnceTaskType, taskId string,
|
||||
return ErrDelayTime
|
||||
}
|
||||
execTime := time.Now().Add(delayTime)
|
||||
return l.create(ctx, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0)
|
||||
return l.create(ctx, createSourceDefault, jobTypeOnce, taskType, taskId, []time.Time{execTime}, attachData, 0)
|
||||
}
|
||||
|
||||
// 指定时间执行(不覆盖)
|
||||
func (l *Once) CreateByTime(ctx context.Context, taskType OnceTaskType, taskId string, executeTime time.Time, attachData any) error {
|
||||
return l.create(ctx, jobTypeOnce, taskType, taskId, []time.Time{executeTime}, attachData, 0)
|
||||
return l.create(ctx, createSourceDefault, jobTypeOnce, taskType, taskId, []time.Time{executeTime}, attachData, 0)
|
||||
}
|
||||
|
||||
// 指定多个时间执行(不覆盖)
|
||||
func (l *Once) CreateByList(ctx context.Context, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData any) error {
|
||||
return l.create(ctx, jobTypeList, taskType, taskId, taskTimes, attachData, 0)
|
||||
return l.create(ctx, createSourceDefault, jobTypeList, taskType, taskId, taskTimes, attachData, 0)
|
||||
}
|
||||
|
||||
func (l *Once) create(ctx context.Context, jobType jobType, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData any, runCount int) error {
|
||||
func (l *Once) create(ctx context.Context, source createSource, jobType jobType, taskType OnceTaskType, taskId string, taskTimes []time.Time, attachData any, runCount int) error {
|
||||
if len(taskTimes) <= 0 {
|
||||
l.logger.Errorf(ctx, "delay time must be positive taskType:%v taskId:%v attachData:%v runCount:%v", taskType, taskId, attachData, runCount)
|
||||
return ErrExecuteTime
|
||||
@@ -432,6 +439,20 @@ func (l *Once) create(ctx context.Context, jobType jobType, taskType OnceTaskTyp
|
||||
|
||||
redisKey := l.buildRedisKey(taskType, taskId)
|
||||
|
||||
if source != createSourceRetry {
|
||||
// 这里加一个全局锁 从Retry来的不需要 因为已经加锁了
|
||||
lock, err := lockx.NewGlobalLock(ctx, l.redis, l.globalLockPrefix+redisKey)
|
||||
if err != nil {
|
||||
l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s", taskId)
|
||||
return err
|
||||
}
|
||||
if b, err := lock.Lock(); !b {
|
||||
l.logger.Errorf(ctx, "processTask timer:获取锁失败:%s %+v", taskId, err)
|
||||
return err
|
||||
}
|
||||
defer lock.Unlock()
|
||||
}
|
||||
|
||||
score, err := l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Result()
|
||||
if err != nil {
|
||||
if errors.Is(err, redis.Nil) {
|
||||
@@ -596,5 +617,5 @@ func (l *Once) handleRetry(ctx context.Context, taskType OnceTaskType, taskId st
|
||||
taskType, taskId, ed.RunCount)
|
||||
|
||||
// 不覆盖的新建
|
||||
return l.create(ctx, ed.JobType, taskType, taskId, ed.TaskTimes, ed.Data, ed.RunCount)
|
||||
return l.create(ctx, createSourceRetry, ed.JobType, taskType, taskId, ed.TaskTimes, ed.Data, ed.RunCount)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user