Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor the process of controlling a training job #39

Merged
merged 5 commits into from
Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ matrix:
- curl https://glide.sh/get | bash
- sudo pip install pre-commit
script:
- rm -f .copyright.hook && wget https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/.copyright.hook
- rm -f .copyright.hook && wget https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/tools/codestyle/copyright.hook
- bash -x .tools/check_style.sh
- ln -s $GOPATH/src/github.com/PaddlePaddle $GOPATH/src/github.com/paddlepaddle
- cd $GOPATH/src/github.com/paddlepaddle/edl
Expand Down
3 changes: 2 additions & 1 deletion cmd/edl/edl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
func main() {
masterURL := flag.String("master", "", "Address of a kube master.")
kubeConfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.")
autoClean := flag.Bool("autoclean", false, "Auto clean pods after terminating job, default false")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So make this default false means user may need to get the logs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if flag autoclean is false, controller will maintain pods after success or failure. Otherwise, all pods will be deleted automatically. It's useful to debug and get logs.

maxLoadDesired := flag.Float64("max_load_desired", 0.97, `Keep the cluster max resource usage around
this value, jobs will scale down if total request is over this level.`)
flag.Parse()
Expand All @@ -58,7 +59,7 @@ func main() {
run := func(stop <-chan struct{}) {
log.Info("I won the leader election", "hostname", hostname)
paddleInformer := paddleinformers.NewSharedInformerFactory(paddleClient, time.Second*10)
controller := paddlecontroller.New(kubeClient, extapiClient, paddleClient, paddleInformer)
controller := paddlecontroller.New(kubeClient, extapiClient, paddleClient, paddleInformer, *autoClean)
go paddleInformer.Start(stopCh)

if controller.Run(1, *maxLoadDesired, stopCh); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/apis/paddlepaddle/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ type TrainerJobScaleRecords struct {
type TrainingResourceType string

const (
// Master is the master name of TrainingResourceType.
Master TrainingResourceType = "MASTER"
// Pserver is the pserver name of TrainingResourceType.
Pserver TrainingResourceType = "PSERVER"
// Trainer is the trainer name of TrainingResourceType.
Trainer TrainingResourceType = "TRAINER"
// MASTER is the master name of TrainingResourceType.
MASTER TrainingResourceType = "master"
// PSERVER is the pserver name of TrainingResourceType.
PSERVER TrainingResourceType = "pserver"
// TRAINER is the trainer name of TrainingResourceType.
TRAINER TrainingResourceType = "trainer"
)

// ResourceState is the state of a type of resource
Expand Down
26 changes: 8 additions & 18 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (a *Autoscaler) totalRunningJob(jobName string) bool {
if !ok {
return false
}
up, ok := v.(*updater.TrainingJobUpdater)
up, ok := v.(*updater.JobUpdater)
if !ok {
return false
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa
nodeName := ""
// Adjust resource upon return.
defer func() {
log.Debug("scaleDryRun", "scaledown", scaleDown, "jobns", j.Namespace, "jobname", j.Name, "additional", additional)
log.Debug("scaleDryRun", "scaledown", scaleDown, "namespace", j.Namespace, "jobname", j.Name, "additional", additional)
r.GPULimit += gpuLimit * additional
r.CPURequestMilli += cpuRequestMilli * int64(additional)
r.MemoryRequestMega += memRequestMega * int64(additional)
Expand All @@ -219,6 +219,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa
plannedInstance := int(*j.Spec.Trainer.ReplicaSpec.Spec.Parallelism) + int(curDiff)
instanceMax := j.Spec.Trainer.MaxInstance
instanceMin := j.Spec.Trainer.MinInstance
log.Debug("scaleDryRun instance num", "min", instanceMin, "max", instanceMax, "planned", plannedInstance)

// TODO(typhoonzero): refine below code to remove direction
// ======================= scaleDown ======================
Expand All @@ -236,6 +237,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa
log.Debug("scaleDryRun", "gpuRequest", r.GPULimit, "threshold", gpuThreshold)
log.Debug("scaleDryRun", "cpuRequest", r.CPURequestMilli, "threshold", cpuThreshold)
log.Debug("scaleDryRun", "memRequest", r.MemoryRequestMega, "threshold", memThreshold)
log.Debug("scaleDryRun conditions", "gpuCondition", gpuCondition, "cpuCondition", cpuCondition, "memCondition", memCondition)
if gpuCondition || cpuCondition || memCondition {
if plannedInstance > instanceMin {
additional = -1
Expand Down Expand Up @@ -297,7 +299,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa
func (a *Autoscaler) setAdditional(diff map[string]int32) {
a.jobUpdater.Range(func(k, v interface{}) bool {
key := k.(string)
up := v.(*updater.TrainingJobUpdater)
up := v.(*updater.JobUpdater)
var additional int32
if val, ok := diff[key]; ok {
additional = val
Expand Down Expand Up @@ -349,17 +351,6 @@ func scaleAllJobsDryRun(jobs []*padv1.TrainingJob, r ClusterResource, maxLoadDes
return diff
}

func (a *Autoscaler) scaleAllJobs() {
a.jobUpdater.Range(func(k, v interface{}) bool {
up := v.(*updater.TrainingJobUpdater)
if up.Additional != 0 {
log.Info("additional of trainingjob", "jobname", k, "scalenum", up.Additional)
up.Scale()
}
return true
})
}

// Run monitors the cluster resources and training jobs in a loop,
// scales the training jobs according to the cluster resource.
func (a *Autoscaler) Run() {
Expand All @@ -381,9 +372,8 @@ func (a *Autoscaler) Run() {
a.findTrainingJobsMightBeRescheduled(havePending),
r,
a.maxLoadDesired)
log.Info("Calculated info", "diff:", diff)
log.Info("Calculated info", "diff", diff)
a.setAdditional(diff)
a.scaleAllJobs()
}
}

Expand All @@ -395,7 +385,7 @@ func (a *Autoscaler) findPendingJob() bool {
log.Debug("findPendingJob check", "jobname", k)
total := 0
pending := 0
up, ok := v.(*updater.TrainingJobUpdater)
up, ok := v.(*updater.JobUpdater)
if !ok {
log.Debug("findPendingJob conversion error", "jobname", k)
}
Expand Down Expand Up @@ -439,7 +429,7 @@ func (a *Autoscaler) findTrainingJobsMightBeRescheduled(havePending bool) (js tr
jn := k.(string)
log.Debug("findTrainingJobsMightBeRescheduled", "jobname", jn)

up, ok := v.(*updater.TrainingJobUpdater)
up, ok := v.(*updater.JobUpdater)
if !ok {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/autoscaler/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func getPodsTotalRequestsAndLimits(podList *v1.PodList) (reqs v1.ResourceList, l
func updateNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryFreeMega map[string]int64) (err error) {
for _, pod := range podList.Items {
podname := pod.Namespace + "/" + pod.Name
log.Debug("updateNodesIdleResource", "podName", podname)
log.Debug("updateNodesIdleResource", "podName", podname, "phase", pod.Status.Phase)
nodeName := pod.Spec.NodeName
if nodeName == "" {
continue
Expand Down
Loading