Skip to content

Commit

Permalink
Merge pull request #17 from qor/process-signal-when-running
Browse files Browse the repository at this point in the history
Set worker status to JobStatusKilled when receive SIGINT(^C (Control-…
  • Loading branch information
sunfmin authored Apr 9, 2019
2 parents fc5e868 + 8d4e854 commit 5c93817
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 14 deletions.
23 changes: 23 additions & 0 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"fmt"
"os"
"os/exec"
"os/signal"
"path/filepath"
"reflect"
"strings"
"sync"
"syscall"
"time"
)

Expand Down Expand Up @@ -127,6 +130,26 @@ func (cron *Cron) Run(qorJob QorJobInterface) error {
job := qorJob.GetJob()

if job.Handler != nil {
go func() {
sigint := make(chan os.Signal, 1)

// interrupt signal sent from terminal
signal.Notify(sigint, syscall.SIGINT)
// sigterm signal sent from kubernetes
signal.Notify(sigint, syscall.SIGTERM)

i := <-sigint

qorJob.SetProgressText(fmt.Sprintf("Worker killed by signal %s", i.String()))
qorJob.SetStatus(JobStatusKilled)

qorJob.StopReferesh()
os.Exit(int(reflect.ValueOf(i).Int()))
}()

qorJob.StartReferesh()
defer qorJob.StopReferesh()

err := job.Handler(qorJob.GetSerializableArgument(qorJob), qorJob)
if err == nil {
cron.parseJobs()
Expand Down
100 changes: 86 additions & 14 deletions qor_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"sync"
"time"

"github.com/jinzhu/gorm"
"github.com/qor/admin"
Expand All @@ -32,6 +34,9 @@ type QorJobInterface interface {
GetResultsTable() ResultsTable
AddResultsRow(...TableCell) error

StartReferesh()
StopReferesh()

GetArgument() interface{}
serializable_meta.SerializableMetaInterface
}
Expand Down Expand Up @@ -77,6 +82,9 @@ type QorJob struct {

mutex sync.Mutex `sql:"-"`

stopReferesh bool `sql:"-"`
inReferesh bool `sql:"-"`

// Add `valid:"-"`` to make the QorJob work well with qor/validations
// When the qor/validations auto exec the validate struct callback we get error
// runtime: goroutine stack exceeds 1000000000-byte limit
Expand Down Expand Up @@ -107,12 +115,67 @@ func (job *QorJob) SetStatus(status string) error {
job.mutex.Lock()
defer job.mutex.Unlock()

worker := job.GetJob().Worker
context := worker.Admin.NewContext(nil, nil).Context
job.Status = status
if status == JobStatusDone {
job.Progress = 100
}

if job.shouldCallSave() {
return job.callSave()
}

return nil
}

func (job *QorJob) shouldCallSave() bool {
return !job.inReferesh || job.stopReferesh
}

func (job *QorJob) StartReferesh() {
job.mutex.Lock()
defer job.mutex.Unlock()
if !job.inReferesh {
job.inReferesh = true
job.stopReferesh = false

go func() {
job.referesh()
}()
}
}

func (job *QorJob) StopReferesh() {
job.mutex.Lock()
defer job.mutex.Unlock()

err := job.callSave()
if err != nil {
log.Println(err)
}

job.stopReferesh = true
}

func (job *QorJob) referesh() {
job.mutex.Lock()
defer job.mutex.Unlock()

err := job.callSave()
if err != nil {
log.Println(err)
}

if job.stopReferesh {
job.inReferesh = false
job.stopReferesh = false
} else {
time.AfterFunc(5*time.Second, job.referesh)
}
}

func (job *QorJob) callSave() error {
worker := job.GetJob().Worker
context := worker.Admin.NewContext(nil, nil).Context
return worker.JobResource.CallSave(job, context)
}

Expand Down Expand Up @@ -153,13 +216,16 @@ func (job *QorJob) SetProgress(progress uint) error {
job.mutex.Lock()
defer job.mutex.Unlock()

worker := job.GetJob().Worker
context := worker.Admin.NewContext(nil, nil).Context
if progress > 100 {
progress = 100
}
job.Progress = progress
return worker.JobResource.CallSave(job, context)

if job.shouldCallSave() {
return job.callSave()
}

return nil
}

// GetProgressText get qor job's progress text
Expand All @@ -172,10 +238,12 @@ func (job *QorJob) SetProgressText(str string) error {
job.mutex.Lock()
defer job.mutex.Unlock()

worker := job.GetJob().Worker
context := worker.Admin.NewContext(nil, nil).Context
job.ProgressText = str
return worker.JobResource.CallSave(job, context)
if job.shouldCallSave() {
return job.callSave()
}

return nil
}

// GetLogs get qor job's logs
Expand All @@ -188,11 +256,13 @@ func (job *QorJob) AddLog(log string) error {
job.mutex.Lock()
defer job.mutex.Unlock()

worker := job.GetJob().Worker
context := worker.Admin.NewContext(nil, nil).Context
fmt.Println(log)
job.Log += "\n" + log
return worker.JobResource.CallSave(job, context)
if job.shouldCallSave() {
return job.callSave()
}

return nil
}

// GetResultsTable get the job's process logs
Expand All @@ -205,8 +275,10 @@ func (job *QorJob) AddResultsRow(cells ...TableCell) error {
job.mutex.Lock()
defer job.mutex.Unlock()

worker := job.GetJob().Worker
context := worker.Admin.NewContext(nil, nil).Context
job.ResultsTable.TableCells = append(job.ResultsTable.TableCells, cells)
return worker.JobResource.CallSave(job, context)
if job.shouldCallSave() {
return job.callSave()
}

return nil
}

0 comments on commit 5c93817

Please sign in to comment.