13 Commits

Author SHA1 Message Date
Yun ca0d5a1b99 修复一个单次的BUG 2024-10-11 16:17:22 +08:00
Yun 1ac53f7688 修改附加内容的字段为字节 2024-10-10 10:05:28 +08:00
Yun 4ee3c5e1c2 添加只执行一次的 2024-10-09 19:37:13 +08:00
Yun fa8e3737fa 执行一次的调试 2024-10-09 17:03:54 +08:00
yun cf3e751afe 部分代码尚未完全 2024-07-02 21:18:16 +08:00
Yun 79fda8c78c 添加超时时间的限制 2024-06-22 15:34:49 +08:00
Yun 5ca5b31efb 创建任务不需要验证全局锁 2024-06-19 16:31:51 +08:00
Yun 25b5008af9 修改测试脚本 2024-05-31 13:55:05 +08:00
Yun 3719c417fb 优化传参 2024-05-31 13:05:51 +08:00
Yun ec41fd80a8 修改方法名 2024-05-31 09:52:10 +08:00
Yun c61b82587b 优化关于时区的问题 2024-05-30 11:02:44 +08:00
Yun 6df89da568 修改一些日志提示 2024-05-28 17:36:18 +08:00
Yun 2cc97438b4 回调的ctx添加trace_id 2024-05-28 17:28:20 +08:00
11 changed files with 298 additions and 160 deletions
+36 -26
View File
@@ -10,6 +10,7 @@ import (
"time"
"github.com/go-redis/redis/v8"
uuid "github.com/satori/go.uuid"
"github.com/yuninks/cachex"
"github.com/yuninks/lockx"
)
@@ -31,8 +32,10 @@ type Cluster struct {
ctx context.Context
redis redis.UniversalClient
cache *cachex.Cache
timeout time.Duration
logger Logger
keyPrefix string // key前缀
location *time.Location // 根据时区计算的时间
lockKey string // 全局计算的key
zsetKey string // 有序集合的key
@@ -53,14 +56,19 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
ctx: ctx,
redis: red,
cache: cachex.NewCache(),
timeout: op.timeout,
logger: op.logger,
keyPrefix: keyPrefix,
location: op.location,
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
setKey: "timer:cluster_setKey" + keyPrefix, // 重入集合
}
// 设置锁的超时时间
lockx.InitOption(lockx.SetTimeout(op.timeout))
// 监听任务
go clu.watch()
@@ -92,8 +100,8 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
// @param callback 回调函数
// @param extendData 扩展数据
// @return error
func (c *Cluster) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryMonth,
@@ -114,8 +122,8 @@ func (c *Cluster) AddMonth(ctx context.Context, taskId string, day int, hour int
// @param hour int 小时
// @param minute int 分钟
// @param second int 秒
func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryWeek,
@@ -130,8 +138,8 @@ func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday,
}
// 每天执行一次
func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryDay,
@@ -145,8 +153,8 @@ func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute in
}
// 每小时执行一次
func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryHour,
@@ -159,8 +167,8 @@ func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second
}
// 每分钟执行一次
func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryMinute,
@@ -172,8 +180,8 @@ func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, call
}
// 特定时间间隔
func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EverySpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
if spaceTime < 0 {
c.logger.Errorf(ctx, "间隔时间不能小于0")
@@ -200,29 +208,29 @@ func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Du
// @param callback callback 回调函数
// @param extendData interface{} 扩展数据
// @return error
func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback callback, extendData interface{}) error {
func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
_, ok := clusterWorkerList.Load(taskId)
if ok {
c.logger.Errorf(ctx, "key已存在:%s", taskId)
return errors.New("key已存在")
}
_, err := GetNextTime(time.Now(), time.Local, jobData)
_, err := GetNextTime(time.Now().In(c.location), jobData)
if err != nil {
c.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error())
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// ctx, cancel := context.WithCancel(ctx)
// defer cancel()
lock := lockx.NewGlobalLock(ctx, c.redis, taskId)
tB := lock.Try(2)
if !tB {
c.logger.Errorf(ctx, "添加失败:%s", taskId)
return errors.New("添加失败")
}
defer lock.Unlock()
// lock := lockx.NewGlobalLock(ctx, c.redis, taskId)
// tB := lock.Try(2)
// if !tB {
// c.logger.Errorf(ctx, "添加失败:%s", taskId)
// return errors.New("添加失败")
// }
// defer lock.Unlock()
t := timerStr{
Callback: callback,
@@ -258,7 +266,7 @@ func (c *Cluster) getNextTime() {
clusterWorkerList.Range(func(key, value interface{}) bool {
val := value.(timerStr)
nextTime, _ := GetNextTime(time.Now(), time.Local, *val.JobData)
nextTime, _ := GetNextTime(time.Now().In(c.location), *val.JobData)
// fmt.Println(val.ExtendData, val.JobData, nextTime)
@@ -415,7 +423,7 @@ type ReJobData struct {
// 执行任务
func (c *Cluster) doTask(ctx context.Context, taskId string) {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
val, ok := clusterWorkerList.Load(taskId)
@@ -436,10 +444,12 @@ func (c *Cluster) doTask(ctx context.Context, taskId string) {
defer func() {
if err := recover(); err != nil {
c.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack()))
c.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
}
}()
ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String)
// 执行任务
t.Callback(ctx, t.ExtendData)
}
+6 -6
View File
@@ -30,7 +30,7 @@ func TestCluster_AddEveryMonth(t *testing.T) {
}
extendData := "testData"
err := cluster.AddMonth(ctx, taskId, 1, hour, minute, second, callback, extendData)
err := cluster.EveryMonth(ctx, taskId, 1, hour, minute, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryMonth failed, err: %v", err)
}
@@ -59,7 +59,7 @@ func TestCluster_AddEveryWeek(t *testing.T) {
}
extendData := "testData"
err := cluster.AddWeek(ctx, taskId, week, hour, minute, second, callback, extendData)
err := cluster.EveryWeek(ctx, taskId, week, hour, minute, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryWeek failed, err: %v", err)
}
@@ -87,7 +87,7 @@ func TestCluster_AddEveryDay(t *testing.T) {
}
extendData := "testData"
err := cluster.AddDay(ctx, taskId, hour, minute, second, callback, extendData)
err := cluster.EveryDay(ctx, taskId, hour, minute, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryDay failed, err: %v", err)
}
@@ -114,7 +114,7 @@ func TestCluster_AddEveryHour(t *testing.T) {
}
extendData := "testData"
err := cluster.AddHour(ctx, taskId, minute, second, callback, extendData)
err := cluster.EveryHour(ctx, taskId, minute, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryHour failed, err: %v", err)
}
@@ -140,7 +140,7 @@ func TestCluster_AddEveryMinute(t *testing.T) {
}
extendData := "testData"
err := cluster.AddMinute(ctx, taskId, second, callback, extendData)
err := cluster.EveryMinute(ctx, taskId, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryMinute failed, err: %v", err)
}
@@ -170,7 +170,7 @@ func TestCluster_Add(t *testing.T) {
}
extendData := "testData"
err := cluster.AddSpace(ctx, taskId, dur, callback, extendData)
err := cluster.EverySpace(ctx, taskId, dur, callback, extendData)
if err != nil {
t.Errorf("Add failed, err: %v", err)
}
+84 -37
View File
@@ -24,57 +24,104 @@ func main() {
// re()
// d()
cluster()
// cluster()
once()
select {}
}
func once() {
client := getRedis()
ctx := context.Background()
w := OnceWorker{}
one := timerx.InitOnce(ctx, client, "test", w)
d := OnceData{
Num: 3,
}
// dy, _ := json.Marshal(d)
err := one.Create("test", "test3", 1*time.Second, d)
if err != nil {
fmt.Println(err)
}
d = OnceData{
Num: 4,
}
// dy, _ = json.Marshal(d)
err = one.Create("test", "test4", 1*time.Second, d)
if err != nil {
fmt.Println(err)
}
}
type OnceData struct {
Num int
}
type OnceWorker struct{}
func (l OnceWorker) Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *timerx.OnceWorkerResp {
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(taskType, taskId)
fmt.Printf("原来的参数:%+v\n", attachData)
// d := OnceData{}
// json.Unmarshal(ab, &d)
// d.Num++
// fmt.Println(d)
// dy, _ := json.Marshal(d)
return &timerx.OnceWorkerResp{
Retry: true,
AttachData: attachData,
DelayTime: 1 * time.Second,
}
}
func cluster() {
client := getRedis()
ctx := context.Background()
cluster := timerx.InitCluster(ctx, client, "test")
err := cluster.AddSpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
err := cluster.EverySpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
fmt.Println(err)
err = cluster.AddMinute(ctx, "test_min", 15, aa, "这是分钟任务")
err = cluster.EveryMinute(ctx, "test_min", 15, aa, "这是分钟任务")
fmt.Println(err)
err = cluster.AddHour(ctx, "test_hour", 30, 0, aa, "这是小时任务")
err = cluster.EveryHour(ctx, "test_hour", 30, 0, aa, "这是小时任务")
fmt.Println(err)
err = cluster.AddDay(ctx, "test_day", 11, 0, 0, aa, "这是天任务")
err = cluster.EveryDay(ctx, "test_day", 11, 0, 0, aa, "这是天任务")
fmt.Println(err)
}
func worker() {
client := getRedis()
w := timerx.InitOnce(context.Background(), client, "test", &Worker{})
w.Add("test", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
w.Add("test2", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
w.Add("test3", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
w.Add("test4", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
w.Add("test5", "test", 1*time.Second, map[string]interface{}{
"test": "test",
})
// client := getRedis()
// w := timerx.InitOnce(context.Background(), client, "test", &OnceWorker{})
// w.Save("test", "test", 1*time.Second, map[string]interface{}{
// "test": "test",
// })
// w.Save("test2", "test", 1*time.Second, map[string]interface{}{
// "test": "test",
// })
// w.Save("test3", "test", 1*time.Second, map[string]interface{}{
// "test": "test",
// })
// w.Save("test4", "test", 1*time.Second, map[string]interface{}{
// "test": "test",
// })
// w.Save("test5", "test", 1*time.Second, map[string]interface{}{
// "test": "test",
// })
select {}
}
type Worker struct{}
func (w *Worker) Worker(jobType string, uniqueKey string, data interface{}) (timerx.WorkerCode, time.Duration) {
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(uniqueKey, jobType)
fmt.Println(data)
return timerx.WorkerCodeAgain, time.Second
}
func getRedis() *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1" + ":" + "6379",
@@ -93,12 +140,12 @@ func re() {
ctx := context.Background()
cl := timerx.InitCluster(ctx, client, "kkkk")
cl.AddSpace(ctx, "test1", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test2", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test3", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test4", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test5", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test6", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test1", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test2", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test3", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test4", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test5", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test6", 1*time.Millisecond, aa, "data")
select {}
}
+2
View File
@@ -4,6 +4,7 @@ go 1.19
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/satori/go.uuid v1.2.0
github.com/yuninks/cachex v1.0.5
github.com/yuninks/lockx v1.0.2
)
@@ -11,4 +12,5 @@ require (
require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
+9
View File
@@ -5,9 +5,16 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM=
github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM=
github.com/yuninks/lockx v1.0.2 h1:p0n791WmsU8D7YF2tQaNLwPE75jdd774unlJZRTNfaw=
@@ -15,5 +22,7 @@ github.com/yuninks/lockx v1.0.2/go.mod h1:J6wvuUELLcMn6FCmiZFt7K5w1QQAh1myL7h3Jr
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+22 -21
View File
@@ -8,21 +8,21 @@ import (
// 计算该任务下次执行时间
// @param job *JobData 任务数据
// @return time.Time 下次执行时间
func GetNextTime(t time.Time, loc *time.Location, job JobData) (*time.Time, error) {
func GetNextTime(t time.Time, job JobData) (*time.Time, error) {
var next time.Time
switch job.JobType {
case JobTypeEveryMonth:
next = calculateNextMonthTime(t, job, loc)
next = calculateNextMonthTime(t, job)
case JobTypeEveryWeek:
next = calculateNextWeekTime(t, job, loc)
next = calculateNextWeekTime(t, job)
case JobTypeEveryDay:
next = calculateNextDayTime(t, job, loc)
next = calculateNextDayTime(t, job)
case JobTypeEveryHour:
next = calculateNextHourTime(t, job, loc)
next = calculateNextHourTime(t, job)
case JobTypeEveryMinute:
next = calculateNextMinuteTime(t, job, loc)
next = calculateNextMinuteTime(t, job)
case JobTypeInterval:
next = calculateNextInterval(t, job)
default:
@@ -38,16 +38,17 @@ func calculateNextInterval(t time.Time, job JobData) time.Time {
return job.BaseTime.Add(job.IntervalTime * time.Duration(cycle+1))
}
func calculateNextMonthTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextMonthTime(t time.Time, job JobData) time.Time {
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, t.Location())
}
// 下一个周期(下个月)
return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, t.Location())
}
func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextWeekTime(t time.Time, job JobData) time.Time {
weekday := t.Weekday()
days := int(job.Weekday - weekday)
if days < 0 {
@@ -55,37 +56,37 @@ func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Ti
}
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location())
}
// 下一个周期(下周)
return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, t.Location())
}
func calculateNextDayTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextDayTime(t time.Time, job JobData) time.Time {
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location())
}
// 下一个周期(明天)
return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, t.Location())
}
func calculateNextHourTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextHourTime(t time.Time, job JobData) time.Time {
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, t.Location())
}
// 下一个周期(下个小时)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, t.Location())
}
func calculateNextMinuteTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextMinuteTime(t time.Time, job JobData) time.Time {
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, t.Location())
}
// 下一个周期(下分钟)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, t.Location())
}
// 检查是否本周期可以运行
+2 -2
View File
@@ -92,8 +92,8 @@ func TestGetNextTime(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
now := time.Now()
loc := time.FixedZone("CST", 8*3600)
nextTime, err := timerx.GetNextTime(now,loc,test.job)
// loc := time.FixedZone("CST", 8*3600)
nextTime, err := timerx.GetNextTime(now, test.job)
if err != nil {
if test.expectedError == nil || err.Error() != test.expectedError.Error() {
t.Errorf("Expected error: %v, Got error: %v", test.expectedError, err)
+79 -30
View File
@@ -4,13 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
uuid "github.com/satori/go.uuid"
)
// 功能描述
@@ -26,14 +26,14 @@ type Once struct {
listKey string
redis redis.UniversalClient
worker Callback
keyPrefix string
}
type WorkerCode int
const (
WorkerCodeSuccess WorkerCode = 0 // 处理完成(不需要重入)
WorkerCodeAgain WorkerCode = -1 // 需要继续定时,默认原来的时间
)
type OnceWorkerResp struct {
Retry bool // 是否重试 true
DelayTime time.Duration
AttachData interface{}
}
// 需要考虑执行失败重新放入队列的情况
type Callback interface {
@@ -43,7 +43,7 @@ type Callback interface {
// @param data interface{} 任务数据
// @return WorkerCode 任务执行结果
// @return time.Duration 任务执行时间间隔
Worker(jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration)
Worker(ctx context.Context, taskType string, taskId string, attachData interface{}) *OnceWorkerResp
}
var wo *Once = nil
@@ -55,7 +55,11 @@ type extendData struct {
}
// 初始化
func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Callback, opts ...Option) *Once {
func InitOnce(ctx context.Context, re redis.UniversalClient, keyPrefix string, call Callback, opts ...Option) *Once {
if re == nil {
panic("redis client is nil")
}
op := newOptions(opts...)
once.Do(func() {
wo = &Once{
@@ -65,6 +69,7 @@ func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Call
listKey: "timer:once_listkey" + keyPrefix,
redis: re,
worker: call,
keyPrefix: keyPrefix,
}
go wo.getTask()
go wo.watch()
@@ -73,13 +78,13 @@ func InitOnce(ctx context.Context, re *redis.Client, keyPrefix string, call Call
return wo
}
// 添加任务
// 添加任务(覆盖)
// 重复插入就代表覆盖
// @param jobType string 任务类型
// @param uniTaskId string 任务唯一标识
// @param delayTime time.Duration 延迟时间
// @param attachData interface{} 附加数据
func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, attachData interface{}) error {
func (w *Once) Save(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
if delayTime.Abs() != delayTime {
return fmt.Errorf("时间间隔不能为负数")
}
@@ -87,7 +92,7 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
return fmt.Errorf("时间间隔不能为0")
}
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId)
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
ed := extendData{
Delay: delayTime,
@@ -96,12 +101,12 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
b, _ := json.Marshal(ed)
// 写入附加数据
_, err := w.redis.SetEX(w.ctx, redisKey, b, delayTime+time.Second*5).Result()
_, err := w.redis.SetEX(w.ctx, w.keyPrefix+redisKey, b, delayTime+time.Second*5).Result()
if err != nil {
return err
}
// 入执行时间
// 入执行时间
_, err = w.redis.ZAdd(w.ctx, w.zsetKey, &redis.Z{
Score: float64(time.Now().Add(delayTime).UnixMilli()),
Member: redisKey,
@@ -110,9 +115,38 @@ func (w *Once) Add(jobType string, uniTaskId string, delayTime time.Duration, at
return err
}
// 添加任务(不覆盖)
func (l *Once) Create(taskType string, taskId string, delayTime time.Duration, attachData interface{}) error {
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
// 判断有序集合Key是否存在,存在则报错,不存在则写入
if l.redis.ZScore(l.ctx, l.zsetKey, redisKey).Val() == 0 {
ed := extendData{
Delay: delayTime,
Data: attachData,
}
b, _ := json.Marshal(ed)
// 写入附加数据
_, err := l.redis.SetEX(l.ctx, l.keyPrefix+redisKey, b, delayTime+time.Second*5).Result()
if err != nil {
return err
}
_, err = l.redis.ZAdd(l.ctx, l.zsetKey, &redis.Z{
Score: float64(time.Now().Add(delayTime).UnixMilli()),
Member: redisKey,
}).Result()
if err != nil {
return err
}
}
return nil
}
// 删除任务
func (w *Once) Del(jobType string, uniTaskId string) error {
redisKey := fmt.Sprintf("%s[:]%s", jobType, uniTaskId)
func (w *Once) Delete(taskType string, taskId string) error {
redisKey := fmt.Sprintf("%s[:]%s", taskType, taskId)
w.redis.Del(w.ctx, redisKey).Result()
@@ -121,6 +155,11 @@ func (w *Once) Del(jobType string, uniTaskId string) error {
return nil
}
// 获取任务
func (l *Once) Get(taskType string, taskId string) {
//
}
// 获取任务
func (w *Once) getTask() {
timer := time.NewTicker(time.Millisecond * 200)
@@ -152,44 +191,54 @@ func (w *Once) watch() {
for {
keys, err := w.redis.BLPop(w.ctx, time.Second*10, w.listKey).Result()
if err != nil {
fmt.Println("watch err:", err)
// fmt.Println("watch err:", err)
continue
}
go w.doTask(keys[1])
ctx := context.WithValue(w.ctx, "trace_id", uuid.NewV4().String())
go w.doTask(ctx, keys[1])
}
}
// 执行任务
func (w *Once) doTask(key string) {
func (l *Once) doTask(ctx context.Context, key string) {
fmt.Println("任务时间:", time.Now().Format("2006-01-02 15:04:05"))
defer func() {
if err := recover(); err != nil {
fmt.Println("timer:定时器出错", err)
log.Println("errStack", string(debug.Stack()))
l.logger.Errorf(ctx, "timer:回调任务panic:%s stack:%s", err, string(debug.Stack()))
}
}()
s := strings.Split(key, "[:]")
// 读取数据
str, err := w.redis.Get(w.ctx, key).Result()
redisKey := l.keyPrefix + key
str, err := l.redis.Get(ctx, redisKey).Result()
if err != nil {
fmt.Println("execJob err:", err)
l.logger.Errorf(ctx, "获取数据失败 err:%s", err)
return
}
fmt.Println("参数:", str)
ed := extendData{}
json.Unmarshal([]byte(str), &ed)
fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05"))
code, t := w.worker.Worker(s[0], s[1], ed.Data)
resp := l.worker.Worker(ctx, s[0], s[1], ed.Data)
if resp == nil {
return
}
if code == WorkerCodeAgain {
if resp.Retry {
// 重新放入队列
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
if t != 0 && t == t.Abs() {
ed.Delay = t
if resp.DelayTime != 0 && resp.DelayTime == resp.DelayTime.Abs() {
ed.Delay = resp.DelayTime
}
w.Add(s[0], s[1], ed.Delay, ed.Data)
ed.Data = resp.AttachData
l.logger.Infof(ctx, "任务重新放入队列:%s", key)
fmt.Println("重入时间:", time.Now().Format("2006-01-02 15:04:05"))
l.Create(s[0], s[1], ed.Delay, ed.Data)
}
}
+15 -2
View File
@@ -1,12 +1,18 @@
package timerx
import "time"
type Options struct {
logger Logger
location *time.Location
timeout time.Duration
}
func defaultOptions() Options {
return Options{
logger: NewLogger(),
location: time.Local,
timeout: time.Hour,
}
}
@@ -28,8 +34,15 @@ func SetLogger(log Logger) Option {
}
// 设定时区
func SetTimeZone(zone string) Option {
func SetTimeZone(zone *time.Location) Option {
return func(o *Options) {
// todo
o.location = zone
}
}
// 设置任务最长执行时间
func SetTimeout(d time.Duration) Option {
return func(o *Options) {
o.timeout = d
}
}
+22 -15
View File
@@ -8,6 +8,8 @@ import (
"runtime/debug"
"sync"
"time"
uuid "github.com/satori/go.uuid"
)
// 简单定时器
@@ -24,6 +26,7 @@ var singleOnceLimit sync.Once // 实现单例
type Single struct {
ctx context.Context
logger Logger
location *time.Location
}
var sin *Single = nil
@@ -40,6 +43,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
sin = &Single{
ctx: ctx,
logger: op.logger,
location: op.location,
}
timer := time.NewTicker(time.Millisecond * 200)
@@ -53,7 +57,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
continue
}
// 迭代定时器
sin.iterator(ctx, t)
sin.iterator(ctx)
// fmt.Println("timer: 执行")
case <-ctx.Done():
// 跳出循环
@@ -77,7 +81,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
// @param callback 回调函数
// @param extendData 扩展数据
// @return error
func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -99,7 +103,7 @@ func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int,
// @param hour int 小时
// @param minute int 分钟
// @param second int 秒
func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -115,7 +119,7 @@ func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday,
}
// 每天执行一次
func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -130,7 +134,7 @@ func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int
}
// 每小时执行一次
func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -144,7 +148,7 @@ func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second
}
// 每分钟执行一次
func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -157,7 +161,7 @@ func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callb
}
// 特定时间间隔
func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
if spaceTime < 0 {
@@ -180,12 +184,10 @@ func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Dur
// @param extend 附加参数
// @return int 定时器索引
// @return error 错误
func (l *Single) addJob(ctx context.Context, jobData JobData, call callback, extend interface{}) (int, error) {
func (l *Single) addJob(ctx context.Context, jobData JobData, call func(ctx context.Context, extendData interface{}) error, extend interface{}) (int, error) {
singleTimerIndex += 1
nowTime := time.Now()
_, err := GetNextTime(nowTime, time.Local, jobData)
_, err := GetNextTime(time.Now().In(l.location), jobData)
if err != nil {
l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error())
return 0, err
@@ -209,7 +211,9 @@ func (s *Single) Del(index int) {
}
// 迭代定时器列表
func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
func (s *Single) iterator(ctx context.Context) {
nowTime := time.Now().In(s.location)
// 默认5秒后(如果没有值就暂停进来5秒)
newNextTime := nowTime.Add(time.Second * 5)
@@ -221,7 +225,7 @@ func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) {
// 可执行
nextTime, _ := GetNextTime(nowTime, time.Local, *timeStr.JobData)
nextTime, _ := GetNextTime(nowTime, *timeStr.JobData)
timeStr.JobData.NextTime = *nextTime
if index == 1 {
@@ -277,11 +281,14 @@ func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
// 定时器操作类
// 这里不应painc
func (s *Single) doTask(ctx context.Context, call callback, extend interface{}) error {
func (s *Single) doTask(ctx context.Context, call func(ctx context.Context, extendData interface{}) error, extend interface{}) error {
defer func() {
if err := recover(); err != nil {
s.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack()))
s.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
}
}()
ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String)
return call(ctx, extend)
}
+2 -2
View File
@@ -6,7 +6,7 @@ import (
)
type timerStr struct {
Callback callback // 需要回调的方法
Callback func(ctx context.Context, extendData interface{}) error // 需要回调的方法
CanRunning chan (struct{}) // 是否允许执行(only single)
TaskId string // 任务ID 全局唯一键(only cluster)
ExtendData interface{} // 附加参数
@@ -40,4 +40,4 @@ type JobData struct {
}
// 定义各个回调函数
type callback func(ctx context.Context, extendData interface{}) error
// type callback func(ctx context.Context, extendData interface{}) error