Skip to content

Commit

Permalink
Merge pull request #39 from m3ngyang/refactory
Browse files Browse the repository at this point in the history
refactor the process of controlling a training job
  • Loading branch information
tizhou86 authored Sep 18, 2018
2 parents a236b71 + 6a438c4 commit d141736
Show file tree
Hide file tree
Showing 9 changed files with 822 additions and 725 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ matrix:
- curl https://glide.sh/get | bash
- sudo pip install pre-commit
script:
- rm -f .copyright.hook && wget https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/.copyright.hook
- rm -f .copyright.hook && wget https://raw.githubusercontent.com/PaddlePaddle/Paddle/develop/tools/codestyle/copyright.hook
- bash -x .tools/check_style.sh
- ln -s $GOPATH/src/github.com/PaddlePaddle $GOPATH/src/github.com/paddlepaddle
- cd $GOPATH/src/github.com/paddlepaddle/edl
Expand Down
3 changes: 2 additions & 1 deletion cmd/edl/edl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
func main() {
masterURL := flag.String("master", "", "Address of a kube master.")
kubeConfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.")
autoClean := flag.Bool("autoclean", false, "Auto clean pods after terminating job, default false")
maxLoadDesired := flag.Float64("max_load_desired", 0.97, `Keep the cluster max resource usage around
this value, jobs will scale down if total request is over this level.`)
flag.Parse()
Expand All @@ -58,7 +59,7 @@ func main() {
run := func(stop <-chan struct{}) {
log.Info("I won the leader election", "hostname", hostname)
paddleInformer := paddleinformers.NewSharedInformerFactory(paddleClient, time.Second*10)
controller := paddlecontroller.New(kubeClient, extapiClient, paddleClient, paddleInformer)
controller := paddlecontroller.New(kubeClient, extapiClient, paddleClient, paddleInformer, *autoClean)
go paddleInformer.Start(stopCh)

if controller.Run(1, *maxLoadDesired, stopCh); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/apis/paddlepaddle/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,12 @@ type TrainerJobScaleRecords struct {
type TrainingResourceType string

const (
// Master is the master name of TrainingResourceType.
Master TrainingResourceType = "MASTER"
// Pserver is the pserver name of TrainingResourceType.
Pserver TrainingResourceType = "PSERVER"
// Trainer is the trainer name of TrainingResourceType.
Trainer TrainingResourceType = "TRAINER"
// MASTER is the master name of TrainingResourceType.
MASTER TrainingResourceType = "master"
// PSERVER is the pserver name of TrainingResourceType.
PSERVER TrainingResourceType = "pserver"
// TRAINER is the trainer name of TrainingResourceType.
TRAINER TrainingResourceType = "trainer"
)

// ResourceState is the state of a type of resource
Expand Down
26 changes: 8 additions & 18 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (a *Autoscaler) totalRunningJob(jobName string) bool {
if !ok {
return false
}
up, ok := v.(*updater.TrainingJobUpdater)
up, ok := v.(*updater.JobUpdater)
if !ok {
return false
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa
nodeName := ""
// Adjust resource upon return.
defer func() {
log.Debug("scaleDryRun", "scaledown", scaleDown, "jobns", j.Namespace, "jobname", j.Name, "additional", additional)
log.Debug("scaleDryRun", "scaledown", scaleDown, "namespace", j.Namespace, "jobname", j.Name, "additional", additional)
r.GPULimit += gpuLimit * additional
r.CPURequestMilli += cpuRequestMilli * int64(additional)
r.MemoryRequestMega += memRequestMega * int64(additional)
Expand All @@ -219,6 +219,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa
plannedInstance := int(*j.Spec.Trainer.ReplicaSpec.Spec.Parallelism) + int(curDiff)
instanceMax := j.Spec.Trainer.MaxInstance
instanceMin := j.Spec.Trainer.MinInstance
log.Debug("scaleDryRun instance num", "min", instanceMin, "max", instanceMax, "planned", plannedInstance)

// TODO(typhoonzero): refine below code to remove direction
// ======================= scaleDown ======================
Expand All @@ -236,6 +237,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa
log.Debug("scaleDryRun", "gpuRequest", r.GPULimit, "threshold", gpuThreshold)
log.Debug("scaleDryRun", "cpuRequest", r.CPURequestMilli, "threshold", cpuThreshold)
log.Debug("scaleDryRun", "memRequest", r.MemoryRequestMega, "threshold", memThreshold)
log.Debug("scaleDryRun conditions", "gpuCondition", gpuCondition, "cpuCondition", cpuCondition, "memCondition", memCondition)
if gpuCondition || cpuCondition || memCondition {
if plannedInstance > instanceMin {
additional = -1
Expand Down Expand Up @@ -297,7 +299,7 @@ func scaleDryRun(r *ClusterResource, j *padv1.TrainingJob, curDiff int32, maxLoa
func (a *Autoscaler) setAdditional(diff map[string]int32) {
a.jobUpdater.Range(func(k, v interface{}) bool {
key := k.(string)
up := v.(*updater.TrainingJobUpdater)
up := v.(*updater.JobUpdater)
var additional int32
if val, ok := diff[key]; ok {
additional = val
Expand Down Expand Up @@ -349,17 +351,6 @@ func scaleAllJobsDryRun(jobs []*padv1.TrainingJob, r ClusterResource, maxLoadDes
return diff
}

func (a *Autoscaler) scaleAllJobs() {
a.jobUpdater.Range(func(k, v interface{}) bool {
up := v.(*updater.TrainingJobUpdater)
if up.Additional != 0 {
log.Info("additional of trainingjob", "jobname", k, "scalenum", up.Additional)
up.Scale()
}
return true
})
}

// Run monitors the cluster resources and training jobs in a loop,
// scales the training jobs according to the cluster resource.
func (a *Autoscaler) Run() {
Expand All @@ -381,9 +372,8 @@ func (a *Autoscaler) Run() {
a.findTrainingJobsMightBeRescheduled(havePending),
r,
a.maxLoadDesired)
log.Info("Calculated info", "diff:", diff)
log.Info("Calculated info", "diff", diff)
a.setAdditional(diff)
a.scaleAllJobs()
}
}

Expand All @@ -395,7 +385,7 @@ func (a *Autoscaler) findPendingJob() bool {
log.Debug("findPendingJob check", "jobname", k)
total := 0
pending := 0
up, ok := v.(*updater.TrainingJobUpdater)
up, ok := v.(*updater.JobUpdater)
if !ok {
log.Debug("findPendingJob conversion error", "jobname", k)
}
Expand Down Expand Up @@ -439,7 +429,7 @@ func (a *Autoscaler) findTrainingJobsMightBeRescheduled(havePending bool) (js tr
jn := k.(string)
log.Debug("findTrainingJobsMightBeRescheduled", "jobname", jn)

up, ok := v.(*updater.TrainingJobUpdater)
up, ok := v.(*updater.JobUpdater)
if !ok {
return false
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/autoscaler/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func getPodsTotalRequestsAndLimits(podList *v1.PodList) (reqs v1.ResourceList, l
func updateNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryFreeMega map[string]int64) (err error) {
for _, pod := range podList.Items {
podname := pod.Namespace + "/" + pod.Name
log.Debug("updateNodesIdleResource", "podName", podname)
log.Debug("updateNodesIdleResource", "podName", podname, "phase", pod.Status.Phase)
nodeName := pod.Spec.NodeName
if nodeName == "" {
continue
Expand Down
Loading

0 comments on commit d141736

Please sign in to comment.