From 1ed6d6d87338c1042a0ccde539dc40b2293e862d Mon Sep 17 00:00:00 2001 From: Patrick Zhao Date: Fri, 20 Dec 2024 10:35:13 +0800 Subject: [PATCH] fix job not stopped after execute findished Signed-off-by: Patrick Zhao --- .../mongodb/update_workflow_task_log.go | 71 +++++++++++++++++++ .../repository/mongodb/wait_pod_finish_log.go | 70 ++++++++++++++++++ .../jobcontroller/kubernetes.go | 18 +++-- .../service/workflowcontroller/workflow.go | 11 +++ 4 files changed, 165 insertions(+), 5 deletions(-) create mode 100644 pkg/microservice/aslan/core/common/repository/mongodb/update_workflow_task_log.go create mode 100644 pkg/microservice/aslan/core/common/repository/mongodb/wait_pod_finish_log.go diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/update_workflow_task_log.go b/pkg/microservice/aslan/core/common/repository/mongodb/update_workflow_task_log.go new file mode 100644 index 0000000000..a7c1fab40a --- /dev/null +++ b/pkg/microservice/aslan/core/common/repository/mongodb/update_workflow_task_log.go @@ -0,0 +1,71 @@ +/* + * Copyright 2023 The KodeRover Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mongodb + +import ( + "context" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/mongo" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo" +) + +type UpdateWorkflowTaskLogColl struct { + *mongo.Collection + + coll string +} + +type UpdateWorkflowTaskLog struct { + WorkflowName string `bson:"workflow_name" json:"workflow_name"` + TaskID int64 `bson:"task_id" json:"task_id"` + StartTime int64 `bson:"start_time" json:"start_time"` + EndTime int64 `bson:"end_time" json:"end_time"` + Status string `bson:"status" json:"status"` + Data interface{} `bson:"data" json:"data"` +} + +func (c UpdateWorkflowTaskLog) TableName() string { + return "update_workflow_task_log" +} + +func NewUpdateWorkflowTaskLogColl() *UpdateWorkflowTaskLogColl { + name := UpdateWorkflowTaskLog{}.TableName() + return &UpdateWorkflowTaskLogColl{ + Collection: mongotool.Database(config.MongoDatabase()).Collection(name), + coll: name, + } +} + +func (c *UpdateWorkflowTaskLogColl) GetCollectionName() string { + return c.coll +} + +func (c *UpdateWorkflowTaskLogColl) EnsureIndex(ctx context.Context) error { + return nil +} + +func (c *UpdateWorkflowTaskLogColl) Create(args *UpdateWorkflowTaskLog) error { + if args == nil { + return errors.New("nil UpdateWorkflowTaskLog") + } + + _, err := c.InsertOne(context.Background(), args) + return err +} diff --git a/pkg/microservice/aslan/core/common/repository/mongodb/wait_pod_finish_log.go b/pkg/microservice/aslan/core/common/repository/mongodb/wait_pod_finish_log.go new file mode 100644 index 0000000000..19d377ecdc --- /dev/null +++ b/pkg/microservice/aslan/core/common/repository/mongodb/wait_pod_finish_log.go @@ -0,0 +1,70 @@ +/* + * Copyright 2023 The KodeRover Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mongodb + +import ( + "context" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/mongo" + + "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" + mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo" +) + +type WaitPodFinishLogColl struct { + *mongo.Collection + + coll string +} + +type WaitPodFinishLog struct { + JobName string `bson:"job_name" json:"job_name"` + JobStatus string `bson:"job_status" json:"job_status"` + Namespace string `bson:"namespace" json:"namespace"` + PodName string `bson:"pod_name" json:"pod_name"` + Status string `bson:"status" json:"status"` +} + +func (c WaitPodFinishLog) TableName() string { + return "wait_pod_finish_log" +} + +func NewWaitPodFinishLogColl() *WaitPodFinishLogColl { + name := WaitPodFinishLog{}.TableName() + return &WaitPodFinishLogColl{ + Collection: mongotool.Database(config.MongoDatabase()).Collection(name), + coll: name, + } +} + +func (c *WaitPodFinishLogColl) GetCollectionName() string { + return c.coll +} + +func (c *WaitPodFinishLogColl) EnsureIndex(ctx context.Context) error { + return nil +} + +func (c *WaitPodFinishLogColl) Create(args *WaitPodFinishLog) error { + if args == nil { + return errors.New("nil WaitPodFinishLog") + } + + _, err := c.InsertOne(context.Background(), args) + return err +} diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go index 70f5407968..092ee4494e 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/jobcontroller/kubernetes.go @@ -829,18 +829,16 @@ func isPodFailed(podName, namespace string, apiReader client.Reader, xl *zap.Sug func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time.Time, namespace, jobName string, checkFile bool, kubeClient crClient.Client, clientset kubernetes.Interface, restConfig *rest.Config, informer informers.SharedInformerFactory, jobTask *commonmodels.JobTask, ack func(), xl *zap.SugaredLogger) (status config.Status, errMsg string) { xl.Infof("wait job to end: %s %s", namespace, jobName) - podLister := informer.Core().V1().Pods().Lister().Pods(namespace) - jobLister := informer.Batch().V1().Jobs().Lister().Jobs(namespace) - cmLister := informer.Core().V1().ConfigMaps().Lister().ConfigMaps(namespace) for { select { case <-ctx.Done(): return config.StatusCancelled, "" - case <-taskTimeout: return config.StatusTimeout, "" - default: + jobLister := informer.Batch().V1().Jobs().Lister().Jobs(namespace) + cmLister := informer.Core().V1().ConfigMaps().Lister().ConfigMaps(namespace) + job, err := jobLister.Get(jobName) if err != nil { errMsg := fmt.Sprintf("failed to get job pod job-name=%s %v", jobName, err) @@ -857,6 +855,7 @@ func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time. // pod is still running switch { case job.Status.Active != 0: + podLister := informer.Core().V1().Pods().Lister().Pods(namespace) pods, err := podLister.List(labels.Set{"job-name": jobName}.AsSelector()) if err != nil { errMsg := fmt.Sprintf("failed to find pod with label job-name=%s %v", jobName, err) @@ -864,6 +863,15 @@ func waitJobEndByCheckingConfigMap(ctx context.Context, taskTimeout <-chan time. return config.StatusFailed, errMsg } for _, pod := range pods { + commonrepo.NewWaitPodFinishLogColl().Create( + &commonrepo.WaitPodFinishLog{ + Namespace: namespace, + PodName: pod.Name, + JobName: jobName, + JobStatus: job.Status.String(), + Status: string(pod.Status.Phase), + }) + ipod := wrapper.Pod(pod) if ipod.Pending() { continue diff --git a/pkg/microservice/aslan/core/common/service/workflowcontroller/workflow.go b/pkg/microservice/aslan/core/common/service/workflowcontroller/workflow.go index 557f676624..e8a69fc619 100644 --- a/pkg/microservice/aslan/core/common/service/workflowcontroller/workflow.go +++ b/pkg/microservice/aslan/core/common/service/workflowcontroller/workflow.go @@ -549,6 +549,17 @@ func updateworkflowStatus(workflow *commonmodels.WorkflowTask) { } func (c *workflowCtl) updateWorkflowTask() { + c.workflowTaskMutex.Lock() + commonrepo.NewUpdateWorkflowTaskLogColl().Create(&commonrepo.UpdateWorkflowTaskLog{ + WorkflowName: c.workflowTask.WorkflowName, + TaskID: c.workflowTask.TaskID, + StartTime: c.workflowTask.StartTime, + EndTime: c.workflowTask.EndTime, + Status: string(c.workflowTask.Status), + Data: c.workflowTask, + }) + c.workflowTaskMutex.Unlock() + taskInColl, err := commonrepo.NewworkflowTaskv4Coll().Find(c.workflowTask.WorkflowName, c.workflowTask.TaskID) if err != nil { c.logger.Errorf("find workflow task v4 %s failed,error: %v", c.workflowTask.WorkflowName, err)