Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scaling when using ingress-addressed nodes #692

Merged
merged 8 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/v1beta1/solrcloud_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,13 @@ func (sc *SolrCloud) GetAllSolrPodNames() []string {
if sc.Spec.Replicas != nil {
replicas = int(*sc.Spec.Replicas)
}
if int(sc.Status.Replicas) > replicas {
replicas = int(sc.Status.Replicas)
}
return sc.GetSolrPodNames(replicas)
}

func (sc *SolrCloud) GetSolrPodNames(replicas int) []string {
podNames := make([]string, replicas)
statefulSetName := sc.StatefulSetName()
for i := range podNames {
Expand Down
22 changes: 19 additions & 3 deletions controllers/solr_cluster_ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func retryNextQueuedClusterOpWithQueue(statefulSet *appsv1.StatefulSet, clusterO
return hasOp, err
}

func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, blockReconciliationOfStatefulSet bool, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
desiredPods := int(*instance.Spec.Replicas)
configuredPods := int(*statefulSet.Spec.Replicas)
if desiredPods != configuredPods {
Expand All @@ -170,11 +170,26 @@ func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudRec
Metadata: strconv.Itoa(configuredPods - 1),
}
} else if desiredPods > configuredPods && (instance.Spec.Scaling.PopulatePodsOnScaleUp == nil || *instance.Spec.Scaling.PopulatePodsOnScaleUp) {
// We need to wait for all pods to become healthy to scale up in a managed fashion, otherwise
// the balancing will skip some pods
if len(podList) < configuredPods {
// There are not enough pods, the statefulSet controller has yet to create the previously desired pods.
// Do not start the scale up until these missing pods are created.
return nil, time.Second * 5, nil
}
// If Solr nodes are advertised by their individual node services (through an ingress)
// then make sure that the host aliases are set for all desired pods before starting a scale-up.
// If the host aliases do not already include the soon-to-be created pods, then Solr might not be able to balance
// replicas onto the new hosts.
// We need to make sure that the StatefulSet is updated with these new hostAliases before the scale up occurs.
if instance.UsesIndividualNodeServices() && instance.Spec.SolrAddressability.External.UseExternalAddress {
for _, pod := range podList {
if len(pod.Spec.HostAliases) < desiredPods {
return nil, time.Second * 5, nil
}
}
}

clusterOp = &SolrClusterOp{
Operation: ScaleUpLock,
Metadata: strconv.Itoa(desiredPods),
Expand Down Expand Up @@ -349,7 +364,8 @@ func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler
}
operationComplete = true
// Only do a re-balancing for rolling restarts that migrated replicas
if updateMetadata.RequiresReplicaMigration {
// If a scale-up will occur afterwards, skip the re-balancing, because it will occur after the scale-up anyway
if updateMetadata.RequiresReplicaMigration && *instance.Spec.Replicas <= *statefulSet.Spec.Replicas {
nextClusterOp = &SolrClusterOp{
Operation: BalanceReplicasLock,
Metadata: "RollingUpdateComplete",
Expand All @@ -371,7 +387,7 @@ func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler
// We won't kill pods that we need the cluster state for, but we can kill the pods that are already not running.
// This is important for scenarios where there is a bad pod config and nothing is running, but we need to do
// a restart to get a working pod config.
state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, hasReadyPod, logger)
state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, statefulSet, hasReadyPod, logger)
if apiError != nil {
return false, true, 0, nil, apiError
} else if !retryLater {
Expand Down
71 changes: 40 additions & 31 deletions controllers/solrcloud_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
hostNameIpMap := make(map[string]string)
// Generate a service for every Node
if instance.UsesIndividualNodeServices() {
// When updating the statefulSet below, the hostNameIpMap is just used to add new IPs or modify existing ones.
// When scaling down, the hostAliases that are no longer found here will not be removed from the hostAliases in the statefulSet pod spec.
// Therefore, it should be ok that we are not reconciling the node services that will be scaled down in the future.
// This is unfortunately the reality since we don't have the statefulSet yet to determine how many Solr pods are still running,
// we just have Spec.replicas which is the requested pod count.
for _, nodeName := range solrNodeNames {
err, ip := r.reconcileNodeService(ctx, logger, instance, nodeName)
if err != nil {
Expand All @@ -161,6 +166,7 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if instance.Spec.SolrAddressability.External.UseExternalAddress {
if ip == "" {
// If we are using this IP in the hostAliases of the statefulSet, it needs to be set for every service before trying to update the statefulSet
// TODO: Make an event here
blockReconciliationOfStatefulSet = true
} else {
hostNameIpMap[instance.AdvertisedNodeHost(nodeName)] = ip
Expand Down Expand Up @@ -319,36 +325,6 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
}

extAddressabilityOpts := instance.Spec.SolrAddressability.External
if extAddressabilityOpts != nil && extAddressabilityOpts.Method == solrv1beta1.Ingress {
// Generate Ingress
ingress := util.GenerateIngress(instance, solrNodeNames)

// Check if the Ingress already exists
ingressLogger := logger.WithValues("ingress", ingress.Name)
foundIngress := &netv1.Ingress{}
err = r.Get(ctx, types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, foundIngress)
if err != nil && errors.IsNotFound(err) {
ingressLogger.Info("Creating Ingress")
if err = controllerutil.SetControllerReference(instance, ingress, r.Scheme); err == nil {
err = r.Create(ctx, ingress)
}
} else if err == nil {
var needsUpdate bool
needsUpdate, err = util.OvertakeControllerRef(instance, foundIngress, r.Scheme)
needsUpdate = util.CopyIngressFields(ingress, foundIngress, ingressLogger) || needsUpdate

// Update the found Ingress and write the result back if there are any changes
if needsUpdate && err == nil {
ingressLogger.Info("Updating Ingress")
err = r.Update(ctx, foundIngress)
}
}
if err != nil {
return requeueOrNot, err
}
}

var statefulSet *appsv1.StatefulSet

if !blockReconciliationOfStatefulSet {
Expand Down Expand Up @@ -416,6 +392,39 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if err != nil {
return requeueOrNot, err
}
if statefulSet != nil && statefulSet.Spec.Replicas != nil {
solrNodeNames = instance.GetSolrPodNames(int(*statefulSet.Spec.Replicas))
}

extAddressabilityOpts := instance.Spec.SolrAddressability.External
if extAddressabilityOpts != nil && extAddressabilityOpts.Method == solrv1beta1.Ingress {
// Generate Ingress
ingress := util.GenerateIngress(instance, solrNodeNames)

// Check if the Ingress already exists
ingressLogger := logger.WithValues("ingress", ingress.Name)
foundIngress := &netv1.Ingress{}
err = r.Get(ctx, types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, foundIngress)
if err != nil && errors.IsNotFound(err) {
ingressLogger.Info("Creating Ingress")
if err = controllerutil.SetControllerReference(instance, ingress, r.Scheme); err == nil {
err = r.Create(ctx, ingress)
}
} else if err == nil {
var needsUpdate bool
needsUpdate, err = util.OvertakeControllerRef(instance, foundIngress, r.Scheme)
needsUpdate = util.CopyIngressFields(ingress, foundIngress, ingressLogger) || needsUpdate

// Update the found Ingress and write the result back if there are any changes
if needsUpdate && err == nil {
ingressLogger.Info("Updating Ingress")
err = r.Update(ctx, foundIngress)
}
}
if err != nil {
return requeueOrNot, err
}
}

// *********************************************************
// The operations after this require a statefulSet to exist,
Expand Down Expand Up @@ -546,7 +555,7 @@ func (r *SolrCloudReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// a "locked" cluster operation
if clusterOp == nil {
_, scaleDownOpIsQueued := queuedRetryOps[ScaleDownLock]
clusterOp, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, scaleDownOpIsQueued, podList, logger)
clusterOp, retryLaterDuration, err = determineScaleClusterOpLockIfNecessary(ctx, r, instance, statefulSet, scaleDownOpIsQueued, podList, blockReconciliationOfStatefulSet, logger)

// If the new clusterOperation is an update to a queued clusterOp, just change the operation that is already queued
if clusterOp != nil {
Expand Down
24 changes: 23 additions & 1 deletion controllers/util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,29 @@ func CopyPodTemplates(from, to *corev1.PodTemplateSpec, basePath string, logger

if !DeepEqualWithNils(to.Spec.HostAliases, from.Spec.HostAliases) {
requireUpdate = true
to.Spec.HostAliases = from.Spec.HostAliases
if to.Spec.HostAliases == nil {
to.Spec.HostAliases = from.Spec.HostAliases
} else {
// Do not remove aliases that are no longer used.
// This is in case Solr is scaling down and we want to keep the old addresses for future use.
for _, fromAlias := range from.Spec.HostAliases {
found := false
for i, toAlias := range to.Spec.HostAliases {
if fromAlias.Hostnames[0] == toAlias.Hostnames[0] {
found = true
if !DeepEqualWithNils(toAlias, fromAlias) {
requireUpdate = true
to.Spec.HostAliases[i] = fromAlias
break
}
}
}
if !found {
requireUpdate = true
to.Spec.HostAliases = append(to.Spec.HostAliases, fromAlias)
}
}
}
logger.Info("Update required because field changed", "field", basePath+"Spec.HostAliases", "from", to.Spec.HostAliases, "to", from.Spec.HostAliases)
}

Expand Down
14 changes: 12 additions & 2 deletions controllers/util/solr_update_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/solr-operator/controllers/util/solr_api"
"github.com/go-logr/logr"
"github.com/robfig/cron/v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"net/url"
Expand Down Expand Up @@ -119,7 +120,7 @@ func (state NodeReplicaState) PodHasReplicas(cloud *solr.SolrCloud, podName stri
return isInClusterState && contents.replicas > 0
}

func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, hasReadyPod bool, logger logr.Logger) (state NodeReplicaState, retryLater bool, err error) {
func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, statefulSet *appsv1.StatefulSet, hasReadyPod bool, logger logr.Logger) (state NodeReplicaState, retryLater bool, err error) {
clusterResp := &solr_api.SolrClusterStatusResponse{}
overseerResp := &solr_api.SolrOverseerStatusResponse{}

Expand All @@ -138,7 +139,7 @@ func GetNodeReplicaState(ctx context.Context, cloud *solr.SolrCloud, hasReadyPod
}
}
if err == nil {
state = findSolrNodeContents(clusterResp.ClusterStatus, overseerResp.Leader, GetAllManagedSolrNodeNames(cloud))
state = findSolrNodeContents(clusterResp.ClusterStatus, overseerResp.Leader, GetManagedSolrNodeNames(cloud, int(*statefulSet.Spec.Replicas)))
} else {
logger.Error(err, "Could not fetch cluster state information for cloud")
}
Expand Down Expand Up @@ -535,6 +536,15 @@ func GetAllManagedSolrNodeNames(solrCloud *solr.SolrCloud) map[string]bool {
return allNodeNames
}

func GetManagedSolrNodeNames(solrCloud *solr.SolrCloud, currentlyConfiguredPodCount int) map[string]bool {
podNames := solrCloud.GetSolrPodNames(currentlyConfiguredPodCount)
allNodeNames := make(map[string]bool, len(podNames))
for _, podName := range podNames {
allNodeNames[SolrNodeName(solrCloud, podName)] = true
}
return allNodeNames
}

// EvictReplicasForPodIfNecessary takes a solr Pod and migrates all replicas off of that Pod.
// For updates this will only be called for pods using ephemeral data.
// For scale-down operations, this can be called for pods using ephemeral or persistent data.
Expand Down
3 changes: 3 additions & 0 deletions controllers/util/solr_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,9 @@ func GenerateStatefulSet(solrCloud *solr.SolrCloud, solrCloudStatus *solr.SolrCl
VolumeClaimTemplates: pvcs,
},
}
if solrCloud.UsesHeadlessService() {
stateful.Spec.Template.Spec.Subdomain = solrCloud.HeadlessServiceName()
}

var imagePullSecrets []corev1.LocalObjectReference

Expand Down
8 changes: 7 additions & 1 deletion helm/solr-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ annotations:
url: https://github.com/apache/solr-operator/issues/624
- name: Github PR
url: https://github.com/apache/solr-operator/pull/648
- kind: fixed
description: Avoid reset of security.json if get request fails
links:
- name: Github Issue
Expand Down Expand Up @@ -89,6 +88,13 @@ annotations:
url: https://issues.apache.org/jira/browse/SOLR-17216
- name: Github PR
url: https://github.com/apache/solr-operator/pull/698
- kind: fixed
description: SolrClouds addressed via an Ingress now scale up and down safely.
links:
- name: Github Issue
url: https://github.com/apache/solr-operator/issues/682
- name: Github PR
url: https://github.com/apache/solr-operator/pull/692
artifacthub.io/images: |
- name: solr-operator
image: apache/solr-operator:v0.9.0-prerelease
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/solrcloud_rolling_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var _ = FDescribe("E2E - SolrCloud - Rolling Upgrades", func() {
}

By("waiting for the balanceReplicas to finish")
expectStatefulSetWithChecks(ctx, solrCloud, solrCloud.StatefulSetName(), func(g Gomega, found *appsv1.StatefulSet) {
expectStatefulSetWithChecksAndTimeout(ctx, solrCloud, solrCloud.StatefulSetName(), time.Second*30, time.Second, func(g Gomega, found *appsv1.StatefulSet) {
clusterOp, err := controllers.GetCurrentClusterOp(found)
g.Expect(err).ToNot(HaveOccurred(), "Error occurred while finding clusterLock for SolrCloud")
g.Expect(clusterOp).To(BeNil(), "StatefulSet should not have a balanceReplicas lock after balancing is complete.")
Expand Down
Loading
Loading