diff --git a/cluster.go b/cluster.go index e106326..8d4fc1d 100644 --- a/cluster.go +++ b/cluster.go @@ -54,6 +54,8 @@ type Cluster struct { cache *cachex.Cache // 本地缓存 cronParser *cron.Parser // cron表达式解析器 batchSize int // 批量获取任务的数量 + workerChan chan struct{} // worker + maxWorkers int // 最大worker数量 } // 初始化定时器 @@ -90,6 +92,8 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin instanceId: U.String(), cronParser: op.cronParser, batchSize: op.batchSize, + workerChan: make(chan struct{}, op.maxRunCount), + maxWorkers: op.maxRunCount, } // 初始化优先级 @@ -519,12 +523,13 @@ func (c *Cluster) executeTasks() { defer c.wg.Done() for { + select { case <-c.stopChan: return case <-c.ctx.Done(): return - default: + case <-c.workerChan: if c.usePriority && !c.priority.IsLatest(c.ctx) { time.Sleep(5 * time.Second) continue @@ -559,6 +564,10 @@ type ReJobData struct { // 执行任务 func (l *Cluster) processTask(taskId string) { + defer func() { + <-l.workerChan + }() + begin := time.Now() ctx, cancel := context.WithTimeout(l.ctx, l.timeout) diff --git a/once.go b/once.go index c04eb04..eab1134 100644 --- a/once.go +++ b/once.go @@ -52,7 +52,9 @@ type Once struct { keySeparator string // 分割符 timeout time.Duration // 任务执行超时时间 - maxRunCount int // 最大重试次数 0代表不限 + maxRunCount int // 最大重试次数 0代表不限 + workerChan chan struct{} // worker + maxWorkers int // 最大worker数量 } type OnceWorkerResp struct { @@ -119,6 +121,8 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c keySeparator: "[:]", timeout: op.timeout, maxRunCount: op.maxRunCount, + workerChan: make(chan struct{}, op.maxRunCount), + maxWorkers: op.maxRunCount, } // 初始化优先级 @@ -285,8 +289,7 @@ func (l *Once) executeTasks() { return case <-l.ctx.Done(): return - default: - + case <-l.workerChan: keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() if err != nil { if err != redis.Nil { @@ -502,6 +505,10 @@ func (l *Once) batchGetTasks() { // 执行任务 func (l *Once) processTask(key string) { + defer func() { + <-l.workerChan + }() + begin := time.Now() ctx, cancel := context.WithTimeout(l.ctx, l.timeout) diff --git a/option.go b/option.go index 33a6116..6ea190b 100644 --- a/option.go +++ b/option.go @@ -14,7 +14,8 @@ type Options struct { usePriority bool priorityVal int64 batchSize int - maxRunCount int // 单个任务最大运行次数 0 代表不限 + maxRunCount int // 单个任务最大运行次数 0代表不限 + maxWorkers int // 最大工作协程数 cronParser *cron.Parser // cron表达式解析器 } @@ -31,6 +32,7 @@ func defaultOptions() Options { priorityVal: 0, batchSize: 100, maxRunCount: 0, + maxWorkers: 100, cronParser: &parser, } } @@ -102,6 +104,15 @@ func WithMaxRetryCount(count int) Option { } } +func WithMaxWorkers(count int) Option { + return func(o *Options) { + if count < 0 { + count = 10 + } + o.maxWorkers = count + } +} + // 添加cron表达式解析器 func WithCronParser(parser cron.Parser) Option { return func(o *Options) { diff --git a/single.go b/single.go index 4a80ba0..51b82ee 100644 --- a/single.go +++ b/single.go @@ -27,12 +27,12 @@ type Single struct { nextTime time.Time nextTimeMux sync.RWMutex wg sync.WaitGroup - workerList sync.Map - timerIndex int64 - stopChan chan struct{} - hasRun sync.Map - timeout time.Duration - cronParser *cron.Parser // cron表达式解析器 + workerList sync.Map // 任务列表,key为taskId,value为worker + timerIndex int64 // 任务索引,用于生成taskId + stopChan chan struct{} // 停止信号 + hasRun sync.Map // 记录已经执行的任务,key为taskId,value为执行时间 + timeout time.Duration // 单次任务超时时间 + cronParser *cron.Parser // cron表达式解析器 } // 定时器类