限制并发的goroutine数量

This commit is contained in:
Yun
2025-10-14 11:13:12 +08:00
parent 1b9e9b757b
commit de3568de42
4 changed files with 38 additions and 11 deletions
+10 -1
View File
@@ -54,6 +54,8 @@ type Cluster struct {
cache *cachex.Cache // 本地缓存 cache *cachex.Cache // 本地缓存
cronParser *cron.Parser // cron表达式解析器 cronParser *cron.Parser // cron表达式解析器
batchSize int // 批量获取任务的数量 batchSize int // 批量获取任务的数量
workerChan chan struct{} // worker
maxWorkers int // 最大worker数量
} }
// 初始化定时器 // 初始化定时器
@@ -90,6 +92,8 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
instanceId: U.String(), instanceId: U.String(),
cronParser: op.cronParser, cronParser: op.cronParser,
batchSize: op.batchSize, batchSize: op.batchSize,
workerChan: make(chan struct{}, op.maxRunCount),
maxWorkers: op.maxRunCount,
} }
// 初始化优先级 // 初始化优先级
@@ -519,12 +523,13 @@ func (c *Cluster) executeTasks() {
defer c.wg.Done() defer c.wg.Done()
for { for {
select { select {
case <-c.stopChan: case <-c.stopChan:
return return
case <-c.ctx.Done(): case <-c.ctx.Done():
return return
default: case <-c.workerChan:
if c.usePriority && !c.priority.IsLatest(c.ctx) { if c.usePriority && !c.priority.IsLatest(c.ctx) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
continue continue
@@ -559,6 +564,10 @@ type ReJobData struct {
// 执行任务 // 执行任务
func (l *Cluster) processTask(taskId string) { func (l *Cluster) processTask(taskId string) {
defer func() {
<-l.workerChan
}()
begin := time.Now() begin := time.Now()
ctx, cancel := context.WithTimeout(l.ctx, l.timeout) ctx, cancel := context.WithTimeout(l.ctx, l.timeout)
+10 -3
View File
@@ -52,7 +52,9 @@ type Once struct {
keySeparator string // 分割符 keySeparator string // 分割符
timeout time.Duration // 任务执行超时时间 timeout time.Duration // 任务执行超时时间
maxRunCount int // 最大重试次数 0代表不限 maxRunCount int // 最大重试次数 0代表不限
workerChan chan struct{} // worker
maxWorkers int // 最大worker数量
} }
type OnceWorkerResp struct { type OnceWorkerResp struct {
@@ -119,6 +121,8 @@ func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, c
keySeparator: "[:]", keySeparator: "[:]",
timeout: op.timeout, timeout: op.timeout,
maxRunCount: op.maxRunCount, maxRunCount: op.maxRunCount,
workerChan: make(chan struct{}, op.maxRunCount),
maxWorkers: op.maxRunCount,
} }
// 初始化优先级 // 初始化优先级
@@ -285,8 +289,7 @@ func (l *Once) executeTasks() {
return return
case <-l.ctx.Done(): case <-l.ctx.Done():
return return
default: case <-l.workerChan:
keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result() keys, err := l.redis.BLPop(l.ctx, time.Second*10, l.listKey).Result()
if err != nil { if err != nil {
if err != redis.Nil { if err != redis.Nil {
@@ -502,6 +505,10 @@ func (l *Once) batchGetTasks() {
// 执行任务 // 执行任务
func (l *Once) processTask(key string) { func (l *Once) processTask(key string) {
defer func() {
<-l.workerChan
}()
begin := time.Now() begin := time.Now()
ctx, cancel := context.WithTimeout(l.ctx, l.timeout) ctx, cancel := context.WithTimeout(l.ctx, l.timeout)
+12 -1
View File
@@ -14,7 +14,8 @@ type Options struct {
usePriority bool usePriority bool
priorityVal int64 priorityVal int64
batchSize int batchSize int
maxRunCount int // 单个任务最大运行次数 0 代表不限 maxRunCount int // 单个任务最大运行次数 0代表不限
maxWorkers int // 最大工作协程数
cronParser *cron.Parser // cron表达式解析器 cronParser *cron.Parser // cron表达式解析器
} }
@@ -31,6 +32,7 @@ func defaultOptions() Options {
priorityVal: 0, priorityVal: 0,
batchSize: 100, batchSize: 100,
maxRunCount: 0, maxRunCount: 0,
maxWorkers: 100,
cronParser: &parser, cronParser: &parser,
} }
} }
@@ -102,6 +104,15 @@ func WithMaxRetryCount(count int) Option {
} }
} }
func WithMaxWorkers(count int) Option {
return func(o *Options) {
if count < 0 {
count = 10
}
o.maxWorkers = count
}
}
// 添加cron表达式解析器 // 添加cron表达式解析器
func WithCronParser(parser cron.Parser) Option { func WithCronParser(parser cron.Parser) Option {
return func(o *Options) { return func(o *Options) {
+6 -6
View File
@@ -27,12 +27,12 @@ type Single struct {
nextTime time.Time nextTime time.Time
nextTimeMux sync.RWMutex nextTimeMux sync.RWMutex
wg sync.WaitGroup wg sync.WaitGroup
workerList sync.Map workerList sync.Map // 任务列表,key为taskIdvalue为worker
timerIndex int64 timerIndex int64 // 任务索引,用于生成taskId
stopChan chan struct{} stopChan chan struct{} // 停止信号
hasRun sync.Map hasRun sync.Map // 记录已经执行的任务,key为taskId,value为执行时间
timeout time.Duration timeout time.Duration // 单次任务超时时间
cronParser *cron.Parser // cron表达式解析器 cronParser *cron.Parser // cron表达式解析器
} }
// 定时器类 // 定时器类