Skip to content

Commit

Permalink
Fail Stream Replicator on startup if source or target isn't reachable (
Browse files Browse the repository at this point in the history
…closes #182)
  • Loading branch information
TiganeteaRobert committed Aug 29, 2022
1 parent d832b17 commit 35cc351
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 149 deletions.
6 changes: 6 additions & 0 deletions pkg/source/kinesis/kinesis_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynam
// TODO: See if the client name can be reused to survive same node reboots
name := uuid.NewV4().String()

// test the connection to kinesis by trying to make an API call
_, err := kinesisClient.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &streamName})
if err != nil {
return nil, err
}

k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, name, config)
if err != nil {
return nil, errors.Wrap(err, "Failed to create Kinsumer client")
Expand Down
89 changes: 57 additions & 32 deletions pkg/source/kinesis/kinesis_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,49 +66,23 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) {

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 10, 10, nil)

fmt.Println(source.clientRecordMaxAge)
assert.IsType(&kinesisSource{}, source)
assert.Nil(err)
}

// newKinesisSourceWithInterfaces should fail if we can't reach Kinesis and DDB, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
/*
func TestNewKinesisSourceWithInterfaces_Failure(t *testing.T) {
// TestNewKinesisSourceWithInterfaces_ConnectionCheck tests that the Kinesis source fails on start-up if the connection to Kinesis fails
func TestNewKinesisSourceWithInterfaces_ConnectionCheck(t *testing.T) {
// Unlike the success test, we don't require anything to exist for this one
assert := assert.New(t)

// Set up localstack resources
kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil,10, 10, &fifty)
_, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil, 10, 10, nil)

assert.Nil(&kinesisSource{}, source)
assert.NotNil(err)
}
*/

// TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change.
func TestKinesisSource_ReadFailure_NoResources(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

assert := assert.New(t)

kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 10, 10, &fifty)
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID())

err = source.Read(nil)
assert.NotNil(err)
if err != nil {
assert.Equal("Failed to start Kinsumer client: error describing table fake-name_checkpoints: ResourceNotFoundException: Cannot do operations on a non-existent table", err.Error())
assert.Equal("ResourceNotFoundException: Stream nonexistent-stream under account 000000000000 not found.", err.Error())
}
}

Expand Down Expand Up @@ -228,6 +202,10 @@ func putNRecordsIntoKinesis(kinesisClient kinesisiface.KinesisAPI, n int, stream
}

func TestGetSource_WithKinesisSource(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

assert := assert.New(t)

// Set up localstack resources
Expand Down Expand Up @@ -281,10 +259,34 @@ func TestGetSource_ConfigErrorLeaderAction(t *testing.T) {

assert := assert.New(t)

// Set up localstack resources
kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "something", "test", nil, 0, 10, &one)
streamName := "kinesis-source-integration-2"
createErr := testutil.CreateAWSLocalstackKinesisStream(kinesisClient, streamName)
if createErr != nil {
t.Fatal(createErr)
}
defer testutil.DeleteAWSLocalstackKinesisStream(kinesisClient, streamName)

appName := "integration"
ddbErr := testutil.CreateAWSLocalstackDynamoDBTables(dynamodbClient, appName)
if ddbErr != nil {
t.Fatal(ddbErr)
}
defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName)

// Put ten records into kinesis stream
putErr := putNRecordsIntoKinesis(kinesisClient, 10, streamName, "Test")
if putErr != nil {
t.Fatal(putErr)
}

time.Sleep(1 * time.Second)

// Create the source and assert that it's there
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 0, 10, &one)

assert.Nil(source)
assert.EqualError(err, `Failed to create Kinsumer client: leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency`)
Expand All @@ -297,10 +299,33 @@ func TestGetSource_ConfigErrorMaxAge(t *testing.T) {

assert := assert.New(t)

// Set up localstack resources
kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "something", "test", nil, 10, 10, &one)
streamName := "kinesis-source-integration-10"
createErr := testutil.CreateAWSLocalstackKinesisStream(kinesisClient, streamName)
if createErr != nil {
t.Fatal(createErr)
}
defer testutil.DeleteAWSLocalstackKinesisStream(kinesisClient, streamName)

appName := "integration"
ddbErr := testutil.CreateAWSLocalstackDynamoDBTables(dynamodbClient, appName)
if ddbErr != nil {
t.Fatal(ddbErr)
}
defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName)

// Put ten records into kinesis stream
putErr := putNRecordsIntoKinesis(kinesisClient, 10, streamName, "Test")
if putErr != nil {
t.Fatal(putErr)
}

time.Sleep(1 * time.Second)

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 10, 10, &one)

assert.Nil(source)
assert.EqualError(err, `Failed to create Kinsumer client: clientRecordMaxAge value must be at least as long as shardCheckFrequency`)
Expand Down
9 changes: 9 additions & 0 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri
return nil, errors.Wrap(err, "Failed to create PubSub client")
}

sub := client.SubscriptionInProject(subscriptionID, projectID)
exists, err := sub.Exists(ctx)
if err != nil {
return nil, errors.Wrap(err, "Connection to PubSub failed")
}
if !exists {
return nil, errors.New("Connection to PubSub failed, subscription does not exist")
}

return &pubSubSource{
projectID: projectID,
client: client,
Expand Down
35 changes: 31 additions & 4 deletions pkg/source/pubsub/pubsub_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,22 @@ func TestPubSubSource_ReadAndReturnSuccessIntegration(t *testing.T) {
}
}

// newPubSubSource_Failure should fail if we can't reach PubSub, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
/*
// newPubSubSource_Failure should fail if we can't reach PubSub
func TestNewPubSubSource_Failure(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
assert := assert.New(t)

srv, _ := testutil.InitMockPubsubServer(8010, nil, t)
defer srv.Close()

pubsubSource, err := newPubSubSource(10, "nonexistent-project", "nonexistent-subscription")
assert.NotNil(err)
assert.Nil(pubsubSource)
// This should return an error when we can't connect, rather than proceeding to the Write() function before we hit a problem.
assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`)
}
*/

// TestNewPubSubSource_Success tests the typical case of creating a new pubsub source.
func TestNewPubSubSource_Success(t *testing.T) {
Expand All @@ -86,7 +88,8 @@ func TestNewPubSubSource_Success(t *testing.T) {
}
assert := assert.New(t)

testutil.InitMockPubsubServer(8010, nil, t)
srv, _ := testutil.InitMockPubsubServer(8010, nil, t)
defer srv.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub")
assert.Nil(err)
Expand Down Expand Up @@ -132,6 +135,30 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) {
assert.Equal(expected, msgDatas)
}

// TestPubSubSource_ConnCheckErrWithMocks unit tests PubSub source with connection error
func TestPubSubSource_ConnCheckErrWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
srv.Close()
conn.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong")
assert.Nil(pubsubSource)
assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`)
}

// TestPubSubSource_SubFailWithMocks unit tests PubSub source initiation with wrong sub name
func TestPubSubSource_SubFailWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
defer srv.Close()
defer conn.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong")
assert.Nil(pubsubSource)
assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`)
}

// TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks tests the behaviour of pubsub source when some messages take longer to ack than others
func TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks(t *testing.T) {
assert := assert.New(t)
Expand Down
5 changes: 5 additions & 0 deletions pkg/source/sqs/sqs_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ var ConfigPair = sourceconfig.ConfigPair{
// newSQSSourceWithInterfaces allows you to provide an SQS client directly to allow
// for mocking and localstack usage
func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, concurrentWrites int, region string, queueName string) (*sqsSource, error) {
_, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: &queueName})
if err != nil {
return nil, errors.Wrap(err, `Could not connect to SQS`)
}

return &sqsSource{
client: client,
queueName: queueName,
Expand Down
42 changes: 18 additions & 24 deletions pkg/source/sqs/sqs_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"

Expand All @@ -30,6 +31,20 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}

// TestNewSqsSource_AWSConnectionCheck tests that the SQS source fails on start-up if the connection to AWS fails
func TestNewSqsSource_AWSConnectionCheck(t *testing.T) {
assert := assert.New(t)

target, err := configFunction(&configuration{
QueueName: "not-exists",
Region: testutil.AWSLocalstackRegion,
RoleARN: `arn:aws:sqs:us-east-1:00000000000:not-exists`,
ConcurrentWrites: 15,
})
assert.Nil(target)
assert.EqualError(err, "NoCredentialProviders: no valid providers in chain. Deprecated.\n\tFor verbose messaging see aws.Config.CredentialsChainVerboseErrors")
}

// func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, concurrentWrites int, region string, queueName string) (*sqsSource, error) {
func TestNewSQSSourceWithInterfaces_Success(t *testing.T) {
if testing.Short() {
Expand All @@ -51,9 +66,8 @@ func TestNewSQSSourceWithInterfaces_Success(t *testing.T) {
assert.Nil(err)
}

// newSQSSourceWithInterfaces should fail if we can't reach SQS, commented out this test until we look into https://github.com/snowplow-devops/stream-replicator/issues/151
/*
func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) {
// newSQSSourceWithInterfaces should fail if we can't reach SQS
func TestNewSQSSourceWithInterfaces_SQSConnectionFailure(t *testing.T) {
// Unlike the success test, we don't require anything to exist for this one
assert := assert.New(t)

Expand All @@ -63,28 +77,8 @@ func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) {

assert.Nil(source)
assert.NotNil(err)
}
*/

// TODO: When we address https://github.com/snowplow-devops/stream-replicator/issues/151, this test will need to change.
func TestSQSSource_ReadFailure(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

assert := assert.New(t)

client := testutil.GetAWSLocalstackSQSClient()

source, err := newSQSSourceWithInterfaces(client, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists")
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:sqs:us-east-1:00000000000:not-exists", source.GetID())

err = source.Read(nil)
assert.NotNil(err)
if err != nil {
assert.Equal("Failed to get SQS queue URL: AWS.SimpleQueueService.NonExistentQueue: AWS.SimpleQueueService.NonExistentQueue; see the SQS docs.\n\tstatus code: 400, request id: 00000000-0000-0000-0000-000000000000", err.Error())
assert.True(strings.HasPrefix(err.Error(), `Could not connect to SQS`))
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/target/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func newEventHubTarget(cfg *EventHubConfig) (*EventHubTarget, error) {
// If none is specified, it will retry indefinitely until the context times out, which hides the actual error message
// To avoid obscuring errors, contextTimeoutInSeconds should be configured to ensure all retries may be completed before its expiry

// get the runtime information of the event hub in order to check the connection
_, err = hub.GetRuntimeInformation(context.Background())
if err != nil {
return nil, errors.Errorf("Error initialising EventHub client: could not reach Event Hub: %v", err)
}

return newEventHubTargetWithInterfaces(hub, cfg), err
}

Expand Down
Loading

0 comments on commit 35cc351

Please sign in to comment.