From f85cbd3c6d4fdedf6423a1346fadde6a8958b55a Mon Sep 17 00:00:00 2001 From: Dxyinme <32793868+dxyinme@users.noreply.github.com> Date: Wed, 13 Mar 2024 12:00:26 +0800 Subject: [PATCH] Develop to Master (#92) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: use `redis.UniversalClient` instead of `*redis.Client` (#88) * make dcron running locally, update cron test, add GetJobs / GetJob function for dcron. (#89) * 增加code coverage, 修复GetJob bug,增加devcontainer方便开发者 (#91) * add example app to readme * add NodeID function into dcron * Deps: updated github.com/go-redis/redis/v8 to github.com/redis/go-redis/v9 (#73) * add cron lib * fix warning * logger removal : phares1 * update * update * update * add robfig/cron to dcron (#75) * add NodeID function into dcron * add cron lib * fix warning * logger removal : phares1 * update * update * update * update * update * update * update * update test workflow * revert 1.22 * update etcd driver * update * fix get nodes * update * update for fix TAT * Revert "update etcd driver" This reverts commit a21ebf7e92a2620bff2ca2f715fb83e6077b1c07. * Revert "deps: updated go-redis to v9 (#79)" This reverts commit 0b85b24bde855b165a47b9d29166b5ba852c451e. * update * update * refact etcd * Revert "refact etcd" This reverts commit 049bed1587c4ea28783e8e32f7c3fbb301574540. * Revert "update" This reverts commit 9c71fd6ff26f10fd28ca8cfd08b2e3a573b78b4b. * update * refactor etcddriver * fix error * update * Revert "update" This reverts commit 6cfcfe6df53ff321e6d1bdc5bcbc1bb0978ceaff. * Revert "fix error" This reverts commit 99b2d82586f0d269f9280d4d0382c47812bfa7c6. * Revert "refactor etcddriver" This reverts commit a576ac357b030772011c9daaacf13fd4925a4272. * update * update * remove comments, and fix * add comments * Add comments, add split the E2E test cases and other test cases. (#80) * add example app to readme * add NodeID function into dcron * add cron lib * fix warning * logger removal : phares1 * update * update * update * update * update * update * update * update test workflow * revert 1.22 * update etcd driver * update * fix get nodes * update * update for fix TAT * Revert "update etcd driver" This reverts commit a21ebf7e92a2620bff2ca2f715fb83e6077b1c07. * Revert "deps: updated go-redis to v9 (#79)" This reverts commit 0b85b24bde855b165a47b9d29166b5ba852c451e. * update * update * refact etcd * Revert "refact etcd" This reverts commit 049bed1587c4ea28783e8e32f7c3fbb301574540. * Revert "update" This reverts commit 9c71fd6ff26f10fd28ca8cfd08b2e3a573b78b4b. * update * refactor etcddriver * fix error * update * Revert "update" This reverts commit 6cfcfe6df53ff321e6d1bdc5bcbc1bb0978ceaff. * Revert "fix error" This reverts commit 99b2d82586f0d269f9280d4d0382c47812bfa7c6. * Revert "refactor etcddriver" This reverts commit a576ac357b030772011c9daaacf13fd4925a4272. * update * update * remove comments, and fix * add comments * merge e2e test and normal test * move e2etest to origin path * add timeout to avoid pipeline timeout * add getjobs related function * update * remove redis config in test workflow * update * update chain test in windows * update action version for node 16 -> node 20 * add test for dcron locally * update readme * update * fix code bug * Update devcontainer and etcd driver test --------- Co-authored-by: Ava Ackerman Co-authored-by: libi <7769922+libi@users.noreply.github.com> --------- Co-authored-by: AHdark Co-authored-by: Ava Ackerman Co-authored-by: libi <7769922+libi@users.noreply.github.com> --- .devcontainer/devcontainer.json | 11 ++ .github/workflows/test.yml | 8 +- cron/README.md | 4 +- cron/chain_test.go | 40 +++-- dcron.go | 110 ++++++++++-- dcron_locally_test.go | 51 ++++++ dcron_test.go | 296 ++++++++++++++++++++++++++++---- driver/driver.go | 4 +- driver/etcddriver.go | 3 + driver/redisdriver.go | 4 +- driver/rediszsetdriver.go | 4 +- driver/util.go | 8 - go.mod | 2 +- go.sum | 4 +- nodepool.go | 19 +- option.go | 6 + 16 files changed, 479 insertions(+), 95 deletions(-) create mode 100644 .devcontainer/devcontainer.json create mode 100644 dcron_locally_test.go diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..0084021 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,11 @@ +{ + "image": "mcr.microsoft.com/devcontainers/go:1-1.22-bookworm", + "customizations": { + "vscode": { + "extensions": [ + "golang.go", + "eamodio.gitlens" + ] + } + } +} \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b77ec4d..9639b39 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,12 +20,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - - name: Redis Server in GitHub Actions - uses: supercharge/redis-github-action@1.7.0 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: ${{ matrix.go_version }} @@ -33,7 +31,7 @@ jobs: run: go test -v -timeout 30m -coverprofile=coverage.txt -covermode=atomic ./... - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/cron/README.md b/cron/README.md index f3ddd13..6d2c439 100644 --- a/cron/README.md +++ b/cron/README.md @@ -1,3 +1,5 @@ # cron -fork from [robfig/cron](github.com/robfig/cron) \ No newline at end of file +fork from [robfig/cron](github.com/robfig/cron) + +maintain by Dcron opensource team. \ No newline at end of file diff --git a/cron/chain_test.go b/cron/chain_test.go index 240deb7..8071263 100644 --- a/cron/chain_test.go +++ b/cron/chain_test.go @@ -116,12 +116,16 @@ func TestChainDelayIfStillRunning(t *testing.T) { t.Run("second run immediate if first done", func(t *testing.T) { var j countJob wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + <-time.After(time.Millisecond) go wrappedJob.Run() + wg.Done() }() - time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + wg.Wait() + <-time.After(3 * time.Millisecond) if c := j.Done(); c != 2 { t.Errorf("expected job run twice, immediately, got %d", c) } @@ -129,24 +133,28 @@ func TestChainDelayIfStillRunning(t *testing.T) { t.Run("second run delayed if first not done", func(t *testing.T) { var j countJob - j.delay = 10 * time.Millisecond + j.delay = 100 * time.Millisecond wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + <-time.After(10 * time.Millisecond) go wrappedJob.Run() + wg.Done() }() - // After 5ms, the first job is still in progress, and the second job was + wg.Wait() + // After 50ms, the first job is still in progress, and the second job was // run but should be waiting for it to finish. - time.Sleep(5 * time.Millisecond) + <-time.After(50 * time.Millisecond) started, done := j.Started(), j.Done() if started != 1 || done != 0 { t.Error("expected first job started, but not finished, got", started, done) } // Verify that the second job completes. - time.Sleep(25 * time.Millisecond) + <-time.After(220 * time.Millisecond) started, done = j.Started(), j.Done() if started != 2 || done != 2 { t.Error("expected both jobs done, got", started, done) @@ -169,12 +177,16 @@ func TestChainSkipIfStillRunning(t *testing.T) { t.Run("second run immediate if first done", func(t *testing.T) { var j countJob wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + <-time.After(time.Millisecond) go wrappedJob.Run() + wg.Done() }() - time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + wg.Wait() + <-time.After(3 * time.Millisecond) // Give both jobs 3ms to complete. if c := j.Done(); c != 2 { t.Errorf("expected job run twice, immediately, got %d", c) } @@ -182,24 +194,24 @@ func TestChainSkipIfStillRunning(t *testing.T) { t.Run("second run skipped if first not done", func(t *testing.T) { var j countJob - j.delay = 10 * time.Millisecond + j.delay = 100 * time.Millisecond wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + <-time.After(10 * time.Millisecond) go wrappedJob.Run() }() - // After 5ms, the first job is still in progress, and the second job was + // After 50ms, the first job is still in progress, and the second job was // aleady skipped. - time.Sleep(5 * time.Millisecond) + <-time.After(50 * time.Millisecond) started, done := j.Started(), j.Done() if started != 1 || done != 0 { t.Error("expected first job started, but not finished, got", started, done) } // Verify that the first job completes and second does not run. - time.Sleep(25 * time.Millisecond) + <-time.After(220 * time.Millisecond) started, done = j.Started(), j.Done() if started != 1 || done != 1 { t.Error("expected second job skipped, got", started, done) diff --git a/dcron.go b/dcron.go index cd839ef..9126a8d 100644 --- a/dcron.go +++ b/dcron.go @@ -25,12 +25,18 @@ const ( dcronStateUpgrade = "dcronStateUpgrade" ) +var ( + ErrJobExist = errors.New("jobName already exist") + ErrJobNotExist = errors.New("jobName not exist") + ErrJobWrongNode = errors.New("job is not running in this node") +) + type RecoverFuncType func(d *Dcron) // Dcron is main struct type Dcron struct { jobs map[string]*JobWarpper - jobsRWMut sync.Mutex + jobsRWMut sync.RWMutex ServerName string nodePool INodePool @@ -48,6 +54,8 @@ type Dcron struct { recentJobs IRecentJobPacker state atomic.Value + + runningLocally bool } // NewDcron create a Dcron @@ -68,7 +76,9 @@ func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ... } dcron.cr = cron.New(dcron.crOptions...) - dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger) + if !dcron.runningLocally { + dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger) + } return dcron } @@ -110,9 +120,9 @@ func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) { d.jobsRWMut.Lock() defer d.jobsRWMut.Unlock() if _, ok := d.jobs[jobName]; ok { - return errors.New("jobName already exist") + return ErrJobExist } - innerJob := JobWarpper{ + innerJob := &JobWarpper{ Name: jobName, CronStr: cronStr, Job: job, @@ -123,11 +133,11 @@ func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) { return err } innerJob.ID = entryID - d.jobs[jobName] = &innerJob + d.jobs[jobName] = innerJob return nil } -// Remove Job +// Remove Job by jobName func (d *Dcron) Remove(jobName string) { d.jobsRWMut.Lock() defer d.jobsRWMut.Unlock() @@ -138,7 +148,71 @@ func (d *Dcron) Remove(jobName string) { } } +// Get job by jobName +// if this jobName not exist, will return error. +// +// if `thisNodeOnly` is true +// if this job is not available in this node, will return error. +// otherwise return the struct of JobWarpper whose name is jobName. +func (d *Dcron) GetJob(jobName string, thisNodeOnly bool) (*JobWarpper, error) { + d.jobsRWMut.RLock() + defer d.jobsRWMut.RUnlock() + + job, ok := d.jobs[jobName] + if !ok { + d.logger.Warnf("job: %s, not exist", jobName) + return nil, ErrJobNotExist + } + if !thisNodeOnly { + return job, nil + } + isRunningHere, err := d.nodePool.CheckJobAvailable(jobName) + if err != nil { + return nil, err + } + if !isRunningHere { + return nil, ErrJobWrongNode + } + return job, nil +} + +// Get job list. +// +// if `thisNodeOnly` is true +// return all jobs available in this node. +// otherwise return all jobs added to dcron. +// +// we never return nil. If there is no job. +// this func will return an empty slice. +func (d *Dcron) GetJobs(thisNodeOnly bool) []*JobWarpper { + d.jobsRWMut.RLock() + defer d.jobsRWMut.RUnlock() + + ret := make([]*JobWarpper, 0) + for _, v := range d.jobs { + var ( + isRunningHere bool + ok bool = true + err error + ) + if thisNodeOnly { + isRunningHere, err = d.nodePool.CheckJobAvailable(v.Name) + if err != nil { + continue + } + ok = isRunningHere + } + if ok { + ret = append(ret, v) + } + } + return ret +} + func (d *Dcron) allowThisNodeRun(jobName string) (ok bool) { + if d.runningLocally { + return true + } ok, err := d.nodePool.CheckJobAvailable(jobName) if err != nil { d.logger.Errorf("allow this node run error, err=%v", err) @@ -165,12 +239,14 @@ func (d *Dcron) Start() { d.RecoverFunc(d) } if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) { - if err := d.startNodePool(); err != nil { - atomic.StoreInt32(&d.running, dcronStopped) - return + if !d.runningLocally { + if err := d.startNodePool(); err != nil { + atomic.StoreInt32(&d.running, dcronStopped) + return + } + d.logger.Infof("dcron started, nodeID is %s", d.nodePool.GetNodeID()) } d.cr.Start() - d.logger.Infof("dcron started, nodeID is %s", d.nodePool.GetNodeID()) } else { d.logger.Infof("dcron have started") } @@ -183,11 +259,13 @@ func (d *Dcron) Run() { d.RecoverFunc(d) } if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) { - if err := d.startNodePool(); err != nil { - atomic.StoreInt32(&d.running, dcronStopped) - return + if !d.runningLocally { + if err := d.startNodePool(); err != nil { + atomic.StoreInt32(&d.running, dcronStopped) + return + } + d.logger.Infof("dcron running, nodeID is %s", d.nodePool.GetNodeID()) } - d.logger.Infof("dcron running, nodeID is %s", d.nodePool.GetNodeID()) d.cr.Run() } else { d.logger.Infof("dcron already running") @@ -205,7 +283,9 @@ func (d *Dcron) startNodePool() error { // Stop job func (d *Dcron) Stop() { tick := time.NewTicker(time.Millisecond) - d.nodePool.Stop(context.Background()) + if !d.runningLocally { + d.nodePool.Stop(context.Background()) + } for range tick.C { if atomic.CompareAndSwapInt32(&d.running, dcronRunning, dcronStopped) { d.cr.Stop() diff --git a/dcron_locally_test.go b/dcron_locally_test.go new file mode 100644 index 0000000..3b42ca4 --- /dev/null +++ b/dcron_locally_test.go @@ -0,0 +1,51 @@ +package dcron_test + +import ( + "testing" + "time" + + "github.com/libi/dcron" + "github.com/libi/dcron/cron" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type DcronLocallyTestSuite struct { + suite.Suite +} + +func (s *DcronLocallyTestSuite) TestNormal() { + dcr := dcron.NewDcronWithOption( + "not a necessary servername", + nil, + dcron.RunningLocally(), + dcron.CronOptionSeconds(), + dcron.CronOptionChain( + cron.Recover( + cron.DefaultLogger, + ))) + + s.Assert().NotNil(dcr) + runningTime := 60 * time.Second + t := s.T() + var err error + err = dcr.AddFunc("job1", "*/5 * * * * *", func() { + t.Log(time.Now()) + }) + require.Nil(t, err) + err = dcr.AddFunc("job2", "*/8 * * * * *", func() { + panic("test panic") + }) + require.Nil(t, err) + err = dcr.AddFunc("job3", "*/2 * * * * *", func() { + t.Log("job3:", time.Now()) + }) + require.Nil(t, err) + dcr.Start() + <-time.After(runningTime) + dcr.Stop() +} + +func TestDcronLocallyTestSuite(t *testing.T) { + suite.Run(t, &DcronLocallyTestSuite{}) +} diff --git a/dcron_test.go b/dcron_test.go index 6d37119..b5c5333 100644 --- a/dcron_test.go +++ b/dcron_test.go @@ -1,6 +1,7 @@ package dcron_test import ( + "fmt" "log" "os" "sync" @@ -8,17 +9,18 @@ import ( "testing" "time" + "github.com/alicebob/miniredis/v2" "github.com/libi/dcron" "github.com/libi/dcron/cron" "github.com/libi/dcron/dlog" "github.com/libi/dcron/driver" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) -const ( - DefaultRedisAddr = "127.0.0.1:6379" -) +type testDcronTestSuite struct{ suite.Suite } type TestJobWithWG struct { Name string @@ -43,7 +45,8 @@ func (job *TestJobWithWG) Run() { } } -func TestMultiNodes(t *testing.T) { +func (s *testDcronTestSuite) TestMultiNodes() { + t := s.T() wg := &sync.WaitGroup{} wg.Add(3) testJobWGs := make([]*sync.WaitGroup, 0) @@ -83,11 +86,13 @@ func TestMultiNodes(t *testing.T) { nodeCancel[2] = make(chan int, 1) // 间隔1秒启动测试节点刷新逻辑 - go runNode(t, wg, testJobs, nodeCancel[0]) + rds := miniredis.RunT(t) + defer rds.Close() + go runNode(t, wg, testJobs, nodeCancel[0], rds.Addr()) <-time.After(time.Second) - go runNode(t, wg, testJobs, nodeCancel[1]) + go runNode(t, wg, testJobs, nodeCancel[1], rds.Addr()) <-time.After(time.Second) - go runNode(t, wg, testJobs, nodeCancel[2]) + go runNode(t, wg, testJobs, nodeCancel[2], rds.Addr()) testJobWGs[0].Wait() testJobWGs[1].Wait() @@ -99,9 +104,15 @@ func TestMultiNodes(t *testing.T) { wg.Wait() } -func runNode(t *testing.T, wg *sync.WaitGroup, testJobs []*TestJobWithWG, cancel chan int) { +func runNode( + t *testing.T, + wg *sync.WaitGroup, + testJobs []*TestJobWithWG, + cancel chan int, + redisAddr string, +) { redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: redisAddr, }) drv := driver.NewRedisDriver(redisCli) dcron := dcron.NewDcronWithOption( @@ -127,9 +138,12 @@ func runNode(t *testing.T, wg *sync.WaitGroup, testJobs []*TestJobWithWG, cancel wg.Done() } -func Test_SecondsJob(t *testing.T) { +func (s *testDcronTestSuite) Test_SecondsJob() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: rds.Addr(), }) drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, dcron.CronOptionSeconds()) @@ -144,9 +158,9 @@ func Test_SecondsJob(t *testing.T) { dcr.Stop() } -func runSecondNode(id string, wg *sync.WaitGroup, runningTime time.Duration, t *testing.T) { +func runSecondNode(id string, wg *sync.WaitGroup, runningTime time.Duration, t *testing.T, redisAddr string) { redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: redisAddr, }) drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, @@ -177,9 +191,9 @@ func runSecondNode(id string, wg *sync.WaitGroup, runningTime time.Duration, t * wg.Done() } -func runSecondNodeWithLogger(id string, wg *sync.WaitGroup, runningTime time.Duration, t *testing.T) { +func runSecondNodeWithLogger(id string, wg *sync.WaitGroup, runningTime time.Duration, t *testing.T, redisAddr string) { redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: redisAddr, }) drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption( @@ -212,33 +226,41 @@ func runSecondNodeWithLogger(id string, wg *sync.WaitGroup, runningTime time.Dur wg.Done() } -func Test_SecondJobWithPanicAndMultiNodes(t *testing.T) { +func (s *testDcronTestSuite) Test_SecondJobWithPanicAndMultiNodes() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() wg := &sync.WaitGroup{} wg.Add(5) - go runSecondNode("1", wg, 45*time.Second, t) - go runSecondNode("2", wg, 45*time.Second, t) - go runSecondNode("3", wg, 45*time.Second, t) - go runSecondNode("4", wg, 45*time.Second, t) - go runSecondNode("5", wg, 45*time.Second, t) + go runSecondNode("1", wg, 45*time.Second, t, rds.Addr()) + go runSecondNode("2", wg, 45*time.Second, t, rds.Addr()) + go runSecondNode("3", wg, 45*time.Second, t, rds.Addr()) + go runSecondNode("4", wg, 45*time.Second, t, rds.Addr()) + go runSecondNode("5", wg, 45*time.Second, t, rds.Addr()) wg.Wait() } -func Test_SecondJobWithStopAndSwapNode(t *testing.T) { +func (s *testDcronTestSuite) Test_SecondJobWithStopAndSwapNode() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() wg := &sync.WaitGroup{} wg.Add(2) - go runSecondNode("1", wg, 60*time.Second, t) - go runSecondNode("2", wg, 20*time.Second, t) + go runSecondNode("1", wg, 60*time.Second, t, rds.Addr()) + go runSecondNode("2", wg, 20*time.Second, t, rds.Addr()) wg.Wait() } -func Test_WithClusterStableNodes(t *testing.T) { +func (s *testDcronTestSuite) Test_WithClusterStableNodes() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() wg := &sync.WaitGroup{} wg.Add(5) - runningTime := 60 * time.Second startFunc := func(id string, timeWindow time.Duration, t *testing.T) { redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: rds.Addr(), }) drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, @@ -273,15 +295,221 @@ func Test_WithClusterStableNodes(t *testing.T) { go startFunc("3", time.Second*6, t) go startFunc("4", time.Second*6, t) go startFunc("5", time.Second*6, t) + wg.Wait() } -func Test_SecondJobLog_Issue68(t *testing.T) { +func (s *testDcronTestSuite) Test_SecondJobLog_Issue68() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() wg := &sync.WaitGroup{} wg.Add(5) - go runSecondNodeWithLogger("1", wg, 45*time.Second, t) - go runSecondNodeWithLogger("2", wg, 45*time.Second, t) - go runSecondNodeWithLogger("3", wg, 45*time.Second, t) - go runSecondNodeWithLogger("4", wg, 45*time.Second, t) - go runSecondNodeWithLogger("5", wg, 45*time.Second, t) + go runSecondNodeWithLogger("1", wg, 45*time.Second, t, rds.Addr()) + go runSecondNodeWithLogger("2", wg, 45*time.Second, t, rds.Addr()) + go runSecondNodeWithLogger("3", wg, 45*time.Second, t, rds.Addr()) + go runSecondNodeWithLogger("4", wg, 45*time.Second, t, rds.Addr()) + go runSecondNodeWithLogger("5", wg, 45*time.Second, t, rds.Addr()) wg.Wait() } + +type testGetJob struct { + Called int + Reached int + Name string +} + +func (job *testGetJob) Run() { + job.Called++ +} + +func (job *testGetJob) Reach() { + job.Reached++ +} + +func (s *testDcronTestSuite) Test_GetJobs_ThisNodeOnlyFalse() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() + redisCli := redis.NewClient(&redis.Options{ + Addr: rds.Addr(), + }) + drv := driver.NewRedisDriver(redisCli) + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.CronOptionSeconds(), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.Default(), + )), + ) + n := 10 + for i := 0; i < n; i++ { + assert.Nil( + t, + dcr.AddJob(fmt.Sprintf("job_%d", i), "* * * * * *", &testGetJob{ + Name: fmt.Sprintf("job_%d", i), + }), + ) + } + + jobs := dcr.GetJobs(false) + assert.Len(t, jobs, n) + for _, job := range jobs { + job.Job.Run() + } + for _, job := range jobs { + assert.Equal(t, job.Name, job.Job.(*testGetJob).Name) + assert.True(t, job.Job.(*testGetJob).Called > 0) + } +} + +func (s *testDcronTestSuite) Test_GetJob_ThisNodeOnlyFalse() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() + redisCli := redis.NewClient(&redis.Options{ + Addr: rds.Addr(), + }) + drv := driver.NewRedisDriver(redisCli) + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.CronOptionSeconds(), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.Default(), + )), + ) + n := 10 + for i := 0; i < n; i++ { + assert.Nil( + t, + dcr.AddJob(fmt.Sprintf("job_%d", i), "* * * * * *", &testGetJob{ + Name: fmt.Sprintf("job_%d", i), + }), + ) + } + + for i := 0; i < n; i++ { + job, err := dcr.GetJob(fmt.Sprintf("job_%d", i), false) + s.Assert().Nil(err) + job.Job.Run() + } + + for i := 0; i < n; i++ { + job, err := dcr.GetJob(fmt.Sprintf("job_%d", i), false) + s.Assert().Nil(err) + s.Assert().Equal(1, job.Job.(*testGetJob).Called) + } + + _, err := dcr.GetJob("xxx", false) + s.Assert().NotNil(err) +} + +func (s *testDcronTestSuite) Test_GetJobs_ThisNodeOnlyTrue() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() + redisCli := redis.NewClient(&redis.Options{ + Addr: rds.Addr(), + }) + drv := driver.NewRedisDriver(redisCli) + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.CronOptionSeconds(), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.Default(), + )), + ) + n := 10 + for i := 0; i < n; i++ { + assert.Nil( + t, + dcr.AddJob(fmt.Sprintf("job_%d", i), "* * * * * *", &testGetJob{ + Name: fmt.Sprintf("job_%d", i), + }), + ) + } + result := make(chan bool, 1) + dcr.Start() + go func() { + // check function + <-time.After(5 * time.Second) + jobs := dcr.GetJobs(true) + s.Assert().Len(jobs, n) + result <- true + }() + s.Assert().True(<-result) +} + +func (s *testDcronTestSuite) Test_GetJob_ThisNodeOnlyTrue() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() + redisCli := redis.NewClient(&redis.Options{ + Addr: rds.Addr(), + }) + drv := driver.NewRedisDriver(redisCli) + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.CronOptionSeconds(), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.Default(), + )), + ) + n := 10 + for i := 0; i < n; i++ { + assert.Nil( + t, + dcr.AddJob(fmt.Sprintf("job_%d", i), "* * * * * *", &testGetJob{ + Name: fmt.Sprintf("job_%d", i), + }), + ) + } + result := make(chan bool, 1) + dcr.Start() + go func() { + // check function + <-time.After(5 * time.Second) + for i := 0; i < n; i++ { + job, err := dcr.GetJob(fmt.Sprintf("job_%d", i), true) + t.Log(job) + s.Assert().Nil(err) + s.Assert().NotNil(job.Job) + job.Job.(*testGetJob).Reach() + } + + for i := 0; i < n; i++ { + job, err := dcr.GetJob(fmt.Sprintf("job_%d", i), true) + s.Assert().Nil(err) + s.Assert().Equal(1, job.Job.(*testGetJob).Reached) + } + result <- true + }() + s.Assert().True(<-result) +} + +func (s *testDcronTestSuite) Test_AddJob_JobName_Duplicate() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() + redisCli := redis.NewClient(&redis.Options{ + Addr: rds.Addr(), + }) + drv := driver.NewRedisDriver(redisCli) + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.CronOptionSeconds(), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.Default(), + )), + ) + s.Assert().Nil(dcr.AddJob("test1", "* * * * * *", &testGetJob{})) + s.Assert().Equal(dcron.ErrJobExist, dcr.AddJob("test1", "* * * * * *", &testGetJob{})) +} + +func TestDcronTestMain(t *testing.T) { + suite.Run(t, new(testDcronTestSuite)) +} diff --git a/driver/driver.go b/driver/driver.go index 4706b81..1e62672 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -30,7 +30,7 @@ type DriverV2 interface { withOption(opt Option) (err error) } -func NewRedisDriver(redisClient *redis.Client) DriverV2 { +func NewRedisDriver(redisClient redis.UniversalClient) DriverV2 { return newRedisDriver(redisClient) } @@ -38,6 +38,6 @@ func NewEtcdDriver(etcdCli *clientv3.Client) DriverV2 { return newEtcdDriver(etcdCli) } -func NewRedisZSetDriver(redisClient *redis.Client) DriverV2 { +func NewRedisZSetDriver(redisClient redis.UniversalClient) DriverV2 { return newRedisZSetDriver(redisClient) } diff --git a/driver/etcddriver.go b/driver/etcddriver.go index 7d17de3..9e0b6f7 100644 --- a/driver/etcddriver.go +++ b/driver/etcddriver.go @@ -173,6 +173,9 @@ func (e *EtcdDriver) keepHeartBeat() { func (e *EtcdDriver) Init(serverName string, opts ...Option) { e.serviceName = serverName e.nodeID = GetNodeId(serverName) + for _, opt := range opts { + e.withOption(opt) + } } func (e *EtcdDriver) NodeID() string { diff --git a/driver/redisdriver.go b/driver/redisdriver.go index b7d193a..44d58b2 100644 --- a/driver/redisdriver.go +++ b/driver/redisdriver.go @@ -17,7 +17,7 @@ const ( ) type RedisDriver struct { - c *redis.Client + c redis.UniversalClient serviceName string nodeID string timeout time.Duration @@ -32,7 +32,7 @@ type RedisDriver struct { sync.Mutex } -func newRedisDriver(redisClient *redis.Client) *RedisDriver { +func newRedisDriver(redisClient redis.UniversalClient) *RedisDriver { rd := &RedisDriver{ c: redisClient, logger: &dlog.StdLogger{ diff --git a/driver/rediszsetdriver.go b/driver/rediszsetdriver.go index 756a23c..38ac266 100644 --- a/driver/rediszsetdriver.go +++ b/driver/rediszsetdriver.go @@ -13,7 +13,7 @@ import ( ) type RedisZSetDriver struct { - c *redis.Client + c redis.UniversalClient serviceName string nodeID string timeout time.Duration @@ -28,7 +28,7 @@ type RedisZSetDriver struct { sync.Mutex } -func newRedisZSetDriver(redisClient *redis.Client) *RedisZSetDriver { +func newRedisZSetDriver(redisClient redis.UniversalClient) *RedisZSetDriver { rd := &RedisZSetDriver{ c: redisClient, logger: &dlog.StdLogger{ diff --git a/driver/util.go b/driver/util.go index 0a3d01f..46e156b 100644 --- a/driver/util.go +++ b/driver/util.go @@ -17,14 +17,6 @@ func GetNodeId(serviceName string) string { return GetKeyPre(serviceName) + uuid.New().String() } -func GetStableJobStore(serviceName string) string { - return GetKeyPre(serviceName) + "stable-jobs" -} - -func GetStableJobStoreTxKey(serviceName string) string { - return GetKeyPre(serviceName) + "TX:stable-jobs" -} - func TimePre(t time.Time, preDuration time.Duration) int64 { return t.Add(-preDuration).Unix() } diff --git a/go.mod b/go.mod index e0cfe1f..1b0f7fd 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/alicebob/miniredis/v2 v2.31.0 - github.com/google/uuid v1.4.0 + github.com/google/uuid v1.5.0 github.com/redis/go-redis/v9 v9.3.1 github.com/stretchr/testify v1.8.4 go.etcd.io/etcd/api/v3 v3.5.11 diff --git a/go.sum b/go.sum index 7febefa..fb08fac 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= diff --git a/nodepool.go b/nodepool.go index 5c6a17b..43be2b7 100644 --- a/nodepool.go +++ b/nodepool.go @@ -21,14 +21,14 @@ const ( // NodePool // For cluster steable. // NodePool has 2 states: -// 1. Steady -// If this nodePoolLists is the same as the last update, -// we will mark this node's state to Steady. In this state, -// this node can run jobs. -// 2. Upgrade -// If this nodePoolLists is different to the last update, -// we will mark this node's state to Upgrade. In this state, -// this node can not run jobs. +// 1. Steady +// If this nodePoolLists is the same as the last update, +// we will mark this node's state to Steady. In this state, +// this node can run jobs. +// 2. Upgrade +// If this nodePoolLists is different to the last update, +// we will mark this node's state to Upgrade. In this state, +// this node can not run jobs. type NodePool struct { serviceName string nodeID string @@ -115,8 +115,9 @@ func (np *NodePool) CheckJobAvailable(jobName string) (bool, error) { } targetNode := np.nodes.Get(jobName) if np.nodeID == targetNode { - np.logger.Infof("job %s, running in node: %s", jobName, targetNode) + np.logger.Infof("job %s, running in node: %s, nodeID is %s", jobName, targetNode, np.nodeID) } + return np.nodeID == targetNode, nil } diff --git a/option.go b/option.go index af47004..4dec584 100644 --- a/option.go +++ b/option.go @@ -81,3 +81,9 @@ func WithClusterStable(timeWindow time.Duration) Option { d.recentJobs = NewRecentJobPacker(timeWindow) } } + +func RunningLocally() Option { + return func(d *Dcron) { + d.runningLocally = true + } +}