Skip to content

Commit

Permalink
Merge pull request #24 from m3ngyang/autoscaler-test
Browse files Browse the repository at this point in the history
implement trainingjob controller with autoscaler
  • Loading branch information
typhoonzero authored May 24, 2018
2 parents 72f1b9c + 1a57118 commit a236b71
Show file tree
Hide file tree
Showing 22 changed files with 1,411 additions and 1,536 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
matrix:
include:
- language: go
go: 1.8.x
go: 1.9.x
sudo: required
install:
- go get -u github.com/golang/lint/golint
Expand Down
100 changes: 78 additions & 22 deletions cmd/edl/edl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,106 @@ package main

import (
"flag"
"os"
"time"

log "github.com/inconshreveable/log15"
"github.com/wangkuiyi/candy"

"k8s.io/api/core/v1"
extcli "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

paddleclientset "github.com/paddlepaddle/edl/pkg/client/clientset/versioned"
"github.com/paddlepaddle/edl/pkg/client/clientset/versioned/scheme"
paddleinformers "github.com/paddlepaddle/edl/pkg/client/informers/externalversions"
paddlecontroller "github.com/paddlepaddle/edl/pkg/controller"
"github.com/paddlepaddle/edl/pkg/signals"
)

"github.com/paddlepaddle/edl/pkg"
edlresource "github.com/paddlepaddle/edl/pkg/resource"
var (
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
)

func main() {
kubeconfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.")
logLevel := flag.String("log_level", "info", "Log level can be debug, info, warn, error, crit.")
masterURL := flag.String("master", "", "Address of a kube master.")
kubeConfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.")
maxLoadDesired := flag.Float64("max_load_desired", 0.97, `Keep the cluster max resource usage around
this value, jobs will scale down if total request is over this level.`)
flag.Parse()

lvl, err := log.LvlFromString(*logLevel)
candy.Must(err)

log.Root().SetHandler(
log.LvlFilterHandler(lvl, log.CallerFileHandler(log.StderrHandler)),
)
stopCh := signals.SetupSignalHandler()

// Create the client config. Use kubeconfig if given, otherwise assume in-cluster.
var cfg *rest.Config
if *kubeconfig != "" {
cfg, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else {
cfg, err = rest.InClusterConfig()
}
cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeConfig)
candy.Must(err)

edlresource.RegisterResource(cfg, &edlresource.TrainingJob{}, &edlresource.TrainingJobList{})
kubeClient, err := kubernetes.NewForConfig(cfg)
candy.Must(err)

clientset, err := kubernetes.NewForConfig(cfg)
extapiClient, err := extcli.NewForConfig(cfg)
candy.Must(err)

client, err := rest.RESTClientFor(cfg)
paddleClient, err := paddleclientset.NewForConfig(cfg)
candy.Must(err)

controller, err := edl.New(client, clientset, *maxLoadDesired)
hostname, err := os.Hostname()
candy.Must(err)

controller.Run()
run := func(stop <-chan struct{}) {
log.Info("I won the leader election", "hostname", hostname)
paddleInformer := paddleinformers.NewSharedInformerFactory(paddleClient, time.Second*10)
controller := paddlecontroller.New(kubeClient, extapiClient, paddleClient, paddleInformer)
go paddleInformer.Start(stopCh)

if controller.Run(1, *maxLoadDesired, stopCh); err != nil {
log.Error("Error running paddle trainingjob controller", "error", err.Error())
return
}
}

stop := func() {
log.Error("I lost the leader election", "hostname", hostname)
return
}

leaderElectionClient, err := kubernetes.NewForConfig(rest.AddUserAgent(cfg, "leader-election"))
if err != nil {
log.Error("Error building leader election clientset", "error", err.Error())
return
}

// Prepare event clients.
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "trainingjob-controller"})

lock := &resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: "kube-system",
Name: "trainingjob-controller",
},
Client: leaderElectionClient.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: hostname,
EventRecorder: recorder,
},
}

leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: leaseDuration,
RenewDeadline: renewDuration,
RetryPeriod: retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: stop,
},
})
}
41 changes: 33 additions & 8 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package: github.com/paddlepaddle/edl
import:
- package: github.com/wangkuiyi/candy
- package: k8s.io/client-go
version: kubernetes-1.8.0
version: kubernetes-1.8.6
- package: k8s.io/kubernetes
version: release-1.8
- package: k8s.io/api
version: release-1.8
- package: github.com/inconshreveable/log15
version: v2.13
- package: github.com/wangkuiyi/candy
- package: k8s.io/code-generator
version: kubernetes-1.8.6
- package: k8s.io/apiextensions-apiserver
version: kubernetes-1.8.6
- package: github.com/inconshreveable/log15
version: v2.13
- package: github.com/golang/groupcache
subpackages:
- lru

34 changes: 34 additions & 0 deletions pkg/apis/paddlepaddle/v1/training_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package v1
import (
"encoding/json"
"fmt"

"k8s.io/apimachinery/pkg/api/resource"
)

// Elastic returns true if the job can scale to more workers.
Expand All @@ -26,7 +28,39 @@ func (s *TrainingJob) NeedGPU() bool {
return s.GPU() > 0
}

// String returns marshal string of TrainingJob
func (s *TrainingJob) String() string {
b, _ := json.MarshalIndent(s, "", " ")
return fmt.Sprintf("%s", b)
}

// TrainerGPULimit returns gpu limit of each trainer instance
func (s *TrainingJob) TrainerGPULimit() int {
q := s.Spec.Trainer.Resources.Limits.NvidiaGPU()
return int(q.Value())
}

// TrainerCPURequestMilli returns cpu request of each trainer instance
func (s *TrainingJob) TrainerCPURequestMilli() int64 {
q := s.Spec.Trainer.Resources.Requests.Cpu()
return q.ScaledValue(resource.Milli)
}

// TrainerMemRequestMega returns memory request of each trainer instance
func (s *TrainingJob) TrainerMemRequestMega() int64 {
q := s.Spec.Trainer.Resources.Requests.Memory()
return q.ScaledValue(resource.Mega)
}

// Fulfillment returns the fulfillment of a trainingjob
func (s *TrainingJob) Fulfillment() float64 {
minInstance := s.Spec.Trainer.MinInstance
maxInstance := s.Spec.Trainer.MaxInstance

if minInstance == maxInstance {
return 1
}

curInstance := int(*s.Spec.Trainer.ReplicaSpec.Spec.Parallelism)
return float64(curInstance-minInstance) / float64(maxInstance-minInstance)
}
34 changes: 31 additions & 3 deletions pkg/apis/paddlepaddle/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,42 @@ const (
TrainingJobPhaseCreating = "creating"
// TrainingJobPhaseRunning is running TrainingJobPhase.
TrainingJobPhaseRunning = "running"
// TrainingJobPhaseScaling is scaling TrainingJobPhase.
TrainingJobPhaseScaling = "scaling"
// TrainingJobPhaseSucceeded is succeeded TrainingJobPhase.
TrainingJobPhaseSucceeded = "succeeded"
// TrainingJobPhaseFailed is failed TrainingJobPhase.
TrainingJobPhaseFailed = "failed"
)

// TrainerJobScaleStatus is status of trainer jobs.
type TrainerJobScaleStatus struct {
// ScaleResults is the result of scale
type ScaleResults string

const (
// ScaleTrue means scale succeed.
ScaleTrue ScaleResults = "True"
// ScaleFalse means scale failed.
ScaleFalse ScaleResults = "False"
// ScaleUnknown means kubernetes can't decide if a scale succeed or not.
ScaleUnknown ScaleResults = "Unknown"
)

// TrainerJobScaleRecord is record of trainer jobs.
type TrainerJobScaleRecord struct {
// ScaleTimestamp is the time to scale a TrainingJob
ScaleTimestamp metav1.Time `json:"scaleTimestamp"`
// Additional is the additional the job to scale
Additional int32 `json:"additional"`
// Status is the result of the scale。
Status ScaleResults `json:"status"`
// reason is the reason for the scale failed.
// +optional
Reason string `json:"reason,omitempty"`
}

// TrainerJobScaleRecords is records of trainer jobs.
type TrainerJobScaleRecords struct {
ScaleRecords []*TrainerJobScaleRecord
}

// TrainingResourceType the type of TrainingJob resource, include MASTER PSERVER and TRAINER
Expand Down Expand Up @@ -154,7 +182,7 @@ type TrainingJobStatus struct {
Reason string `json:"reason"`
// ScaleStatus is autoscale status of trainer jobs
// TODO(ZhengQi): this will used in autoscale mode in future.
ScaleStatus TrainerJobScaleStatus `json:"scale_status"`
ScaleRecords TrainerJobScaleRecords `json:"scale_records"`
// ReplicaStatuses is detail status of resources
// TODO(ZhengQi): should we only considered trainer job now?
ReplicaStatuses []*TrainingResourceStatus `json:"replica_statuses"`
Expand Down
Loading

0 comments on commit a236b71

Please sign in to comment.