diff --git a/internal/connector/internal/handlers/connector_admin.go b/internal/connector/internal/handlers/connector_admin.go index 7c42a62ed..6e31cc082 100644 --- a/internal/connector/internal/handlers/connector_admin.go +++ b/internal/connector/internal/handlers/connector_admin.go @@ -395,12 +395,11 @@ func (h *ConnectorAdminHandler) DeleteConnector(writer http.ResponseWriter, requ // check force flag to force deletion of connector and deployments if parseBoolParam(request.URL.Query().Get("force")) { - serviceError = h.ConnectorsService.ForceDelete(request.Context(), connectorId) + return nil, h.ConnectorsService.ForceDelete(request.Context(), connectorId) } else { ctx := request.Context() return nil, HandleConnectorDelete(ctx, h.ConnectorsService, h.NamespaceService, connectorId) } - return nil, serviceError }, } @@ -605,6 +604,7 @@ func (h *ConnectorAdminHandler) PatchConnectorDeployment(writer http.ResponseWri // Handle the fields that support being updated... var updatedDeployment dbapi.ConnectorDeployment updatedDeployment.ID = existingDeployment.ID + updatedDeployment.Version = existingDeployment.Version if len(resource.Spec.ShardMetadata) != 0 { // channel update updateRevision, err := workers.GetShardMetadataRevision(resource.Spec.ShardMetadata) diff --git a/internal/connector/internal/handlers/connector_validation.go b/internal/connector/internal/handlers/connector_validation.go index d0b5a97be..e5afa096b 100644 --- a/internal/connector/internal/handlers/connector_validation.go +++ b/internal/connector/internal/handlers/connector_validation.go @@ -2,9 +2,10 @@ package handlers import ( "context" + "strings" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi" "k8s.io/apimachinery/pkg/util/validation" - "strings" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/arrays" @@ -36,7 +37,7 @@ func connectorValidationFunction(connectorTypesService services.ConnectorTypesSe return func() *errors.ServiceError { ct, err := connectorTypesService.Get(*connectorTypeId) if err != nil { - return errors.BadRequest("YYY invalid connector type id %v : %s", connectorTypeId, err) + return errors.BadRequest("Invalid connector type id %v : %s", connectorTypeId, err) } if !arrays.Contains(ct.ChannelNames(), string(*channel)) { diff --git a/internal/connector/internal/services/connector_cluster.go b/internal/connector/internal/services/connector_cluster.go index edf2dfe6d..df921e3fe 100644 --- a/internal/connector/internal/services/connector_cluster.go +++ b/internal/connector/internal/services/connector_cluster.go @@ -396,11 +396,20 @@ func (k *connectorClusterService) GetClientID(clusterID string) (string, error) // SaveDeployment creates a connector deployment in the database func (k *connectorClusterService) SaveDeployment(ctx context.Context, resource *dbapi.ConnectorDeployment) *errors.ServiceError { dbConn := k.connectionFactory.New() + if resource.Version != 0 { + dbConn = dbConn.Where("version = ?", resource.Version) + } - if err := dbConn.Save(resource).Error; err != nil { + saveResult := dbConn.Save(resource) + if err := saveResult.Error; err != nil { return services.HandleCreateError(`Connector deployment`, err) } + if saveResult.RowsAffected == 0 { + return errors.Conflict("Optimistic locking: resource version changed from %v", resource.Version) + } + //refresh version + dbConn = k.connectionFactory.New() if err := dbConn.Where("id = ?", resource.ID).Select("version").First(&resource).Error; err != nil { return services.HandleGetError(`Connector deployment`, "id", resource.ID, err) } @@ -416,11 +425,20 @@ func (k *connectorClusterService) SaveDeployment(ctx context.Context, resource * func (k *connectorClusterService) UpdateDeployment(resource *dbapi.ConnectorDeployment) *errors.ServiceError { dbConn := k.connectionFactory.New() + + if resource.Version != 0 { + dbConn = dbConn.Where("version = ?", resource.Version) + } + updates := dbConn.Where("id = ?", resource.ID). Updates(resource) if err := updates.Error; err != nil { - return services.HandleUpdateError(`Connector namespace`, err) + return services.HandleUpdateError(`Connector deployment`, err) + } + if updates.RowsAffected == 0 { + return errors.Conflict("Optimistic locking: resource version changed from %v", resource.Version) } + return nil } @@ -494,7 +512,7 @@ func (k *connectorClusterService) ListConnectorDeployments(ctx context.Context, func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Context, deploymentStatus dbapi.ConnectorDeploymentStatus) *errors.ServiceError { dbConn := k.connectionFactory.New() - // lets get the connector id of the deployment.. + // let's get the connector id of the deployment... deployment := dbapi.ConnectorDeployment{} if err := dbConn.Unscoped().Select("connector_id", "deleted_at"). Where("id = ?", deploymentStatus.ID). @@ -505,7 +523,7 @@ func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Co return services.HandleGoneError("Connector deployment", "id", deploymentStatus.ID) } - if err := dbConn.Model(&deploymentStatus).Where("id = ? and version <= ?", deploymentStatus.ID, deploymentStatus.Version).Save(&deploymentStatus).Error; err != nil { + if err := dbConn.Where("id = ? and version <= ?", deploymentStatus.ID, deploymentStatus.Version).Save(&deploymentStatus).Error; err != nil { return errors.Conflict("failed to update deployment status: %s, probably a stale deployment status version was used: %d", err.Error(), deploymentStatus.Version) } @@ -531,8 +549,8 @@ func (k *connectorClusterService) UpdateConnectorDeploymentStatus(ctx context.Co } } - // update the connector status - if err := dbConn.Where("id = ?", deployment.ConnectorID).Updates(&connectorStatus).Error; err != nil { + // update the connector status, don't updated deleted statues + if err := dbConn.Where("deleted_at IS NULL AND id = ?", deployment.ConnectorID).Updates(&connectorStatus).Error; err != nil { return services.HandleUpdateError("Connector status", err) } @@ -564,7 +582,7 @@ func (k *connectorClusterService) FindAvailableNamespace(owner string, orgID str func (k *connectorClusterService) GetDeploymentByConnectorId(ctx context.Context, connectorID string) (resource dbapi.ConnectorDeployment, serr *errors.ServiceError) { - if err := k.connectionFactory.New().Preload(clause.Associations). + if err := k.connectionFactory.New(). Joins("Status").Joins("ConnectorShardMetadata").Joins("Connector"). Where("connector_id = ?", connectorID).First(&resource).Error; err != nil { return resource, services.HandleGetError("Connector deployment", "connector_id", connectorID, err) diff --git a/internal/connector/internal/services/connector_namespaces.go b/internal/connector/internal/services/connector_namespaces.go index 6098e14c4..22261c189 100644 --- a/internal/connector/internal/services/connector_namespaces.go +++ b/internal/connector/internal/services/connector_namespaces.go @@ -3,11 +3,12 @@ package services import ( "context" "fmt" - "gorm.io/gorm/clause" "math/rand" "strings" "time" + "gorm.io/gorm/clause" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/profiles" "reflect" @@ -214,12 +215,14 @@ func (k *connectorNamespaceService) setConnectorsDeployed(namespaces dbapi.Conne Id string Count int32 }, 0) - if err := k.connectionFactory.New().Model(&dbapi.ConnectorDeployment{}). - Select("namespace_id as id, count(*) as count"). - Group("namespace_id"). - Where("namespace_id in ?", ids). - Find(&result).Error; err != nil { - return services.HandleGetError(`Connector namespace`, `id`, ids, err) + if err := k.connectionFactory.New(). + Table("connector_namespaces"). + Joins("left join connector_deployments on connector_namespaces.id = namespace_id"). + Select("connector_namespaces.id as id, count(*) as count"). + Group("connector_namespaces.id"). + Where("namespace_id is not null and connector_deployments.deleted_at is null and connector_namespaces.id in ?", ids). + Scan(&result).Error; err != nil { + return services.HandleGetError(`Connector namespace deployment count`, `namespaces ids`, ids, err) } // set counts for non-empty ns diff --git a/internal/connector/internal/services/connectors.go b/internal/connector/internal/services/connectors.go index 18e379948..57f0af058 100644 --- a/internal/connector/internal/services/connectors.go +++ b/internal/connector/internal/services/connectors.go @@ -4,10 +4,11 @@ import ( "context" "encoding/base64" "fmt" - "gorm.io/gorm/clause" "regexp" "strings" + "gorm.io/gorm/clause" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/internal/connector/internal/api/dbapi" @@ -35,7 +36,7 @@ type ConnectorsService interface { Update(ctx context.Context, resource *dbapi.Connector) *errors.ServiceError SaveStatus(ctx context.Context, resource dbapi.ConnectorStatus) *errors.ServiceError Delete(ctx context.Context, id string) *errors.ServiceError - ForEach(f func(*dbapi.Connector) *errors.ServiceError, query string, args ...interface{}) []error + ForEach(f func(*dbapi.Connector) *errors.ServiceError, joins string, query string, args ...interface{}) []error ForceDelete(ctx context.Context, id string) *errors.ServiceError ResolveConnectorRefsWithBase64Secrets(resource *dbapi.Connector) (bool, *errors.ServiceError) @@ -351,16 +352,21 @@ func (k *connectorsService) Update(ctx context.Context, resource *dbapi.Connecto return nil } -func (k *connectorsService) SaveStatus(ctx context.Context, resource dbapi.ConnectorStatus) *errors.ServiceError { +func (k *connectorsService) SaveStatus(ctx context.Context, connectorStatus dbapi.ConnectorStatus) *errors.ServiceError { dbConn := k.connectionFactory.New() - if err := dbConn.Model(resource).Save(resource).Error; err != nil { + if err := dbConn.Where("deleted_at IS NULL").Save(&connectorStatus).Error; err != nil { return errors.GeneralError("failed to update: %s", err.Error()) } return nil } -func (k *connectorsService) ForEach(f func(*dbapi.Connector) *errors.ServiceError, query string, args ...interface{}) []error { +func (k *connectorsService) ForEach(f func(*dbapi.Connector) *errors.ServiceError, joins string, query string, args ...interface{}) []error { dbConn := k.connectionFactory.New() + + if joins != "" { + dbConn = dbConn.Joins(joins) + } + rows, err := dbConn. Model(&dbapi.Connector{}). Where(query, args...). diff --git a/internal/connector/internal/workers/connector_mgr.go b/internal/connector/internal/workers/connector_mgr.go index f2864e052..bca190673 100644 --- a/internal/connector/internal/workers/connector_mgr.go +++ b/internal/connector/internal/workers/connector_mgr.go @@ -26,7 +26,6 @@ type ConnectorManager struct { connectorClusterService services.ConnectorClusterService connectorTypesService services.ConnectorTypesService vaultService vault.VaultService - lastVersion int64 db *db.ConnectionFactory ctx context.Context } @@ -79,26 +78,25 @@ func (k *ConnectorManager) Reconcile() []error { } // reconcile assigning connectors in "ready" desired state with "assigning" phase and a valid namespace id - k.doReconcile(&errs, "assigning", k.reconcileAssigning, + k.doReconcile(&errs, "assigning", k.reconcileAssigning, "", "desired_state = ? AND phase = ? AND connectors.namespace_id IS NOT NULL", dbapi.ConnectorReady, dbapi.ConnectorStatusPhaseAssigning) // reconcile unassigned connectors in "unassigned" desired state and "deleted" phase - k.doReconcile(&errs, "unassigned", k.reconcileUnassigned, + k.doReconcile(&errs, "unassigned", k.reconcileUnassigned, "", "desired_state = ? AND phase = ?", dbapi.ConnectorUnassigned, dbapi.ConnectorStatusPhaseDeleted) // reconcile deleting connectors with no deployments - k.doReconcile(&errs, "deleting", k.reconcileDeleting, + k.doReconcile(&errs, "deleting", k.reconcileDeleting, "", "desired_state = ? AND phase = ?", dbapi.ConnectorDeleted, dbapi.ConnectorStatusPhaseDeleting) // reconcile deleted connectors with no deployments - k.doReconcile(&errs, "deleted", k.reconcileDeleted, + k.doReconcile(&errs, "deleted", k.reconcileDeleted, "", "desired_state = ? AND phase IN ?", dbapi.ConnectorDeleted, []string{string(dbapi.ConnectorStatusPhaseAssigning), string(dbapi.ConnectorStatusPhaseDeleted)}) // reconcile connector updates for assigned connectors that aren't being deleted... - k.doReconcile(&errs, "updated", k.reconcileConnectorUpdate, - "version > ? AND phase NOT IN ?", k.lastVersion, - []string{string(dbapi.ConnectorStatusPhaseAssigning), string(dbapi.ConnectorStatusPhaseDeleting), string(dbapi.ConnectorStatusPhaseDeleted)}) + k.doReconcile(&errs, "updated", k.reconcileConnectorUpdate, "LEFT JOIN connector_deployments ON connectors.id = connector_deployments.connector_id", + "connectors.version <> connector_deployments.connector_version AND phase NOT IN ?", []string{string(dbapi.ConnectorStatusPhaseAssigning), string(dbapi.ConnectorStatusPhaseDeleting), string(dbapi.ConnectorStatusPhaseDeleted)}) return errs } @@ -248,21 +246,10 @@ func (k *ConnectorManager) reconcileConnectorUpdate(ctx context.Context, connect } } - if cerr := db.AddPostCommitAction(ctx, func() { - k.lastVersion = connector.Version - }); cerr != nil { - glog.Errorf("Failed to AddPostCommitAction to save lastVersion %d: %v", connector.Version, cerr.Error()) - if err == nil { - err = cerr - } else { - err = errors.Errorf("Multiple errors in reconciling connector %s: %s; %s", connector.ID, err, cerr) - } - } - return err } -func (k *ConnectorManager) doReconcile(errs *[]error, reconcilePhase string, reconcileFunc func(ctx context.Context, connector *dbapi.Connector) error, query string, args ...interface{}) { +func (k *ConnectorManager) doReconcile(errs *[]error, reconcilePhase string, reconcileFunc func(ctx context.Context, connector *dbapi.Connector) error, joins string, query string, args ...interface{}) { var count int64 var serviceErrs []error glog.V(5).Infof("Reconciling %s connectors...", reconcilePhase) @@ -276,7 +263,7 @@ func (k *ConnectorManager) doReconcile(errs *[]error, reconcilePhase string, rec count++ return nil }) - }, query, args...); len(serviceErrs) > 0 { + }, joins, query, args...); len(serviceErrs) > 0 { *errs = append(*errs, serviceErrs...) } if count == 0 && len(serviceErrs) == 0 { diff --git a/internal/connector/test/integration/cucumber_steps.go b/internal/connector/test/integration/cucumber_steps.go index 4ec4055b8..cd6503e1e 100644 --- a/internal/connector/test/integration/cucumber_steps.go +++ b/internal/connector/test/integration/cucumber_steps.go @@ -7,6 +7,7 @@ import ( "net/url" "time" + "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/errors" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared" "github.com/bf2fc6cc711aee1a0c2a/kas-fleet-manager/pkg/shared/utils/arrays" "github.com/golang/glog" @@ -38,7 +39,7 @@ func (s *extender) iResetTheVaultCounters() error { } func (s *extender) theVaultDeleteCounterShouldBe(expected int64) error { - // we can only check the delete count on the TmpVault service impl... + // we can only check delete count on the TmpVault service impl... var service vault.VaultService if err := s.Suite.Helper.Env.ServiceContainer.Resolve(&service); err != nil { return err @@ -106,19 +107,27 @@ func (s *extender) getAndStoreAccessTokenUsingTheAddonParameterResponseAs(as str const clientIdList = "_client_id_list" -func (s *extender) deleteKeycloakClients(sc *godog.Scenario, err error) { +func (s *extender) deleteKeycloakClients() error { if clientIds, ok := s.Variables[clientIdList].([]string); ok { env := s.Suite.Helper.Env var keycloakService sso.KafkaKeycloakService env.MustResolve(&keycloakService) + var collectedErrors errors.ErrorList for _, clientID := range clientIds { if err := keycloakService.DeleteServiceAccountInternal(clientID); err != nil { glog.Errorf("Error deleting keycloak client with clientId %s: %s", clientID, err) + collectedErrors.AddErrors(err) } } + + if !collectedErrors.IsEmpty() { + return collectedErrors + } + } + return nil } func (s *extender) rememberKeycloakClientForCleanup(clientID string) error { @@ -201,8 +210,7 @@ func init() { ctx.Step(`^I delete or deprecate types not in latest connector catalog$`, e.iDeleteOrDeprecateRemovedTypes) ctx.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) { - e.deleteKeycloakClients(sc, err) - return ctx, err + return ctx, e.deleteKeycloakClients() }) }) } diff --git a/internal/connector/test/integration/feature_test.go b/internal/connector/test/integration/feature_test.go index 8cdf8b1ea..98612a651 100644 --- a/internal/connector/test/integration/feature_test.go +++ b/internal/connector/test/integration/feature_test.go @@ -116,17 +116,15 @@ func TestFeatures(t *testing.T) { testName = strings.ReplaceAll(testName, "-", "_") t.Run(testName, func(t *testing.T) { - // To preserve the current behavior, the test are market to be "safely" run in parallel, however - // we may think to introduce a new naming convention i.e. files that ends with _parallel would - // cause t.Parallel() to be invoked, other tests won't, so they won't be executed concurrently. - // - // This could help reducing/removing the need of explicit lock - t.Parallel() + // The assumption is that features run sequentially and scenario in a feature runs concurrently + // Running features in parallel brakes locking steps and test users session mechanisms + //t.Parallel() o := opts o.TestingT = t o.Paths = []string{path.Join(root, info.Name())} - //o.Randomize = -1 + o.Randomize = -1 + o.StopOnFailure = true _, exists := os.LookupEnv("GODOG_NO_COLORS") if exists { o.NoColors = true @@ -135,7 +133,7 @@ func TestFeatures(t *testing.T) { s := cucumber.NewTestSuite(helper) status := godog.TestSuite{ - Name: "connectors", + Name: "connectors-" + testName, Options: &o, ScenarioInitializer: s.InitializeScenario, }.Run() diff --git a/internal/connector/test/integration/features/connector-agent-api.feature b/internal/connector/test/integration/features/connector-agent-api.feature index b21dbb733..5dd7e984a 100644 --- a/internal/connector/test/integration/features/connector-agent-api.feature +++ b/internal/connector/test/integration/features/connector-agent-api.feature @@ -15,6 +15,8 @@ Feature: connector agent API Given an admin user named "Ricky Bobby" with roles "cos-fleet-manager-admin-full" Given an admin user named "Cal Naughton Jr." with roles "cos-fleet-manager-admin-write" Given an admin user named "Carley Bobby" with roles "cos-fleet-manager-admin-read" + Given an org admin user named "Org admin user1" + Given a user named "Shard1" Scenario: connector cluster is created and agent processes assigned a deployment. Given I am logged in as "Jimmy" @@ -218,7 +220,7 @@ Feature: connector agent API Then the response code should be 200 And the response header "Content-Type" should match "application/json;stream=watch" - Given I wait up to "2" seconds for a response event + Given I wait up to "5" seconds for a response event # yeah.. this event is kinda ugly.. upgrading openapi-generator to 5.0.1 should help us omit more fields. Then the response should match json: """ @@ -1406,18 +1408,7 @@ Feature: connector agent API # Should work for Cal, who can write Given I am logged in as "Cal Naughton Jr." Given I set the "Content-Type" header to "application/merge-patch+json" - When I PATCH path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments/${upgradable_deployment_id}" with json body: - """ - { - "spec": { - "shard_metadata": { - "connector_revision": ${shard_metadata_latest_revision} - } - } - } - """ - Then the response code should be 202 - And the response should match json: + When I wait up to "30" seconds for a PATCH on path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments/${upgradable_deployment_id}" with json body: "{ "spec": { "shard_metadata": { "connector_revision": ${shard_metadata_latest_revision} } } }" response code to match "202" and response to match json: """ { "href": "${response.href}", @@ -1650,16 +1641,7 @@ Feature: connector agent API # Upgrade by operator Given I set the "Content-Type" header to "application/merge-patch+json" - When I PATCH path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments/${upgradable_deployment_id}" with json body: - """ - { - "spec": { - "operator_id": "${available_operator_id}" - } - } - """ - Then the response code should be 202 - And the response should match json: + When I wait up to "30" seconds for a PATCH on path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments/${upgradable_deployment_id}" with json body: "{ "spec": { "operator_id": "${available_operator_id}" } }" response code to match "202" and response to match json: """ { "href": "${response.href}", @@ -1866,9 +1848,7 @@ Feature: connector agent API # api admin should be able to see new connector deployment Given I am logged in as "Ricky Bobby" - When I wait up to "10" seconds for a GET on path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "2" - Then I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments" - And the ".total" selection from the response should match "2" + When I wait up to "30" seconds for a GET on path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "2" # soft delete connector using admin API When I DELETE path "/v1/admin/kafka_connectors/${forced_connector_id}" @@ -1936,9 +1916,7 @@ Feature: connector agent API When I DELETE path "/v1/admin/kafka_connector_namespaces/${forced_namespace_id}?force=true" Then the response code should be 204 # deleted namespace should get reconciled - When I wait up to "15" seconds for a GET on path "/v1/admin/kafka_connector_namespaces/${forced_namespace_id}" response code to match "410" - When I GET path "/v1/admin/kafka_connector_namespaces/${forced_namespace_id}" - Then the response code should be 410 + When I wait up to "30" seconds for a GET on path "/v1/admin/kafka_connector_namespaces/${forced_namespace_id}" response code to match "410" # Bobby should be able to delete cluster as an org admin Given I am logged in as "Bobby" @@ -1972,7 +1950,7 @@ Feature: connector agent API # Connectors that were assigning the cluster get updated to not refer to them. Given I am logged in as "Jimmy" - When I wait up to "10" seconds for a GET on path "/v1/kafka_connectors/${connector_id}" response ".namespace_id" selection to match "" + When I wait up to "30" seconds for a GET on path "/v1/kafka_connectors/${connector_id}" response ".namespace_id" selection to match "" Then I GET path "/v1/kafka_connectors/${connector_id}" And the response code should be 200 And the ".desired_state" selection from the response should match "unassigned" @@ -1992,16 +1970,13 @@ Feature: connector agent API When I DELETE path "/v1/admin/kafka_connectors/${connector_id}" Then the response code should be 204 # unassigned connector should get reconciled and deleted - When I wait up to "10" seconds for a GET on path "/v1/admin/kafka_connectors/${connector_id}" response code to match "410" - Then I GET path "/v1/admin/kafka_connectors/${connector_id}" - And the response code should be 410 + When I wait up to "30" seconds for a GET on path "/v1/admin/kafka_connectors/${connector_id}" response code to match "410" - - Scenario: Bobby can stop and start an existing connector - Given I am logged in as "Bobby" + Scenario: Org admin user1 can stop and start an existing connector + Given I am logged in as "Org admin user1" #--------------------------------------------------------------------------------------------- - # Create a target cluster, and get the shard access token, and connect it using the Shard user + # Create a target cluster, and get the shard access token, and connect it using the Shard1 user # -------------------------------------------------------------------------------------------- When I POST path "/v1/kafka_connector_clusters" with json body: """ @@ -2020,7 +1995,7 @@ Feature: connector agent API And get and store access token using the addon parameter response as ${shard_token} and clientID as ${clientID} And I remember keycloak client for cleanup with clientID: ${clientID} - Given I am logged in as "Shard" + Given I am logged in as "Shard1" Given I set the "Authorization" header to "Bearer ${shard_token}" When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/status" with json body: """ @@ -2055,14 +2030,11 @@ Feature: connector agent API And the response should match "" # verify that initially the namespace status includes connectors_deployed=0 - Given I am logged in as "Bobby" - When I wait up to "10" seconds for a GET on path "/v1/kafka_connector_namespaces/${connector_namespace_id}" response ".status.connectors_deployed" selection to match "0" - Then I GET path "/v1/kafka_connector_namespaces/${connector_namespace_id}" - And the response code should be 200 - And the ".status.connectors_deployed" selection from the response should match "0" + Given I am logged in as "Org admin user1" + When I wait up to "30" seconds for a GET on path "/v1/kafka_connector_namespaces/${connector_namespace_id}" response ".status.connectors_deployed" selection to match "0" # agent should be able to post individual namespace status - Given I am logged in as "Shard" + Given I am logged in as "Shard1" Given I set the "Authorization" header to "Bearer ${shard_token}" When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/namespaces/${connector_namespace_id}/status" with json body: """ @@ -2086,7 +2058,7 @@ Feature: connector agent API #--------------------------------------------------------------------------------------------- # Create a connector # -------------------------------------------------------------------------------------------- - Given I am logged in as "Bobby" + Given I am logged in as "Org admin user1" When I POST path "/v1/kafka_connectors?async=true" with json body: """ { @@ -2114,19 +2086,14 @@ Feature: connector agent API Given I store the ".id" selection from the response as ${connector_id} # verify that the namespace status includes connectors_deployed=1 - When I wait up to "10" seconds for a GET on path "/v1/kafka_connector_namespaces/${connector_namespace_id}" response ".status.connectors_deployed" selection to match "1" - Then I GET path "/v1/kafka_connector_namespaces/${connector_namespace_id}" - And the response code should be 200 - And the ".status.connectors_deployed" selection from the response should match "1" + When I wait up to "30" seconds for a GET on path "/v1/kafka_connector_namespaces/${connector_namespace_id}" response ".status.connectors_deployed" selection to match "1" #----------------------------------------------------------------------------------------------------------------- - # Shard waits for the deployment, marks it ready, Bobby waits to see ready status. + # Shard1 waits for the deployment, marks it ready, Org admin user1 waits to see ready status. #----------------------------------------------------------------------------------------------------------------- - Given I am logged in as "Shard" + Given I am logged in as "Shard1" Given I set the "Authorization" header to "Bearer ${shard_token}" - Given I wait up to "10" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "1" - When I GET path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" - Then the ".total" selection from the response should match "1" + Given I wait up to "30" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "1" Given I store the ".items[0].id" selection from the response as ${connector_deployment_id} When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}/status" with json body: """ @@ -2137,10 +2104,8 @@ Feature: connector agent API """ Then the response code should be 204 - Given I am logged in as "Bobby" - Given I wait up to "10" seconds for a GET on path "/v1/kafka_connectors/${connector_id}" response ".status" selection to match "ready" - When I GET path "/v1/kafka_connectors/${connector_id}" - Then the ".status.state" selection from the response should match "ready" + Given I am logged in as "Org admin user1" + Given I wait up to "30" seconds for a GET on path "/v1/kafka_connectors/${connector_id}" response ".status.state" selection to match "ready" # verify user can search for connector using name, state, and ilike When I GET path "/v1/kafka_connectors/?search=id+like+${connector_id}+and+name+ilike+Example%25+and+state+ilike+Ready&orderBy=id%2Cnamespace_id%2Cdesired_state%2Cstate+asc%2Cupdated_at+desc" @@ -2149,7 +2114,7 @@ Feature: connector agent API And the ".items[0].namespace_id" selection from the response should match "${connector_namespace_id}" #----------------------------------------------------------------------------------------------------------------- - # Bobby sets desired state to stopped.. Agent sees deployment stopped, it updates status to stopped,, Bobby then see stopped status + # Org admin user1 sets desired state to stopped.. Agent sees deployment stopped, it updates status to stopped,, Org admin user1 then see stopped status #----------------------------------------------------------------------------------------------------------------- # Updating the connector config should update the deployment. Given I set the "Content-Type" header to "application/merge-patch+json" @@ -2161,11 +2126,9 @@ Feature: connector agent API """ Then the response code should be 202 - Given I am logged in as "Shard" + Given I am logged in as "Shard1" Given I set the "Authorization" header to "Bearer ${shard_token}" - Given I wait up to "10" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}" response ".spec.desired_state" selection to match "stopped" - When I GET path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}" - Then the ".spec.desired_state" selection from the response should match "stopped" + Given I wait up to "30" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}" response ".spec.desired_state" selection to match "stopped" When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}/status" with json body: """ { @@ -2175,12 +2138,12 @@ Feature: connector agent API """ Then the response code should be 204 - Given I am logged in as "Bobby" + Given I am logged in as "Org admin user1" When I GET path "/v1/kafka_connectors/${connector_id}" Then the ".status.state" selection from the response should match "stopped" #----------------------------------------------------------------------------------------------------------------- - # Bobby sets desired state to ready.. Agent sees new deployment + # Org admin user1 sets desired state to ready.. Agent sees new deployment #----------------------------------------------------------------------------------------------------------------- Given I set the "Content-Type" header to "application/merge-patch+json" When I PATCH path "/v1/kafka_connectors/${connector_id}" with json body: @@ -2191,11 +2154,9 @@ Feature: connector agent API """ Then the response code should be 202 - Given I am logged in as "Shard" + Given I am logged in as "Shard1" Given I set the "Authorization" header to "Bearer ${shard_token}" - Given I wait up to "10" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "1" - When I GET path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" - Then the ".total" selection from the response should match "1" + Given I wait up to "30" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "1" And the ".items[0].spec.desired_state" selection from the response should match "ready" When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}/status" with json body: @@ -2210,7 +2171,7 @@ Feature: connector agent API #----------------------------------------------------------------------------------------------------------------- # Agent try to update status with a stale version and get 500 #----------------------------------------------------------------------------------------------------------------- - Given I am logged in as "Shard" + Given I am logged in as "Shard1" Given I set the "Authorization" header to "Bearer ${shard_token}" When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}/status" with json body: """ @@ -2220,19 +2181,17 @@ Feature: connector agent API } """ Then the response code should be 409 - + #----------------------------------------------------------------------------------------------------------------- - # Bobby sets desired state to deleted.. Agent sees deployment deleted, it updates status to deleted, Bobby can not see the connector anymore + # Org admin user1 sets desired state to deleted.. Agent sees deployment deleted, it updates status to deleted, Org admin user1 can not see the connector anymore #----------------------------------------------------------------------------------------------------------------- - Given I am logged in as "Bobby" + Given I am logged in as "Org admin user1" When I DELETE path "/v1/kafka_connectors/${connector_id}" Then the response code should be 204 - Given I am logged in as "Shard" + Given I am logged in as "Shard1" And I set the "Authorization" header to "Bearer ${shard_token}" - And I wait up to "10" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}" response ".spec.desired_state" selection to match "deleted" - When I GET path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}" - Then the ".spec.desired_state" selection from the response should match "deleted" + And I wait up to "30" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}" response ".spec.desired_state" selection to match "deleted" When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}/status" with json body: """ { @@ -2242,11 +2201,8 @@ Feature: connector agent API """ Then the response code should be 204 - Given I am logged in as "Bobby" - And I wait up to "10" seconds for a GET on path "/v1/kafka_connectors/${connector_id}" response code to match "410" - When I GET path "/v1/kafka_connectors/${connector_id}" - Then the response code should be 410 - + Given I am logged in as "Org admin user1" + And I wait up to "30" seconds for a GET on path "/v1/kafka_connectors/${connector_id}" response code to match "410" #--------------------------------------------------------------------------------------------- # Validate cluster delete completes with empty namespace removed after agent ack @@ -2254,11 +2210,8 @@ Feature: connector agent API # expire the existing namespace Given I run SQL "UPDATE connector_namespaces SET expiration='1000-01-01 10:10:10+00' WHERE id = '${connector_namespace_id}';" expect 1 row to be affected. # check that the namespace state is now deleting - Given I am logged in as "Bobby" - When I wait up to "10" seconds for a GET on path "/v1/kafka_connector_namespaces/${connector_namespace_id}" response ".status.state" selection to match "deleting" - Then I GET path "/v1/kafka_connector_namespaces/${connector_namespace_id}" - And the response code should be 200 - And the ".status.state" selection from the response should match "deleting" + Given I am logged in as "Org admin user1" + When I wait up to "30" seconds for a GET on path "/v1/kafka_connector_namespaces/${connector_namespace_id}" response ".status.state" selection to match "deleting" # delete the cluster Given I DELETE path "/v1/kafka_connector_clusters/${connector_cluster_id}" @@ -2266,13 +2219,10 @@ Feature: connector agent API And the response should match "" # check that the cluster state is now `deleting` - Given I wait up to "10" seconds for a GET on path "/v1/kafka_connector_clusters/${connector_cluster_id}" response ".status.state" selection to match "deleting" - When I GET path "/v1/kafka_connector_clusters/${connector_cluster_id}" - Then the response code should be 200 - And the ".status.state" selection from the response should match "deleting" + Given I wait up to "30" seconds for a GET on path "/v1/kafka_connector_clusters/${connector_cluster_id}" response ".status.state" selection to match "deleting" # validate namespace processing can handle namespace status update errors and removes deleting namespace - Given I am logged in as "Shard" + Given I am logged in as "Shard1" Given I set the "Authorization" header to "Bearer ${shard_token}" When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/status" with json body: """ @@ -2317,14 +2267,12 @@ Feature: connector agent API """ # wait for cluster to be deleted - Given I am logged in as "Bobby" - Given I wait up to "10" seconds for a GET on path "/v1/kafka_connector_clusters/${connector_cluster_id}" response code to match "410" - When I GET path "/v1/kafka_connector_clusters/${connector_cluster_id}" - Then the response code should be 410 + Given I am logged in as "Org admin user1" + Given I wait up to "30" seconds for a GET on path "/v1/kafka_connector_clusters/${connector_cluster_id}" response code to match "410" And I can forget keycloak clientID: ${clientID} # agent should get 410 for deleted cluster - Given I am logged in as "Shard" + Given I am logged in as "Shard1" Given I set the "Authorization" header to "Bearer ${shard_token}" When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/status" with json body: """ @@ -2452,10 +2400,7 @@ Feature: connector agent API # wait for the agent to see deployment Given I am logged in as "Shard3" Given I set the "Authorization" header to "Bearer ${shard_token}" - When I wait up to "10" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "1" - When I GET path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" - Then the response code should be 200 - And the ".total" selection from the response should match "1" + When I wait up to "30" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "1" # set deleted_at to delete connector When I run SQL "UPDATE connectors SET deleted_at='1000-01-01 00:00:00.000000+00' WHERE id = '${connector_id}';" expect 1 row to be affected. @@ -2666,10 +2611,7 @@ Feature: connector agent API # wait for the agent to see deployment Given I am logged in as "Shard3" Given I set the "Authorization" header to "Bearer ${shard_token}" - When I wait up to "10" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "1" - When I GET path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" - Then the response code should be 200 - And the ".total" selection from the response should match "1" + When I wait up to "30" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments" response ".total" selection to match "1" Given I store the ".items[0].id" selection from the response as ${connector_deployment_id} # remember the current namespace version @@ -2687,9 +2629,7 @@ Feature: connector agent API And the ".items[0].id" selection from the response should match "${connector_namespace_id}" # agent deletes unassigning connector - When I wait up to "10" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}" response ".spec.desired_state" selection to match "unassigned" - When I GET path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}" - Then the ".spec.desired_state" selection from the response should match "unassigned" + When I wait up to "30" seconds for a GET on path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}" response ".spec.desired_state" selection to match "unassigned" When I PUT path "/v1/agent/kafka_connector_clusters/${connector_cluster_id}/deployments/${connector_deployment_id}/status" with json body: """ { @@ -2711,12 +2651,8 @@ Feature: connector agent API # wait for namespace to get deleted Given I am logged in as "Bobby" - When I wait up to "10" seconds for a GET on path "/v1/kafka_connector_namespaces/${connector_namespace_id}" response code to match "410" - When I GET path "/v1/kafka_connector_namespaces/${connector_namespace_id}" - Then the response code should be 410 + When I wait up to "30" seconds for a GET on path "/v1/kafka_connector_namespaces/${connector_namespace_id}" response code to match "410" # connector must be unassigned - When I wait up to "10" seconds for a GET on path "/v1/kafka_connectors/${connector_id}" response ".desired_state" selection to match "unassigned" - When I GET path "/v1/kafka_connectors/${connector_id}" - And the ".desired_state" selection from the response should match "unassigned" + When I wait up to "30" seconds for a GET on path "/v1/kafka_connectors/${connector_id}" response ".desired_state" selection to match "unassigned" And the ".namespace_id" selection from the response should match "" diff --git a/internal/connector/test/integration/features/connector-cluster-admin-api.feature b/internal/connector/test/integration/features/connector-cluster-admin-api.feature index f852bbe45..d321a9fcb 100644 --- a/internal/connector/test/integration/features/connector-cluster-admin-api.feature +++ b/internal/connector/test/integration/features/connector-cluster-admin-api.feature @@ -2,12 +2,12 @@ Feature: connector cluster admin API Background: Given the path prefix is "/api/connector_mgmt" - Given an org admin user named "Jimmy" + Given an org admin user named "UniqueID Org Admin User" Given a user named "Shard" - Given an admin user named "Ricky Bobby" with roles "cos-fleet-manager-admin-full" + Given an admin user named "UniqueID Admin User" with roles "cos-fleet-manager-admin-full" Scenario: connector cluster is created and agent update status. - Given I am logged in as "Jimmy" + Given I am logged in as "UniqueID Org Admin User" Given I store an UID as ${openshift.id.1} Given I store an UID as ${openshift.id.2} @@ -78,7 +78,7 @@ Feature: connector cluster admin API # Get connector cluster status with connector-fleet-manager-admin-full role # ----------------------------------------------------------------------------------- - Given I am logged in as "Ricky Bobby" + Given I am logged in as "UniqueID Admin User" When I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}" Then the response code should be 200 And the ".status.conditions[0].type" selection from the response should match "Ready" @@ -137,7 +137,7 @@ Feature: connector cluster admin API # Get connector cluster status with connector-fleet-manager-admin-full role # ----------------------------------------------------------------------------------- - Given I am logged in as "Ricky Bobby" + Given I am logged in as "UniqueID Admin User" When I GET path "/v1/admin/kafka_connector_clusters/${connector_cluster_id}" Then the response code should be 200 And the ".status.conditions[0].type" selection from the response should match "Ready" @@ -155,7 +155,7 @@ Feature: connector cluster admin API # Get connector cluster status as org admin # ----------------------------------------------------------------------------------- - Given I am logged in as "Jimmy" + Given I am logged in as "UniqueID Org Admin User" When I GET path "/v1/kafka_connector_clusters/${connector_cluster_id}" Then the response code should be 200 And the ".status.platform" selection from the response should match "null" @@ -166,7 +166,7 @@ Feature: connector cluster admin API # ----------------------------------------------------------------------------------- # delete namespace - Given I am logged in as "Ricky Bobby" + Given I am logged in as "UniqueID Admin User" When I DELETE path "/v1/admin/kafka_connector_namespaces/${connector_namespace_id}" Then the response code should be 204 @@ -176,10 +176,8 @@ Feature: connector cluster admin API And the response should match "" # wait for cluster cleanup - Given I am logged in as "Jimmy" + Given I am logged in as "UniqueID Org Admin User" Given I wait up to "30" seconds for a GET on path "/v1/kafka_connector_namespaces/${connector_namespace_id}" response code to match "410" - When I GET path "/v1/kafka_connector_namespaces/${connector_namespace_id}" - Then the response code should be 410 And the response should match json: """ { @@ -193,16 +191,14 @@ Feature: connector cluster admin API """ # delete cluster - Given I am logged in as "Jimmy" + Given I am logged in as "UniqueID Org Admin User" When I DELETE path "/v1/kafka_connector_clusters/${connector_cluster_id}" Then the response code should be 204 And the response should match "" # wait for cluster cleanup - Given I am logged in as "Jimmy" + Given I am logged in as "UniqueID Org Admin User" Given I wait up to "30" seconds for a GET on path "/v1/kafka_connector_clusters/${connector_cluster_id}" response code to match "410" - When I GET path "/v1/kafka_connector_clusters/${connector_cluster_id}" - Then the response code should be 410 And the response should match json: """ { diff --git a/internal/connector/test/integration/features/connector-cluster-api.feature b/internal/connector/test/integration/features/connector-cluster-api.feature index 5879f275c..a10d772bd 100644 --- a/internal/connector/test/integration/features/connector-cluster-api.feature +++ b/internal/connector/test/integration/features/connector-cluster-api.feature @@ -200,9 +200,7 @@ Feature: create a connector And the response should match "" # wait for cluster cleanup - Given I wait up to "10" seconds for a GET on path "/v1/kafka_connector_clusters/${cluster_id}" response code to match "410" - When I GET path "/v1/kafka_connector_clusters/${cluster_id}" - Then the response code should be 410 + Given I wait up to "30" seconds for a GET on path "/v1/kafka_connector_clusters/${cluster_id}" response code to match "410" And the response should match json: """ { diff --git a/internal/connector/test/integration/features/connector-multitenancy-api.feature b/internal/connector/test/integration/features/connector-multitenancy-api.feature index 8b3b785fa..1ef7d0319 100644 --- a/internal/connector/test/integration/features/connector-multitenancy-api.feature +++ b/internal/connector/test/integration/features/connector-multitenancy-api.feature @@ -125,7 +125,7 @@ Feature: connector namespaces API And the response should match "" # wait for cluster cleanup - Given I wait up to "10" seconds for a GET on path "/v1/kafka_connector_clusters/${connector_cluster_id}" response code to match "410" + Given I wait up to "30" seconds for a GET on path "/v1/kafka_connector_clusters/${connector_cluster_id}" response code to match "410" And I GET path "/v1/kafka_connector_clusters/${connector_cluster_id}" Then the response code should be 410 And I can forget keycloak clientID: ${clientID} diff --git a/internal/connector/test/main/main.go b/internal/connector/test/main/main.go index 18b0ba91f..6eef6a394 100644 --- a/internal/connector/test/main/main.go +++ b/internal/connector/test/main/main.go @@ -14,9 +14,9 @@ func main() { // every log messages is prefixed by an error message stating the flags haven't been parsed. _ = flag.CommandLine.Parse([]string{}) - // Always log to stderr by default - if err := flag.Set("logtostderr", "true"); err != nil { - glog.Infof("Unable to set logtostderr to true") + // Always also log to stderr by default + if err := flag.Set("alsologtostderr", "true"); err != nil { + glog.Infof("Unable to set alsologtostderr to true") } env, err := environments.New(environments.GetEnvironmentStrFromEnv(), connector.ConfigProviders(false)) diff --git a/test/cucumber/cucumber.go b/test/cucumber/cucumber.go index a2812ac34..8399b19e8 100644 --- a/test/cucumber/cucumber.go +++ b/test/cucumber/cucumber.go @@ -94,6 +94,7 @@ type TestScenario struct { sessions map[string]*TestSession Variables map[string]interface{} hasTestCaseLock bool + hasReadLock bool } func (s *TestScenario) User() *TestUser { diff --git a/test/cucumber/http_request.go b/test/cucumber/http_request.go index 73d039c5b..901ad4847 100644 --- a/test/cucumber/http_request.go +++ b/test/cucumber/http_request.go @@ -48,8 +48,9 @@ func init() { ctx.Step(`^I (GET|POST|PUT|DELETE|PATCH|OPTION) path "([^"]*)"$`, s.sendHttpRequest) ctx.Step(`^I (GET|POST|PUT|DELETE|PATCH|OPTION) path "([^"]*)" as a json event stream$`, s.sendHttpRequestAsEventStream) ctx.Step(`^I (GET|POST|PUT|DELETE|PATCH|OPTION) path "([^"]*)" with json body:$`, s.SendHttpRequestWithJsonBody) - ctx.Step(`^I wait up to "([^"]*)" seconds for a GET on path "([^"]*)" response "([^"]*)" selection to match "([^"]*)"$`, s.iWaitUpToSecondsForAGETOnPathResponseSelectionToMatch) - ctx.Step(`^I wait up to "([^"]*)" seconds for a GET on path "([^"]*)" response code to match "([^"]*)"$`, s.iWaitUpToSecondsForAGETOnPathResponseCodeToMatch) + ctx.Step(`^I wait up to "([^"]*)" seconds for a (GET|POST|PUT|DELETE|PATCH|OPTION) on path "([^"]*)" response "([^"]*)" selection to match "([^"]*)"$`, s.iWaitUpToSecondsForResponseSelectionToMatch) + ctx.Step(`^I wait up to "([^"]*)" seconds for a (GET|POST|PUT|DELETE|PATCH|OPTION) on path "([^"]*)" response code to match "([^"]*)"$`, s.iWaitUpToSecondsForResponseCodeToMatch) + ctx.Step(`^I wait up to "([^"]*)" seconds for a (GET|POST|PUT|DELETE|PATCH|OPTION) on path "([^"]*)" with json body: "(.*?)" response code to match "([^"]*)" and response to match json:$`, s.iWaitUpToSecondsForResponseCodeToMatchAndResponseToMatch) ctx.Step(`^I wait up to "([^"]*)" seconds for a response event$`, s.iWaitUpToSecondsForAResponseJsonEvent) }) } @@ -187,22 +188,47 @@ func (s *TestScenario) iWaitUpToSecondsForAResponseJsonEvent(timeout float64) er ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout*float64(time.Second))) defer cancel() + var err error select { case event := <-session.EventStreamEvents: - session.respJson = event - var err error session.RespBytes, err = json.Marshal(event) - if err != nil { - return err - } + return err case <-ctx.Done(): + return fmt.Errorf("a Response Json event was not received in %v, last error was %w", time.Duration(timeout*float64(time.Second)), err) } +} - return nil +func (s *TestScenario) iWaitUpToSecondsForResponseCodeToMatch(timeout float64, method string, path string, expected int) error { + + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout*float64(time.Second))) + defer cancel() + session := s.Session() + session.Ctx = ctx + defer func() { + session.Ctx = nil + }() + + var errRespCodeComparison error + for { + err := s.sendHttpRequest(method, path) + if err == nil { + errRespCodeComparison = s.theResponseCodeShouldBe(expected) + if errRespCodeComparison == nil { + return nil + } + } + + select { + case <-ctx.Done(): + return fmt.Errorf("a Response with code %d from %s to path %s was not received in %v, last response error was %w", expected, method, path, time.Duration(timeout*float64(time.Second)), errRespCodeComparison) + default: + time.Sleep(time.Duration(timeout * float64(time.Second) / 10.0)) + } + } } -func (s *TestScenario) iWaitUpToSecondsForAGETOnPathResponseCodeToMatch(timeout float64, path string, expected int) error { +func (s *TestScenario) iWaitUpToSecondsForResponseSelectionToMatch(timeout float64, method string, path string, selection, expected string) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout*float64(time.Second))) defer cancel() @@ -212,25 +238,26 @@ func (s *TestScenario) iWaitUpToSecondsForAGETOnPathResponseCodeToMatch(timeout session.Ctx = nil }() + var errRespComparison error for { - err := s.sendHttpRequest("GET", path) + err := s.sendHttpRequest(method, path) if err == nil { - err = s.theResponseCodeShouldBe(expected) - if err == nil { + errRespComparison = s.theSelectionFromTheResponseShouldMatch(selection, expected) + if errRespComparison == nil { return nil } } select { case <-ctx.Done(): - return nil + return fmt.Errorf("a Response from %s to path %s with selection %s matching %s was not received in %v, last attempt error was %w", method, path, selection, expected, time.Duration(timeout*float64(time.Second)), errRespComparison) default: time.Sleep(time.Duration(timeout * float64(time.Second) / 10.0)) } } } -func (s *TestScenario) iWaitUpToSecondsForAGETOnPathResponseSelectionToMatch(timeout float64, path string, selection, expected string) error { +func (s *TestScenario) iWaitUpToSecondsForResponseCodeToMatchAndResponseToMatch(timeout float64, method string, path string, jsonTxt string, expectedCode int, expectedResponse *godog.DocString) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout*float64(time.Second))) defer cancel() @@ -240,18 +267,21 @@ func (s *TestScenario) iWaitUpToSecondsForAGETOnPathResponseSelectionToMatch(tim session.Ctx = nil }() + var errRespComparison error + var errRespCodeComparison error for { - err := s.sendHttpRequest("GET", path) + err := s.SendHttpRequestWithJsonBody(method, path, &godog.DocString{Content: jsonTxt}) if err == nil { - err = s.theSelectionFromTheResponseShouldMatch(selection, expected) - if err == nil { + errRespCodeComparison = s.theResponseCodeShouldBe(expectedCode) + errRespComparison = s.TheResponseShouldMatchJsonDoc(expectedResponse) + if errRespCodeComparison == nil && errRespComparison == nil { return nil } } select { case <-ctx.Done(): - return nil + return fmt.Errorf("a Response with code %d and content %s from %s to path %s was not received in %v, last response error was %w", expectedCode, expectedResponse, method, path, time.Duration(timeout*float64(time.Second)), errRespComparison) default: time.Sleep(time.Duration(timeout * float64(time.Second) / 10.0)) } diff --git a/test/cucumber/scenario.go b/test/cucumber/scenario.go index 28b0d91ba..85995da96 100644 --- a/test/cucumber/scenario.go +++ b/test/cucumber/scenario.go @@ -15,10 +15,11 @@ package cucumber import ( "context" - "github.com/rs/xid" "sync" "time" + "github.com/rs/xid" + "github.com/cucumber/godog" ) @@ -33,14 +34,17 @@ func init() { ctx.Before(func(ctx context.Context, sc *godog.Scenario) (context.Context, error) { testCaseLock.RLock() + s.hasReadLock = true return ctx, nil }) ctx.After(func(ctx context.Context, sc *godog.Scenario, err error) (context.Context, error) { if s.hasTestCaseLock { testCaseLock.Unlock() - } else { + s.hasTestCaseLock = false + } else if s.hasReadLock { testCaseLock.RUnlock() + s.hasReadLock = false } return ctx, nil @@ -52,7 +56,10 @@ func (s *TestScenario) lock() error { // User might have already locked the scenario. if !s.hasTestCaseLock { // Convert to write lock to be the only executing scenario. - testCaseLock.RUnlock() + if s.hasReadLock { + testCaseLock.RUnlock() + s.hasReadLock = false + } testCaseLock.Lock() s.hasTestCaseLock = true } @@ -62,9 +69,12 @@ func (s *TestScenario) lock() error { func (s *TestScenario) unlock() error { // User might have already unlocked the scenario. if s.hasTestCaseLock { - // Convert to read lock to to allow other scenarios to keep running. + // Convert to read lock in order to allow other scenarios to keep running. testCaseLock.Unlock() - testCaseLock.RLock() + if !s.hasReadLock { + testCaseLock.RLock() + s.hasReadLock = true + } s.hasTestCaseLock = false } return nil