Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 25b5008af9 | |||
| 3719c417fb | |||
| ec41fd80a8 | |||
| c61b82587b | |||
| 6df89da568 | |||
| 2cc97438b4 |
+22
-17
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"github.com/yuninks/cachex"
|
||||
"github.com/yuninks/lockx"
|
||||
)
|
||||
@@ -32,7 +33,8 @@ type Cluster struct {
|
||||
redis redis.UniversalClient
|
||||
cache *cachex.Cache
|
||||
logger Logger
|
||||
keyPrefix string // key前缀
|
||||
keyPrefix string // key前缀
|
||||
location *time.Location // 根据时区计算的时间
|
||||
|
||||
lockKey string // 全局计算的key
|
||||
zsetKey string // 有序集合的key
|
||||
@@ -55,6 +57,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
||||
cache: cachex.NewCache(),
|
||||
logger: op.logger,
|
||||
keyPrefix: keyPrefix,
|
||||
location: op.location,
|
||||
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
|
||||
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
|
||||
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
|
||||
@@ -92,8 +95,8 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
|
||||
// @param callback 回调函数
|
||||
// @param extendData 扩展数据
|
||||
// @return error
|
||||
func (c *Cluster) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) error {
|
||||
nowTime := time.Now()
|
||||
func (c *Cluster) EveryMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
|
||||
nowTime := time.Now().In(c.location)
|
||||
|
||||
jobData := JobData{
|
||||
JobType: JobTypeEveryMonth,
|
||||
@@ -114,8 +117,8 @@ func (c *Cluster) AddMonth(ctx context.Context, taskId string, day int, hour int
|
||||
// @param hour int 小时
|
||||
// @param minute int 分钟
|
||||
// @param second int 秒
|
||||
func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) error {
|
||||
nowTime := time.Now()
|
||||
func (c *Cluster) EveryWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
|
||||
nowTime := time.Now().In(c.location)
|
||||
|
||||
jobData := JobData{
|
||||
JobType: JobTypeEveryWeek,
|
||||
@@ -130,8 +133,8 @@ func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday,
|
||||
}
|
||||
|
||||
// 每天执行一次
|
||||
func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) error {
|
||||
nowTime := time.Now()
|
||||
func (c *Cluster) EveryDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
|
||||
nowTime := time.Now().In(c.location)
|
||||
|
||||
jobData := JobData{
|
||||
JobType: JobTypeEveryDay,
|
||||
@@ -145,8 +148,8 @@ func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute in
|
||||
}
|
||||
|
||||
// 每小时执行一次
|
||||
func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) error {
|
||||
nowTime := time.Now()
|
||||
func (c *Cluster) EveryHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
|
||||
nowTime := time.Now().In(c.location)
|
||||
|
||||
jobData := JobData{
|
||||
JobType: JobTypeEveryHour,
|
||||
@@ -159,8 +162,8 @@ func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second
|
||||
}
|
||||
|
||||
// 每分钟执行一次
|
||||
func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) error {
|
||||
nowTime := time.Now()
|
||||
func (c *Cluster) EveryMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
|
||||
nowTime := time.Now().In(c.location)
|
||||
|
||||
jobData := JobData{
|
||||
JobType: JobTypeEveryMinute,
|
||||
@@ -172,8 +175,8 @@ func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, call
|
||||
}
|
||||
|
||||
// 特定时间间隔
|
||||
func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) error {
|
||||
nowTime := time.Now()
|
||||
func (c *Cluster) EverySpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
|
||||
nowTime := time.Now().In(c.location)
|
||||
|
||||
if spaceTime < 0 {
|
||||
c.logger.Errorf(ctx, "间隔时间不能小于0")
|
||||
@@ -200,14 +203,14 @@ func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Du
|
||||
// @param callback callback 回调函数
|
||||
// @param extendData interface{} 扩展数据
|
||||
// @return error
|
||||
func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback callback, extendData interface{}) error {
|
||||
func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
|
||||
_, ok := clusterWorkerList.Load(taskId)
|
||||
if ok {
|
||||
c.logger.Errorf(ctx, "key已存在:%s", taskId)
|
||||
return errors.New("key已存在")
|
||||
}
|
||||
|
||||
_, err := GetNextTime(time.Now(), time.Local, jobData)
|
||||
_, err := GetNextTime(time.Now().In(c.location), jobData)
|
||||
if err != nil {
|
||||
c.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error())
|
||||
return err
|
||||
@@ -258,7 +261,7 @@ func (c *Cluster) getNextTime() {
|
||||
clusterWorkerList.Range(func(key, value interface{}) bool {
|
||||
val := value.(timerStr)
|
||||
|
||||
nextTime, _ := GetNextTime(time.Now(), time.Local, *val.JobData)
|
||||
nextTime, _ := GetNextTime(time.Now().In(c.location), *val.JobData)
|
||||
|
||||
// fmt.Println(val.ExtendData, val.JobData, nextTime)
|
||||
|
||||
@@ -436,10 +439,12 @@ func (c *Cluster) doTask(ctx context.Context, taskId string) {
|
||||
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
c.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack()))
|
||||
c.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
|
||||
}
|
||||
}()
|
||||
|
||||
ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String)
|
||||
|
||||
// 执行任务
|
||||
t.Callback(ctx, t.ExtendData)
|
||||
}
|
||||
|
||||
+6
-6
@@ -30,7 +30,7 @@ func TestCluster_AddEveryMonth(t *testing.T) {
|
||||
}
|
||||
extendData := "testData"
|
||||
|
||||
err := cluster.AddMonth(ctx, taskId, 1, hour, minute, second, callback, extendData)
|
||||
err := cluster.EveryMonth(ctx, taskId, 1, hour, minute, second, callback, extendData)
|
||||
if err != nil {
|
||||
t.Errorf("AddEveryMonth failed, err: %v", err)
|
||||
}
|
||||
@@ -59,7 +59,7 @@ func TestCluster_AddEveryWeek(t *testing.T) {
|
||||
}
|
||||
extendData := "testData"
|
||||
|
||||
err := cluster.AddWeek(ctx, taskId, week, hour, minute, second, callback, extendData)
|
||||
err := cluster.EveryWeek(ctx, taskId, week, hour, minute, second, callback, extendData)
|
||||
if err != nil {
|
||||
t.Errorf("AddEveryWeek failed, err: %v", err)
|
||||
}
|
||||
@@ -87,7 +87,7 @@ func TestCluster_AddEveryDay(t *testing.T) {
|
||||
}
|
||||
extendData := "testData"
|
||||
|
||||
err := cluster.AddDay(ctx, taskId, hour, minute, second, callback, extendData)
|
||||
err := cluster.EveryDay(ctx, taskId, hour, minute, second, callback, extendData)
|
||||
if err != nil {
|
||||
t.Errorf("AddEveryDay failed, err: %v", err)
|
||||
}
|
||||
@@ -114,7 +114,7 @@ func TestCluster_AddEveryHour(t *testing.T) {
|
||||
}
|
||||
extendData := "testData"
|
||||
|
||||
err := cluster.AddHour(ctx, taskId, minute, second, callback, extendData)
|
||||
err := cluster.EveryHour(ctx, taskId, minute, second, callback, extendData)
|
||||
if err != nil {
|
||||
t.Errorf("AddEveryHour failed, err: %v", err)
|
||||
}
|
||||
@@ -140,7 +140,7 @@ func TestCluster_AddEveryMinute(t *testing.T) {
|
||||
}
|
||||
extendData := "testData"
|
||||
|
||||
err := cluster.AddMinute(ctx, taskId, second, callback, extendData)
|
||||
err := cluster.EveryMinute(ctx, taskId, second, callback, extendData)
|
||||
if err != nil {
|
||||
t.Errorf("AddEveryMinute failed, err: %v", err)
|
||||
}
|
||||
@@ -170,7 +170,7 @@ func TestCluster_Add(t *testing.T) {
|
||||
}
|
||||
extendData := "testData"
|
||||
|
||||
err := cluster.AddSpace(ctx, taskId, dur, callback, extendData)
|
||||
err := cluster.EverySpace(ctx, taskId, dur, callback, extendData)
|
||||
if err != nil {
|
||||
t.Errorf("Add failed, err: %v", err)
|
||||
}
|
||||
|
||||
+11
-11
@@ -34,13 +34,13 @@ func cluster() {
|
||||
client := getRedis()
|
||||
ctx := context.Background()
|
||||
cluster := timerx.InitCluster(ctx, client, "test")
|
||||
err := cluster.AddSpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
|
||||
err := cluster.EverySpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
|
||||
fmt.Println(err)
|
||||
err = cluster.AddMinute(ctx, "test_min", 15, aa, "这是分钟任务")
|
||||
err = cluster.EveryMinute(ctx, "test_min", 15, aa, "这是分钟任务")
|
||||
fmt.Println(err)
|
||||
err = cluster.AddHour(ctx, "test_hour", 30, 0, aa, "这是小时任务")
|
||||
err = cluster.EveryHour(ctx, "test_hour", 30, 0, aa, "这是小时任务")
|
||||
fmt.Println(err)
|
||||
err = cluster.AddDay(ctx, "test_day", 11, 0, 0, aa, "这是天任务")
|
||||
err = cluster.EveryDay(ctx, "test_day", 11, 0, 0, aa, "这是天任务")
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ func worker() {
|
||||
|
||||
type Worker struct{}
|
||||
|
||||
func (w *Worker) Worker(jobType string, uniqueKey string, data interface{}) (timerx.WorkerCode, time.Duration) {
|
||||
func (w *Worker) Worker(ctx context.Context, jobType string, uniqueKey string, data interface{}) (timerx.WorkerCode, time.Duration) {
|
||||
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||
fmt.Println(uniqueKey, jobType)
|
||||
fmt.Println(data)
|
||||
@@ -93,12 +93,12 @@ func re() {
|
||||
|
||||
ctx := context.Background()
|
||||
cl := timerx.InitCluster(ctx, client, "kkkk")
|
||||
cl.AddSpace(ctx, "test1", 1*time.Millisecond, aa, "data")
|
||||
cl.AddSpace(ctx, "test2", 1*time.Millisecond, aa, "data")
|
||||
cl.AddSpace(ctx, "test3", 1*time.Millisecond, aa, "data")
|
||||
cl.AddSpace(ctx, "test4", 1*time.Millisecond, aa, "data")
|
||||
cl.AddSpace(ctx, "test5", 1*time.Millisecond, aa, "data")
|
||||
cl.AddSpace(ctx, "test6", 1*time.Millisecond, aa, "data")
|
||||
cl.EverySpace(ctx, "test1", 1*time.Millisecond, aa, "data")
|
||||
cl.EverySpace(ctx, "test2", 1*time.Millisecond, aa, "data")
|
||||
cl.EverySpace(ctx, "test3", 1*time.Millisecond, aa, "data")
|
||||
cl.EverySpace(ctx, "test4", 1*time.Millisecond, aa, "data")
|
||||
cl.EverySpace(ctx, "test5", 1*time.Millisecond, aa, "data")
|
||||
cl.EverySpace(ctx, "test6", 1*time.Millisecond, aa, "data")
|
||||
|
||||
select {}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ go 1.19
|
||||
|
||||
require (
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
github.com/satori/go.uuid v1.2.0
|
||||
github.com/yuninks/cachex v1.0.5
|
||||
github.com/yuninks/lockx v1.0.2
|
||||
)
|
||||
@@ -11,4 +12,5 @@ require (
|
||||
require (
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
|
||||
)
|
||||
|
||||
@@ -5,9 +5,16 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
|
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
|
||||
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
|
||||
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
||||
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
|
||||
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
|
||||
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
|
||||
github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM=
|
||||
github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM=
|
||||
github.com/yuninks/lockx v1.0.2 h1:p0n791WmsU8D7YF2tQaNLwPE75jdd774unlJZRTNfaw=
|
||||
@@ -15,5 +22,7 @@ github.com/yuninks/lockx v1.0.2/go.mod h1:J6wvuUELLcMn6FCmiZFt7K5w1QQAh1myL7h3Jr
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
|
||||
+22
-21
@@ -8,21 +8,21 @@ import (
|
||||
// 计算该任务下次执行时间
|
||||
// @param job *JobData 任务数据
|
||||
// @return time.Time 下次执行时间
|
||||
func GetNextTime(t time.Time, loc *time.Location, job JobData) (*time.Time, error) {
|
||||
func GetNextTime(t time.Time, job JobData) (*time.Time, error) {
|
||||
|
||||
var next time.Time
|
||||
|
||||
switch job.JobType {
|
||||
case JobTypeEveryMonth:
|
||||
next = calculateNextMonthTime(t, job, loc)
|
||||
next = calculateNextMonthTime(t, job)
|
||||
case JobTypeEveryWeek:
|
||||
next = calculateNextWeekTime(t, job, loc)
|
||||
next = calculateNextWeekTime(t, job)
|
||||
case JobTypeEveryDay:
|
||||
next = calculateNextDayTime(t, job, loc)
|
||||
next = calculateNextDayTime(t, job)
|
||||
case JobTypeEveryHour:
|
||||
next = calculateNextHourTime(t, job, loc)
|
||||
next = calculateNextHourTime(t, job)
|
||||
case JobTypeEveryMinute:
|
||||
next = calculateNextMinuteTime(t, job, loc)
|
||||
next = calculateNextMinuteTime(t, job)
|
||||
case JobTypeInterval:
|
||||
next = calculateNextInterval(t, job)
|
||||
default:
|
||||
@@ -38,16 +38,17 @@ func calculateNextInterval(t time.Time, job JobData) time.Time {
|
||||
return job.BaseTime.Add(job.IntervalTime * time.Duration(cycle+1))
|
||||
}
|
||||
|
||||
func calculateNextMonthTime(t time.Time, job JobData, loc *time.Location) time.Time {
|
||||
func calculateNextMonthTime(t time.Time, job JobData) time.Time {
|
||||
// 判断是否可执行并返回下一个执行时间
|
||||
|
||||
if canRun(t, job) {
|
||||
return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, t.Location())
|
||||
}
|
||||
// 下一个周期(下个月)
|
||||
return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, t.Location())
|
||||
}
|
||||
|
||||
func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Time {
|
||||
func calculateNextWeekTime(t time.Time, job JobData) time.Time {
|
||||
weekday := t.Weekday()
|
||||
days := int(job.Weekday - weekday)
|
||||
if days < 0 {
|
||||
@@ -55,37 +56,37 @@ func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Ti
|
||||
}
|
||||
// 判断是否可执行并返回下一个执行时间
|
||||
if canRun(t, job) {
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location())
|
||||
}
|
||||
// 下一个周期(下周)
|
||||
return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, t.Location())
|
||||
}
|
||||
|
||||
func calculateNextDayTime(t time.Time, job JobData, loc *time.Location) time.Time {
|
||||
func calculateNextDayTime(t time.Time, job JobData) time.Time {
|
||||
// 判断是否可执行并返回下一个执行时间
|
||||
if canRun(t, job) {
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location())
|
||||
}
|
||||
// 下一个周期(明天)
|
||||
return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, t.Location())
|
||||
}
|
||||
|
||||
func calculateNextHourTime(t time.Time, job JobData, loc *time.Location) time.Time {
|
||||
func calculateNextHourTime(t time.Time, job JobData) time.Time {
|
||||
// 判断是否可执行并返回下一个执行时间
|
||||
if canRun(t, job) {
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, t.Location())
|
||||
}
|
||||
// 下一个周期(下个小时)
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, t.Location())
|
||||
}
|
||||
|
||||
func calculateNextMinuteTime(t time.Time, job JobData, loc *time.Location) time.Time {
|
||||
func calculateNextMinuteTime(t time.Time, job JobData) time.Time {
|
||||
// 判断是否可执行并返回下一个执行时间
|
||||
if canRun(t, job) {
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, t.Location())
|
||||
}
|
||||
// 下一个周期(下分钟)
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, loc)
|
||||
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, t.Location())
|
||||
}
|
||||
|
||||
// 检查是否本周期可以运行
|
||||
|
||||
+2
-2
@@ -92,8 +92,8 @@ func TestGetNextTime(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
now := time.Now()
|
||||
loc := time.FixedZone("CST", 8*3600)
|
||||
nextTime, err := timerx.GetNextTime(now,loc,test.job)
|
||||
// loc := time.FixedZone("CST", 8*3600)
|
||||
nextTime, err := timerx.GetNextTime(now, test.job)
|
||||
if err != nil {
|
||||
if test.expectedError == nil || err.Error() != test.expectedError.Error() {
|
||||
t.Errorf("Expected error: %v, Got error: %v", test.expectedError, err)
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
)
|
||||
|
||||
// 功能描述
|
||||
@@ -43,7 +44,7 @@ type Callback interface {
|
||||
// @param data interface{} 任务数据
|
||||
// @return WorkerCode 任务执行结果
|
||||
// @return time.Duration 任务执行时间间隔
|
||||
Worker(jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration)
|
||||
Worker(ctx context.Context, jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration)
|
||||
}
|
||||
|
||||
var wo *Once = nil
|
||||
@@ -165,7 +166,7 @@ func (w *Once) watch() {
|
||||
func (w *Once) doTask(key string) {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
fmt.Println("timer:定时器出错", err)
|
||||
fmt.Println("timer:回调任务panic", err)
|
||||
log.Println("errStack", string(debug.Stack()))
|
||||
}
|
||||
}()
|
||||
@@ -182,7 +183,10 @@ func (w *Once) doTask(key string) {
|
||||
json.Unmarshal([]byte(str), &ed)
|
||||
|
||||
fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05"))
|
||||
code, t := w.worker.Worker(s[0], s[1], ed.Data)
|
||||
|
||||
ctx := context.WithValue(context.Background(), "trace_id", uuid.NewV4().String)
|
||||
|
||||
code, t := w.worker.Worker(ctx, s[0], s[1], ed.Data)
|
||||
|
||||
if code == WorkerCodeAgain {
|
||||
// 重新放入队列
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
package timerx
|
||||
|
||||
import "time"
|
||||
|
||||
type Options struct {
|
||||
logger Logger
|
||||
logger Logger
|
||||
location *time.Location
|
||||
}
|
||||
|
||||
func defaultOptions() Options {
|
||||
return Options{
|
||||
logger: NewLogger(),
|
||||
logger: NewLogger(),
|
||||
location: time.Local,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,8 +32,8 @@ func SetLogger(log Logger) Option {
|
||||
}
|
||||
|
||||
// 设定时区
|
||||
func SetTimeZone(zone string) Option {
|
||||
func SetTimeZone(zone *time.Location) Option {
|
||||
return func(o *Options) {
|
||||
// todo
|
||||
o.location = zone
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
uuid "github.com/satori/go.uuid"
|
||||
)
|
||||
|
||||
// 简单定时器
|
||||
@@ -22,8 +24,9 @@ var singleTimerIndex int // 当前定时数目
|
||||
var singleOnceLimit sync.Once // 实现单例
|
||||
|
||||
type Single struct {
|
||||
ctx context.Context
|
||||
logger Logger
|
||||
ctx context.Context
|
||||
logger Logger
|
||||
location *time.Location
|
||||
}
|
||||
|
||||
var sin *Single = nil
|
||||
@@ -38,8 +41,9 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
|
||||
op := newOptions(opts...)
|
||||
|
||||
sin = &Single{
|
||||
ctx: ctx,
|
||||
logger: op.logger,
|
||||
ctx: ctx,
|
||||
logger: op.logger,
|
||||
location: op.location,
|
||||
}
|
||||
|
||||
timer := time.NewTicker(time.Millisecond * 200)
|
||||
@@ -53,7 +57,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
|
||||
continue
|
||||
}
|
||||
// 迭代定时器
|
||||
sin.iterator(ctx, t)
|
||||
sin.iterator(ctx)
|
||||
// fmt.Println("timer: 执行")
|
||||
case <-ctx.Done():
|
||||
// 跳出循环
|
||||
@@ -77,7 +81,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
|
||||
// @param callback 回调函数
|
||||
// @param extendData 扩展数据
|
||||
// @return error
|
||||
func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) {
|
||||
func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
|
||||
nowTime := time.Now()
|
||||
|
||||
jobData := JobData{
|
||||
@@ -99,7 +103,7 @@ func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int,
|
||||
// @param hour int 小时
|
||||
// @param minute int 分钟
|
||||
// @param second int 秒
|
||||
func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) {
|
||||
func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
|
||||
nowTime := time.Now()
|
||||
|
||||
jobData := JobData{
|
||||
@@ -115,7 +119,7 @@ func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday,
|
||||
}
|
||||
|
||||
// 每天执行一次
|
||||
func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) {
|
||||
func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
|
||||
nowTime := time.Now()
|
||||
|
||||
jobData := JobData{
|
||||
@@ -130,7 +134,7 @@ func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int
|
||||
}
|
||||
|
||||
// 每小时执行一次
|
||||
func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) (int, error) {
|
||||
func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
|
||||
nowTime := time.Now()
|
||||
|
||||
jobData := JobData{
|
||||
@@ -144,7 +148,7 @@ func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second
|
||||
}
|
||||
|
||||
// 每分钟执行一次
|
||||
func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) (int, error) {
|
||||
func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
|
||||
nowTime := time.Now()
|
||||
|
||||
jobData := JobData{
|
||||
@@ -157,7 +161,7 @@ func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callb
|
||||
}
|
||||
|
||||
// 特定时间间隔
|
||||
func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) (int, error) {
|
||||
func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
|
||||
nowTime := time.Now()
|
||||
|
||||
if spaceTime < 0 {
|
||||
@@ -180,12 +184,10 @@ func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Dur
|
||||
// @param extend 附加参数
|
||||
// @return int 定时器索引
|
||||
// @return error 错误
|
||||
func (l *Single) addJob(ctx context.Context, jobData JobData, call callback, extend interface{}) (int, error) {
|
||||
func (l *Single) addJob(ctx context.Context, jobData JobData, call func(ctx context.Context, extendData interface{}) error, extend interface{}) (int, error) {
|
||||
singleTimerIndex += 1
|
||||
|
||||
nowTime := time.Now()
|
||||
|
||||
_, err := GetNextTime(nowTime, time.Local, jobData)
|
||||
_, err := GetNextTime(time.Now().In(l.location), jobData)
|
||||
if err != nil {
|
||||
l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error())
|
||||
return 0, err
|
||||
@@ -209,7 +211,9 @@ func (s *Single) Del(index int) {
|
||||
}
|
||||
|
||||
// 迭代定时器列表
|
||||
func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
|
||||
func (s *Single) iterator(ctx context.Context) {
|
||||
|
||||
nowTime := time.Now().In(s.location)
|
||||
|
||||
// 默认5秒后(如果没有值就暂停进来5秒)
|
||||
newNextTime := nowTime.Add(time.Second * 5)
|
||||
@@ -221,7 +225,7 @@ func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
|
||||
|
||||
if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) {
|
||||
// 可执行
|
||||
nextTime, _ := GetNextTime(nowTime, time.Local, *timeStr.JobData)
|
||||
nextTime, _ := GetNextTime(nowTime, *timeStr.JobData)
|
||||
timeStr.JobData.NextTime = *nextTime
|
||||
|
||||
if index == 1 {
|
||||
@@ -277,11 +281,14 @@ func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
|
||||
|
||||
// 定时器操作类
|
||||
// 这里不应painc
|
||||
func (s *Single) doTask(ctx context.Context, call callback, extend interface{}) error {
|
||||
func (s *Single) doTask(ctx context.Context, call func(ctx context.Context, extendData interface{}) error, extend interface{}) error {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
s.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack()))
|
||||
s.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
|
||||
}
|
||||
}()
|
||||
|
||||
ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String)
|
||||
|
||||
return call(ctx, extend)
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
)
|
||||
|
||||
type timerStr struct {
|
||||
Callback callback // 需要回调的方法
|
||||
Callback func(ctx context.Context, extendData interface{}) error // 需要回调的方法
|
||||
CanRunning chan (struct{}) // 是否允许执行(only single)
|
||||
TaskId string // 任务ID 全局唯一键(only cluster)
|
||||
ExtendData interface{} // 附加参数
|
||||
@@ -40,4 +40,4 @@ type JobData struct {
|
||||
}
|
||||
|
||||
// 定义各个回调函数
|
||||
type callback func(ctx context.Context, extendData interface{}) error
|
||||
// type callback func(ctx context.Context, extendData interface{}) error
|
||||
|
||||
Reference in New Issue
Block a user