Skip to content

Commit

Permalink
Add cluster id to create kafka (#1804)
Browse files Browse the repository at this point in the history
* feat: addition of descriptions and examples to dedicated

* feat: Allows the CLI to install stage env addons when terraforming customer-cloud dataplane clusters

* refactor: the default api url is now set when creating a connection instead of in the registercluster cmd file

* refactor: update some func and var names

* refactor: add cluster-id flag to create kafka cmd to allow kafka creation on CC cluster

* refactor: doc and small refactor
  • Loading branch information
dimakis authored Feb 8, 2023
1 parent 81cd99c commit 17cbcbc
Show file tree
Hide file tree
Showing 74 changed files with 1,316 additions and 351 deletions.
2 changes: 1 addition & 1 deletion docs/commands/rhoas.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions docs/commands/rhoas_dedicated.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions docs/commands/rhoas_dedicated_register-cluster.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions docs/commands/rhoas_kafka_create.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/redhat-developer/app-services-sdk-go/accountmgmt v0.3.0
github.com/redhat-developer/app-services-sdk-go/connectormgmt v0.10.0
github.com/redhat-developer/app-services-sdk-go/kafkainstance v0.11.0
github.com/redhat-developer/app-services-sdk-go/kafkamgmt v0.17.0
github.com/redhat-developer/app-services-sdk-go/kafkamgmt v0.19.0
github.com/redhat-developer/app-services-sdk-go/registryinstance v0.8.2
github.com/redhat-developer/app-services-sdk-go/registrymgmt v0.11.1
github.com/redhat-developer/app-services-sdk-go/serviceaccountmgmt v0.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,8 @@ github.com/redhat-developer/app-services-sdk-go/connectormgmt v0.10.0 h1:CURbTHI
github.com/redhat-developer/app-services-sdk-go/connectormgmt v0.10.0/go.mod h1:t3IV0eKUPgCQjoInv2l8B/NMm2OVemCxGFO/z91wsCU=
github.com/redhat-developer/app-services-sdk-go/kafkainstance v0.11.0 h1:WdwVjneugUC898RSHuc2vLwlcNgPh3oF7/fuxEEGGPg=
github.com/redhat-developer/app-services-sdk-go/kafkainstance v0.11.0/go.mod h1:yazwUm4IHuIWrQ0CCsqN0h7rHZx51nlFbYWKnUn7B84=
github.com/redhat-developer/app-services-sdk-go/kafkamgmt v0.17.0 h1:Lp0D3pF2A1VYsbFk+KE1QQ+PfdW3HySTnIuUHrvb1iQ=
github.com/redhat-developer/app-services-sdk-go/kafkamgmt v0.17.0/go.mod h1:ILvcakLEXMLZyRdO//WJZNk9fdFbnU+cM3XrBvubE64=
github.com/redhat-developer/app-services-sdk-go/kafkamgmt v0.19.0 h1:RVDEeUfBgMzAK+BCnlhfHGHp2YYW6GH6jgYOv2jwYVY=
github.com/redhat-developer/app-services-sdk-go/kafkamgmt v0.19.0/go.mod h1:ILvcakLEXMLZyRdO//WJZNk9fdFbnU+cM3XrBvubE64=
github.com/redhat-developer/app-services-sdk-go/registryinstance v0.8.2 h1:U2je87d/DIeOaQIycg2Y7TLiESmGu0/0rQC5n64Od0Y=
github.com/redhat-developer/app-services-sdk-go/registryinstance v0.8.2/go.mod h1:HkNzOWHTW/SomobQ4343+yR4oTmiyvm85BIWlsh0qbA=
github.com/redhat-developer/app-services-sdk-go/registrymgmt v0.11.1 h1:VOv3wcodQ6EpKp2RRntMMTMuQSnNv1sqLezdbv18mjs=
Expand Down
12 changes: 4 additions & 8 deletions pkg/cmd/dedicated/dedicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@ import (
"github.com/spf13/cobra"
)

// TO-DO add localizer and descriptions
func NewDedicatedCmd(f *factory.Factory) *cobra.Command {
cmd := &cobra.Command{
Use: "dedicated",
// Short: f.Localizer.MustLocalize("kafka.topic.cmd.shortDescription"),
Short: "shortDescription",
// Long: f.Localizer.MustLocalize("kafka.topic.cmd.longDescription"),
Long: "longDescription",
// Example: f.Localizer.MustLocalize("kafka.topic.cmd.example"),
Example: "example",
Use: "dedicated",
Short: f.Localizer.MustLocalize("dedicated.cmd.shortDescription"),
Long: f.Localizer.MustLocalize("dedicated.cmd.longDescription"),
Example: f.Localizer.MustLocalize("dedicated.cmd.example"),
}

cmd.AddCommand(
Expand Down
79 changes: 43 additions & 36 deletions pkg/cmd/dedicated/register/registercluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package register
import (
"context"
"fmt"
"github.com/redhat-developer/app-services-cli/internal/build"
"github.com/redhat-developer/app-services-cli/pkg/core/config"
"strings"

"github.com/AlecAivazis/survey/v2"
Expand All @@ -15,12 +17,11 @@ import (
)

type options struct {
selectedClusterId string
// clusterManagementApiUrl string
// accessToken string
clusterList []clustersmgmtv1.Cluster
selectedCluster clustersmgmtv1.Cluster
// clusterMachinePoolList clustersmgmtv1.MachinePoolList
selectedClusterId string
clusterManagementApiUrl string
accessToken string
clusterList []clustersmgmtv1.Cluster
selectedCluster clustersmgmtv1.Cluster
existingMachinePoolList []clustersmgmtv1.MachinePool
selectedClusterMachinePool clustersmgmtv1.MachinePool
requestedMachinePoolNodeCount int
Expand All @@ -32,18 +33,18 @@ type options struct {

// list of consts should come from KFM
const (
machinePoolId = "kafka-standard"
machinePoolTaintKey = "bf2.org/kafkaInstanceProfileType"
machinePoolTaintEffect = "NoExecute"
machinePoolTaintValue = "standard"
// machinePoolInstanceType = "m5.2xlarge"
machinePoolId = "kafka-standard"
machinePoolTaintKey = "bf2.org/kafkaInstanceProfileType"
machinePoolTaintEffect = "NoExecute"
machinePoolTaintValue = "standard"
machinePoolInstanceType = "r5.xlarge"
machinePoolLabelKey = "bf2.org/kafkaInstanceProfileType"
machinePoolLabelValue = "standard"
clusterReadyState = "ready"
fleetshardAddonId = "kas-fleetshard-operator"
strimziAddonId = "managed-kafka"
// clusterManagementAPIURL = "https://api.openshift.com"
fleetshardAddonIdQE = "kas-fleetshard-operator-qe"
strimziAddonIdQE = "managed-kafka-qe"
)

func NewRegisterClusterCommand(f *factory.Factory) *cobra.Command {
Expand All @@ -62,19 +63,16 @@ func NewRegisterClusterCommand(f *factory.Factory) *cobra.Command {
},
}

// TO-DO add flags
// add a flag for clustermgmt url, i.e --cluster-management-api-url, make the flag hidden, default to api.openshift.com
// supply customer mgmt access token via a flag, i.e --access-token, make the flag hidden, default to ""
flags := kafkaFlagutil.NewFlagSet(cmd, f.Localizer)
// flags.StringVar(&opts.clusterManagementApiUrl, "cluster-management-api-url", clusterManagementAPIURL, "cluster management api url")
// flags.StringVar(&opts.accessToken, "access-token", "", "access token")
// this flag will allow the user to pass the cluster id as a flag
flags.StringVar(&opts.clusterManagementApiUrl, "cluster-mgmt-api-url", "", f.Localizer.MustLocalize("dedicated.registerCluster.flag.clusterMgmtApiUrl.description"))
flags.StringVar(&opts.accessToken, "access-token", "", f.Localizer.MustLocalize("dedicated.registercluster.flag.accessToken.description"))
flags.StringVar(&opts.selectedClusterId, "cluster-id", "", f.Localizer.MustLocalize("dedicated.registerCluster.flag.clusterId.description"))

return cmd
}

func runRegisterClusterCmd(opts *options) (err error) {
// Set the base URL for the cluster management API
err = setListClusters(opts)
if err != nil {
return err
Expand Down Expand Up @@ -114,12 +112,11 @@ func runRegisterClusterCmd(opts *options) (err error) {
}

func getClusterList(opts *options) (*clustersmgmtv1.ClusterList, error) {
// ocm client connection
conn, err := opts.f.Connection()
if err != nil {
return nil, err
}
client, cc, err := conn.API().OCMClustermgmt()
client, cc, err := conn.API().OCMClustermgmt(opts.clusterManagementApiUrl, opts.accessToken)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -147,9 +144,7 @@ func setListClusters(opts *options) error {

func validateClusters(clusters *clustersmgmtv1.ClusterList, cls []clustersmgmtv1.Cluster) []clustersmgmtv1.Cluster {
for _, cluster := range clusters.Slice() {
// TO-DO the cluster must be multiAZ
if cluster.State() == clusterReadyState && cluster.MultiAZ() == true {
// if cluster.State() == clusterReadyState {
cls = append(cls, *cluster)
}
}
Expand Down Expand Up @@ -223,7 +218,7 @@ func getMachinePoolList(opts *options) (*clustersmgmtv1.MachinePoolsListResponse
if err != nil {
return nil, err
}
client, cc, err := conn.API().OCMClustermgmt()
client, cc, err := conn.API().OCMClustermgmt(opts.clusterManagementApiUrl, opts.accessToken)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -295,12 +290,11 @@ func createMachinePoolRequestForDedicated(machinePoolNodeCount int) (*clustersmg

// TO-DO this function should be moved to an ocm client / provider area
func createMachinePool(opts *options, mprequest *clustersmgmtv1.MachinePool) error {
// create a new machine pool via ocm
conn, err := opts.f.Connection()
if err != nil {
return err
}
client, cc, err := conn.API().OCMClustermgmt()
client, cc, err := conn.API().OCMClustermgmt(opts.clusterManagementApiUrl, opts.accessToken)
if err != nil {
return err
}
Expand Down Expand Up @@ -384,17 +378,12 @@ func selectAccessPrivateNetworkInteractivePrompt(opts *options) error {
Help: opts.f.Localizer.MustLocalize("dedicated.registerCluster.prompt.selectPublicNetworkAccess.help"),
Default: false,
}
accessKafkasViaPublicNetwork := false
err := survey.AskOne(prompt, &accessKafkasViaPublicNetwork)
accessFromPublicNetwork := true
err := survey.AskOne(prompt, &accessFromPublicNetwork)
if err != nil {
return err
}
if accessKafkasViaPublicNetwork {
opts.accessKafkasViaPrivateNetwork = false
} else {
opts.accessKafkasViaPrivateNetwork = true
}

opts.accessKafkasViaPrivateNetwork = !accessFromPublicNetwork
return nil
}

Expand All @@ -416,7 +405,7 @@ func createAddonWithParams(opts *options, addonId string, params *[]kafkamgmtcli
if err != nil {
return err
}
client, cc, err := conn.API().OCMClustermgmt()
client, cc, err := conn.API().OCMClustermgmt(opts.clusterManagementApiUrl, opts.accessToken)
if err != nil {
return err
}
Expand All @@ -439,6 +428,20 @@ func createAddonWithParams(opts *options, addonId string, params *[]kafkamgmtcli
return nil
}

func getStrimziAddonIdByEnv(con *config.Config) string {
if con.APIUrl == build.ProductionAPIURL {
return strimziAddonId
}
return strimziAddonIdQE
}

func getKafkaFleetShardAddonIdByEnv(con *config.Config) string {
if con.APIUrl == build.ProductionAPIURL {
return fleetshardAddonId
}
return fleetshardAddonIdQE
}

// TO-DO go through errs and make them more user friendly with actual error messages.
func registerClusterWithKasFleetManager(opts *options) error {
clusterIngressDNSName, err := parseDNSURL(opts)
Expand All @@ -465,11 +468,15 @@ func registerClusterWithKasFleetManager(opts *options) error {
if err != nil {
return err
}
err = createAddonWithParams(opts, strimziAddonId, nil)
con, err := opts.f.Config.Load()
if err != nil {
return err
}
err = createAddonWithParams(opts, getStrimziAddonIdByEnv(con), nil)
if err != nil {
return err
}
err = createAddonWithParams(opts, fleetshardAddonId, response.FleetshardParameters)
err = createAddonWithParams(opts, getKafkaFleetShardAddonIdByEnv(con), response.FleetshardParameters)
if err != nil {
return err
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/cmd/kafka/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ const (
)

type options struct {
name string
provider string
region string
size string
name string
provider string
region string
size string
clusterId string

marketplaceAcctId string
marketplace string
Expand Down Expand Up @@ -143,6 +144,7 @@ func NewCreateCommand(f *factory.Factory) *cobra.Command {
flags.BoolVarP(&opts.dryRun, "dry-run", "", false, f.Localizer.MustLocalize("kafka.create.flag.dryrun.description"))
flags.StringVar(&opts.billingModel, FlagBillingModel, "", f.Localizer.MustLocalize("kafka.create.flag.billingModel.description"))
flags.AddBypassTermsCheck(&opts.bypassChecks)
flags.StringVar(&opts.clusterId, "cluster-id", "", f.Localizer.MustLocalize("kafka.create.flag.clusterId.description"))

_ = cmd.RegisterFlagCompletionFunc(FlagProvider, func(cmd *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return GetCloudProviderCompletionValues(f)
Expand Down Expand Up @@ -237,6 +239,7 @@ func runCreate(opts *options) error {
Name: opts.name,
Region: &opts.region,
CloudProvider: &opts.provider,
ClusterId: *kafkamgmtclient.NewNullableString(&opts.clusterId),
}

if !opts.bypassChecks {
Expand Down
34 changes: 32 additions & 2 deletions pkg/core/localize/locales/en/cmd/dedicated.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ one = 'The ID of the OpenShift cluster to register:'
one = 'Select the ready cluster to register'

[dedicated.registerCluster.prompt.selectPublicNetworkAccess.message]
one = 'Would you like your Kakfas to be accessible via a public network?'
one = 'Would you like your Kafkas to be accessible via a public network?'

[dedicated.registerCluster.prompt.selectPublicNetworkAccess.help]
one = 'If you select yes, your Kafka will be accessible via a public network'
Expand All @@ -42,4 +42,34 @@ There will be N/3 streaming units in your Kafka cluster, where N is the machine
'''

[dedicated.registerCluster.info.foundValidMachinePool]
one = 'Using the valid machine pool:'
one = 'Using the valid machine pool:'

[dedicated.cmd.shortDescription]
one = 'Manage your OpenShift clusters which host your kafkas.'

[dedicated.cmd.longDescription]
one = '''
Red Hat OpenShift Streams for Apache Kafka allows you to use your own OpenShift clusters to provision your
kafkas. These Kafka instances will be managed by Red Hat OpenShift Streams for Apache Kafka.
'''

[dedicated.cmd.example]
one = '''
# Register an OpenShift cluster with Red Hat OpenShift Streams for Apache Kafka.
rhoas dedicated register-cluster
'''

[dedicated.registerCluster.kfmResponse.status.clusterAccepted]
one = '''
The cluster has been accepted. Red Hat OpenShift Streams for Apache Kafka control plane is now
terraforming your cluster for use with your Kafkas.
'''

[dedicated.registerCluster.kfmResponse.status.conflict]
one = 'The cluster has already been registered with Red Hat OpenShift Streams for Apache Kafka.'

[dedicated.registerCluster.flag.clusterMgmtApiUrl.description]
one = 'The API URL of the OpenShift Cluster Management API.'

[dedicated.registercluster.flag.accessToken.description]
one = 'The access token to use to authenticate with the OpenShift Cluster Management API.'
4 changes: 4 additions & 0 deletions pkg/core/localize/locales/en/cmd/kafka.en.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1144,3 +1144,7 @@ one = 'Kafka instance "{{.Name}}" is not ready yet'
[kafka.update.flag.owner]
description = 'Description for the --owner flag'
one = 'ID of the Kafka instance owner'

[kafka.create.flag.clusterId.description]
description = 'Description for the --cluster-id flag'
one = 'ID of the Customer-Cloud data plane cluster to create the Kafka instance on.'
4 changes: 2 additions & 2 deletions pkg/shared/connection/api/api.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package api

import (
ocmclustersmgmtv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
"net/http"
"net/url"

ocmclustersmgmtv1 "github.com/openshift-online/ocm-sdk-go/clustersmgmt/v1"
"github.com/redhat-developer/app-services-cli/pkg/api/generic"
"github.com/redhat-developer/app-services-cli/pkg/api/rbac"
"github.com/redhat-developer/app-services-cli/pkg/core/logging"
Expand All @@ -29,7 +29,7 @@ type API interface {
RBAC() rbac.RbacAPI
GenericAPI() generic.GenericAPI
GetConfig() Config
OCMClustermgmt() (*ocmclustersmgmtv1.Client, func(), error)
OCMClustermgmt(apiGateway, accessToken string) (*ocmclustersmgmtv1.Client, func(), error)
}

type Config struct {
Expand Down
Loading

0 comments on commit 17cbcbc

Please sign in to comment.