From c351cb084fd191d67d72b3b2c7319013a9f8f076 Mon Sep 17 00:00:00 2001 From: Yun Date: Thu, 28 Aug 2025 17:45:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=B0=83=E6=95=B4=E5=8D=95?= =?UTF-8?q?=E6=AC=A1=E4=BB=BB=E5=8A=A1=E7=9A=84=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/main.go | 24 ++++++++++++++++++++---- once.go | 13 +++++++++++-- option.go | 4 ++-- priority/priority.go | 20 +++++++++++++------- priority/version.go | 2 +- 5 files changed, 47 insertions(+), 16 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index b5b4cac..e3c6ccc 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,6 +7,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/yuninks/timerx" + "github.com/yuninks/timerx/priority" ) func main() { @@ -35,14 +36,24 @@ func once() { client := getRedis() ctx := context.Background() w := OnceWorker{} - one := timerx.InitOnce(ctx, client, "test", w) + + ver, err := priority.PriorityByVersion("v2.2.3.4.5") + if err != nil { + panic(err) + } + + ops := []timerx.Option{ + timerx.SetPriority(ver), + } + + one := timerx.InitOnce(ctx, client, "test", w, ops...) d := OnceData{ Num: 3, } // dy, _ := json.Marshal(d) - err := one.Create("test", "test3", 1*time.Second, d) + err = one.Create("test", "test3", 1*time.Second, d) if err != nil { fmt.Println(err) } @@ -51,7 +62,12 @@ func once() { // } dd := 123 // dy, _ = json.Marshal(d) - err = one.Save("test", "test4", 1*time.Second, dd) + err = one.Save("test", "test4", 2*time.Second, dd) + if err != nil { + fmt.Println(err) + } + + err = one.Save("test", "test5", 5*time.Second, dd) if err != nil { fmt.Println(err) } @@ -87,7 +103,7 @@ func (l OnceWorker) Worker(ctx context.Context, taskType timerx.OnceTaskType, ta return &timerx.OnceWorkerResp{ Retry: true, AttachData: attachData, - DelayTime: 1 * time.Second, + DelayTime: 10 * time.Second, } } diff --git a/once.go b/once.go index fee6129..722a0be 100644 --- a/once.go +++ b/once.go @@ -114,8 +114,9 @@ func (w *Once) Save(taskType OnceTaskType, taskId string, delayTime time.Duratio b, _ := json.Marshal(ed) // 写入附加数据 - _, err := w.redis.SetEX(w.ctx, w.keyPrefix+redisKey, b, delayTime+time.Second*5).Result() + _, err := w.redis.SetEX(w.ctx, w.keyPrefix+redisKey, b, delayTime+time.Minute*30).Result() if err != nil { + w.logger.Errorf(w.ctx, "写入附加数据失败:%s", err.Error()) return err } @@ -130,6 +131,13 @@ func (w *Once) Save(taskType OnceTaskType, taskId string, delayTime time.Duratio // 添加任务(不覆盖) func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Duration, attachData interface{}) error { + if delayTime.Abs() != delayTime { + return fmt.Errorf("时间间隔不能为负数") + } + if delayTime == 0 { + return fmt.Errorf("时间间隔不能为0") + } + redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId) // 判断有序集合Key是否存在,存在则报错,不存在则写入 if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 { @@ -141,8 +149,9 @@ func (l *Once) Create(taskType OnceTaskType, taskId string, delayTime time.Durat b, _ := json.Marshal(ed) // 写入附加数据 - _, err := l.redis.SetEX(l.ctx, l.keyPrefix+redisKey, b, delayTime+time.Second*5).Result() + _, err := l.redis.SetEX(l.ctx, l.keyPrefix+redisKey, b, delayTime+time.Minute*30).Result() if err != nil { + l.logger.Errorf(l.ctx, "写入附加数据失败:%s", err.Error()) return err } _, err = l.redis.ZAdd(l.ctx, l.zsetKey, &redis.Z{ diff --git a/option.go b/option.go index d369849..b0945d4 100644 --- a/option.go +++ b/option.go @@ -11,7 +11,7 @@ type Options struct { location *time.Location timeout time.Duration usePriority bool - priorityVal int + priorityVal int64 } func defaultOptions() Options { @@ -56,7 +56,7 @@ func SetTimeout(d time.Duration) Option { } // 设置优先级 -func SetPriority(priority int) Option { +func SetPriority(priority int64) Option { return func(o *Options) { o.usePriority = true o.priorityVal = priority diff --git a/priority/priority.go b/priority/priority.go index aa14222..1f75a42 100644 --- a/priority/priority.go +++ b/priority/priority.go @@ -15,16 +15,16 @@ import ( type Priority struct { ctx context.Context - priority int // 优先级 + priority int64 // 优先级 redis redis.UniversalClient redisKey string logger logger.Logger expireTime time.Duration updateInterval time.Duration // 更新间隔 - deadTime int64 // 缓存时间戳,单位秒 + deadTime *int64 // 缓存时间戳,单位秒 } -func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int, opts ...Option) *Priority { +func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix string, priority int64, opts ...Option) *Priority { conf := newOptions(opts...) pro := &Priority{ @@ -35,6 +35,7 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin redisKey: "timer:priority_" + keyPrefix, expireTime: conf.expireTime, updateInterval: conf.updateInterval, + deadTime: new(int64), } // 更新间隔 @@ -55,9 +56,13 @@ func InitPriority(ctx context.Context, re redis.UniversalClient, keyPrefix strin return pro } -func (l *Priority) IsLatest(ctx context.Context) bool { +func (l *Priority) IsLatest(ctx context.Context) (b bool) { + // defer func() { + // l.logger.Infof(l.ctx, "当前优先级:%d bool:%v", l.priority, b) + // }() + // 加缓存 - if atomic.LoadInt64(&l.deadTime) > time.Now().Unix() { + if atomic.LoadInt64(l.deadTime) > time.Now().Unix() { return true } @@ -71,7 +76,7 @@ func (l *Priority) IsLatest(ctx context.Context) bool { return false } - strPriority, err := strconv.Atoi(str) + strPriority, err := strconv.ParseInt(str, 10, 64) if err != nil { l.logger.Errorf(l.ctx, "全局优先级转换失败:%s", err.Error()) return false @@ -83,6 +88,7 @@ func (l *Priority) IsLatest(ctx context.Context) bool { } func (l *Priority) setPriority() bool { + // redis lua脚本 // 如果redis的可以不存在则设置值,如果存在且redis内的值比当前大则不处理,如果存在redis内的值比当前小或等于则更新值且更新ttl script := ` @@ -142,7 +148,7 @@ func (l *Priority) setPriority() bool { if operationResult == "SET" || operationResult == "UPDATE" { l.logger.Infof(l.ctx, "设置全局优先级成功:%s", priority) - atomic.StoreInt64(&l.deadTime, time.Now().Add(l.updateInterval).Unix()) + atomic.StoreInt64(l.deadTime, time.Now().Add(l.updateInterval).Unix()) return true } diff --git a/priority/version.go b/priority/version.go index a327120..441726f 100644 --- a/priority/version.go +++ b/priority/version.go @@ -37,7 +37,7 @@ func PriorityByVersion(version string) (priority int64, err error) { if val == "" { return 0, ErrVersionFormat } - i, err := strconv.ParseInt(val, 10, 32) + i, err := strconv.ParseInt(val, 10, 64) if err != nil { return 0, ErrVersionFormat }