diff --git a/cluster.go b/cluster.go index c105d2b..e698320 100644 --- a/cluster.go +++ b/cluster.go @@ -32,6 +32,7 @@ type Cluster struct { ctx context.Context redis redis.UniversalClient cache *cachex.Cache + timeout time.Duration logger Logger keyPrefix string // key前缀 location *time.Location // 根据时区计算的时间 @@ -55,6 +56,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin ctx: ctx, redis: red, cache: cachex.NewCache(), + timeout: op.timeout, logger: op.logger, keyPrefix: keyPrefix, location: op.location, @@ -64,6 +66,9 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合 } + // 设置锁的超时时间 + lockx.InitOption(lockx.SetTimeout(op.timeout)) + // 监听任务 go clu.watch() @@ -418,7 +423,7 @@ type ReJobData struct { // 执行任务 func (c *Cluster) doTask(ctx context.Context, taskId string) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() val, ok := clusterWorkerList.Load(taskId) diff --git a/option.go b/option.go index e7cc476..cc08bcd 100644 --- a/option.go +++ b/option.go @@ -3,14 +3,16 @@ package timerx import "time" type Options struct { - logger Logger - location *time.Location + logger Logger + location *time.Location + timeout time.Duration } func defaultOptions() Options { return Options{ - logger: NewLogger(), - location: time.Local, + logger: NewLogger(), + location: time.Local, + timeout: time.Hour, } } @@ -37,3 +39,10 @@ func SetTimeZone(zone *time.Location) Option { o.location = zone } } + +// 设置任务最长执行时间 +func SetTimeout(d time.Duration) Option { + return func(o *Options) { + o.timeout = d + } +}