2 Commits

Author SHA1 Message Date
Yun 79fda8c78c 添加超时时间的限制 2024-06-22 15:34:49 +08:00
Yun 5ca5b31efb 创建任务不需要验证全局锁 2024-06-19 16:31:51 +08:00
2 changed files with 28 additions and 14 deletions
+15 -10
View File
@@ -32,6 +32,7 @@ type Cluster struct {
ctx context.Context
redis redis.UniversalClient
cache *cachex.Cache
timeout time.Duration
logger Logger
keyPrefix string // key前缀
location *time.Location // 根据时区计算的时间
@@ -55,6 +56,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
ctx: ctx,
redis: red,
cache: cachex.NewCache(),
timeout: op.timeout,
logger: op.logger,
keyPrefix: keyPrefix,
location: op.location,
@@ -64,6 +66,9 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
}
// 设置锁的超时时间
lockx.InitOption(lockx.SetTimeout(op.timeout))
// 监听任务
go clu.watch()
@@ -216,16 +221,16 @@ func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, ca
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// ctx, cancel := context.WithCancel(ctx)
// defer cancel()
lock := lockx.NewGlobalLock(ctx, c.redis, taskId)
tB := lock.Try(2)
if !tB {
c.logger.Errorf(ctx, "添加失败:%s", taskId)
return errors.New("添加失败")
}
defer lock.Unlock()
// lock := lockx.NewGlobalLock(ctx, c.redis, taskId)
// tB := lock.Try(2)
// if !tB {
// c.logger.Errorf(ctx, "添加失败:%s", taskId)
// return errors.New("添加失败")
// }
// defer lock.Unlock()
t := timerStr{
Callback: callback,
@@ -418,7 +423,7 @@ type ReJobData struct {
// 执行任务
func (c *Cluster) doTask(ctx context.Context, taskId string) {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
val, ok := clusterWorkerList.Load(taskId)
+13 -4
View File
@@ -3,14 +3,16 @@ package timerx
import "time"
type Options struct {
logger Logger
location *time.Location
logger Logger
location *time.Location
timeout time.Duration
}
func defaultOptions() Options {
return Options{
logger: NewLogger(),
location: time.Local,
logger: NewLogger(),
location: time.Local,
timeout: time.Hour,
}
}
@@ -37,3 +39,10 @@ func SetTimeZone(zone *time.Location) Option {
o.location = zone
}
}
// 设置任务最长执行时间
func SetTimeout(d time.Duration) Option {
return func(o *Options) {
o.timeout = d
}
}