Skip to content

Commit

Permalink
Develop to Master (#92)
Browse files Browse the repository at this point in the history
* chore: use `redis.UniversalClient` instead of `*redis.Client` (#88)

* make dcron running locally, update cron test, add GetJobs / GetJob function for dcron. (#89)

* 增加code coverage, 修复GetJob bug,增加devcontainer方便开发者 (#91)

* add example app to readme

* add NodeID function into dcron

* Deps: updated github.com/go-redis/redis/v8 to github.com/redis/go-redis/v9 (#73)

* add cron lib

* fix warning

* logger removal : phares1

* update

* update

* update

* add robfig/cron to dcron (#75)

* add NodeID function into dcron

* add cron lib

* fix warning

* logger removal : phares1

* update

* update

* update

* update

* update

* update

* update

* update test workflow

* revert 1.22

* update etcd driver

* update

* fix get nodes

* update

* update for fix TAT

* Revert "update etcd driver"

This reverts commit a21ebf7.

* Revert "deps: updated go-redis to v9 (#79)"

This reverts commit 0b85b24.

* update

* update

* refact etcd

* Revert "refact etcd"

This reverts commit 049bed1.

* Revert "update"

This reverts commit 9c71fd6.

* update

* refactor etcddriver

* fix error

* update

* Revert "update"

This reverts commit 6cfcfe6.

* Revert "fix error"

This reverts commit 99b2d82.

* Revert "refactor etcddriver"

This reverts commit a576ac3.

* update

* update

* remove comments, and fix

* add comments

* Add comments, add split the E2E test cases and other test cases. (#80)

* add example app to readme

* add NodeID function into dcron

* add cron lib

* fix warning

* logger removal : phares1

* update

* update

* update

* update

* update

* update

* update

* update test workflow

* revert 1.22

* update etcd driver

* update

* fix get nodes

* update

* update for fix TAT

* Revert "update etcd driver"

This reverts commit a21ebf7.

* Revert "deps: updated go-redis to v9 (#79)"

This reverts commit 0b85b24.

* update

* update

* refact etcd

* Revert "refact etcd"

This reverts commit 049bed1.

* Revert "update"

This reverts commit 9c71fd6.

* update

* refactor etcddriver

* fix error

* update

* Revert "update"

This reverts commit 6cfcfe6.

* Revert "fix error"

This reverts commit 99b2d82.

* Revert "refactor etcddriver"

This reverts commit a576ac3.

* update

* update

* remove comments, and fix

* add comments

* merge e2e test and normal test

* move e2etest to origin path

* add timeout to avoid pipeline timeout

* add getjobs related function

* update

* remove redis config in test workflow

* update

* update chain test in windows

* update action version for node 16 -> node 20

* add test for dcron locally

* update readme

* update

* fix code bug

* Update devcontainer and etcd driver test

---------

Co-authored-by: Ava Ackerman <[email protected]>
Co-authored-by: libi <[email protected]>

---------

Co-authored-by: AHdark <[email protected]>
Co-authored-by: Ava Ackerman <[email protected]>
Co-authored-by: libi <[email protected]>
  • Loading branch information
4 people authored Mar 13, 2024
1 parent 68982eb commit f85cbd3
Show file tree
Hide file tree
Showing 16 changed files with 479 additions and 95 deletions.
11 changes: 11 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"image": "mcr.microsoft.com/devcontainers/go:1-1.22-bookworm",
"customizations": {
"vscode": {
"extensions": [
"golang.go",
"eamodio.gitlens"
]
}
}
}
8 changes: 3 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Redis Server in GitHub Actions
uses: supercharge/[email protected]
- name: Set up Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: ${{ matrix.go_version }}

- name: Test
run: go test -v -timeout 30m -coverprofile=coverage.txt -covermode=atomic ./...

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}

4 changes: 3 additions & 1 deletion cron/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# cron

fork from [robfig/cron](github.com/robfig/cron)
fork from [robfig/cron](github.com/robfig/cron)

maintain by Dcron opensource team.
40 changes: 26 additions & 14 deletions cron/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,37 +116,45 @@ func TestChainDelayIfStillRunning(t *testing.T) {
t.Run("second run immediate if first done", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
<-time.After(time.Millisecond)
go wrappedJob.Run()
wg.Done()
}()
time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
wg.Wait()
<-time.After(3 * time.Millisecond)
if c := j.Done(); c != 2 {
t.Errorf("expected job run twice, immediately, got %d", c)
}
})

t.Run("second run delayed if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
j.delay = 100 * time.Millisecond
wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
<-time.After(10 * time.Millisecond)
go wrappedJob.Run()
wg.Done()
}()

// After 5ms, the first job is still in progress, and the second job was
wg.Wait()
// After 50ms, the first job is still in progress, and the second job was
// run but should be waiting for it to finish.
time.Sleep(5 * time.Millisecond)
<-time.After(50 * time.Millisecond)
started, done := j.Started(), j.Done()
if started != 1 || done != 0 {
t.Error("expected first job started, but not finished, got", started, done)
}

// Verify that the second job completes.
time.Sleep(25 * time.Millisecond)
<-time.After(220 * time.Millisecond)
started, done = j.Started(), j.Done()
if started != 2 || done != 2 {
t.Error("expected both jobs done, got", started, done)
Expand All @@ -169,37 +177,41 @@ func TestChainSkipIfStillRunning(t *testing.T) {
t.Run("second run immediate if first done", func(t *testing.T) {
var j countJob
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
<-time.After(time.Millisecond)
go wrappedJob.Run()
wg.Done()
}()
time.Sleep(3 * time.Millisecond) // Give both jobs 3ms to complete.
wg.Wait()
<-time.After(3 * time.Millisecond) // Give both jobs 3ms to complete.
if c := j.Done(); c != 2 {
t.Errorf("expected job run twice, immediately, got %d", c)
}
})

t.Run("second run skipped if first not done", func(t *testing.T) {
var j countJob
j.delay = 10 * time.Millisecond
j.delay = 100 * time.Millisecond
wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
go func() {
go wrappedJob.Run()
time.Sleep(time.Millisecond)
<-time.After(10 * time.Millisecond)
go wrappedJob.Run()
}()

// After 5ms, the first job is still in progress, and the second job was
// After 50ms, the first job is still in progress, and the second job was
// aleady skipped.
time.Sleep(5 * time.Millisecond)
<-time.After(50 * time.Millisecond)
started, done := j.Started(), j.Done()
if started != 1 || done != 0 {
t.Error("expected first job started, but not finished, got", started, done)
}

// Verify that the first job completes and second does not run.
time.Sleep(25 * time.Millisecond)
<-time.After(220 * time.Millisecond)
started, done = j.Started(), j.Done()
if started != 1 || done != 1 {
t.Error("expected second job skipped, got", started, done)
Expand Down
110 changes: 95 additions & 15 deletions dcron.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ const (
dcronStateUpgrade = "dcronStateUpgrade"
)

var (
ErrJobExist = errors.New("jobName already exist")
ErrJobNotExist = errors.New("jobName not exist")
ErrJobWrongNode = errors.New("job is not running in this node")
)

type RecoverFuncType func(d *Dcron)

// Dcron is main struct
type Dcron struct {
jobs map[string]*JobWarpper
jobsRWMut sync.Mutex
jobsRWMut sync.RWMutex

ServerName string
nodePool INodePool
Expand All @@ -48,6 +54,8 @@ type Dcron struct {

recentJobs IRecentJobPacker
state atomic.Value

runningLocally bool
}

// NewDcron create a Dcron
Expand All @@ -68,7 +76,9 @@ func NewDcronWithOption(serverName string, driver driver.DriverV2, dcronOpts ...
}

dcron.cr = cron.New(dcron.crOptions...)
dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger)
if !dcron.runningLocally {
dcron.nodePool = NewNodePool(serverName, driver, dcron.nodeUpdateDuration, dcron.hashReplicas, dcron.logger)
}
return dcron
}

Expand Down Expand Up @@ -110,9 +120,9 @@ func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) {
d.jobsRWMut.Lock()
defer d.jobsRWMut.Unlock()
if _, ok := d.jobs[jobName]; ok {
return errors.New("jobName already exist")
return ErrJobExist
}
innerJob := JobWarpper{
innerJob := &JobWarpper{
Name: jobName,
CronStr: cronStr,
Job: job,
Expand All @@ -123,11 +133,11 @@ func (d *Dcron) addJob(jobName, cronStr string, job Job) (err error) {
return err
}
innerJob.ID = entryID
d.jobs[jobName] = &innerJob
d.jobs[jobName] = innerJob
return nil
}

// Remove Job
// Remove Job by jobName
func (d *Dcron) Remove(jobName string) {
d.jobsRWMut.Lock()
defer d.jobsRWMut.Unlock()
Expand All @@ -138,7 +148,71 @@ func (d *Dcron) Remove(jobName string) {
}
}

// Get job by jobName
// if this jobName not exist, will return error.
//
// if `thisNodeOnly` is true
// if this job is not available in this node, will return error.
// otherwise return the struct of JobWarpper whose name is jobName.
func (d *Dcron) GetJob(jobName string, thisNodeOnly bool) (*JobWarpper, error) {
d.jobsRWMut.RLock()
defer d.jobsRWMut.RUnlock()

job, ok := d.jobs[jobName]
if !ok {
d.logger.Warnf("job: %s, not exist", jobName)
return nil, ErrJobNotExist
}
if !thisNodeOnly {
return job, nil
}
isRunningHere, err := d.nodePool.CheckJobAvailable(jobName)
if err != nil {
return nil, err
}
if !isRunningHere {
return nil, ErrJobWrongNode
}
return job, nil
}

// Get job list.
//
// if `thisNodeOnly` is true
// return all jobs available in this node.
// otherwise return all jobs added to dcron.
//
// we never return nil. If there is no job.
// this func will return an empty slice.
func (d *Dcron) GetJobs(thisNodeOnly bool) []*JobWarpper {
d.jobsRWMut.RLock()
defer d.jobsRWMut.RUnlock()

ret := make([]*JobWarpper, 0)
for _, v := range d.jobs {
var (
isRunningHere bool
ok bool = true
err error
)
if thisNodeOnly {
isRunningHere, err = d.nodePool.CheckJobAvailable(v.Name)
if err != nil {
continue
}
ok = isRunningHere
}
if ok {
ret = append(ret, v)
}
}
return ret
}

func (d *Dcron) allowThisNodeRun(jobName string) (ok bool) {
if d.runningLocally {
return true
}
ok, err := d.nodePool.CheckJobAvailable(jobName)
if err != nil {
d.logger.Errorf("allow this node run error, err=%v", err)
Expand All @@ -165,12 +239,14 @@ func (d *Dcron) Start() {
d.RecoverFunc(d)
}
if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
if !d.runningLocally {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
}
d.logger.Infof("dcron started, nodeID is %s", d.nodePool.GetNodeID())
}
d.cr.Start()
d.logger.Infof("dcron started, nodeID is %s", d.nodePool.GetNodeID())
} else {
d.logger.Infof("dcron have started")
}
Expand All @@ -183,11 +259,13 @@ func (d *Dcron) Run() {
d.RecoverFunc(d)
}
if atomic.CompareAndSwapInt32(&d.running, dcronStopped, dcronRunning) {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
if !d.runningLocally {
if err := d.startNodePool(); err != nil {
atomic.StoreInt32(&d.running, dcronStopped)
return
}
d.logger.Infof("dcron running, nodeID is %s", d.nodePool.GetNodeID())
}
d.logger.Infof("dcron running, nodeID is %s", d.nodePool.GetNodeID())
d.cr.Run()
} else {
d.logger.Infof("dcron already running")
Expand All @@ -205,7 +283,9 @@ func (d *Dcron) startNodePool() error {
// Stop job
func (d *Dcron) Stop() {
tick := time.NewTicker(time.Millisecond)
d.nodePool.Stop(context.Background())
if !d.runningLocally {
d.nodePool.Stop(context.Background())
}
for range tick.C {
if atomic.CompareAndSwapInt32(&d.running, dcronRunning, dcronStopped) {
d.cr.Stop()
Expand Down
51 changes: 51 additions & 0 deletions dcron_locally_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package dcron_test

import (
"testing"
"time"

"github.com/libi/dcron"
"github.com/libi/dcron/cron"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

type DcronLocallyTestSuite struct {
suite.Suite
}

func (s *DcronLocallyTestSuite) TestNormal() {
dcr := dcron.NewDcronWithOption(
"not a necessary servername",
nil,
dcron.RunningLocally(),
dcron.CronOptionSeconds(),
dcron.CronOptionChain(
cron.Recover(
cron.DefaultLogger,
)))

s.Assert().NotNil(dcr)
runningTime := 60 * time.Second
t := s.T()
var err error
err = dcr.AddFunc("job1", "*/5 * * * * *", func() {
t.Log(time.Now())
})
require.Nil(t, err)
err = dcr.AddFunc("job2", "*/8 * * * * *", func() {
panic("test panic")
})
require.Nil(t, err)
err = dcr.AddFunc("job3", "*/2 * * * * *", func() {
t.Log("job3:", time.Now())
})
require.Nil(t, err)
dcr.Start()
<-time.After(runningTime)
dcr.Stop()
}

func TestDcronLocallyTestSuite(t *testing.T) {
suite.Run(t, &DcronLocallyTestSuite{})
}
Loading

0 comments on commit f85cbd3

Please sign in to comment.