diff --git a/pkg/source/kinesis/kinesis_source.go b/pkg/source/kinesis/kinesis_source.go index ff813781..f50705d0 100644 --- a/pkg/source/kinesis/kinesis_source.go +++ b/pkg/source/kinesis/kinesis_source.go @@ -174,6 +174,12 @@ func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynam // TODO: See if the client name can be reused to survive same node reboots name := uuid.NewV4().String() + // test the connection to kinesis by trying to make an API call + _, err := kinesisClient.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &streamName}) + if err != nil { + return nil, err + } + k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, name, config) if err != nil { return nil, errors.Wrap(err, "Failed to create Kinsumer client") diff --git a/pkg/source/kinesis/kinesis_source_test.go b/pkg/source/kinesis/kinesis_source_test.go index e7fe07b3..c1e28b89 100644 --- a/pkg/source/kinesis/kinesis_source_test.go +++ b/pkg/source/kinesis/kinesis_source_test.go @@ -66,14 +66,12 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) { source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 10, 10, nil) - fmt.Println(source.clientRecordMaxAge) assert.IsType(&kinesisSource{}, source) assert.Nil(err) } -// newKinesisSourceWithInterfaces should fail if we can't reach Kinesis and DDB, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 -/* -func TestNewKinesisSourceWithInterfaces_Failure(t *testing.T) { +// TestNewKinesisSourceWithInterfaces_ConnectionCheck tests that the Kinesis source fails on start-up if the connection to Kinesis fails +func TestNewKinesisSourceWithInterfaces_ConnectionCheck(t *testing.T) { // Unlike the success test, we don't require anything to exist for this one assert := assert.New(t) @@ -81,34 +79,10 @@ func TestNewKinesisSourceWithInterfaces_Failure(t *testing.T) { kinesisClient := testutil.GetAWSLocalstackKinesisClient() dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil,10, 10, &fifty) + _, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil, 10, 10, nil) - assert.Nil(&kinesisSource{}, source) - assert.NotNil(err) - -} -*/ - -// TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change. -func TestKinesisSource_ReadFailure_NoResources(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - - assert := assert.New(t) - - kinesisClient := testutil.GetAWSLocalstackKinesisClient() - dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 10, 10, &fifty) - assert.Nil(err) - assert.NotNil(source) - assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID()) - - err = source.Read(nil) - assert.NotNil(err) if err != nil { - assert.Equal("Failed to start Kinsumer client: error describing table fake-name_checkpoints: ResourceNotFoundException: Cannot do operations on a non-existent table", err.Error()) + assert.Equal("ResourceNotFoundException: Stream nonexistent-stream under account 000000000000 not found.", err.Error()) } } @@ -228,6 +202,10 @@ func putNRecordsIntoKinesis(kinesisClient kinesisiface.KinesisAPI, n int, stream } func TestGetSource_WithKinesisSource(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + assert := assert.New(t) // Set up localstack resources @@ -281,10 +259,34 @@ func TestGetSource_ConfigErrorLeaderAction(t *testing.T) { assert := assert.New(t) + // Set up localstack resources kinesisClient := testutil.GetAWSLocalstackKinesisClient() dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "something", "test", nil, 0, 10, &one) + streamName := "kinesis-source-integration-2" + createErr := testutil.CreateAWSLocalstackKinesisStream(kinesisClient, streamName) + if createErr != nil { + t.Fatal(createErr) + } + defer testutil.DeleteAWSLocalstackKinesisStream(kinesisClient, streamName) + + appName := "integration" + ddbErr := testutil.CreateAWSLocalstackDynamoDBTables(dynamodbClient, appName) + if ddbErr != nil { + t.Fatal(ddbErr) + } + defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName) + + // Put ten records into kinesis stream + putErr := putNRecordsIntoKinesis(kinesisClient, 10, streamName, "Test") + if putErr != nil { + t.Fatal(putErr) + } + + time.Sleep(1 * time.Second) + + // Create the source and assert that it's there + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 0, 10, &one) assert.Nil(source) assert.EqualError(err, `Failed to create Kinsumer client: leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency`) @@ -297,10 +299,33 @@ func TestGetSource_ConfigErrorMaxAge(t *testing.T) { assert := assert.New(t) + // Set up localstack resources kinesisClient := testutil.GetAWSLocalstackKinesisClient() dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "something", "test", nil, 10, 10, &one) + streamName := "kinesis-source-integration-10" + createErr := testutil.CreateAWSLocalstackKinesisStream(kinesisClient, streamName) + if createErr != nil { + t.Fatal(createErr) + } + defer testutil.DeleteAWSLocalstackKinesisStream(kinesisClient, streamName) + + appName := "integration" + ddbErr := testutil.CreateAWSLocalstackDynamoDBTables(dynamodbClient, appName) + if ddbErr != nil { + t.Fatal(ddbErr) + } + defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName) + + // Put ten records into kinesis stream + putErr := putNRecordsIntoKinesis(kinesisClient, 10, streamName, "Test") + if putErr != nil { + t.Fatal(putErr) + } + + time.Sleep(1 * time.Second) + + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 10, 10, &one) assert.Nil(source) assert.EqualError(err, `Failed to create Kinsumer client: clientRecordMaxAge value must be at least as long as shardCheckFrequency`) diff --git a/pkg/source/pubsub/pubsub_source.go b/pkg/source/pubsub/pubsub_source.go index acb10626..6b2f493b 100644 --- a/pkg/source/pubsub/pubsub_source.go +++ b/pkg/source/pubsub/pubsub_source.go @@ -96,6 +96,15 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri return nil, errors.Wrap(err, "Failed to create PubSub client") } + sub := client.SubscriptionInProject(subscriptionID, projectID) + exists, err := sub.Exists(ctx) + if err != nil { + return nil, errors.Wrap(err, "Connection to PubSub failed") + } + if !exists { + return nil, errors.New("Connection to PubSub failed, subscription does not exist") + } + return &pubSubSource{ projectID: projectID, client: client, diff --git a/pkg/source/pubsub/pubsub_source_test.go b/pkg/source/pubsub/pubsub_source_test.go index 8de38102..4cd020db 100644 --- a/pkg/source/pubsub/pubsub_source_test.go +++ b/pkg/source/pubsub/pubsub_source_test.go @@ -64,20 +64,22 @@ func TestPubSubSource_ReadAndReturnSuccessIntegration(t *testing.T) { } } -// newPubSubSource_Failure should fail if we can't reach PubSub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 -/* +// newPubSubSource_Failure should fail if we can't reach PubSub func TestNewPubSubSource_Failure(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } assert := assert.New(t) + srv, _ := testutil.InitMockPubsubServer(8010, nil, t) + defer srv.Close() + pubsubSource, err := newPubSubSource(10, "nonexistent-project", "nonexistent-subscription") assert.NotNil(err) assert.Nil(pubsubSource) // This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem. + assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`) } -*/ // TestNewPubSubSource_Success tests the typical case of creating a new pubsub source. func TestNewPubSubSource_Success(t *testing.T) { @@ -86,7 +88,8 @@ func TestNewPubSubSource_Success(t *testing.T) { } assert := assert.New(t) - testutil.InitMockPubsubServer(8010, nil, t) + srv, _ := testutil.InitMockPubsubServer(8010, nil, t) + defer srv.Close() pubsubSource, err := newPubSubSource(10, "project-test", "test-sub") assert.Nil(err) @@ -132,6 +135,30 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) { assert.Equal(expected, msgDatas) } +// TestPubSubSource_ConnCheckErrWithMocks unit tests PubSub source with connection error +func TestPubSubSource_ConnCheckErrWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + srv.Close() + conn.Close() + + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong") + assert.Nil(pubsubSource) + assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`) +} + +// TestPubSubSource_SubFailWithMocks unit tests PubSub source initiation with wrong sub name +func TestPubSubSource_SubFailWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + defer srv.Close() + defer conn.Close() + + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong") + assert.Nil(pubsubSource) + assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`) +} + // TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks tests the behaviour of pubsub source when some messages take longer to ack than others func TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks(t *testing.T) { assert := assert.New(t) diff --git a/pkg/source/sqs/sqs_source.go b/pkg/source/sqs/sqs_source.go index 28039e41..de333789 100644 --- a/pkg/source/sqs/sqs_source.go +++ b/pkg/source/sqs/sqs_source.go @@ -115,6 +115,11 @@ var ConfigPair = sourceconfig.ConfigPair{ // newSQSSourceWithInterfaces allows you to provide an SQS client directly to allow // for mocking and localstack usage func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, concurrentWrites int, region string, queueName string) (*sqsSource, error) { + _, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: &queueName}) + if err != nil { + return nil, errors.Wrap(err, `Could not connect to SQS`) + } + return &sqsSource{ client: client, queueName: queueName, diff --git a/pkg/source/sqs/sqs_source_test.go b/pkg/source/sqs/sqs_source_test.go index 84aa4b77..24744dd3 100644 --- a/pkg/source/sqs/sqs_source_test.go +++ b/pkg/source/sqs/sqs_source_test.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "reflect" + "strings" "testing" "time" @@ -30,6 +31,20 @@ func TestMain(m *testing.M) { os.Exit(exitVal) } +// TestNewSqsSource_AWSConnectionCheck tests that the SQS source fails on start-up if the connection to AWS fails +func TestNewSqsSource_AWSConnectionCheck(t *testing.T) { + assert := assert.New(t) + + target, err := configFunction(&configuration{ + QueueName: "not-exists", + Region: testutil.AWSLocalstackRegion, + RoleARN: `arn:aws:sqs:us-east-1:00000000000:not-exists`, + ConcurrentWrites: 15, + }) + assert.Nil(target) + assert.EqualError(err, "NoCredentialProviders: no valid providers in chain. Deprecated.\n\tFor verbose messaging see aws.Config.CredentialsChainVerboseErrors") +} + // func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, concurrentWrites int, region string, queueName string) (*sqsSource, error) { func TestNewSQSSourceWithInterfaces_Success(t *testing.T) { if testing.Short() { @@ -51,9 +66,8 @@ func TestNewSQSSourceWithInterfaces_Success(t *testing.T) { assert.Nil(err) } -// newSQSSourceWithInterfaces should fail if we can't reach SQS, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 -/* -func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) { +// newSQSSourceWithInterfaces should fail if we can't reach SQS +func TestNewSQSSourceWithInterfaces_SQSConnectionFailure(t *testing.T) { // Unlike the success test, we don't require anything to exist for this one assert := assert.New(t) @@ -63,28 +77,8 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) { assert.Nil(source) assert.NotNil(err) -} -*/ - -// TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change. -func TestSQSSource_ReadFailure(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - - assert := assert.New(t) - - client := testutil.GetAWSLocalstackSQSClient() - - source, err := newSQSSourceWithInterfaces(client, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists") - assert.Nil(err) - assert.NotNil(source) - assert.Equal("arn:aws:sqs:us-east-1:00000000000:not-exists", source.GetID()) - - err = source.Read(nil) - assert.NotNil(err) if err != nil { - assert.Equal("Failed to get SQS queue URL: AWS.SimpleQueueService.NonExistentQueue: AWS.SimpleQueueService.NonExistentQueue; see the SQS docs.\n\tstatus code: 400, request id: 00000000-0000-0000-0000-000000000000", err.Error()) + assert.True(strings.HasPrefix(err.Error(), `Could not connect to SQS`)) } } diff --git a/pkg/target/eventhub.go b/pkg/target/eventhub.go index caa3b481..eab1f5d3 100644 --- a/pkg/target/eventhub.go +++ b/pkg/target/eventhub.go @@ -96,6 +96,12 @@ func newEventHubTarget(cfg *EventHubConfig) (*EventHubTarget, error) { // If none is specified, it will retry indefinitely until the context times out, which hides the actual error message // To avoid obscuring errors, contextTimeoutInSeconds should be configured to ensure all retries may be completed before its expiry + // get the runtime information of the event hub in order to check the connection + _, err = hub.GetRuntimeInformation(context.Background()) + if err != nil { + return nil, errors.Errorf("Error initialising EventHub client: could not reach Event Hub: %v", err) + } + return newEventHubTargetWithInterfaces(hub, cfg), err } diff --git a/pkg/target/eventhub_test.go b/pkg/target/eventhub_test.go index 069b0875..a5313ccc 100644 --- a/pkg/target/eventhub_test.go +++ b/pkg/target/eventhub_test.go @@ -370,56 +370,43 @@ func TestWriteFailure(t *testing.T) { assert.Nil(twres.Invalid) } -// TestNewEventHubTarget_KeyValue tests that we can initialise a client with key value credentials. -func TestNewEventHubTarget_KeyValue(t *testing.T) { +// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values. +func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) { assert := assert.New(t) - // Test that we can initialise a client with Key and Value - t.Setenv("EVENTHUB_KEY_NAME", "fake") - t.Setenv("EVENTHUB_KEY_VALUE", "fake") - tgt, err := newEventHubTarget(&cfg) - assert.Nil(err) - assert.NotNil(tgt) + assert.NotNil(err) + if err != nil { + assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error()) + } + assert.Nil(tgt) } -// TestNewEventHubTarget_ConnString tests that we can initialise a client with connection string credentials. -func TestNewEventHubTarget_ConnString(t *testing.T) { +func TestNewEventHubTarget_ConnStringErr(t *testing.T) { assert := assert.New(t) - - // Test that we can initialise a client with Connection String - t.Setenv("EVENTHUB_CONNECTION_STRING", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=fake;SharedAccessKey=fake") tgt, err := newEventHubTarget(&cfg) - assert.Nil(err) - assert.NotNil(tgt) + assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`) + assert.Nil(tgt) } -// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values. -func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) { +func TestNewEventHubTarget_ConnVarsErr(t *testing.T) { assert := assert.New(t) + t.Setenv("EVENTHUB_KEY_NAME", "fake") + t.Setenv("EVENTHUB_KEY_VALUE", "fake") + tgt, err := newEventHubTarget(&cfg) - assert.NotNil(err) - if err != nil { - assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error()) - } + assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`) assert.Nil(tgt) } -// NewEventHubTarget should fail if we can't reach EventHub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151 -// Note that when we do so, the above tests will need to be changed to use some kind of mock -/* -func TestNewEventHubTarget_Failure(t *testing.T) { +// NewEventHubTarget should fail if we can't reach EventHub +func TestNewEventHubTarget_NoVars(t *testing.T) { assert := assert.New(t) - // Test that we can initialise a client with Key and Value - t.Setenv("EVENTHUB_KEY_NAME", "fake") - t.Setenv("EVENTHUB_KEY_VALUE", "fake") - tgt, err := newEventHubTarget(&cfg) assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error()) assert.Nil(tgt) } -*/ diff --git a/pkg/target/http.go b/pkg/target/http.go index 28f8d9a3..96cf1b7d 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -108,11 +108,24 @@ func newHTTPTarget(httpURL string, requestTimeout int, byteLimit int, contentTyp transport.TLSClientConfig = tlsConfig } + client := &http.Client{ + Transport: transport, + Timeout: time.Duration(requestTimeout) * time.Second, + } + + // send a HEAD request to the URL to check the connection + request, err := http.NewRequest("HEAD", httpURL, nil) + if err != nil { + return nil, errors.Wrap(err, `Error creating HEAD request`) + } + resp, err := client.Do(request) + if err != nil { + return nil, errors.Wrap(err, `Connection to host error`) + } + defer resp.Body.Close() + return &HTTPTarget{ - client: &http.Client{ - Transport: transport, - Timeout: time.Duration(requestTimeout) * time.Second, - }, + client: client, httpURL: httpURL, byteLimit: byteLimit, contentType: contentType, diff --git a/pkg/target/http_test.go b/pkg/target/http_test.go index 7ab3eb86..a15e7d3f 100644 --- a/pkg/target/http_test.go +++ b/pkg/target/http_test.go @@ -32,6 +32,13 @@ func createTestServer(results *[][]byte, waitgroup *sync.WaitGroup) *httptest.Se panic(err) } mutex.Lock() + if req.URL.Path == `/error` { + if req.Method == http.MethodHead { + *results = append(*results, data) + } else { + w.WriteHeader(500) + } + } *results = append(*results, data) mutex.Unlock() defer waitgroup.Done() @@ -119,10 +126,10 @@ func TestAddHeadersToRequest(t *testing.T) { func TestNewHTTPTarget(t *testing.T) { assert := assert.New(t) - httpTarget, err := newHTTPTarget("http://something", 5, 1048576, "application/json", "", "", "", "", "", "", true) + httpTarget, err := newHTTPTarget("http://wrong-url-12345678.com", 5, 1048576, "application/json", "", "", "", "", "", "", true) - assert.Nil(err) - assert.NotNil(httpTarget) + assert.EqualError(err, `Connection to host error: Head "http://wrong-url-12345678.com": dial tcp: lookup wrong-url-12345678.com: no such host`) + assert.Nil(httpTarget) failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1048576, "application/json", "", "", "", "", "", "", true) @@ -145,6 +152,8 @@ func TestHttpWrite_Simple(t *testing.T) { var results [][]byte wg := sync.WaitGroup{} + // add to waitGroup for connection check HEAD request + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() @@ -166,8 +175,8 @@ func TestHttpWrite_Simple(t *testing.T) { assert.Nil(err1) assert.Equal(501, len(writeResult.Sent)) - assert.Equal(501, len(results)) - for _, result := range results { + assert.Equal(502, len(results)) + for _, result := range results[1:] { assert.Equal("Hello Server!!", string(result)) } @@ -179,6 +188,7 @@ func TestHttpWrite_Concurrent(t *testing.T) { var results [][]byte wg := sync.WaitGroup{} + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() @@ -209,23 +219,25 @@ func TestHttpWrite_Concurrent(t *testing.T) { wg.Wait() - assert.Equal(10, len(results)) - for _, result := range results { + assert.Equal(11, len(results)) + for _, result := range results[1:] { assert.Equal("Hello Server!!", string(result)) } assert.Equal(int64(10), ackOps) } -func TestHttpWrite_Failure(t *testing.T) { +func TestHttpWrite_RequestFailure(t *testing.T) { assert := assert.New(t) var results [][]byte wg := sync.WaitGroup{} + // add to waitGroup for connection check HEAD request + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() - target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1048576, "application/json", "", "", "", "", "", "", true) + target, err := newHTTPTarget(server.URL+`/error`, 5, 1048576, "application/json", "", "", "", "", "", "", true) if err != nil { t.Fatal(err) } @@ -235,18 +247,15 @@ func TestHttpWrite_Failure(t *testing.T) { atomic.AddInt64(&ackOps, 1) } - messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc) + messages := testutil.GetTestMessages(1, "Hello Server!!", ackFunc) + wg.Add(1) + _, err1 := target.Write(messages) - writeResult, err1 := target.Write(messages) + wg.Wait() - assert.NotNil(err1) - if err1 != nil { - assert.Regexp("Error sending http request: 10 errors occurred:.*", err1.Error()) - } + assert.EqualError(err1, "Error sending http request: 1 error occurred:\n\t* 500 Internal Server Error: \n\n") - assert.Equal(10, len(writeResult.Failed)) - assert.Nil(writeResult.Sent) - assert.Nil(writeResult.Oversized) + assert.Equal(int64(0), ackOps) } func TestHttpWrite_Oversized(t *testing.T) { @@ -254,6 +263,7 @@ func TestHttpWrite_Oversized(t *testing.T) { var results [][]byte wg := sync.WaitGroup{} + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() @@ -278,8 +288,8 @@ func TestHttpWrite_Oversized(t *testing.T) { assert.Nil(err1) assert.Equal(10, len(writeResult.Sent)) assert.Equal(1, len(writeResult.Oversized)) - assert.Equal(10, len(results)) - for _, result := range results { + assert.Equal(11, len(results)) + for _, result := range results[1:] { assert.Equal("Hello Server!!", string(result)) } diff --git a/pkg/target/kafka_test.go b/pkg/target/kafka_test.go index 9de02a2b..1b7a452c 100644 --- a/pkg/target/kafka_test.go +++ b/pkg/target/kafka_test.go @@ -7,6 +7,7 @@ package target import ( + "strings" "sync/atomic" "testing" @@ -62,6 +63,23 @@ func SetUpMockSyncProducer(t *testing.T) (*mocks.SyncProducer, *KafkaTarget) { } } +// TestNewKafkaTarget_Failure tests that the kafka target fails on start-up if the connection to Kafka fails +func TestNewKafkaTarget_Failure(t *testing.T) { + assert := assert.New(t) + + k, err := NewKafkaTarget(&KafkaConfig{ + Brokers: "test:8080", + TopicName: "test", + ByteLimit: 1000, + }) + + assert.Nil(k) + assert.NotNil(err) + if err != nil { + assert.True(strings.HasPrefix(err.Error(), `kafka: client has run out of available brokers to talk to: dial tcp`)) + } +} + func TestKafkaTarget_AsyncWriteFailure(t *testing.T) { assert := assert.New(t) diff --git a/pkg/target/kinesis.go b/pkg/target/kinesis.go index 2dc898cb..d63e339b 100644 --- a/pkg/target/kinesis.go +++ b/pkg/target/kinesis.go @@ -62,6 +62,12 @@ func newKinesisTarget(region string, streamName string, roleARN string) (*Kinesi // newKinesisTargetWithInterfaces allows you to provide a Kinesis client directly to allow // for mocking and localstack usage func newKinesisTargetWithInterfaces(client kinesisiface.KinesisAPI, awsAccountID string, region string, streamName string) (*KinesisTarget, error) { + // test the connection to kinesis by trying to make an API call + _, err := client.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &streamName}) + if err != nil { + return nil, err + } + return &KinesisTarget{ client: client, streamName: streamName, diff --git a/pkg/target/kinesis_test.go b/pkg/target/kinesis_test.go index 5d7aee8f..78a45c03 100644 --- a/pkg/target/kinesis_test.go +++ b/pkg/target/kinesis_test.go @@ -7,6 +7,12 @@ package target import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "strings" "sync/atomic" "testing" @@ -15,35 +21,40 @@ import ( "github.com/snowplow-devops/stream-replicator/pkg/testutil" ) -func TestKinesisTarget_WriteFailure(t *testing.T) { +// TestNewKinesisTarget_ConnectionCheck tests that the Kinesis target fails on start-up if the connection to Kinesis fails +func TestNewKinesisTarget_ConnectionCheck(t *testing.T) { + assert := assert.New(t) + + target, err := newKinesisTarget(testutil.AWSLocalstackRegion, "00000000000", "arn:aws:kinesis:us-east-1:00000000000:stream/not-exists") + assert.Nil(target) + assert.NotNil(err) + // check that there is an error, meaning that a connection attempt to AWS was made + if err != nil { + assert.Equal("NoCredentialProviders: no valid providers in chain. Deprecated.\n\tFor verbose messaging see aws.Config.CredentialsChainVerboseErrors", err.Error()) + } +} + +func TestKinesisTarget_KinesisConnectionFailure(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } assert := assert.New(t) - client := testutil.GetAWSLocalstackKinesisClient() + client := kinesis.New(session.Must(session.NewSession(&aws.Config{ + Credentials: credentials.NewStaticCredentials("foo", "var", ""), + S3ForcePathStyle: aws.Bool(true), + Region: aws.String(`wrong-region`), + Endpoint: aws.String(`wrong-endpoint`), + }))) target, err := newKinesisTargetWithInterfaces(client, "00000000000", testutil.AWSLocalstackRegion, "not-exists") - assert.Nil(err) - assert.NotNil(target) - assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", target.GetID()) - - defer target.Close() - target.Open() - - messages := testutil.GetTestMessages(1, "Hello Kinesis!!", nil) - - writeRes, err := target.Write(messages) assert.NotNil(err) + assert.Nil(target) + fmt.Println(err.Error()) if err != nil { - assert.Equal("Error writing messages to Kinesis stream: 1 error occurred:\n\t* Failed to send message batch to Kinesis stream: ResourceNotFoundException: Stream not-exists under account 000000000000 not found.\n\n", err.Error()) + assert.True(strings.HasPrefix(err.Error(), `RequestError`)) } - assert.NotNil(writeRes) - - // Check results - assert.Equal(int64(0), writeRes.SentCount) - assert.Equal(int64(1), writeRes.FailedCount) } func TestKinesisTarget_WriteSuccess(t *testing.T) { diff --git a/pkg/target/pubsub.go b/pkg/target/pubsub.go index 2af180ce..13ebdefd 100644 --- a/pkg/target/pubsub.go +++ b/pkg/target/pubsub.go @@ -57,6 +57,15 @@ func newPubSubTarget(projectID string, topicName string) (*PubSubTarget, error) return nil, errors.Wrap(err, "Failed to create PubSub client") } + sub := client.TopicInProject(topicName, projectID) + exists, err := sub.Exists(ctx) + if err != nil { + return nil, errors.Wrap(err, "Connection to PubSub failed") + } + if !exists { + return nil, errors.New("Connection to PubSub failed, topic does not exist") + } + return &PubSubTarget{ projectID: projectID, client: client, diff --git a/pkg/target/pubsub_test.go b/pkg/target/pubsub_test.go index 74ffeeaf..460d255b 100644 --- a/pkg/target/pubsub_test.go +++ b/pkg/target/pubsub_test.go @@ -150,6 +150,30 @@ func TestPubSubTarget_WriteSuccessWithMocks(t *testing.T) { assert.Equal(int64(10), ackOps) } +// TestPubSubTarget_ConnCheckErrWithMocks unit tests the Pubsub target initialization with connection error +func TestPubSubTarget_ConnCheckErrWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + srv.Close() + conn.Close() + + pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) + assert.Nil(pubsubTarget) + assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`) +} + +// TestPubSubTarget_TopicFailWithMocks unit tests the Pubsub target initialization with wrong topic name +func TestPubSubTarget_TopicFailWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + defer srv.Close() + defer conn.Close() + + pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) + assert.Nil(pubsubTarget) + assert.EqualError(err, `Connection to PubSub failed, topic does not exist`) +} + // TestPubSubTarget_WriteFailureWithMocks unit tests the unhappy path for PubSub target func TestPubSubTarget_WriteFailureWithMocks(t *testing.T) { assert := assert.New(t) @@ -262,17 +286,16 @@ func TestNewPubSubTarget_Success(t *testing.T) { assert.IsType(PubSubTarget{}, *pubsubTarget) } -// TestnewPubSubTarget_Failure tests that we fail early when we cannot reach pubsub -// Commented out as this behaviour is not currently instrumented. -// This test serves to illustrate the desired behaviour for this issue: https://github.com/snowplow-devops/stream-replicator/issues/151 -/* -func TestnewPubSubTarget_Failure(t *testing.T) { +// TestNewPubSubTarget_Failure tests that we fail early when we cannot reach pubsub +func TestNewPubSubTarget_Failure(t *testing.T) { assert := assert.New(t) - pubsubTarget, err := newPubSubTarget(`nonexistent-project`, `nonexistent-topic`) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + defer srv.Close() + defer conn.Close() - // TODO: Test for the actual error we expect, when we have instrumented failing fast - assert.NotNil(err) + pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) + + assert.EqualError(err, `Connection to PubSub failed, topic does not exist`) assert.Nil(pubsubTarget) } -*/ diff --git a/pkg/target/sqs.go b/pkg/target/sqs.go index 0cd365c0..87d6d24b 100644 --- a/pkg/target/sqs.go +++ b/pkg/target/sqs.go @@ -64,6 +64,11 @@ func newSQSTarget(region string, queueName string, roleARN string) (*SQSTarget, // newSQSTargetWithInterfaces allows you to provide an SQS client directly to allow // for mocking and localstack usage func newSQSTargetWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, region string, queueName string) (*SQSTarget, error) { + _, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: &queueName}) + if err != nil { + return nil, errors.Wrap(err, `Could not connect to SQS`) + } + return &SQSTarget{ client: client, queueName: queueName, diff --git a/pkg/target/sqs_test.go b/pkg/target/sqs_test.go index 159c81d9..1caade75 100644 --- a/pkg/target/sqs_test.go +++ b/pkg/target/sqs_test.go @@ -7,6 +7,7 @@ package target import ( + "strings" "sync/atomic" "testing" @@ -15,7 +16,16 @@ import ( "github.com/snowplow-devops/stream-replicator/pkg/testutil" ) -func TestSQSTarget_WriteFailure(t *testing.T) { +// TestNewSqsTarget_AWSConnectionCheck tests that the SQS target fails on start-up if the connection to AWS fails +func TestNewSqsTarget_AWSConnectionCheck(t *testing.T) { + assert := assert.New(t) + + target, err := newSQSTarget(testutil.AWSLocalstackRegion, "not-exists", `arn:aws:sqs:us-east-1:00000000000:not-exists`) + assert.Nil(target) + assert.EqualError(err, "NoCredentialProviders: no valid providers in chain. Deprecated.\n\tFor verbose messaging see aws.Config.CredentialsChainVerboseErrors") +} + +func TestSQSTarget_SQSConnectionFailure(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } @@ -25,13 +35,11 @@ func TestSQSTarget_WriteFailure(t *testing.T) { client := testutil.GetAWSLocalstackSQSClient() target, err := newSQSTargetWithInterfaces(client, "00000000000", testutil.AWSLocalstackRegion, "not-exists") - assert.Nil(err) - assert.NotNil(target) - assert.Equal("arn:aws:sqs:us-east-1:00000000000:not-exists", target.GetID()) - - res, err := target.Write(nil) - assert.Nil(err) - assert.NotNil(res) + assert.Nil(target) + assert.NotNil(err) + if err != nil { + assert.True(strings.HasPrefix(err.Error(), `Could not connect to SQS`)) + } } func TestSQSTarget_WriteSuccess(t *testing.T) {