diff --git a/cluster.go b/cluster.go index be93567..0be4d1f 100644 --- a/cluster.go +++ b/cluster.go @@ -1,4 +1,4 @@ -package timer +package timerx import ( "context" @@ -10,7 +10,7 @@ import ( "sync" "time" - "code.yun.ink/open/timer/lockx" + "code.yun.ink/pkg/lockx" "github.com/go-redis/redis/v8" ) diff --git a/cluster_test.go b/cluster_test.go index 1672e00..ad7e218 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1,4 +1,4 @@ -package timer +package timerx import ( "fmt" diff --git a/cmd/main.go b/cmd/main.go index 005a6b5..ca70092 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "code.yun.ink/open/timer" + "code.yun.ink/pkg/timerx" "github.com/go-redis/redis/v8" ) @@ -30,7 +30,7 @@ func main() { func worker() { client := getRedis() - w := timer.InitWorker(context.Background(), client, &Worker{}) + w := timerx.InitOnce(context.Background(), client, &Worker{}) w.Add("test", "test", 1*time.Second, map[string]interface{}{ "test": "test", }) @@ -52,11 +52,11 @@ func worker() { type Worker struct{} -func (w *Worker) Worker(uniqueKey string, jobType string,data map[string]interface{}) timer.WorkerCode { +func (w *Worker) Worker(uniqueKey string, jobType string, data interface{}) (timerx.WorkerCode, time.Duration) { fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) fmt.Println(uniqueKey, jobType) fmt.Println(data) - return timer.WorkerCodeAgain + return timerx.WorkerCodeAgain,time.Second } func getRedis() *redis.Client { @@ -76,63 +76,22 @@ func re() { client := getRedis() ctx := context.Background() - cl := timer.InitCluster(ctx, client) - cl.AddTimer(ctx, "test1", 1*time.Millisecond, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text1", - }, - }) - cl.AddTimer(ctx, "test2", 1*time.Millisecond, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text2", - }, - }) - cl.AddTimer(ctx, "test3", 1*time.Millisecond, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text3", - }, - }) - cl.AddTimer(ctx, "test4", 1*time.Millisecond, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text4", - }, - }) - cl.AddTimer(ctx, "test5", 1*time.Millisecond, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text5", - }, - }) - cl.AddTimer(ctx, "test6", 1*time.Millisecond, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text6", - }, - }) - cl.AddTimer(ctx, "test7", 1*time.Millisecond, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text7", - }, - }) - cl.AddTimer(ctx, "test8", 1*time.Millisecond, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text8", - }, - }) - cl.AddTimer(ctx, "test9", 1*time.Millisecond, aa, timer.ExtendParams{ - Params: map[string]interface{}{ - "test": "text9", - }, - }) + cl := timerx.InitCluster(ctx, client) + cl.Add(ctx, "test1", 1*time.Millisecond, aa, "data") + cl.Add(ctx, "test2", 1*time.Millisecond, aa, "data") + cl.Add(ctx, "test3", 1*time.Millisecond, aa, "data") + cl.Add(ctx, "test4", 1*time.Millisecond, aa, "data") + cl.Add(ctx, "test5", 1*time.Millisecond, aa, "data") + cl.Add(ctx, "test6", 1*time.Millisecond, aa, "data") select {} } -func aa(ctx context.Context) bool { - // fmt.Println(time.Now().Format(time.RFC3339)) - // fmt.Println("gggggggggggggggggggggggggggg") - a, err := timer.GetExtendParams(ctx) - fmt.Printf("%+v %+v \n\n", a, err) +func aa(ctx context.Context, data interface{}) error { + fmt.Println("执行时间:", time.Now().Format("2006-01-02 15:04:05")) + fmt.Println(data) time.Sleep(time.Second * 5) - return true + return nil } func d() { diff --git a/go.mod b/go.mod index e9a5a75..084ef26 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,11 @@ -module code.yun.ink/open/timer +module code.yun.ink/pkg/timerx go 1.19 require github.com/go-redis/redis/v8 v8.11.5 require ( + code.yun.ink/pkg/lockx v1.0.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect ) diff --git a/go.sum b/go.sum index 7342ff8..c46c1da 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +code.yun.ink/pkg/lockx v1.0.0 h1:xoLyf05PrOAhLID2LbJsEXA8YYURJTK/7spEk/hu/Rs= +code.yun.ink/pkg/lockx v1.0.0/go.mod h1:0xUU5xD8fui0Kf7g4TnFmaxUDo59CH2WM+sitko2SLc= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= diff --git a/lockx/lockx.go b/lockx/lockx.go deleted file mode 100644 index 23f1ede..0000000 --- a/lockx/lockx.go +++ /dev/null @@ -1,120 +0,0 @@ -package lockx - -import ( - "context" - "fmt" - "log" - "time" - - "github.com/go-redis/redis/v8" -) - -// 全局锁 -type globalLock struct { - redis *redis.Client - ctx context.Context - cancel context.CancelFunc - uniqueKey string - value string -} - -func NewGlobalLock(ctx context.Context, red *redis.Client, uniqueKey string) *globalLock { - ctx, cancel := context.WithTimeout(ctx, time.Second*30) - return &globalLock{ - redis: red, - ctx: ctx, - cancel: cancel, - uniqueKey: uniqueKey, - value: fmt.Sprintf("%d", time.Now().UnixNano()), - } -} - -// 获取锁 -func (g *globalLock) Lock() bool { - - script := ` - local token = redis.call('get',KEYS[1]) - if token == false - then - return redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) - end - return 'ERROR' - ` - - resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() - if resp != "OK" { - _ = err - log.Println("globalLock Lock", resp, err, g.uniqueKey, g.value) - } - if resp == "OK" { - g.refresh() - return true - } - return false -} - -// 尝试获取锁 -func (g *globalLock) Try(limitTimes int) bool { - for i := 0; i < limitTimes; i++ { - if g.Lock() { - return true - } - time.Sleep(time.Millisecond * 100) - } - return false -} - -// 删除锁 -func (g *globalLock) Unlock() bool { - - script := ` - local token = redis.call('get',KEYS[1]) - if token == ARGV[1] - then - redis.call('del',KEYS[1]) - return 'OK' - end - return 'ERROR' - ` - - resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value).Result() - if resp != "OK" { - log.Println("globalLock Unlock", resp, err, g.uniqueKey, g.value) - } - g.cancel() - return false -} - -// 刷新锁 -func (g *globalLock) refresh() { - go func() { - t := time.NewTicker(time.Second) - for { - select { - case <-t.C: - g.refreshExec() - case <-g.ctx.Done(): - t.Stop() - return - } - } - }() -} - -func (g *globalLock) refreshExec() bool { - script := ` - local token = redis.call('get',KEYS[1]) - if token == ARGV[1] - then - redis.call('set',KEYS[1],ARGV[1],'EX',ARGV[2]) - return 'OK' - end - return 'ERROR' - ` - - resp, err := g.redis.Eval(g.ctx, script, []string{g.uniqueKey}, g.value, 5).Result() - if resp != "OK" { - log.Println("globalLock refresh", resp, err, g.uniqueKey, g.value) - } - return resp == "OK" -} diff --git a/lockx/lockx_test.go b/lockx/lockx_test.go deleted file mode 100644 index 9b36e9b..0000000 --- a/lockx/lockx_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package lockx_test - -import ( - "context" - "fmt" - "testing" - - "code.yun.ink/open/timer/lockx" - "github.com/go-redis/redis/v8" -) - -var Redis *redis.Client - -// func TestMain(m *testing.M) { -// client := redis.NewClient(&redis.Options{ -// Addr: "127.0.0.1" + ":" + "6379", -// Password: "", // no password set -// DB: 0, // use default DB -// }) -// if client == nil { -// fmt.Println("redis init error") -// return -// } -// // fmt.Println("ffff") -// Redis = client -// } - -func TestLockx(t *testing.T) { - client := redis.NewClient(&redis.Options{ - Addr: "127.0.0.1" + ":" + "6379", - Password: "", // no password set - DB: 0, // use default DB - }) - if client == nil { - fmt.Println("redis init error") - return - } - fmt.Println("begin") - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - lock := lockx.NewGlobalLock(ctx, client, "lockx:test") - - if !lock.Lock() { - fmt.Println("lock error") - } - defer lock.Unlock() - - fmt.Println("ssss") - -} diff --git a/once.go b/once.go index afdbe79..c0e1245 100644 --- a/once.go +++ b/once.go @@ -1,4 +1,4 @@ -package timer +package timerx import ( "context" diff --git a/once_test.go b/once_test.go index 2b97dd1..14e5b7c 100644 --- a/once_test.go +++ b/once_test.go @@ -1,2 +1,2 @@ -package timer +package timerx diff --git a/single.go b/single.go index 827a8a9..e73fa2e 100644 --- a/single.go +++ b/single.go @@ -1,4 +1,4 @@ -package timer +package timerx // 作者:黄新云 diff --git a/single_test.go b/single_test.go index 6fb1621..e19ce1b 100644 --- a/single_test.go +++ b/single_test.go @@ -1,4 +1,4 @@ -package timer_test +package timerx_test import "testing" diff --git a/types.go b/types.go index 4fc5600..2515938 100644 --- a/types.go +++ b/types.go @@ -1,4 +1,4 @@ -package timer +package timerx import ( "context" @@ -15,17 +15,9 @@ type timerStr struct { ExtendData interface{} // 附加参数 } -// 扩展参数 -// -// type ExtendParams struct { -// Params map[string]interface{} // 带出去的参数 -// } + var nextTime = time.Now() // 下一次执行的时间 -type ContextValueKey string // 定义context 传递的Key类型 -const ( - extendParamKey ContextValueKey = "extend_param" -) // 定义各个回调函数 type callback func(ctx context.Context, extendData interface{}) error