Table of Contents generated with DocToc
Authors:
- @ScorpioCPH - Penghao Cen <[email protected]>
Kubeflow community currently have tf-operator (v1alpha1) for running TensorFlow jobs on Kubernetes. And we have received some refactoring requests for API changes.
Open this file to summarize the design details and move the version of API to v1alpha2
.
- Define the structure of API
v1alpha2
.- Cover most of the refactoring requests we have discussed.
- Simplify the API definition.
- Define an
event-driven
mechanism for TFJob life-cycle management.- And use
reconciler
mechanism as a double check.
- And use
- Clarify the
error handing
logic. - Provide a
test
mechanism to verify the design and implementation.
- Notes: As we make a big change in API v1alpha2, compatibility with v1alpha1 is NOT be taken into consideration in this proposal.
The TFJob
API v1alpha2 object will have the following structure:
TFJob:
// TFJob represents the configuration of signal TFJob
type TFJob struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
metav1.ObjectMeta `json:"metadata,omitempty"`
// Specification of the desired behavior of the TFJob.
Spec TFJobSpec `json:"spec,omitempty"`
// Most recently observed status of the TFJob.
// This data may not be up to date.
// Populated by the system.
// Read-only.
Status TFJobStatus `json:"status,omitempty"`
}
TFJobSpec:
// TFJobSpec is a desired state description of the TFJob.
type TFJobSpec struct {
// TFReplicaSpecs is map of TFReplicaType and TFReplicaSpec
// specifies the TF replicas to run.
// For example,
// {
// "PS": TFReplicaSpec,
// "Worker": TFReplicaSpec,
// }
TFReplicaSpecs map[TFReplicaType]*TFReplicaSpec `json:"tfReplicaSpecs"`
}
TFReplicaSpec:
// TFReplicaSpec is a description of the TFReplica
type TFReplicaSpec struct {
// Replicas is the desired number of replicas of the given template.
// If unspecified, defaults to 1.
Replicas *int32 `json:"replicas,omitempty"`
// Template is the object that describes the pod that
// will be created for this TFReplica.
// We use RestartPolicy in PodTemplateSpec
// to describe how the containers within the pod should be restarted.
// Please set this restart policy carefully according to your code.
Template *v1.PodTemplateSpec `json:"template,omitempty"`
// Restart policy for all TFReplicas within the TFJob.
// One of Always, OnFailure, Never and ExitCode.
// Default to Always.
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"`
}
// RestartPolicy describes how the TFReplicas should be restarted.
// Only one of the following restart policies may be specified.
// If none of the following policies is specified, the default one
// is RestartPolicyAlways.
type RestartPolicy string
const (
RestartPolicyAlways RestartPolicy = "Always"
RestartPolicyOnFailure RestartPolicy = "OnFailure"
RestartPolicyNever RestartPolicy = "Never"
RestartPolicyExitCode RestartPolicy = "ExitCode"
)
TFReplicaType:
// TFReplicaType is the type for TFReplica.
type TFReplicaType string
const (
// TFReplicaTypePS is the type for parameter servers of distributed TensorFlow.
TFReplicaTypePS TFReplicaType = "PS"
// TFReplicaTypeWorker is the type for workers of distributed TensorFlow.
TFReplicaTypeWorker TFReplicaType = "Worker"
// TFReplicaTypeChief is the type for chief worker of distributed TensorFlow.
// If there is "chief" replica type, it's the "chief worker".
// Else, worker:0 is the chief worker.
TFReplicaTypeChief TFReplicaType = "Chief"
// TFReplicaTypeEval is the type for evaluation replica in TensorFlow.
TFReplicaTypeEval TFReplicaType = "Eval"
)
TFJobStatus:
// TFJobStatus represents the current observed state of the TFJob.
type TFJobStatus struct {
// TFReplicaStatuses is map of TFReplicaType and TFReplicaStatus,
// specifies the status of each TFReplica.
TFReplicaStatuses map[TFReplicaType]*TFReplicaStatus `json:"tfReplicaStatuses"`
// Represents time when the TFJob was acknowledged by the TFJob controller.
// It is not guaranteed to be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
StartTime *metav1.Time `json:"startTime,omitempty"`
// Represents time when the TFJob was completed. It is not guaranteed to
// be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
CompletionTime *metav1.Time `json:"completionTime,omitempty"`
// Represents last time when the TFJob was reconciled. It is not guaranteed to
// be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
LastReconcileTime *metav1.Time `json:"lastReconcileTime,omitempty"`
// Represents is an array of current observed TFJob conditions.
Conditions []TFJobCondition `json:"conditions"`
}
TFReplicaStatus:
// TFReplicaStatus represents the current observed state of the TFReplica.
type TFReplicaStatus struct {
// The number of actively running pods.
Active int32 `json:"active,omitempty""`
// The number of pods which reached phase Succeeded.
Succeeded int32 `json:"succeeded,omitempty"`
// The number of pods which reached phase Failed.
Failed int32 `json:"failed,omitempty"`
}
TFJobCondition:
// TFJobCondition describes the state of the TFJob at a certain point.
type TFJobCondition struct {
// Type of TFJob condition.
Type TFJobConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
Status v1.ConditionStatus `json:"status"`
// The reason for the condition's last transition.
Reason string `json:"reason,omitempty"`
// A human readable message indicating details about the transition.
Message string `json:"message,omitempty"`
// The last time this condition was updated.
LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"`
// Last time the condition transitioned from one status to another.
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
}
TFJobConditionType:
// TFJobConditionType defines all kinds of types of TFJobStatus.
type TFJobConditionType string
const (
// TFJobCreated means all sub-resources (e.g. services/pods) of this TFJob
// have been successfully created.
// But they are waiting to be scheduled and launched.
TFJobCreated TFJobConditionType = "Created"
// TFJobRunning means all sub-resources (e.g. services/pods) of this TFJob
// have been successfully scheduled and launched.
// The training is running without error.
TFJobRunning TFJobConditionType = "Running"
// TFJobRestarting means one or more sub-resources (e.g. services/pods) of this TFJob
// reached phase failed but maybe restarted according to it's restart policy
// which specified by user in v1.PodTemplateSpec.
// The training is freezing/pending.
TFJobRestarting TFJobConditionType = "Restarting"
// TFJobSucceeded means all sub-resources (e.g. services/pods) of this TFJob
// reached phase have terminated in success.
// The training is complete without error.
TFJobSucceeded TFJobConditionType = "Succeeded"
// TFJobFailed means one or more sub-resources (e.g. services/pods) of this TFJob
// reached phase failed with no restarting.
// The training has failed its execution.
TFJobFailed TFJobConditionType = "Failed"
)
The TFJob controller tf-operator
will process TFJobs and CRUD services/pods according to the spec of TFJob. It is responsible for synchronizing TFJob objects stored in the system with actual running services and pods, continuously strive to make the observed state match the desired state.
Here is the definition of TFController
:
type TFController struct {
// kubeClientset is a standard kubernetes clientset
kubeClientset kubernetes.Interface
// tfJobClientset is a clientset for CRD TFJob
tfJobClientset tfjobclient.Interface
tfJobLister listers.TFJobLister
tfJobSynced cache.InformerSynced
// for pod/service CRUD
podLister kubelisters.PodLister
podControl controller.PodControlInterface
serviceLister kubelisters.ServiceLister
// workQueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workQueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
// A TTLCache of pod creates/deletes each TFReplica expects to see
expectations controller.ControllerExpectationsInterface
}
Auto-Generated TF_CONFIG:
To make distributed TensorFlow work, user should get the distributed TensorFlow configurations TF_CONFIG
which generated by tf-operator
.
This config looks like this:
{
"cluster": {
"ps": ["ps1:2222", "ps2:2222"],
"worker": ["worker1:2222", "worker2:2222", "worker3:2222"]
},
"task": {
"type": "ps",
"index": 1
},
}
}
tf-operator
will append these auto-generated environment variables into Env
field.
Check more details from here.
User-Defined Arguments:
Other user-defined arguments can also be passed into container by Args
field in Container
struct.
First, we should follow the Event-Driven
pattern as other resource controller in kubernetes (e.g. Deployment/Job):
- Start
tfJobInformer
to listen on CRUD events of TFJob.tfJobInformer
was automatically generated from API definition byinformer-gen
script.
- Create one pair pod/service for each specify TFReplicaType + replica index in TFJob CreateHandler.
- For example, as a given TFReplicaSpec:
We will create:
{ "PS": { Replicas: 2, }, "Worker": { Replicas: 3, }, }
two
pair pods/services for PSs:- tf-job-name-ps-1-uid
- tf-job-name-ps-2-uid
three
pair pods/services for Workers:- tf-job-name-worker-1-uid
- tf-job-name-worker-2-uid
- tf-job-name-worker-3-uid
- We use a postfix
uid
to make each object name unique. - Then set these objects'
OwnerReferences
to this TFJob object.
- For example, as a given TFReplicaSpec:
- Listen on pods/services via
podInformer
andserviceInformer
.- On pod created/updated/deleted, get TFJob object by parsing
OwnerReferences
, set theTFJob.Status
as defined above according to the whole TF cluster state. - Update the
TFJob.Status.Condition
if needed.
- On pod created/updated/deleted, get TFJob object by parsing
- Terminate/Delete the TFJob object if every pod is completed (or leave pod phase as
Succeeded
).- This maybe be lead to logs and model checkpoint files unreachable.
More than that, we should provide a Reconciler
mechanism to reconcile observed and desired states and repair discrepancies as a double check for Event-Driven
mechanism.
Here is configuration
of tf-operator:
// TFControllerConfiguration contains configuration of tf-operator.
// DefaultTimerConfig is the suggested tf-operator configuration for production.
type TFControllerConfiguration struct {
// ReconcilerSyncLoopPeriod is the amount of time the reconciler sync states loop
// wait between two reconciler sync.
// It is set to 15 sec by default.
// TODO(cph): maybe we can let it grows by multiple in the future
// and up to 5 minutes to reduce idle loop.
// e.g. 15s, 30s, 60s, 120s...
ReconcilerSyncLoopPeriod metav1.Duration
}
// DefaultTFControllerConfiguration is the suggested tf-operator configuration for production.
var DefaultTFControllerConfiguration TFControllerConfiguration = TFControllerConfiguration{
ReconcilerSyncLoopPeriod: 15 * time.Second,
}
Reconciler use a ReconcilerSyncLoopPeriod
to determine whether we should call this reconciler or ignore it. We should leave a record LastReconcileTime
in TFJob object of course:
// Represents last time when the TFJob was reconciled. It is not guaranteed to
// be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
LastReconcileTime *metav1.Time `json:"lastReconcileTime,omitempty"`
As tfJobImformer
provides a forcing resync mechanism by calling UpdateFunc
which defined in ResourceEventHandlerFuncs
periodically. We can call the reconciler in this function:
- UpdateFunc return a TFJob object periodically.
- Check
LastReconcileTime
to determine whether we should trigger a reconciler call. tf-operator
will list all pods/services which related to this TFJob.- Compare the current state to the spec of this TFJob.
- Try to recovery the failed pod/service to make the training healthy.
- Error handing is described below.
- Update the status of this TFJob.
- TODO: we should call this reconciler with an exponential back-off delay (15s, 30s, 60s …) capped at 5 minutes.
To make the system robust, the tf-operator should be able to locally and automatically recover from errors.
We extend kubernetes built-in RestartPolicy
by adding new policy ExitCode
:
RestartPolicyAlways RestartPolicy = "Always"
RestartPolicyOnFailure RestartPolicy = "OnFailure"
RestartPolicyNever RestartPolicy = "Never"
RestartPolicyExitCode RestartPolicy = "ExitCode"
We let users set this field according to their model code.
- If set RestartPolicy to
OnFailure
/Always
, user should add reloading checkpoint code by themselves. - Otherwise restarting will take no effect.
ExitCode
policy means that user should add exit code by themselves, tf-operator
will check these exit codes to determine the behavior when a error occurs:
- 1-127: permanent error, do not restart.
- 128-255: retryable error, will restart the pod.
Unit Test
TBD
E2E Test
We can use this model from TensorFlow repo for e2e test.
Apart from the above, we should add these abilities in the future:
- Provide a properly mechanism to store training logs and checkpoint files.