7 Commits

Author SHA1 Message Date
Yun 5ca5b31efb 创建任务不需要验证全局锁 2024-06-19 16:31:51 +08:00
Yun 25b5008af9 修改测试脚本 2024-05-31 13:55:05 +08:00
Yun 3719c417fb 优化传参 2024-05-31 13:05:51 +08:00
Yun ec41fd80a8 修改方法名 2024-05-31 09:52:10 +08:00
Yun c61b82587b 优化关于时区的问题 2024-05-30 11:02:44 +08:00
Yun 6df89da568 修改一些日志提示 2024-05-28 17:36:18 +08:00
Yun 2cc97438b4 回调的ctx添加trace_id 2024-05-28 17:28:20 +08:00
11 changed files with 126 additions and 94 deletions
+31 -26
View File
@@ -10,6 +10,7 @@ import (
"time"
"github.com/go-redis/redis/v8"
uuid "github.com/satori/go.uuid"
"github.com/yuninks/cachex"
"github.com/yuninks/lockx"
)
@@ -32,7 +33,8 @@ type Cluster struct {
redis redis.UniversalClient
cache *cachex.Cache
logger Logger
keyPrefix string // key前缀
keyPrefix string // key前缀
location *time.Location // 根据时区计算的时间
lockKey string // 全局计算的key
zsetKey string // 有序集合的key
@@ -55,6 +57,7 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
cache: cachex.NewCache(),
logger: op.logger,
keyPrefix: keyPrefix,
location: op.location,
lockKey: "timer:cluster_globalLockKey" + keyPrefix, // 定时器的全局锁
zsetKey: "timer:cluster_zsetKey" + keyPrefix, // 有序集合
listKey: "timer:cluster_listKey" + keyPrefix, // 列表
@@ -92,8 +95,8 @@ func InitCluster(ctx context.Context, red redis.UniversalClient, keyPrefix strin
// @param callback 回调函数
// @param extendData 扩展数据
// @return error
func (c *Cluster) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryMonth,
@@ -114,8 +117,8 @@ func (c *Cluster) AddMonth(ctx context.Context, taskId string, day int, hour int
// @param hour int 小时
// @param minute int 分钟
// @param second int 秒
func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryWeek,
@@ -130,8 +133,8 @@ func (c *Cluster) AddWeek(ctx context.Context, taskId string, week time.Weekday,
}
// 每天执行一次
func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryDay,
@@ -145,8 +148,8 @@ func (c *Cluster) AddDay(ctx context.Context, taskId string, hour int, minute in
}
// 每小时执行一次
func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryHour,
@@ -159,8 +162,8 @@ func (c *Cluster) AddHour(ctx context.Context, taskId string, minute int, second
}
// 每分钟执行一次
func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EveryMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
jobData := JobData{
JobType: JobTypeEveryMinute,
@@ -172,8 +175,8 @@ func (c *Cluster) AddMinute(ctx context.Context, taskId string, second int, call
}
// 特定时间间隔
func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) error {
nowTime := time.Now()
func (c *Cluster) EverySpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
nowTime := time.Now().In(c.location)
if spaceTime < 0 {
c.logger.Errorf(ctx, "间隔时间不能小于0")
@@ -200,29 +203,29 @@ func (c *Cluster) AddSpace(ctx context.Context, taskId string, spaceTime time.Du
// @param callback callback 回调函数
// @param extendData interface{} 扩展数据
// @return error
func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback callback, extendData interface{}) error {
func (c *Cluster) addJob(ctx context.Context, taskId string, jobData JobData, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) error {
_, ok := clusterWorkerList.Load(taskId)
if ok {
c.logger.Errorf(ctx, "key已存在:%s", taskId)
return errors.New("key已存在")
}
_, err := GetNextTime(time.Now(), time.Local, jobData)
_, err := GetNextTime(time.Now().In(c.location), jobData)
if err != nil {
c.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error())
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// ctx, cancel := context.WithCancel(ctx)
// defer cancel()
lock := lockx.NewGlobalLock(ctx, c.redis, taskId)
tB := lock.Try(2)
if !tB {
c.logger.Errorf(ctx, "添加失败:%s", taskId)
return errors.New("添加失败")
}
defer lock.Unlock()
// lock := lockx.NewGlobalLock(ctx, c.redis, taskId)
// tB := lock.Try(2)
// if !tB {
// c.logger.Errorf(ctx, "添加失败:%s", taskId)
// return errors.New("添加失败")
// }
// defer lock.Unlock()
t := timerStr{
Callback: callback,
@@ -258,7 +261,7 @@ func (c *Cluster) getNextTime() {
clusterWorkerList.Range(func(key, value interface{}) bool {
val := value.(timerStr)
nextTime, _ := GetNextTime(time.Now(), time.Local, *val.JobData)
nextTime, _ := GetNextTime(time.Now().In(c.location), *val.JobData)
// fmt.Println(val.ExtendData, val.JobData, nextTime)
@@ -436,10 +439,12 @@ func (c *Cluster) doTask(ctx context.Context, taskId string) {
defer func() {
if err := recover(); err != nil {
c.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack()))
c.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
}
}()
ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String)
// 执行任务
t.Callback(ctx, t.ExtendData)
}
+6 -6
View File
@@ -30,7 +30,7 @@ func TestCluster_AddEveryMonth(t *testing.T) {
}
extendData := "testData"
err := cluster.AddMonth(ctx, taskId, 1, hour, minute, second, callback, extendData)
err := cluster.EveryMonth(ctx, taskId, 1, hour, minute, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryMonth failed, err: %v", err)
}
@@ -59,7 +59,7 @@ func TestCluster_AddEveryWeek(t *testing.T) {
}
extendData := "testData"
err := cluster.AddWeek(ctx, taskId, week, hour, minute, second, callback, extendData)
err := cluster.EveryWeek(ctx, taskId, week, hour, minute, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryWeek failed, err: %v", err)
}
@@ -87,7 +87,7 @@ func TestCluster_AddEveryDay(t *testing.T) {
}
extendData := "testData"
err := cluster.AddDay(ctx, taskId, hour, minute, second, callback, extendData)
err := cluster.EveryDay(ctx, taskId, hour, minute, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryDay failed, err: %v", err)
}
@@ -114,7 +114,7 @@ func TestCluster_AddEveryHour(t *testing.T) {
}
extendData := "testData"
err := cluster.AddHour(ctx, taskId, minute, second, callback, extendData)
err := cluster.EveryHour(ctx, taskId, minute, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryHour failed, err: %v", err)
}
@@ -140,7 +140,7 @@ func TestCluster_AddEveryMinute(t *testing.T) {
}
extendData := "testData"
err := cluster.AddMinute(ctx, taskId, second, callback, extendData)
err := cluster.EveryMinute(ctx, taskId, second, callback, extendData)
if err != nil {
t.Errorf("AddEveryMinute failed, err: %v", err)
}
@@ -170,7 +170,7 @@ func TestCluster_Add(t *testing.T) {
}
extendData := "testData"
err := cluster.AddSpace(ctx, taskId, dur, callback, extendData)
err := cluster.EverySpace(ctx, taskId, dur, callback, extendData)
if err != nil {
t.Errorf("Add failed, err: %v", err)
}
+11 -11
View File
@@ -34,13 +34,13 @@ func cluster() {
client := getRedis()
ctx := context.Background()
cluster := timerx.InitCluster(ctx, client, "test")
err := cluster.AddSpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
err := cluster.EverySpace(ctx, "test_space", 1*time.Second, aa, "这是秒任务")
fmt.Println(err)
err = cluster.AddMinute(ctx, "test_min", 15, aa, "这是分钟任务")
err = cluster.EveryMinute(ctx, "test_min", 15, aa, "这是分钟任务")
fmt.Println(err)
err = cluster.AddHour(ctx, "test_hour", 30, 0, aa, "这是小时任务")
err = cluster.EveryHour(ctx, "test_hour", 30, 0, aa, "这是小时任务")
fmt.Println(err)
err = cluster.AddDay(ctx, "test_day", 11, 0, 0, aa, "这是天任务")
err = cluster.EveryDay(ctx, "test_day", 11, 0, 0, aa, "这是天任务")
fmt.Println(err)
}
@@ -68,7 +68,7 @@ func worker() {
type Worker struct{}
func (w *Worker) Worker(jobType string, uniqueKey string, data interface{}) (timerx.WorkerCode, time.Duration) {
func (w *Worker) Worker(ctx context.Context, jobType string, uniqueKey string, data interface{}) (timerx.WorkerCode, time.Duration) {
fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05"))
fmt.Println(uniqueKey, jobType)
fmt.Println(data)
@@ -93,12 +93,12 @@ func re() {
ctx := context.Background()
cl := timerx.InitCluster(ctx, client, "kkkk")
cl.AddSpace(ctx, "test1", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test2", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test3", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test4", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test5", 1*time.Millisecond, aa, "data")
cl.AddSpace(ctx, "test6", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test1", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test2", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test3", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test4", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test5", 1*time.Millisecond, aa, "data")
cl.EverySpace(ctx, "test6", 1*time.Millisecond, aa, "data")
select {}
}
+2
View File
@@ -4,6 +4,7 @@ go 1.19
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/satori/go.uuid v1.2.0
github.com/yuninks/cachex v1.0.5
github.com/yuninks/lockx v1.0.2
)
@@ -11,4 +12,5 @@ require (
require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
+9
View File
@@ -5,9 +5,16 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/yuninks/cachex v1.0.5 h1:Y2NmTsuEgwEVYb7FVFh5tUN67kmrUioeksQqLbOAwsM=
github.com/yuninks/cachex v1.0.5/go.mod h1:5357qz18UvHTJSgZzkMamUzZoFzGeKG9+4tIUBXRSVM=
github.com/yuninks/lockx v1.0.2 h1:p0n791WmsU8D7YF2tQaNLwPE75jdd774unlJZRTNfaw=
@@ -15,5 +22,7 @@ github.com/yuninks/lockx v1.0.2/go.mod h1:J6wvuUELLcMn6FCmiZFt7K5w1QQAh1myL7h3Jr
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+22 -21
View File
@@ -8,21 +8,21 @@ import (
// 计算该任务下次执行时间
// @param job *JobData 任务数据
// @return time.Time 下次执行时间
func GetNextTime(t time.Time, loc *time.Location, job JobData) (*time.Time, error) {
func GetNextTime(t time.Time, job JobData) (*time.Time, error) {
var next time.Time
switch job.JobType {
case JobTypeEveryMonth:
next = calculateNextMonthTime(t, job, loc)
next = calculateNextMonthTime(t, job)
case JobTypeEveryWeek:
next = calculateNextWeekTime(t, job, loc)
next = calculateNextWeekTime(t, job)
case JobTypeEveryDay:
next = calculateNextDayTime(t, job, loc)
next = calculateNextDayTime(t, job)
case JobTypeEveryHour:
next = calculateNextHourTime(t, job, loc)
next = calculateNextHourTime(t, job)
case JobTypeEveryMinute:
next = calculateNextMinuteTime(t, job, loc)
next = calculateNextMinuteTime(t, job)
case JobTypeInterval:
next = calculateNextInterval(t, job)
default:
@@ -38,16 +38,17 @@ func calculateNextInterval(t time.Time, job JobData) time.Time {
return job.BaseTime.Add(job.IntervalTime * time.Duration(cycle+1))
}
func calculateNextMonthTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextMonthTime(t time.Time, job JobData) time.Time {
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), job.Day, job.Hour, job.Minute, job.Second, 0, t.Location())
}
// 下一个周期(下个月)
return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month()+1, job.Day, job.Hour, job.Minute, job.Second, 0, t.Location())
}
func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextWeekTime(t time.Time, job JobData) time.Time {
weekday := t.Weekday()
days := int(job.Weekday - weekday)
if days < 0 {
@@ -55,37 +56,37 @@ func calculateNextWeekTime(t time.Time, job JobData, loc *time.Location) time.Ti
}
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location())
}
// 下一个周期(下周)
return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day()+days+7, job.Hour, job.Minute, job.Second, 0, t.Location())
}
func calculateNextDayTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextDayTime(t time.Time, job JobData) time.Time {
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), job.Hour, job.Minute, job.Second, 0, t.Location())
}
// 下一个周期(明天)
return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day()+1, job.Hour, job.Minute, job.Second, 0, t.Location())
}
func calculateNextHourTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextHourTime(t time.Time, job JobData) time.Time {
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), job.Minute, job.Second, 0, t.Location())
}
// 下一个周期(下个小时)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour()+1, job.Minute, job.Second, 0, t.Location())
}
func calculateNextMinuteTime(t time.Time, job JobData, loc *time.Location) time.Time {
func calculateNextMinuteTime(t time.Time, job JobData) time.Time {
// 判断是否可执行并返回下一个执行时间
if canRun(t, job) {
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), job.Second, 0, t.Location())
}
// 下一个周期(下分钟)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, loc)
return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute()+1, job.Second, 0, t.Location())
}
// 检查是否本周期可以运行
+2 -2
View File
@@ -92,8 +92,8 @@ func TestGetNextTime(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
now := time.Now()
loc := time.FixedZone("CST", 8*3600)
nextTime, err := timerx.GetNextTime(now,loc,test.job)
// loc := time.FixedZone("CST", 8*3600)
nextTime, err := timerx.GetNextTime(now, test.job)
if err != nil {
if test.expectedError == nil || err.Error() != test.expectedError.Error() {
t.Errorf("Expected error: %v, Got error: %v", test.expectedError, err)
+7 -3
View File
@@ -11,6 +11,7 @@ import (
"time"
"github.com/go-redis/redis/v8"
uuid "github.com/satori/go.uuid"
)
// 功能描述
@@ -43,7 +44,7 @@ type Callback interface {
// @param data interface{} 任务数据
// @return WorkerCode 任务执行结果
// @return time.Duration 任务执行时间间隔
Worker(jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration)
Worker(ctx context.Context, jobType string, uniTaskId string, attachData interface{}) (WorkerCode, time.Duration)
}
var wo *Once = nil
@@ -165,7 +166,7 @@ func (w *Once) watch() {
func (w *Once) doTask(key string) {
defer func() {
if err := recover(); err != nil {
fmt.Println("timer:定时器出错", err)
fmt.Println("timer:回调任务panic", err)
log.Println("errStack", string(debug.Stack()))
}
}()
@@ -182,7 +183,10 @@ func (w *Once) doTask(key string) {
json.Unmarshal([]byte(str), &ed)
fmt.Println("开始时间:", time.Now().Format("2006-01-02 15:04:05"))
code, t := w.worker.Worker(s[0], s[1], ed.Data)
ctx := context.WithValue(context.Background(), "trace_id", uuid.NewV4().String)
code, t := w.worker.Worker(ctx, s[0], s[1], ed.Data)
if code == WorkerCodeAgain {
// 重新放入队列
+8 -4
View File
@@ -1,12 +1,16 @@
package timerx
import "time"
type Options struct {
logger Logger
logger Logger
location *time.Location
}
func defaultOptions() Options {
return Options{
logger: NewLogger(),
logger: NewLogger(),
location: time.Local,
}
}
@@ -28,8 +32,8 @@ func SetLogger(log Logger) Option {
}
// 设定时区
func SetTimeZone(zone string) Option {
func SetTimeZone(zone *time.Location) Option {
return func(o *Options) {
// todo
o.location = zone
}
}
+26 -19
View File
@@ -8,6 +8,8 @@ import (
"runtime/debug"
"sync"
"time"
uuid "github.com/satori/go.uuid"
)
// 简单定时器
@@ -22,8 +24,9 @@ var singleTimerIndex int // 当前定时数目
var singleOnceLimit sync.Once // 实现单例
type Single struct {
ctx context.Context
logger Logger
ctx context.Context
logger Logger
location *time.Location
}
var sin *Single = nil
@@ -38,8 +41,9 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
op := newOptions(opts...)
sin = &Single{
ctx: ctx,
logger: op.logger,
ctx: ctx,
logger: op.logger,
location: op.location,
}
timer := time.NewTicker(time.Millisecond * 200)
@@ -53,7 +57,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
continue
}
// 迭代定时器
sin.iterator(ctx, t)
sin.iterator(ctx)
// fmt.Println("timer: 执行")
case <-ctx.Done():
// 跳出循环
@@ -77,7 +81,7 @@ func InitSingle(ctx context.Context, opts ...Option) *Single {
// @param callback 回调函数
// @param extendData 扩展数据
// @return error
func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -99,7 +103,7 @@ func (c *Single) AddMonth(ctx context.Context, taskId string, day int, hour int,
// @param hour int 小时
// @param minute int 分钟
// @param second int 秒
func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -115,7 +119,7 @@ func (c *Single) AddWeek(ctx context.Context, taskId string, week time.Weekday,
}
// 每天执行一次
func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -130,7 +134,7 @@ func (c *Single) AddDay(ctx context.Context, taskId string, hour int, minute int
}
// 每小时执行一次
func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -144,7 +148,7 @@ func (c *Single) AddHour(ctx context.Context, taskId string, minute int, second
}
// 每分钟执行一次
func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
jobData := JobData{
@@ -157,7 +161,7 @@ func (c *Single) AddMinute(ctx context.Context, taskId string, second int, callb
}
// 特定时间间隔
func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback callback, extendData interface{}) (int, error) {
func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Duration, callback func(ctx context.Context, extendData interface{}) error, extendData interface{}) (int, error) {
nowTime := time.Now()
if spaceTime < 0 {
@@ -180,12 +184,10 @@ func (c *Single) AddSpace(ctx context.Context, taskId string, spaceTime time.Dur
// @param extend 附加参数
// @return int 定时器索引
// @return error 错误
func (l *Single) addJob(ctx context.Context, jobData JobData, call callback, extend interface{}) (int, error) {
func (l *Single) addJob(ctx context.Context, jobData JobData, call func(ctx context.Context, extendData interface{}) error, extend interface{}) (int, error) {
singleTimerIndex += 1
nowTime := time.Now()
_, err := GetNextTime(nowTime, time.Local, jobData)
_, err := GetNextTime(time.Now().In(l.location), jobData)
if err != nil {
l.logger.Errorf(ctx, "获取下次执行时间失败:%s", err.Error())
return 0, err
@@ -209,7 +211,9 @@ func (s *Single) Del(index int) {
}
// 迭代定时器列表
func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
func (s *Single) iterator(ctx context.Context) {
nowTime := time.Now().In(s.location)
// 默认5秒后(如果没有值就暂停进来5秒)
newNextTime := nowTime.Add(time.Second * 5)
@@ -221,7 +225,7 @@ func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
if timeStr.JobData.NextTime.Before(nowTime) || timeStr.JobData.NextTime.Equal(nowTime) {
// 可执行
nextTime, _ := GetNextTime(nowTime, time.Local, *timeStr.JobData)
nextTime, _ := GetNextTime(nowTime, *timeStr.JobData)
timeStr.JobData.NextTime = *nextTime
if index == 1 {
@@ -277,11 +281,14 @@ func (s *Single) iterator(ctx context.Context, nowTime time.Time) {
// 定时器操作类
// 这里不应painc
func (s *Single) doTask(ctx context.Context, call callback, extend interface{}) error {
func (s *Single) doTask(ctx context.Context, call func(ctx context.Context, extendData interface{}) error, extend interface{}) error {
defer func() {
if err := recover(); err != nil {
s.logger.Errorf(ctx, "timer:定时器出错 err:%+v stack:%s", err, string(debug.Stack()))
s.logger.Errorf(ctx, "timer:回调任务panic err:%+v stack:%s", err, string(debug.Stack()))
}
}()
ctx = context.WithValue(ctx, "trace_id", uuid.NewV4().String)
return call(ctx, extend)
}
+2 -2
View File
@@ -6,7 +6,7 @@ import (
)
type timerStr struct {
Callback callback // 需要回调的方法
Callback func(ctx context.Context, extendData interface{}) error // 需要回调的方法
CanRunning chan (struct{}) // 是否允许执行(only single)
TaskId string // 任务ID 全局唯一键(only cluster)
ExtendData interface{} // 附加参数
@@ -40,4 +40,4 @@ type JobData struct {
}
// 定义各个回调函数
type callback func(ctx context.Context, extendData interface{}) error
// type callback func(ctx context.Context, extendData interface{}) error