Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor updatequeuejob thread logic in informer #624

Merged
merged 11 commits into from
Sep 9, 2023
Merged
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 105 additions & 78 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,7 @@ func (cc *XController) Run(stopCh <-chan struct{}) {
go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh)

// This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion
go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh)
//go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh)
asm582 marked this conversation as resolved.
Show resolved Hide resolved

if cc.isDispatcher {
go wait.Until(cc.UpdateAgent, 2*time.Second, stopCh) // In the Agent?
Expand All @@ -1536,90 +1536,79 @@ func (qjm *XController) UpdateAgent() {
// Move AW from Running to Completed or RunningHoldCompletion
// Do not use event queues! Running AWs move to Completed, from which it will never transition to any other state.
// State transition: Running->RunningHoldCompletion->Completed
func (qjm *XController) UpdateQueueJobs() {
queueJobs, err := qjm.appWrapperLister.AppWrappers("").List(labels.Everything())
if err != nil {
klog.Errorf("[UpdateQueueJobs] Failed to get a list of active appwrappers, err=%+v", err)
return
}
containsCompletionStatus := false
for _, newjob := range queueJobs {
for _, item := range newjob.Spec.AggrResources.GenericItems {
if len(item.CompletionStatus) > 0 {
containsCompletionStatus = true
}
func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) {

if newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion {
err := qjm.UpdateQueueJobStatus(newjob)
if err != nil {
klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err)
//TODO: should we really return?
return
}
if (newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion) && containsCompletionStatus {
err := qjm.UpdateQueueJobStatus(newjob)
if err != nil {
klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err)
continue
}
klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
// set appwrapper status to Complete or RunningHoldCompletion
derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob)
klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
// set appwrapper status to Complete or RunningHoldCompletion
derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob)

klog.Infof("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, newjob.Namespace, newjob.Name, newjob.ResourceVersion,
newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed)
klog.Infof("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, newjob.Namespace, newjob.Name, newjob.ResourceVersion,
newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed)

// Set Appwrapper state to complete if all items in Appwrapper
// are completed
if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion {
newjob.Status.State = derivedAwStatus
var updateQj *arbv1.AppWrapper
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted")
if index < 0 {
newjob.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
newjob.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion
updateQj = newjob.DeepCopy()
} else {
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
newjob.Status.Conditions[index] = *cond.DeepCopy()
updateQj = newjob.DeepCopy()
}
err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setRunningHoldCompletion")
if err != nil {
// TODO: implement retry
klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err)
}
// Set Appwrapper state to complete if all items in Appwrapper
// are completed
if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion {
newjob.Status.State = derivedAwStatus
var updateQj *arbv1.AppWrapper
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted")
if index < 0 {
newjob.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
newjob.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion
updateQj = newjob.DeepCopy()
} else {
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
newjob.Status.Conditions[index] = *cond.DeepCopy()
updateQj = newjob.DeepCopy()
}
// Set appwrapper status to complete
if derivedAwStatus == arbv1.AppWrapperStateCompleted {
newjob.Status.State = derivedAwStatus
newjob.Status.CanRun = false
var updateQj *arbv1.AppWrapper
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondCompleted, "PodsCompleted")
if index < 0 {
newjob.Status.QueueJobState = arbv1.AppWrapperCondCompleted
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
newjob.Status.FilterIgnore = true // Update AppWrapperCondCompleted
updateQj = newjob.DeepCopy()
} else {
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
newjob.Status.Conditions[index] = *cond.DeepCopy()
updateQj = newjob.DeepCopy()
}
err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setCompleted")
if err != nil {
if qjm.quotaManager != nil {
qjm.quotaManager.Release(updateQj)
}
// TODO: Implement retry
klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err)
}
err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setRunningHoldCompletion")
if err != nil {
// TODO: implement retry
klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err)
}
}
// Set appwrapper status to complete
if derivedAwStatus == arbv1.AppWrapperStateCompleted {
newjob.Status.State = derivedAwStatus
newjob.Status.CanRun = false
var updateQj *arbv1.AppWrapper
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondCompleted, "PodsCompleted")
if index < 0 {
newjob.Status.QueueJobState = arbv1.AppWrapperCondCompleted
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
newjob.Status.FilterIgnore = true // Update AppWrapperCondCompleted
updateQj = newjob.DeepCopy()
} else {
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
newjob.Status.Conditions[index] = *cond.DeepCopy()
updateQj = newjob.DeepCopy()
}
err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setCompleted")
if err != nil {
if qjm.quotaManager != nil {
qjm.quotaManager.Release(updateQj)
}
// Delete AW from both queue's
qjm.eventQueue.Delete(updateQj)
qjm.qjqueue.Delete(updateQj)
// TODO: Implement retry
klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err)
}
klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion,
newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed)
if qjm.quotaManager != nil {
qjm.quotaManager.Release(updateQj)
}
// Delete AW from both queue's
qjm.eventQueue.Delete(updateQj)
qjm.qjqueue.Delete(updateQj)
}
klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion,
newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed)
}
}

Expand Down Expand Up @@ -1653,6 +1642,40 @@ func (cc *XController) addQueueJob(obj interface{}) {

klog.V(6).Infof("[Informer-addQJ] enqueue %s &qj=%p Version=%s Status=%+v", qj.Name, qj, qj.ResourceVersion, qj.Status)
cc.enqueue(qj)
// Requeue the item to be processed again in 30 seconds.
//TODO: tune the frequency of reprocessing an AW
hasCompletionStatus := false
for _, genericItem := range qj.Spec.AggrResources.GenericItems {
if len(genericItem.CompletionStatus) > 0 {
hasCompletionStatus = true
}
}
//When an AW entrs a system with completionstatus keep checking the AW until completed
//updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate
//on stale AWs. This has potential to improve performance at scale.
//if qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed && qj.Status.State != "" {
asm582 marked this conversation as resolved.
Show resolved Hide resolved
requeueInterval := 30 * time.Second
key, err := cache.MetaNamespaceKeyFunc(qj)
if err == nil {
asm582 marked this conversation as resolved.
Show resolved Hide resolved
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time.AfterFunc seems a better construct to use here rather than go func() + time.Sleep.

On a side node, it seems it'll be valuable to move to used a delaying queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to wake the thread up every so often, from what I understand time.AfterFunc would wake up only once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, maybe requeuing within the function passed to time.AfterFunc (possibly calling time.AfterFunc)?

for {
time.Sleep(requeueInterval)
latestAw, exists, err := cc.appwrapperInformer.Informer().GetStore().GetByKey(key)
if latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateActive && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateEnqueued && latestAw.(*arbv1.AppWrapper).Status.State != arbv1.AppWrapperStateRunningHoldCompletion {
klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.(*arbv1.AppWrapper).Name, latestAw.(*arbv1.AppWrapper).Status.State)
break //Exit the loop
}
if err == nil && exists {
asm582 marked this conversation as resolved.
Show resolved Hide resolved
// Enqueue the latest copy of the AW.
if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus {
cc.UpdateQueueJobs(latestAw.(*arbv1.AppWrapper))
klog.V(2).Infof("[Informer-addQJ] Finished requeing AW to determine completion status")
}
}
}
}()
}
//}
}

func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
Expand All @@ -1678,6 +1701,7 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
}

klog.V(6).Infof("[Informer-updateQJ] '%s/%s' *Delay=%.6f seconds normal enqueue Version=%s Status=%v", newQJ.Namespace, newQJ.Name, time.Now().Sub(newQJ.Status.ControllerFirstTimestamp.Time).Seconds(), newQJ.ResourceVersion, newQJ.Status)
notBackedoff := true
for _, cond := range newQJ.Status.Conditions {
if cond.Type == arbv1.AppWrapperCondBackoff {
//AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue.
Expand All @@ -1688,12 +1712,15 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
}
cc.enqueue(newQJ)
})
return
notBackedoff = false
}
}

// cc.eventQueue.Delete(oldObj)
cc.enqueue(newQJ)
if notBackedoff {
cc.enqueue(newQJ)
}

}

// a, b arbitrary length numerical string. returns true if a larger than b
Expand Down