Skip to content

Commit

Permalink
feat: The result of the scheduled task execution is based on the task…
Browse files Browse the repository at this point in the history
… output
  • Loading branch information
ssongliu committed Dec 27, 2024
1 parent 4d548ad commit 00dc8e7
Show file tree
Hide file tree
Showing 14 changed files with 325 additions and 215 deletions.
2 changes: 1 addition & 1 deletion agent/app/api/v2/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (b *BaseApi) CreateSnapshot(c *gin.Context) {
return
}

if err := snapshotService.SnapshotCreate(req); err != nil {
if err := snapshotService.SnapshotCreate(req, false); err != nil {
helper.InternalServer(c, err)
return
}
Expand Down
1 change: 1 addition & 0 deletions agent/app/dto/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ type SearchRecord struct {

type Record struct {
ID uint `json:"id"`
TaskID string `json:"taskID"`
StartTime string `json:"startTime"`
Records string `json:"records"`
Status string `json:"status"`
Expand Down
2 changes: 2 additions & 0 deletions agent/app/repo/cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/1Panel-dev/1Panel/agent/app/model"
"github.com/1Panel-dev/1Panel/agent/constant"
"github.com/1Panel-dev/1Panel/agent/global"
"github.com/google/uuid"
"gorm.io/gorm"
)

Expand Down Expand Up @@ -146,6 +147,7 @@ func (u *CronjobRepo) StartRecords(cronjobID uint, targetPath string) model.JobR
var record model.JobRecords
record.StartTime = time.Now()
record.CronjobID = cronjobID
record.TaskID = uuid.New().String()
record.Status = constant.StatusWaiting
if err := global.DB.Create(&record).Error; err != nil {
global.LOG.Errorf("create record status failed, err: %v", err)
Expand Down
38 changes: 28 additions & 10 deletions agent/app/service/cronjob_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/pkg/errors"
)

func (u *CronjobService) handleApp(cronjob model.Cronjob, startTime time.Time) error {
func (u *CronjobService) handleApp(cronjob model.Cronjob, startTime time.Time, taskID string) error {
var apps []model.AppInstall
if cronjob.AppID == "all" {
apps, _ = appInstallRepo.ListBy()
Expand Down Expand Up @@ -46,7 +46,7 @@ func (u *CronjobService) handleApp(cronjob model.Cronjob, startTime time.Time) e
record.DownloadAccountID, record.SourceAccountIDs = cronjob.DownloadAccountID, cronjob.SourceAccountIDs
backupDir := path.Join(global.CONF.System.TmpDir, fmt.Sprintf("app/%s/%s", app.App.Key, app.Name))
record.FileName = fmt.Sprintf("app_%s_%s.tar.gz", app.Name, startTime.Format(constant.DateTimeSlimLayout)+common.RandStrAndNum(5))
if err := handleAppBackup(&app, nil, backupDir, record.FileName, cronjob.ExclusionRules, cronjob.Secret, ""); err != nil {
if err := handleAppBackup(&app, nil, backupDir, record.FileName, cronjob.ExclusionRules, cronjob.Secret, taskID); err != nil {
return err
}
downloadPath, err := u.uploadCronjobBackFile(cronjob, accountMap, path.Join(backupDir, record.FileName))
Expand All @@ -63,7 +63,7 @@ func (u *CronjobService) handleApp(cronjob model.Cronjob, startTime time.Time) e
return nil
}

func (u *CronjobService) handleWebsite(cronjob model.Cronjob, startTime time.Time) error {
func (u *CronjobService) handleWebsite(cronjob model.Cronjob, startTime time.Time, taskID string) error {
webs := loadWebsForJob(cronjob)
if len(webs) == 0 {
return errors.New("no such website in database!")
Expand All @@ -82,7 +82,7 @@ func (u *CronjobService) handleWebsite(cronjob model.Cronjob, startTime time.Tim
record.DownloadAccountID, record.SourceAccountIDs = cronjob.DownloadAccountID, cronjob.SourceAccountIDs
backupDir := path.Join(global.CONF.System.TmpDir, fmt.Sprintf("website/%s", web.PrimaryDomain))
record.FileName = fmt.Sprintf("website_%s_%s.tar.gz", web.PrimaryDomain, startTime.Format(constant.DateTimeSlimLayout)+common.RandStrAndNum(5))
if err := handleWebsiteBackup(&web, backupDir, record.FileName, cronjob.ExclusionRules, cronjob.Secret, ""); err != nil {
if err := handleWebsiteBackup(&web, backupDir, record.FileName, cronjob.ExclusionRules, cronjob.Secret, taskID); err != nil {
return err
}
downloadPath, err := u.uploadCronjobBackFile(cronjob, accountMap, path.Join(backupDir, record.FileName))
Expand All @@ -99,7 +99,7 @@ func (u *CronjobService) handleWebsite(cronjob model.Cronjob, startTime time.Tim
return nil
}

func (u *CronjobService) handleDatabase(cronjob model.Cronjob, startTime time.Time) error {
func (u *CronjobService) handleDatabase(cronjob model.Cronjob, startTime time.Time, taskID string) error {
dbs := loadDbsForJob(cronjob)
if len(dbs) == 0 {
return errors.New("no such db in database!")
Expand All @@ -120,11 +120,11 @@ func (u *CronjobService) handleDatabase(cronjob model.Cronjob, startTime time.Ti
backupDir := path.Join(global.CONF.System.TmpDir, fmt.Sprintf("database/%s/%s/%s", dbInfo.DBType, record.Name, dbInfo.Name))
record.FileName = fmt.Sprintf("db_%s_%s.sql.gz", dbInfo.Name, startTime.Format(constant.DateTimeSlimLayout)+common.RandStrAndNum(5))
if cronjob.DBType == "mysql" || cronjob.DBType == "mariadb" {
if err := handleMysqlBackup(dbInfo, nil, backupDir, record.FileName, ""); err != nil {
if err := handleMysqlBackup(dbInfo, nil, backupDir, record.FileName, taskID); err != nil {
return err
}
} else {
if err := handlePostgresqlBackup(dbInfo, nil, backupDir, record.FileName, ""); err != nil {
if err := handlePostgresqlBackup(dbInfo, nil, backupDir, record.FileName, taskID); err != nil {
return err
}
}
Expand Down Expand Up @@ -212,11 +212,15 @@ func (u *CronjobService) handleSystemLog(cronjob model.Cronjob, startTime time.T
return nil
}

func (u *CronjobService) handleSnapshot(cronjob model.Cronjob, startTime time.Time) error {
func (u *CronjobService) handleSnapshot(cronjob model.Cronjob, startTime time.Time, taskID string) error {
accountMap, err := NewBackupClientMap(strings.Split(cronjob.SourceAccountIDs, ","))
if err != nil {
return err
}
itemData, err := NewISnapshotService().LoadSnapshotData()
if err != nil {
return err
}

var record model.BackupRecord
record.From = "cronjob"
Expand All @@ -227,14 +231,28 @@ func (u *CronjobService) handleSnapshot(cronjob model.Cronjob, startTime time.Ti
record.FileDir = "system_snapshot"

versionItem, _ := settingRepo.Get(settingRepo.WithByKey("SystemVersion"))
scope := "core"
if !global.IsMaster {
scope = "agent"
}
req := dto.SnapshotCreate{
Name: fmt.Sprintf("snapshot-1panel-%s-linux-%s-%s", versionItem.Value, loadOs(), startTime.Format(constant.DateTimeSlimLayout)+common.RandStrAndNum(5)),
Name: fmt.Sprintf("snapshot-1panel-%s-%s-linux-%s-%s", scope, versionItem.Value, loadOs(), startTime.Format(constant.DateTimeSlimLayout)+common.RandStrAndNum(5)),
Secret: cronjob.Secret,
TaskID: taskID,

SourceAccountIDs: record.SourceAccountIDs,
DownloadAccountID: cronjob.DownloadAccountID,
AppData: itemData.AppData,
PanelData: itemData.PanelData,
BackupData: itemData.BackupData,
WithMonitorData: true,
WithLoginLog: true,
WithOperationLog: true,
WithSystemLog: true,
WithTaskLog: true,
}
if err := NewISnapshotService().HandleSnapshot(req); err != nil {

if err := NewISnapshotService().SnapshotCreate(req, true); err != nil {
return err
}
record.FileName = req.Name + ".tar.gz"
Expand Down
156 changes: 92 additions & 64 deletions agent/app/service/cronjob_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/1Panel-dev/1Panel/agent/app/model"
"github.com/1Panel-dev/1Panel/agent/app/repo"
"github.com/1Panel-dev/1Panel/agent/app/task"
"github.com/1Panel-dev/1Panel/agent/buserr"
"github.com/1Panel-dev/1Panel/agent/constant"
"github.com/1Panel-dev/1Panel/agent/global"
Expand All @@ -31,36 +32,26 @@ func (u *CronjobService) HandleJob(cronjob *model.Cronjob) {
if len(cronjob.Script) == 0 {
return
}
record.Records = u.generateLogsPath(*cronjob, record.StartTime)
_ = cronjobRepo.UpdateRecords(record.ID, map[string]interface{}{"records": record.Records})
err = u.handleShell(*cronjob, record.Records)
u.removeExpiredLog(*cronjob)
err = u.handleShell(*cronjob, record.TaskID)
case "curl":
if len(cronjob.URL) == 0 {
return
}
record.Records = u.generateLogsPath(*cronjob, record.StartTime)
_ = cronjobRepo.UpdateRecords(record.ID, map[string]interface{}{"records": record.Records})
err = cmd.ExecShell(record.Records, 24*time.Hour, "bash", "-c", "curl", cronjob.URL)
u.removeExpiredLog(*cronjob)
err = u.handleCurl(*cronjob, record.TaskID)
case "ntp":
err = u.handleNtpSync()
u.removeExpiredLog(*cronjob)
err = u.handleNtpSync(*cronjob, record.TaskID)
case "cutWebsiteLog":
var messageItem []string
messageItem, record.File, err = u.handleCutWebsiteLog(cronjob, record.StartTime)
message = []byte(strings.Join(messageItem, "\n"))
case "clean":
messageItem := ""
messageItem, err = u.handleSystemClean()
message = []byte(messageItem)
u.removeExpiredLog(*cronjob)
err = u.handleSystemClean(*cronjob, record.TaskID)
case "website":
err = u.handleWebsite(*cronjob, record.StartTime)
err = u.handleWebsite(*cronjob, record.StartTime, record.TaskID)
case "app":
err = u.handleApp(*cronjob, record.StartTime)
err = u.handleApp(*cronjob, record.StartTime, record.TaskID)
case "database":
err = u.handleDatabase(*cronjob, record.StartTime)
err = u.handleDatabase(*cronjob, record.StartTime, record.TaskID)
case "directory":
if len(cronjob.SourceDir) == 0 {
return
Expand All @@ -70,7 +61,7 @@ func (u *CronjobService) HandleJob(cronjob *model.Cronjob) {
err = u.handleSystemLog(*cronjob, record.StartTime)
case "snapshot":
_ = cronjobRepo.UpdateRecords(record.ID, map[string]interface{}{"records": record.Records})
err = u.handleSnapshot(*cronjob, record.StartTime)
err = u.handleSnapshot(*cronjob, record.StartTime, record.TaskID)
}

if err != nil {
Expand All @@ -90,53 +81,95 @@ func (u *CronjobService) HandleJob(cronjob *model.Cronjob) {
}()
}

func (u *CronjobService) handleShell(cronjob model.Cronjob, logPath string) error {
if len(cronjob.ContainerName) != 0 {
command := "sh"
if len(cronjob.Command) != 0 {
command = cronjob.Command
}
scriptFile, _ := os.ReadFile(cronjob.Script)
return cmd.ExecShell(logPath, 24*time.Hour, "docker", "exec", cronjob.ContainerName, command, "-c", strings.ReplaceAll(string(scriptFile), "\"", "\\\""))
}
if len(cronjob.Executor) == 0 {
cronjob.Executor = "bash"
func (u *CronjobService) handleShell(cronjob model.Cronjob, taskID string) error {
taskItem, err := task.NewTaskWithOps(fmt.Sprintf("cronjob-%s", cronjob.Name), task.TaskHandle, task.TaskScopeCronjob, taskID, cronjob.ID)
if err != nil {
global.LOG.Errorf("new task for exec shell failed, err: %v", err)
return err
}
if cronjob.ScriptMode == "input" {
fileItem := pathUtils.Join(global.CONF.System.BaseDir, "1panel", "task", "shell", cronjob.Name, cronjob.Name+".sh")
_ = os.MkdirAll(pathUtils.Dir(fileItem), os.ModePerm)
shellFile, err := os.OpenFile(fileItem, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, constant.FilePerm)
if err != nil {
return err

taskItem.AddSubTask(i18n.GetWithName("HandleShell", cronjob.Name), func(t *task.Task) error {
if len(cronjob.ContainerName) != 0 {
command := "sh"
if len(cronjob.Command) != 0 {
command = cronjob.Command
}
scriptFile, _ := os.ReadFile(cronjob.Script)
return cmd.ExecShellWithTask(taskItem, 24*time.Hour, "docker", "exec", cronjob.ContainerName, command, "-c", strings.ReplaceAll(string(scriptFile), "\"", "\\\""))
}
defer shellFile.Close()
if _, err := shellFile.WriteString(cronjob.Script); err != nil {
return err
if len(cronjob.Executor) == 0 {
cronjob.Executor = "bash"
}
if cronjob.ScriptMode == "input" {
fileItem := pathUtils.Join(global.CONF.System.BaseDir, "1panel", "task", "shell", cronjob.Name, cronjob.Name+".sh")
_ = os.MkdirAll(pathUtils.Dir(fileItem), os.ModePerm)
shellFile, err := os.OpenFile(fileItem, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, constant.FilePerm)
if err != nil {
return err
}
defer shellFile.Close()
if _, err := shellFile.WriteString(cronjob.Script); err != nil {
return err
}
if len(cronjob.User) == 0 {
return cmd.ExecShellWithTask(taskItem, 24*time.Hour, cronjob.Executor, fileItem)
}
return cmd.ExecShellWithTask(taskItem, 24*time.Hour, "sudo", "-u", cronjob.User, cronjob.Executor, fileItem)
}
if len(cronjob.User) == 0 {
return cmd.ExecShell(logPath, 24*time.Hour, cronjob.Executor, fileItem)
return cmd.ExecShellWithTask(taskItem, 24*time.Hour, cronjob.Executor, cronjob.Script)
}
return cmd.ExecShell(logPath, 24*time.Hour, "sudo", "-u", cronjob.User, cronjob.Executor, fileItem)
}
if len(cronjob.User) == 0 {
return cmd.ExecShell(logPath, 24*time.Hour, cronjob.Executor, cronjob.Script)
}
return cmd.ExecShell(logPath, 24*time.Hour, "sudo", "-u", cronjob.User, cronjob.Executor, cronjob.Script)
if err := cmd.ExecShellWithTask(taskItem, 24*time.Hour, "sudo", "-u", cronjob.User, cronjob.Executor, cronjob.Script); err != nil {
return err
}
return nil
},
nil,
)
return taskItem.Execute()
}

func (u *CronjobService) handleNtpSync() error {
ntpServer, err := settingRepo.Get(settingRepo.WithByKey("NtpSite"))
func (u *CronjobService) handleCurl(cronjob model.Cronjob, taskID string) error {
taskItem, err := task.NewTaskWithOps(fmt.Sprintf("cronjob-%s", cronjob.Name), task.TaskHandle, task.TaskScopeCronjob, taskID, cronjob.ID)
if err != nil {
global.LOG.Errorf("new task for exec shell failed, err: %v", err)
return err
}
ntime, err := ntp.GetRemoteTime(ntpServer.Value)

taskItem.AddSubTask(i18n.GetWithName("HandleShell", cronjob.Name), func(t *task.Task) error {
if err := cmd.ExecShellWithTask(taskItem, 24*time.Hour, "bash", "-c", "curl", cronjob.URL); err != nil {
return err
}
return nil
},
nil,
)
return taskItem.Execute()
}

func (u *CronjobService) handleNtpSync(cronjob model.Cronjob, taskID string) error {
taskItem, err := task.NewTaskWithOps(fmt.Sprintf("cronjob-%s", cronjob.Name), task.TaskHandle, task.TaskScopeCronjob, taskID, cronjob.ID)
if err != nil {
global.LOG.Errorf("new task for exec shell failed, err: %v", err)
return err
}
if err := ntp.UpdateSystemTime(ntime.Format(constant.DateTimeLayout)); err != nil {
return err
}
return nil

taskItem.AddSubTask(i18n.GetMsgByKey("HandleNtpSync"), func(t *task.Task) error {
ntpServer, err := settingRepo.Get(settingRepo.WithByKey("NtpSite"))
if err != nil {
return err
}
taskItem.Logf("ntp server: %s", ntpServer.Value)
ntime, err := ntp.GetRemoteTime(ntpServer.Value)
if err != nil {
return err
}
if err := ntp.UpdateSystemTime(ntime.Format(constant.DateTimeLayout)); err != nil {
return err
}
return nil
}, nil)
return taskItem.Execute()
}

func (u *CronjobService) handleCutWebsiteLog(cronjob *model.Cronjob, startTime time.Time) ([]string, string, error) {
Expand Down Expand Up @@ -201,8 +234,13 @@ func backupLogFile(dstFilePath, websiteLogDir string, fileOp files.FileOp) error
return nil
}

func (u *CronjobService) handleSystemClean() (string, error) {
return NewIDeviceService().CleanForCronjob()
func (u *CronjobService) handleSystemClean(cronjob model.Cronjob, taskID string) error {
taskItem, err := task.NewTaskWithOps(fmt.Sprintf("cronjob-%s", cronjob.Name), task.TaskHandle, task.TaskScopeCronjob, taskID, cronjob.ID)
if err != nil {
global.LOG.Errorf("new task for system clean failed, err: %v", err)
return err
}
return systemClean(taskItem)
}

func (u *CronjobService) uploadCronjobBackFile(cronjob model.Cronjob, accountMap map[string]backupClientHelper, file string) (string, error) {
Expand Down Expand Up @@ -274,16 +312,6 @@ func (u *CronjobService) removeExpiredLog(cronjob model.Cronjob) {
}
}

func (u *CronjobService) generateLogsPath(cronjob model.Cronjob, startTime time.Time) string {
dir := fmt.Sprintf("%s/task/%s/%s", constant.DataDir, cronjob.Type, cronjob.Name)
if _, err := os.Stat(dir); err != nil && os.IsNotExist(err) {
_ = os.MkdirAll(dir, os.ModePerm)
}

path := fmt.Sprintf("%s/%s.log", dir, startTime.Format(constant.DateTimeSlimLayout))
return path
}

func hasBackup(cronjobType string) bool {
return cronjobType == "app" || cronjobType == "database" || cronjobType == "website" || cronjobType == "directory" || cronjobType == "snapshot" || cronjobType == "log"
}
1 change: 0 additions & 1 deletion agent/app/service/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type IDeviceService interface {

Scan() dto.CleanData
Clean(req []dto.Clean)
CleanForCronjob() (string, error)
}

func NewIDeviceService() IDeviceService {
Expand Down
Loading

0 comments on commit 00dc8e7

Please sign in to comment.