Files
timerx/once.go
T

244 lines
5.4 KiB
Go
Raw Normal View History

2023-11-27 22:37:33 +08:00
package timerx
2023-09-09 23:24:11 +08:00
import (
"context"
"encoding/json"
"fmt"
2023-11-13 23:49:42 +08:00
"runtime/debug"
2023-09-09 23:24:11 +08:00
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
2024-05-28 17:28:20 +08:00
uuid "github.com/satori/go.uuid"
2025-07-24 17:13:17 +08:00
"github.com/yuninks/timerx/logger"
2023-09-09 23:24:11 +08:00
)
2023-11-13 23:49:42 +08:00
// 功能描述
2023-12-27 17:19:52 +08:00
// 1. 任务可以多节点发布
// 2. 每个任务的执行在全局仅会执行一次
// 3. 任务执行失败支持快捷重新加入队列
2023-11-13 23:49:42 +08:00
2023-09-09 23:24:11 +08:00
// 单次的任务队列
2024-05-08 13:01:55 +08:00
type Once struct {
2024-10-11 16:17:22 +08:00
ctx context.Context
2025-07-24 17:13:17 +08:00
logger logger.Logger
2024-10-11 16:17:22 +08:00
zsetKey string
listKey string
redis redis.UniversalClient
worker Callback
keyPrefix string
2023-09-09 23:24:11 +08:00
}
2024-10-09 19:37:13 +08:00
type OnceWorkerResp struct {
Retry bool // 是否重试 true
DelayTime time.Duration
2024-10-11 16:17:22 +08:00
AttachData interface{}
2024-10-09 19:37:13 +08:00
}
2023-09-09 23:24:11 +08:00
// 需要考虑执行失败重新放入队列的情况
2023-11-13 23:49:42 +08:00
type Callback interface {
// 任务执行
2024-05-25 20:42:32 +08:00
// @param jobType string 任务类型
// @param uniTaskId string 任务唯一标识
// @param data interface{} 任务数据
// @return WorkerCode 任务执行结果
// @return time.Duration 任务执行时间间隔
2024-10-11 16:17:22 +08:00
Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp
2023-09-09 23:24:11 +08:00
}
2024-05-08 13:01:55 +08:00
var wo *Once = nil
2023-09-09 23:24:11 +08:00
var once sync.Once
type extendData struct {
Delay time.Duration
2024-10-11 16:17:22 +08:00
Data interface{}
2023-09-09 23:24:11 +08:00
}
2023-11-13 23:49:42 +08:00
// 初始化
2024-10-09 17:03:54 +08:00
func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) *Once {
2024-10-11 16:17:22 +08:00
if re == nil {
panic("redis client is nil")
}
2024-05-08 13:01:55 +08:00
op := newOptions(opts...)
2023-09-09 23:24:11 +08:00
once.Do(func() {
2024-05-08 13:01:55 +08:00
wo = &Once{
2024-10-11 16:17:22 +08:00
ctx: ctx,
logger: op.logger,
zsetKey: "timer:once_zsetkey" + keyPrefix,
listKey: "timer:once_listkey" + keyPrefix,
redis: re,
worker: call,
keyPrefix: keyPrefix,
2023-09-09 23:24:11 +08:00
}
2023-09-09 23:32:37 +08:00
go wo.getTask()
2023-11-13 23:49:42 +08:00
go wo.watch()
2023-09-09 23:24:11 +08:00
})
return wo
}
2024-07-02 21:18:16 +08:00
// 添加任务(覆盖)
2023-09-09 23:24:11 +08:00
// 重复插入就代表覆盖
2024-05-25 20:42:32 +08:00
// @param jobType string 任务类型
// @param uniTaskId string 任务唯一标识
// @param delayTime time.Duration 延迟时间
// @param attachData interface{} 附加数据
2024-10-11 16:17:22 +08:00
func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
2023-09-09 23:24:11 +08:00
if delayTime.Abs() != delayTime {
return fmt.Errorf("时间间隔不能为负数")
}
if delayTime == 0 {
return fmt.Errorf("时间间隔不能为0")
}
2024-07-02 21:18:16 +08:00
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
2023-09-09 23:24:11 +08:00
ed := extendData{
Delay: delayTime,
2024-05-25 20:42:32 +08:00
Data: attachData,
2023-09-09 23:24:11 +08:00
}
b, _ := json.Marshal(ed)
2024-05-25 20:42:32 +08:00
// 写入附加数据
2024-10-11 16:17:22 +08:00
_, err := w.redis.SetEX(w.ctx, w.keyPrefix+redisKey, b, delayTime+time.Second*5).Result()
2023-09-09 23:24:11 +08:00
if err != nil {
return err
}
2024-07-02 21:18:16 +08:00
// 写入执行时间
2023-09-09 23:24:11 +08:00
_, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{
Score: float64(time.Now().Add(delayTime).UnixMilli()),
Member: redisKey,
}).Result()
return err
}
2024-07-02 21:18:16 +08:00
// 添加任务(不覆盖)
2024-10-11 16:17:22 +08:00
func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
2024-07-02 21:18:16 +08:00
// 判断有序集合Key是否存在,存在则报错,不存在则写入
2024-10-11 16:17:22 +08:00
if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 {
2024-10-09 17:03:54 +08:00
ed := extendData{
Delay: delayTime,
Data: attachData,
}
b, _ := json.Marshal(ed)
// 写入附加数据
2024-10-11 16:17:22 +08:00
_, err := l.redis.SetEX(l.ctx, l.keyPrefix+redisKey, b, delayTime+time.Second*5).Result()
2024-10-09 17:03:54 +08:00
if err != nil {
return err
}
_, err = l.redis.ZAdd(l.ctx, l.zsetKey, &redis.Z{
Score: float64(time.Now().Add(delayTime).UnixMilli()),
Member: redisKey,
}).Result()
if err != nil {
return err
}
}
2024-07-02 21:18:16 +08:00
return nil
}
2023-09-09 23:24:11 +08:00
// 删除任务
2024-07-02 21:18:16 +08:00
func (w *Once) Delete(taskType string, taskId string) error {
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
2023-09-09 23:24:11 +08:00
w.redis.Del(w.ctx, redisKey).Result()
w.redis.ZRem(w.ctx, w.zsetKey, redisKey).Result()
return nil
}
2024-07-02 21:18:16 +08:00
// 获取任务
func (l *Once) Get(taskType string, taskId string) {
//
}
2023-09-09 23:24:11 +08:00
// 获取任务
2024-05-08 13:01:55 +08:00
func (w *Once) getTask() {
2023-09-09 23:32:37 +08:00
timer := time.NewTicker(time.Millisecond * 200)
2023-09-09 23:24:11 +08:00
defer timer.Stop()
Loop:
for {
select {
case <-timer.C:
script := `
local token = redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2])
for i,v in ipairs(token) do
redis.call('zrem',KEYS[1],v)
redis.call('lpush',KEYS[2],v)
end
return "OK"
`
w.redis.Eval(w.ctx, script, []string{w.zsetKey, w.listKey}, 0, time.Now().UnixMilli()).Result()
// fmt.Println(i, err)
case <-w.ctx.Done():
break Loop
}
}
}
2023-11-13 23:49:42 +08:00
// 监听任务
2024-05-08 13:01:55 +08:00
func (w *Once) watch() {
2023-09-09 23:24:11 +08:00
for {
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
if err != nil {
2024-07-02 21:18:16 +08:00
// fmt.Println("watch err:", err)
2023-09-09 23:24:11 +08:00
continue
}
2024-07-02 21:18:16 +08:00
ctx := context.WithValue(w.ctx, "trace_id", uuid.NewV4().String())
go w.doTask(ctx, keys[1])
2023-11-13 23:49:42 +08:00
}
}
2024-05-08 13:01:55 +08:00
// 执行任务
2024-07-02 21:18:16 +08:00
func (l *Once) doTask(ctx context.Context, key string) {
2023-11-13 23:49:42 +08:00
defer func() {
if err := recover(); err != nil {
2024-07-02 21:18:16 +08:00
l.logger.Errorf(ctx, "timer:回调任务panic:%s stack:%s", err, string(debug.Stack()))
2023-11-13 23:49:42 +08:00
}
}()
s := strings.Split(key, "[:]")
// 读取数据
2024-10-11 16:17:22 +08:00
redisKey := l.keyPrefix + key
str, err := l.redis.Get(ctx, redisKey).Result()
2023-11-13 23:49:42 +08:00
if err != nil {
2024-10-15 18:15:09 +08:00
l.logger.Errorf(ctx, "获取数据失败 key:%s err:%s", key, err)
2023-11-13 23:49:42 +08:00
return
}
2024-10-11 16:17:22 +08:00
2024-10-15 18:15:09 +08:00
l.logger.Infof(ctx, "任务执行:%s 参数:%s", key, str)
2024-10-11 16:17:22 +08:00
2023-11-13 23:49:42 +08:00
ed := extendData{}
json.Unmarshal([]byte(str), &ed)
2024-10-09 19:37:13 +08:00
resp := l.worker.Worker(ctx, s[0], s[1], ed.Data)
if resp == nil {
return
}
2023-11-13 23:49:42 +08:00
2024-10-09 19:37:13 +08:00
if resp.Retry {
2023-11-13 23:49:42 +08:00
// 重新放入队列
2024-10-09 19:37:13 +08:00
if resp.DelayTime != 0 && resp.DelayTime == resp.DelayTime.Abs() {
ed.Delay = resp.DelayTime
2023-11-13 23:49:42 +08:00
}
2024-10-09 19:37:13 +08:00
ed.Data = resp.AttachData
2024-07-02 21:18:16 +08:00
l.logger.Infof(ctx, "任务重新放入队列:%s", key)
l.Create(s[0], s[1], ed.Delay, ed.Data)
2023-09-09 23:24:11 +08:00
}
}