Skip to content

Commit

Permalink
support e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Mar 4, 2024
1 parent 25ba435 commit 9c5b606
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 91 deletions.
22 changes: 11 additions & 11 deletions pkg/controller/tidbcluster/tidb_cluster_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,6 @@ func (c *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster)
}
}

// works that should be done to make the pd microservice current state match the desired state:
// - create or update the pdms service
// - create or update the pdms headless service
// - create the pdms statefulset
// - sync pdms cluster status from pdms to TidbCluster object
// - upgrade the pdms cluster
// - scale out/in the pdms cluster
if err := c.pdMSMemberManager.Sync(tc); err != nil {
return err
}

// works that should be done to make the pd cluster current state match the desired state:
// - create or update the pd service
// - create or update the pd headless service
Expand All @@ -215,6 +204,17 @@ func (c *defaultTidbClusterControl) updateTidbCluster(tc *v1alpha1.TidbCluster)
return err
}

// works that should be done to make the pd microservice current state match the desired state:
// - create or update the pdms service
// - create or update the pdms headless service
// - create the pdms statefulset
// - sync pdms cluster status from pdms to TidbCluster object
// - upgrade the pdms cluster
// - scale out/in the pdms cluster
if err := c.pdMSMemberManager.Sync(tc); err != nil {
return err
}

// works that should be done to make the tiproxy cluster current state match the desired state:
// - create or update the tiproxy service
// - create or update the tiproxy headless service
Expand Down
23 changes: 15 additions & 8 deletions pkg/manager/member/pd_ms_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,30 +59,33 @@ func NewPDMSMemberManager(dependencies *controller.Dependencies, pdMSScaler Scal

// Sync for all PD Micro Service components.
func (m *pdMSMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
if tc.Spec.PDMS == nil {
return nil
}

// Need to start PD API
if tc.Spec.PD == nil {
if tc.Spec.PDMS != nil && tc.Spec.PD == nil {
klog.Infof("PD Micro Service is enabled, but PD is not enabled, skip syncing PD Micro Service")
return nil
}
if tc.Spec.PD.Mode != "ms" {
// remove all micro service components
for _, comp := range tc.Spec.PDMS {
// remove all micro service components if PDMS is not enabled
if tc.Spec.PD != nil && tc.Spec.PD.Mode != "ms" {
for _, comp := range tc.Status.PDMS {
ns := tc.GetNamespace()
tcName := tc.GetName()
curService := comp.Name

oldPDMSSetTmp, err := m.deps.StatefulSetLister.StatefulSets(ns).Get(controller.PDMSMemberName(tcName, curService))
if err != nil && !errors.IsNotFound(err) {
if errors.IsNotFound(err) {
continue
}
return fmt.Errorf("syncPDMSStatefulSet: fail to get sts %s PDMS component %s for cluster [%s/%s], error: %s",
controller.PDMSMemberName(tcName, curService), curService, ns, tcName, err)
}

oldPDMSSet := oldPDMSSetTmp.DeepCopy()
newPDMSSet := oldPDMSSetTmp.DeepCopy()
if oldPDMSSet.Status.Replicas == 0 {
continue
}
tc.Status.PDMS[curService].Synced = true
*newPDMSSet.Spec.Replicas = 0
if err := m.scaler.Scale(tc, oldPDMSSet, newPDMSSet); err != nil {
return err
Expand All @@ -93,6 +96,10 @@ func (m *pdMSMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
return nil
}

if tc.Spec.PDMS == nil {
return nil
}

// init PD Micro Service status
if tc.Status.PDMS == nil {
tc.Status.PDMS = make(map[string]*v1alpha1.PDMSStatus)
Expand Down
14 changes: 8 additions & 6 deletions pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ func (m *tikvMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
return controller.RequeueErrorf("TidbCluster: [%s/%s], please make sure pdms is not nil, "+
"then now waiting for PD's micro service running", ns, tcName)
}
// Check if all PD Micro Services are available
for _, pdms := range tc.Spec.PDMS {
_, err = controller.GetPDMSClient(m.deps.PDControl, tc, pdms.Name)
if err != nil {
return controller.RequeueErrorf("PDMS component %s for TidbCluster: [%s/%s], "+
"waiting for PD micro service cluster running, error: %v", pdms.Name, ns, tcName, err)
if tc.Spec.PD != nil && tc.Spec.PD.Mode == "ms" {
// Check if all PD Micro Services are available
for _, pdms := range tc.Spec.PDMS {
_, err = controller.GetPDMSClient(m.deps.PDControl, tc, pdms.Name)
if err != nil {
return controller.RequeueErrorf("PDMS component %s for TidbCluster: [%s/%s], "+
"waiting for PD micro service cluster running, error: %v", pdms.Name, ns, tcName, err)
}
}
}

Expand Down
61 changes: 10 additions & 51 deletions tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,18 +684,19 @@ func (oa *OperatorActions) memberCheckContextForTC(tc *v1alpha1.TidbCluster, com
services = []string{controller.PDMemberName(name), controller.PDPeerMemberName(name)}
checkComponent = oa.isPDMembersReady
case v1alpha1.PDMSTSOMemberType, v1alpha1.PDMSSchedulingMemberType:
curService := component.String()
skip = true
for _, service := range tc.Spec.PDMS {
log.Logf("check pdms service ready, curService: %s, service.Name: %s", curService, service.Name)
if curService == service.Name {
skip = false
break
if tc.Spec.PD != nil && tc.Spec.PD.Mode == "ms" {
curService := component.String()
for _, service := range tc.Spec.PDMS {
if curService == service.Name {
skip = false
break
}
}
expectedImage = tc.PDImage()
services = []string{controller.PDMSMemberName(name, curService), controller.PDMSPeerMemberName(name, curService)}
checkComponent = oa.isPDMSMembersReady
}
expectedImage = tc.PDImage()
services = []string{controller.PDMSMemberName(name, curService), controller.PDMSPeerMemberName(name, curService)}
checkComponent = oa.isPDMSMembersReady
case v1alpha1.TiDBMemberType:
skip = tc.Spec.TiDB == nil
expectedImage = tc.TiDBImage()
Expand Down Expand Up @@ -1350,47 +1351,6 @@ func (oa *OperatorActions) eventWorker() {
// FIXME: this duplicates with WaitForTidbClusterReady in crd_test_utils.go, and all functions in it
// TODO: sync with e2e doc
func (oa *OperatorActions) WaitForTidbClusterReady(tc *v1alpha1.TidbCluster, timeout, pollInterval time.Duration) error {
if tc == nil {
return fmt.Errorf("tidbcluster is nil, cannot call WaitForTidbClusterReady")
}
var checkErr, err error
var local *v1alpha1.TidbCluster
tcID := fmt.Sprintf("%s/%s", tc.Namespace, tc.Name)
err = wait.PollImmediate(pollInterval, timeout, func() (bool, error) {
if local, err = oa.cli.PingcapV1alpha1().TidbClusters(tc.Namespace).Get(context.TODO(), tc.Name, metav1.GetOptions{}); err != nil {
checkErr = fmt.Errorf("failed to get TidbCluster: %q, %v", tcID, err)
return false, nil
}

components := []v1alpha1.MemberType{
v1alpha1.PDMemberType,
v1alpha1.TiKVMemberType,
v1alpha1.TiDBMemberType,
v1alpha1.TiDBMemberType,
v1alpha1.TiFlashMemberType,
v1alpha1.PumpMemberType,
v1alpha1.TiCDCMemberType,
}

for _, component := range components {
if err := oa.IsMembersReady(local, component); err != nil {
checkErr = fmt.Errorf("%s members for tc %q are not ready: %v", component, tcID, err)
return false, nil
}
}

log.Logf("TidbCluster %q is ready", tcID)
return true, nil
})

if err == wait.ErrWaitTimeout {
err = checkErr
}

return err
}

func (oa *OperatorActions) WaitForPDMSClusterReady(tc *v1alpha1.TidbCluster, timeout, pollInterval time.Duration) error {
if tc == nil {
return fmt.Errorf("tidbcluster is nil, cannot call WaitForTidbClusterReady")
}
Expand All @@ -1417,7 +1377,6 @@ func (oa *OperatorActions) WaitForPDMSClusterReady(tc *v1alpha1.TidbCluster, tim

for _, component := range components {
if err := oa.IsMembersReady(local, component); err != nil {
log.Logf("%s members for tc %q are not ready: %v", component, tcID, err)
checkErr = fmt.Errorf("%s members for tc %q are not ready: %v", component, tcID, err)
return false, nil
}
Expand Down
12 changes: 6 additions & 6 deletions tests/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,12 @@ var _ = ginkgo.SynchronizedBeforeSuite(func() []byte {
// only deploy MySQL and TiDB for DM if CRDs and TiDB Operator installed.
// setup upstream MySQL instances and the downstream TiDB cluster for DM testing.
// if we can only setup these resource for DM tests with something like `--focus` or `--skip`, that should be better.
if e2econfig.TestConfig.InstallDMMysql {
oa.DeployDMMySQLOrDie(tests.DMMySQLNamespace)
oa.DeployDMTiDBOrDie()
} else {
ginkgo.By("Skip installing MySQL and TiDB for DM tests")
}
// if e2econfig.TestConfig.InstallDMMysql {
// oa.DeployDMMySQLOrDie(tests.DMMySQLNamespace)
// oa.DeployDMTiDBOrDie()
// } else {
// ginkgo.By("Skip installing MySQL and TiDB for DM tests")
// }
} else {
ginkgo.By("Skip installing tidb-operator")
}
Expand Down
102 changes: 99 additions & 3 deletions tests/e2e/tidbcluster/tidbcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/onsi/gomega"
astsHelper "github.com/pingcap/advanced-statefulset/client/apis/apps/v1/helper"
asclientset "github.com/pingcap/advanced-statefulset/client/client/clientset/versioned"
"github.com/pingcap/errors"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -138,7 +139,7 @@ var _ = ginkgo.Describe("TiDBCluster", func() {

// basic deploy, scale out, scale in, change configuration tests
ginkgo.Context("[TiDBCluster: Basic]", func() {
versions := []string{utilimage.TiDBLatest}
versions := []string{utilimage.TiDBLatest, utilimage.PDMSImage}
versions = append(versions, utilimage.TiDBPreviousVersions...)
for _, version := range versions {
version := version
Expand Down Expand Up @@ -3250,7 +3251,7 @@ var _ = ginkgo.Describe("TiDBCluster", func() {
})

ginkgo.It("should scale in tc successfully", func() {
ginkgo.By("Deploy a basic tc")
ginkgo.By("Deploy a basic tc with pdms")
tc := fixture.GetTidbCluster(ns, fmt.Sprintf("basic-%s", versionDashed), version)
tc = fixture.AddPDMSForTidbCluster(tc)
tc.Spec.TiKV.Replicas = 4
Expand Down Expand Up @@ -3280,7 +3281,7 @@ var _ = ginkgo.Describe("TiDBCluster", func() {
})

ginkgo.It("should change configurations successfully", func() {
ginkgo.By("Deploy a basic tc")
ginkgo.By("Deploy a basic tc with pdms")
tc := fixture.GetTidbCluster(ns, fmt.Sprintf("basic-%s", versionDashed), version)
tc = fixture.AddPDMSForTidbCluster(tc)
tc.Spec.TiDB.Replicas = 1
Expand Down Expand Up @@ -3328,11 +3329,106 @@ var _ = ginkgo.Describe("TiDBCluster", func() {
ginkgo.By("Check custom labels and annotations changed")
framework.ExpectNoError(checkCustomLabelAndAnn(tc, c, newValue, 10*time.Minute, 10*time.Second), "failed to check labels and annotations")
})

// move pd original mode to pdms mode
ginkgo.It("should transfer pd mode successfully", func() {
ginkgo.By("Deploy a basic tc")
tc := fixture.GetTidbCluster(ns, fmt.Sprintf("basic-%s", versionDashed), version)
tc.Spec.PD.Mode = ""
tc.Spec.PDMS = nil
_, err := cli.PingcapV1alpha1().TidbClusters(tc.Namespace).Create(context.TODO(), tc, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create TidbCluster: %q", tc.Name)
err = oa.WaitForTidbClusterReady(tc, 30*time.Minute, 30*time.Second)
framework.ExpectNoError(err, "failed to wait for TidbCluster ready: %q", tc.Name)
err = crdUtil.CheckDisasterTolerance(tc)
framework.ExpectNoError(err, "failed to check disaster tolerance for TidbCluster: %q", tc.Name)

ginkgo.By("transfer to pdms")
err = controller.GuaranteedUpdate(genericCli, tc, func() error {
tc = fixture.AddPDMSForTidbCluster(tc)
return nil
})
framework.ExpectNoError(err, "failed to change pd mode of TidbCluster: %q", tc.Name)
err = oa.WaitForTidbClusterReady(tc, 10*time.Minute, 5*time.Second)
framework.ExpectNoError(err, "failed to wait for TidbCluster ready: %q", tc.Name)

ginkgo.By("transfer to original pd mode but not delete pdms")
err = controller.GuaranteedUpdate(genericCli, tc, func() error {
tc.Spec.PD.Mode = ""
return nil
})
framework.ExpectNoError(err, "failed to change pd mode of TidbCluster: %q", tc.Name)
// wait for pdms deleted
checkAllPDMSStatus(stsGetter, ns, tc.Name, 0)
ginkgo.By("check pdms deleted successfully")
err = oa.WaitForTidbClusterReady(tc, 10*time.Minute, 5*time.Second)
framework.ExpectNoError(err, "failed to wait for TidbCluster ready: %q", tc.Name)

ginkgo.By("transfer to pdms")
err = controller.GuaranteedUpdate(genericCli, tc, func() error {
tc = fixture.AddPDMSForTidbCluster(tc)
return nil
})
framework.ExpectNoError(err, "failed to change pd mode of TidbCluster: %q", tc.Name)
err = oa.WaitForTidbClusterReady(tc, 10*time.Minute, 5*time.Second)
framework.ExpectNoError(err, "failed to wait for TidbCluster ready: %q", tc.Name)

ginkgo.By("transfer to original pd mode and delete pdms")
err = controller.GuaranteedUpdate(genericCli, tc, func() error {
tc.Spec.PD.Mode = ""
tc.Spec.PDMS = nil
return nil
})
framework.ExpectNoError(err, "failed to change pd mode of TidbCluster: %q", tc.Name)
// wait for pdms deleted
checkAllPDMSStatus(stsGetter, ns, tc.Name, 0)
ginkgo.By("check pdms deleted successfully")
err = oa.WaitForTidbClusterReady(tc, 30*time.Minute, 5*time.Second)
framework.ExpectNoError(err, "failed to wait for TidbCluster ready: %q", tc.Name)
})
})
}
})
})

// checkAllPDMSStatus check there are onlineNum online pdms instance running now.
func checkAllPDMSStatus(stsGetter typedappsv1.StatefulSetsGetter, ns string, tcName string, onlineNum int32) error {
var checkErr error
err := wait.PollImmediate(5*time.Second, 10*time.Minute, func() (bool, error) {

var onlines int32
for _, component := range []string{"tso", "scheduling"} {
pdSts, err := stsGetter.StatefulSets(ns).Get(context.TODO(), controller.PDMSMemberName(tcName, "tso"), metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
return false, err
}

if err != nil {
setNotExist := errors.IsNotFound(err)
if !setNotExist && pdSts.Status.Replicas > 0 {
log.Logf("failed to check %d online pdms, onlines %d, component is %s", onlineNum, pdSts.Status.Replicas, component)
onlines++
}
} else if pdSts.Status.Replicas > 0 {
onlines++
}
}

if onlines == onlineNum {
return true, nil
}

checkErr = fmt.Errorf("failed to check %d online pdms", onlineNum)
return false, nil
})

if err == wait.ErrWaitTimeout {
err = checkErr
}

return err
}

// checkPumpStatus check there are onlineNum online pump instance running now.
func checkPumpStatus(pcli versioned.Interface, ns string, name string, onlineNum int32) error {
var checkErr error
Expand Down
5 changes: 4 additions & 1 deletion tests/e2e/util/image/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ const (
DMV2 = TiDBLatest
TiDBNGMonitoringLatest = TiDBLatest
HelperImage = "alpine:3.16.0"
PDMSImage = "nightly"
// TODO: replace this after we have `8.0.0` release
// PDMSImage = "release-7.6"
// FOR local test
PDMSImage = "v7.6.0"
)

func ListImages() []string {
Expand Down
Loading

0 comments on commit 9c5b606

Please sign in to comment.