From c9eca56c5e789588064dcdb19898d476aed08c49 Mon Sep 17 00:00:00 2001 From: Dxyinme <32793868+dxyinme@users.noreply.github.com> Date: Tue, 6 Feb 2024 11:16:04 +0800 Subject: [PATCH] make dcron running locally, update cron test, add GetJobs / GetJob function for dcron. (#89) --- .github/workflows/test.yml | 8 +- cron/README.md | 4 +- cron/chain_test.go | 40 ++++-- dcron.go | 106 ++++++++++++-- dcron_locally_test.go | 51 +++++++ dcron_test.go | 274 ++++++++++++++++++++++++++++++++----- go.mod | 2 +- go.sum | 4 +- option.go | 6 + 9 files changed, 425 insertions(+), 70 deletions(-) create mode 100644 dcron_locally_test.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b77ec4d..9639b39 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,12 +20,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - - name: Redis Server in GitHub Actions - uses: supercharge/redis-github-action@1.7.0 - name: Set up Go - uses: actions/setup-go@v3 + uses: actions/setup-go@v5 with: go-version: ${{ matrix.go_version }} @@ -33,7 +31,7 @@ jobs: run: go test -v -timeout 30m -coverprofile=coverage.txt -covermode=atomic ./... - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/cron/README.md b/cron/README.md index f3ddd13..6d2c439 100644 --- a/cron/README.md +++ b/cron/README.md @@ -1,3 +1,5 @@ # cron -fork from [robfig/cron](github.com/robfig/cron) \ No newline at end of file +fork from [robfig/cron](github.com/robfig/cron) + +maintain by Dcron opensource team. \ No newline at end of file diff --git a/cron/chain_test.go b/cron/chain_test.go index 240deb7..8071263 100644 --- a/cron/chain_test.go +++ b/cron/chain_test.go @@ -116,12 +116,16 @@ func TestChainDelayIfStillRunning(t *testing.T) { t.Run("second run immediate if first done", func(t *testing.T) { var j countJob wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + <-time.After(time.Millisecond) go wrappedJob.Run() + wg.Done() }() - time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + wg.Wait() + <-time.After(3 * time.Millisecond) if c := j.Done(); c != 2 { t.Errorf("expected job run twice, immediately, got %d", c) } @@ -129,24 +133,28 @@ func TestChainDelayIfStillRunning(t *testing.T) { t.Run("second run delayed if first not done", func(t *testing.T) { var j countJob - j.delay = 10 * time.Millisecond + j.delay = 100 * time.Millisecond wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + <-time.After(10 * time.Millisecond) go wrappedJob.Run() + wg.Done() }() - // After 5ms, the first job is still in progress, and the second job was + wg.Wait() + // After 50ms, the first job is still in progress, and the second job was // run but should be waiting for it to finish. - time.Sleep(5 * time.Millisecond) + <-time.After(50 * time.Millisecond) started, done := j.Started(), j.Done() if started != 1 || done != 0 { t.Error("expected first job started, but not finished, got", started, done) } // Verify that the second job completes. - time.Sleep(25 * time.Millisecond) + <-time.After(220 * time.Millisecond) started, done = j.Started(), j.Done() if started != 2 || done != 2 { t.Error("expected both jobs done, got", started, done) @@ -169,12 +177,16 @@ func TestChainSkipIfStillRunning(t *testing.T) { t.Run("second run immediate if first done", func(t *testing.T) { var j countJob wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + <-time.After(time.Millisecond) go wrappedJob.Run() + wg.Done() }() - time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete. + wg.Wait() + <-time.After(3 * time.Millisecond) // Give both jobs 3ms to complete. if c := j.Done(); c != 2 { t.Errorf("expected job run twice, immediately, got %d", c) } @@ -182,24 +194,24 @@ func TestChainSkipIfStillRunning(t *testing.T) { t.Run("second run skipped if first not done", func(t *testing.T) { var j countJob - j.delay = 10 * time.Millisecond + j.delay = 100 * time.Millisecond wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j) go func() { go wrappedJob.Run() - time.Sleep(time.Millisecond) + <-time.After(10 * time.Millisecond) go wrappedJob.Run() }() - // After 5ms, the first job is still in progress, and the second job was + // After 50ms, the first job is still in progress, and the second job was // aleady skipped. - time.Sleep(5 * time.Millisecond) + <-time.After(50 * time.Millisecond) started, done := j.Started(), j.Done() if started != 1 || done != 0 { t.Error("expected first job started, but not finished, got", started, done) } // Verify that the first job completes and second does not run. - time.Sleep(25 * time.Millisecond) + <-time.After(220 * time.Millisecond) started, done = j.Started(), j.Done() if started != 1 || done != 1 { t.Error("expected second job skipped, got", started, done) diff --git a/dcron.go b/dcron.go index cd839ef..c0d2a5b 100644 --- a/dcron.go +++ b/dcron.go @@ -25,12 +25,18 @@ const ( dcronStateUpgrade = "dcronStateUpgrade" ) +var ( + ErrJobExist = errors.New("jobName already exist") + ErrJobNotExist = errors.New("jobName not exist") + ErrJobWrongNode = errors.New("job is not running in this node") +) + type RecoverFuncType func(d *Dcron) // Dcron is main struct type Dcron struct { jobs map[string]*JobWarpper - jobsRWMut sync.Mutex + jobsRWMut sync.RWMutex ServerName string nodePool INodePool @@ -48,6 +54,8 @@ type Dcron struct { recentJobs IRecentJobPacker state atomic.Value + + runningLocally bool } // NewDcron create a Dcron @@ -68,7 +76,9 @@ func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ... } dcron.cr = cron.New(dcron.crOptions...) - dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger) + if !dcron.runningLocally { + dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger) + } return dcron } @@ -110,7 +120,7 @@ func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) { d.jobsRWMut.Lock() defer d.jobsRWMut.Unlock() if _, ok := d.jobs[jobName]; ok { - return errors.New("jobName already exist") + return ErrJobExist } innerJob := JobWarpper{ Name: jobName, @@ -127,7 +137,7 @@ func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) { return nil } -// Remove Job +// Remove Job by jobName func (d *Dcron) Remove(jobName string) { d.jobsRWMut.Lock() defer d.jobsRWMut.Unlock() @@ -138,7 +148,71 @@ func (d *Dcron) Remove(jobName string) { } } +// Get job by jobName +// if this jobName not exist, will return error. +// +// if `thisNodeOnly` is true +// if this job is not available in this node, will return error. +// otherwise return the struct of JobWarpper whose name is jobName. +func (d *Dcron) GetJob(jobName string, thisNodeOnly bool) (*JobWarpper, error) { + d.jobsRWMut.RLock() + defer d.jobsRWMut.RUnlock() + + job, ok := d.jobs[jobName] + if !ok { + d.logger.Warnf("job: %s, not exist", jobName) + return nil, ErrJobNotExist + } + if !thisNodeOnly { + return job, nil + } + isRunningHere, err := d.nodePool.CheckJobAvailable(jobName) + if err != nil { + return nil, err + } + if isRunningHere { + return nil, ErrJobWrongNode + } + return job, nil +} + +// Get job list. +// +// if `thisNodeOnly` is true +// return all jobs available in this node. +// otherwise return all jobs added to dcron. +// +// we never return nil. If there is no job. +// this func will return an empty slice. +func (d *Dcron) GetJobs(thisNodeOnly bool) []*JobWarpper { + d.jobsRWMut.RLock() + defer d.jobsRWMut.RUnlock() + + ret := make([]*JobWarpper, 0) + for _, v := range d.jobs { + var ( + isRunningHere bool + ok bool = true + err error + ) + if thisNodeOnly { + isRunningHere, err = d.nodePool.CheckJobAvailable(v.Name) + if err != nil { + continue + } + ok = isRunningHere + } + if ok { + ret = append(ret, v) + } + } + return ret +} + func (d *Dcron) allowThisNodeRun(jobName string) (ok bool) { + if d.runningLocally { + return true + } ok, err := d.nodePool.CheckJobAvailable(jobName) if err != nil { d.logger.Errorf("allow this node run error, err=%v", err) @@ -165,12 +239,14 @@ func (d *Dcron) Start() { d.RecoverFunc(d) } if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) { - if err := d.startNodePool(); err != nil { - atomic.StoreInt32(&d.running, dcronStopped) - return + if !d.runningLocally { + if err := d.startNodePool(); err != nil { + atomic.StoreInt32(&d.running, dcronStopped) + return + } + d.logger.Infof("dcron started, nodeID is %s", d.nodePool.GetNodeID()) } d.cr.Start() - d.logger.Infof("dcron started, nodeID is %s", d.nodePool.GetNodeID()) } else { d.logger.Infof("dcron have started") } @@ -183,11 +259,13 @@ func (d *Dcron) Run() { d.RecoverFunc(d) } if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) { - if err := d.startNodePool(); err != nil { - atomic.StoreInt32(&d.running, dcronStopped) - return + if !d.runningLocally { + if err := d.startNodePool(); err != nil { + atomic.StoreInt32(&d.running, dcronStopped) + return + } + d.logger.Infof("dcron running, nodeID is %s", d.nodePool.GetNodeID()) } - d.logger.Infof("dcron running, nodeID is %s", d.nodePool.GetNodeID()) d.cr.Run() } else { d.logger.Infof("dcron already running") @@ -205,7 +283,9 @@ func (d *Dcron) startNodePool() error { // Stop job func (d *Dcron) Stop() { tick := time.NewTicker(time.Millisecond) - d.nodePool.Stop(context.Background()) + if !d.runningLocally { + d.nodePool.Stop(context.Background()) + } for range tick.C { if atomic.CompareAndSwapInt32(&d.running, dcronRunning, dcronStopped) { d.cr.Stop() diff --git a/dcron_locally_test.go b/dcron_locally_test.go new file mode 100644 index 0000000..3b42ca4 --- /dev/null +++ b/dcron_locally_test.go @@ -0,0 +1,51 @@ +package dcron_test + +import ( + "testing" + "time" + + "github.com/libi/dcron" + "github.com/libi/dcron/cron" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type DcronLocallyTestSuite struct { + suite.Suite +} + +func (s *DcronLocallyTestSuite) TestNormal() { + dcr := dcron.NewDcronWithOption( + "not a necessary servername", + nil, + dcron.RunningLocally(), + dcron.CronOptionSeconds(), + dcron.CronOptionChain( + cron.Recover( + cron.DefaultLogger, + ))) + + s.Assert().NotNil(dcr) + runningTime := 60 * time.Second + t := s.T() + var err error + err = dcr.AddFunc("job1", "*/5 * * * * *", func() { + t.Log(time.Now()) + }) + require.Nil(t, err) + err = dcr.AddFunc("job2", "*/8 * * * * *", func() { + panic("test panic") + }) + require.Nil(t, err) + err = dcr.AddFunc("job3", "*/2 * * * * *", func() { + t.Log("job3:", time.Now()) + }) + require.Nil(t, err) + dcr.Start() + <-time.After(runningTime) + dcr.Stop() +} + +func TestDcronLocallyTestSuite(t *testing.T) { + suite.Run(t, &DcronLocallyTestSuite{}) +} diff --git a/dcron_test.go b/dcron_test.go index 6d37119..3c709b8 100644 --- a/dcron_test.go +++ b/dcron_test.go @@ -1,6 +1,7 @@ package dcron_test import ( + "fmt" "log" "os" "sync" @@ -8,17 +9,18 @@ import ( "testing" "time" + "github.com/alicebob/miniredis/v2" "github.com/libi/dcron" "github.com/libi/dcron/cron" "github.com/libi/dcron/dlog" "github.com/libi/dcron/driver" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) -const ( - DefaultRedisAddr = "127.0.0.1:6379" -) +type testDcronTestSuite struct{ suite.Suite } type TestJobWithWG struct { Name string @@ -43,7 +45,8 @@ func (job *TestJobWithWG) Run() { } } -func TestMultiNodes(t *testing.T) { +func (s *testDcronTestSuite) TestMultiNodes() { + t := s.T() wg := &sync.WaitGroup{} wg.Add(3) testJobWGs := make([]*sync.WaitGroup, 0) @@ -83,11 +86,13 @@ func TestMultiNodes(t *testing.T) { nodeCancel[2] = make(chan int, 1) // 间隔1秒启动测试节点刷新逻辑 - go runNode(t, wg, testJobs, nodeCancel[0]) + rds := miniredis.RunT(t) + defer rds.Close() + go runNode(t, wg, testJobs, nodeCancel[0], rds.Addr()) <-time.After(time.Second) - go runNode(t, wg, testJobs, nodeCancel[1]) + go runNode(t, wg, testJobs, nodeCancel[1], rds.Addr()) <-time.After(time.Second) - go runNode(t, wg, testJobs, nodeCancel[2]) + go runNode(t, wg, testJobs, nodeCancel[2], rds.Addr()) testJobWGs[0].Wait() testJobWGs[1].Wait() @@ -99,9 +104,15 @@ func TestMultiNodes(t *testing.T) { wg.Wait() } -func runNode(t *testing.T, wg *sync.WaitGroup, testJobs []*TestJobWithWG, cancel chan int) { +func runNode( + t *testing.T, + wg *sync.WaitGroup, + testJobs []*TestJobWithWG, + cancel chan int, + redisAddr string, +) { redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: redisAddr, }) drv := driver.NewRedisDriver(redisCli) dcron := dcron.NewDcronWithOption( @@ -127,9 +138,12 @@ func runNode(t *testing.T, wg *sync.WaitGroup, testJobs []*TestJobWithWG, cancel wg.Done() } -func Test_SecondsJob(t *testing.T) { +func (s *testDcronTestSuite) Test_SecondsJob() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: rds.Addr(), }) drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, dcron.CronOptionSeconds()) @@ -144,9 +158,9 @@ func Test_SecondsJob(t *testing.T) { dcr.Stop() } -func runSecondNode(id string, wg *sync.WaitGroup, runningTime time.Duration, t *testing.T) { +func runSecondNode(id string, wg *sync.WaitGroup, runningTime time.Duration, t *testing.T, redisAddr string) { redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: redisAddr, }) drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, @@ -177,9 +191,9 @@ func runSecondNode(id string, wg *sync.WaitGroup, runningTime time.Duration, t * wg.Done() } -func runSecondNodeWithLogger(id string, wg *sync.WaitGroup, runningTime time.Duration, t *testing.T) { +func runSecondNodeWithLogger(id string, wg *sync.WaitGroup, runningTime time.Duration, t *testing.T, redisAddr string) { redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: redisAddr, }) drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption( @@ -212,33 +226,41 @@ func runSecondNodeWithLogger(id string, wg *sync.WaitGroup, runningTime time.Dur wg.Done() } -func Test_SecondJobWithPanicAndMultiNodes(t *testing.T) { +func (s *testDcronTestSuite) Test_SecondJobWithPanicAndMultiNodes() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() wg := &sync.WaitGroup{} wg.Add(5) - go runSecondNode("1", wg, 45*time.Second, t) - go runSecondNode("2", wg, 45*time.Second, t) - go runSecondNode("3", wg, 45*time.Second, t) - go runSecondNode("4", wg, 45*time.Second, t) - go runSecondNode("5", wg, 45*time.Second, t) + go runSecondNode("1", wg, 45*time.Second, t, rds.Addr()) + go runSecondNode("2", wg, 45*time.Second, t, rds.Addr()) + go runSecondNode("3", wg, 45*time.Second, t, rds.Addr()) + go runSecondNode("4", wg, 45*time.Second, t, rds.Addr()) + go runSecondNode("5", wg, 45*time.Second, t, rds.Addr()) wg.Wait() } -func Test_SecondJobWithStopAndSwapNode(t *testing.T) { +func (s *testDcronTestSuite) Test_SecondJobWithStopAndSwapNode() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() wg := &sync.WaitGroup{} wg.Add(2) - go runSecondNode("1", wg, 60*time.Second, t) - go runSecondNode("2", wg, 20*time.Second, t) + go runSecondNode("1", wg, 60*time.Second, t, rds.Addr()) + go runSecondNode("2", wg, 20*time.Second, t, rds.Addr()) wg.Wait() } -func Test_WithClusterStableNodes(t *testing.T) { +func (s *testDcronTestSuite) Test_WithClusterStableNodes() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() wg := &sync.WaitGroup{} wg.Add(5) - runningTime := 60 * time.Second startFunc := func(id string, timeWindow time.Duration, t *testing.T) { redisCli := redis.NewClient(&redis.Options{ - Addr: DefaultRedisAddr, + Addr: rds.Addr(), }) drv := driver.NewRedisDriver(redisCli) dcr := dcron.NewDcronWithOption(t.Name(), drv, @@ -273,15 +295,199 @@ func Test_WithClusterStableNodes(t *testing.T) { go startFunc("3", time.Second*6, t) go startFunc("4", time.Second*6, t) go startFunc("5", time.Second*6, t) + wg.Wait() } -func Test_SecondJobLog_Issue68(t *testing.T) { +func (s *testDcronTestSuite) Test_SecondJobLog_Issue68() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() wg := &sync.WaitGroup{} wg.Add(5) - go runSecondNodeWithLogger("1", wg, 45*time.Second, t) - go runSecondNodeWithLogger("2", wg, 45*time.Second, t) - go runSecondNodeWithLogger("3", wg, 45*time.Second, t) - go runSecondNodeWithLogger("4", wg, 45*time.Second, t) - go runSecondNodeWithLogger("5", wg, 45*time.Second, t) + go runSecondNodeWithLogger("1", wg, 45*time.Second, t, rds.Addr()) + go runSecondNodeWithLogger("2", wg, 45*time.Second, t, rds.Addr()) + go runSecondNodeWithLogger("3", wg, 45*time.Second, t, rds.Addr()) + go runSecondNodeWithLogger("4", wg, 45*time.Second, t, rds.Addr()) + go runSecondNodeWithLogger("5", wg, 45*time.Second, t, rds.Addr()) wg.Wait() } + +type testGetJob struct { + Called int + Reached int + Name string +} + +func (job *testGetJob) Run() { + job.Called++ +} + +func (job *testGetJob) Reach() { + job.Reached++ +} + +func (s *testDcronTestSuite) Test_GetJobs_ThisNodeOnlyFalse() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() + redisCli := redis.NewClient(&redis.Options{ + Addr: rds.Addr(), + }) + drv := driver.NewRedisDriver(redisCli) + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.CronOptionSeconds(), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.Default(), + )), + ) + n := 10 + for i := 0; i < n; i++ { + assert.Nil( + t, + dcr.AddJob(fmt.Sprintf("job_%d", i), "* * * * * *", &testGetJob{ + Name: fmt.Sprintf("job_%d", i), + }), + ) + } + + jobs := dcr.GetJobs(false) + assert.Len(t, jobs, n) + for _, job := range jobs { + job.Job.Run() + } + for _, job := range jobs { + assert.Equal(t, job.Name, job.Job.(*testGetJob).Name) + assert.True(t, job.Job.(*testGetJob).Called > 0) + } +} + +func (s *testDcronTestSuite) Test_GetJob_ThisNodeOnlyFalse() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() + redisCli := redis.NewClient(&redis.Options{ + Addr: rds.Addr(), + }) + drv := driver.NewRedisDriver(redisCli) + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.CronOptionSeconds(), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.Default(), + )), + ) + n := 10 + for i := 0; i < n; i++ { + assert.Nil( + t, + dcr.AddJob(fmt.Sprintf("job_%d", i), "* * * * * *", &testGetJob{ + Name: fmt.Sprintf("job_%d", i), + }), + ) + } + + for i := 0; i < n; i++ { + job, err := dcr.GetJob(fmt.Sprintf("job_%d", i), false) + s.Assert().Nil(err) + job.Job.Run() + } + + for i := 0; i < n; i++ { + job, err := dcr.GetJob(fmt.Sprintf("job_%d", i), false) + s.Assert().Nil(err) + s.Assert().Equal(1, job.Job.(*testGetJob).Called) + } + + _, err := dcr.GetJob("xxx", false) + s.Assert().NotNil(err) +} + +func (s *testDcronTestSuite) Test_GetJobs_ThisNodeOnlyTrue() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() + redisCli := redis.NewClient(&redis.Options{ + Addr: rds.Addr(), + }) + drv := driver.NewRedisDriver(redisCli) + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.CronOptionSeconds(), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.Default(), + )), + ) + n := 10 + for i := 0; i < n; i++ { + assert.Nil( + t, + dcr.AddJob(fmt.Sprintf("job_%d", i), "* * * * * *", &testGetJob{ + Name: fmt.Sprintf("job_%d", i), + }), + ) + } + result := make(chan bool, 1) + dcr.Start() + go func() { + // check function + <-time.After(5 * time.Second) + jobs := dcr.GetJobs(true) + s.Assert().Len(jobs, n) + result <- true + }() + s.Assert().True(<-result) +} + +func (s *testDcronTestSuite) Test_GetJob_ThisNodeOnlyTrue() { + t := s.T() + rds := miniredis.RunT(t) + defer rds.Close() + redisCli := redis.NewClient(&redis.Options{ + Addr: rds.Addr(), + }) + drv := driver.NewRedisDriver(redisCli) + dcr := dcron.NewDcronWithOption( + t.Name(), + drv, + dcron.CronOptionSeconds(), + dcron.WithLogger(dlog.VerbosePrintfLogger( + log.Default(), + )), + ) + n := 10 + for i := 0; i < n; i++ { + assert.Nil( + t, + dcr.AddJob(fmt.Sprintf("job_%d", i), "* * * * * *", &testGetJob{ + Name: fmt.Sprintf("job_%d", i), + }), + ) + } + result := make(chan bool, 1) + dcr.Start() + go func() { + // check function + <-time.After(5 * time.Second) + for i := 0; i < n; i++ { + job, err := dcr.GetJob(fmt.Sprintf("job_%d", i), false) + s.Assert().Nil(err) + job.Job.(*testGetJob).Reach() + } + + for i := 0; i < n; i++ { + job, err := dcr.GetJob(fmt.Sprintf("job_%d", i), false) + s.Assert().Nil(err) + s.Assert().Equal(1, job.Job.(*testGetJob).Reached) + } + result <- true + }() + s.Assert().True(<-result) +} + +func TestDcronTestMain(t *testing.T) { + suite.Run(t, new(testDcronTestSuite)) +} diff --git a/go.mod b/go.mod index e0cfe1f..1b0f7fd 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/alicebob/miniredis/v2 v2.31.0 - github.com/google/uuid v1.4.0 + github.com/google/uuid v1.5.0 github.com/redis/go-redis/v9 v9.3.1 github.com/stretchr/testify v1.8.4 go.etcd.io/etcd/api/v3 v3.5.11 diff --git a/go.sum b/go.sum index 7febefa..fb08fac 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= diff --git a/option.go b/option.go index af47004..4dec584 100644 --- a/option.go +++ b/option.go @@ -81,3 +81,9 @@ func WithClusterStable(timeWindow time.Duration) Option { d.recentJobs = NewRecentJobPacker(timeWindow) } } + +func RunningLocally() Option { + return func(d *Dcron) { + d.runningLocally = true + } +}