From adb452ba5f917aa0fc01e8f02a2cc25661c33ef5 Mon Sep 17 00:00:00 2001 From: TiganeteaRobert Date: Wed, 3 Aug 2022 16:10:57 +0300 Subject: [PATCH] Fail Stream Replicator on startup if source or target isn't reachable (closes #182) --- pkg/source/pubsub/pubsub_source.go | 9 +++++ pkg/source/pubsub/pubsub_source_test.go | 24 +++++++++++ pkg/target/eventhub.go | 6 +++ pkg/target/eventhub_test.go | 36 +++++++---------- pkg/target/http.go | 21 ++++++++-- pkg/target/http_test.go | 54 ++++++------------------- pkg/target/pubsub.go | 9 +++++ pkg/target/pubsub_test.go | 24 +++++++++++ 8 files changed, 117 insertions(+), 66 deletions(-) diff --git a/pkg/source/pubsub/pubsub_source.go b/pkg/source/pubsub/pubsub_source.go index acb10626..6b2f493b 100644 --- a/pkg/source/pubsub/pubsub_source.go +++ b/pkg/source/pubsub/pubsub_source.go @@ -96,6 +96,15 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri return nil, errors.Wrap(err, "Failed to create PubSub client") } + sub := client.SubscriptionInProject(subscriptionID, projectID) + exists, err := sub.Exists(ctx) + if err != nil { + return nil, errors.Wrap(err, "Connection to PubSub failed") + } + if !exists { + return nil, errors.New("Connection to PubSub failed, subscription does not exist") + } + return &pubSubSource{ projectID: projectID, client: client, diff --git a/pkg/source/pubsub/pubsub_source_test.go b/pkg/source/pubsub/pubsub_source_test.go index 8de38102..91a8ca09 100644 --- a/pkg/source/pubsub/pubsub_source_test.go +++ b/pkg/source/pubsub/pubsub_source_test.go @@ -132,6 +132,30 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) { assert.Equal(expected, msgDatas) } +// TestPubSubSource_ConnCheckErrWithMocks unit tests PubSub source with connection error +func TestPubSubSource_ConnCheckErrWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + srv.Close() + conn.Close() + + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong") + assert.Nil(pubsubSource) + assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`) +} + +// TestPubSubSource_SubFailWithMocks unit tests PubSub source initiation with wrong sub name +func TestPubSubSource_SubFailWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + defer srv.Close() + defer conn.Close() + + pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong") + assert.Nil(pubsubSource) + assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`) +} + // TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks tests the behaviour of pubsub source when some messages take longer to ack than others func TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks(t *testing.T) { assert := assert.New(t) diff --git a/pkg/target/eventhub.go b/pkg/target/eventhub.go index 62a31abf..cd576cf2 100644 --- a/pkg/target/eventhub.go +++ b/pkg/target/eventhub.go @@ -96,6 +96,12 @@ func newEventHubTarget(cfg *EventHubConfig) (*EventHubTarget, error) { // If none is specified, it will retry indefinitely until the context times out, which hides the actual error message // To avoid obscuring errors, contextTimeoutInSeconds should be configured to ensure all retries may be completed before its expiry + // get the runtime information of the event hub in order to check the connection + _, err = hub.GetRuntimeInformation(context.Background()) + if err != nil { + return nil, errors.Errorf("Error initialising EventHub client: could not reach Event Hub: %v", err) + } + return newEventHubTargetWithInterfaces(hub, cfg), err } diff --git a/pkg/target/eventhub_test.go b/pkg/target/eventhub_test.go index 611c5c18..ccd73209 100644 --- a/pkg/target/eventhub_test.go +++ b/pkg/target/eventhub_test.go @@ -364,41 +364,35 @@ func TestWriteFailure(t *testing.T) { assert.Nil(twres.Invalid) } -// TestNewEventHubTarget_KeyValue tests that we can initialise a client with key value credentials. -func TestNewEventHubTarget_KeyValue(t *testing.T) { +// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values. +func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) { assert := assert.New(t) - // Test that we can initialise a client with Key and Value - t.Setenv("EVENTHUB_KEY_NAME", "fake") - t.Setenv("EVENTHUB_KEY_VALUE", "fake") - tgt, err := newEventHubTarget(&cfg) - assert.Nil(err) - assert.NotNil(tgt) + assert.NotNil(err) + if err != nil { + assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error()) + } + assert.Nil(tgt) } -// TestNewEventHubTarget_ConnString tests that we can initialise a client with connection string credentials. -func TestNewEventHubTarget_ConnString(t *testing.T) { +func TestNewEventHubTarget_ConnStringErr(t *testing.T) { assert := assert.New(t) - - // Test that we can initialise a client with Connection String - t.Setenv("EVENTHUB_CONNECTION_STRING", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=fake;SharedAccessKey=fake") tgt, err := newEventHubTarget(&cfg) - assert.Nil(err) - assert.NotNil(tgt) + assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`) + assert.Nil(tgt) } -// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values. -func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) { +func TestNewEventHubTarget_ConnVarsErr(t *testing.T) { assert := assert.New(t) + t.Setenv("EVENTHUB_KEY_NAME", "fake") + t.Setenv("EVENTHUB_KEY_VALUE", "fake") + tgt, err := newEventHubTarget(&cfg) - assert.NotNil(err) - if err != nil { - assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error()) - } + assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`) assert.Nil(tgt) } diff --git a/pkg/target/http.go b/pkg/target/http.go index ec0af45c..efe2eb0a 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -108,11 +108,24 @@ func newHTTPTarget(httpURL string, requestTimeout int, byteLimit int, contentTyp transport.TLSClientConfig = tlsConfig } + client := &http.Client{ + Transport: transport, + Timeout: time.Duration(requestTimeout) * time.Second, + } + + // send a HEAD request to the URL to check the connection + request, err := http.NewRequest("HEAD", httpURL, nil) + if err != nil { + return nil, errors.Wrap(err, `Error creating HEAD request`) + } + resp, err := client.Do(request) + if err != nil { + return nil, errors.Wrap(err, `Connection to host error`) + } + defer resp.Body.Close() + return &HTTPTarget{ - client: &http.Client{ - Transport: transport, - Timeout: time.Duration(requestTimeout) * time.Second, - }, + client: client, httpURL: httpURL, byteLimit: byteLimit, contentType: contentType, diff --git a/pkg/target/http_test.go b/pkg/target/http_test.go index 7ab3eb86..1d1b09a6 100644 --- a/pkg/target/http_test.go +++ b/pkg/target/http_test.go @@ -119,10 +119,10 @@ func TestAddHeadersToRequest(t *testing.T) { func TestNewHTTPTarget(t *testing.T) { assert := assert.New(t) - httpTarget, err := newHTTPTarget("http://something", 5, 1048576, "application/json", "", "", "", "", "", "", true) + httpTarget, err := newHTTPTarget("http://wrong-url-12345678.com", 5, 1048576, "application/json", "", "", "", "", "", "", true) - assert.Nil(err) - assert.NotNil(httpTarget) + assert.EqualError(err, `Connection to host error: Head "http://wrong-url-12345678.com": dial tcp: lookup wrong-url-12345678.com: no such host`) + assert.Nil(httpTarget) failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1048576, "application/json", "", "", "", "", "", "", true) @@ -145,6 +145,8 @@ func TestHttpWrite_Simple(t *testing.T) { var results [][]byte wg := sync.WaitGroup{} + // add to waitGroup for connection check HEAD request + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() @@ -166,8 +168,8 @@ func TestHttpWrite_Simple(t *testing.T) { assert.Nil(err1) assert.Equal(501, len(writeResult.Sent)) - assert.Equal(501, len(results)) - for _, result := range results { + assert.Equal(502, len(results)) + for _, result := range results[1:] { assert.Equal("Hello Server!!", string(result)) } @@ -179,6 +181,7 @@ func TestHttpWrite_Concurrent(t *testing.T) { var results [][]byte wg := sync.WaitGroup{} + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() @@ -209,51 +212,20 @@ func TestHttpWrite_Concurrent(t *testing.T) { wg.Wait() - assert.Equal(10, len(results)) - for _, result := range results { + assert.Equal(11, len(results)) + for _, result := range results[1:] { assert.Equal("Hello Server!!", string(result)) } assert.Equal(int64(10), ackOps) } -func TestHttpWrite_Failure(t *testing.T) { - assert := assert.New(t) - - var results [][]byte - wg := sync.WaitGroup{} - server := createTestServer(&results, &wg) - defer server.Close() - - target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1048576, "application/json", "", "", "", "", "", "", true) - if err != nil { - t.Fatal(err) - } - - var ackOps int64 - ackFunc := func() { - atomic.AddInt64(&ackOps, 1) - } - - messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc) - - writeResult, err1 := target.Write(messages) - - assert.NotNil(err1) - if err1 != nil { - assert.Regexp("Error sending http request: 10 errors occurred:.*", err1.Error()) - } - - assert.Equal(10, len(writeResult.Failed)) - assert.Nil(writeResult.Sent) - assert.Nil(writeResult.Oversized) -} - func TestHttpWrite_Oversized(t *testing.T) { assert := assert.New(t) var results [][]byte wg := sync.WaitGroup{} + wg.Add(1) server := createTestServer(&results, &wg) defer server.Close() @@ -278,8 +250,8 @@ func TestHttpWrite_Oversized(t *testing.T) { assert.Nil(err1) assert.Equal(10, len(writeResult.Sent)) assert.Equal(1, len(writeResult.Oversized)) - assert.Equal(10, len(results)) - for _, result := range results { + assert.Equal(11, len(results)) + for _, result := range results[1:] { assert.Equal("Hello Server!!", string(result)) } diff --git a/pkg/target/pubsub.go b/pkg/target/pubsub.go index 2af180ce..13ebdefd 100644 --- a/pkg/target/pubsub.go +++ b/pkg/target/pubsub.go @@ -57,6 +57,15 @@ func newPubSubTarget(projectID string, topicName string) (*PubSubTarget, error) return nil, errors.Wrap(err, "Failed to create PubSub client") } + sub := client.TopicInProject(topicName, projectID) + exists, err := sub.Exists(ctx) + if err != nil { + return nil, errors.Wrap(err, "Connection to PubSub failed") + } + if !exists { + return nil, errors.New("Connection to PubSub failed, topic does not exist") + } + return &PubSubTarget{ projectID: projectID, client: client, diff --git a/pkg/target/pubsub_test.go b/pkg/target/pubsub_test.go index 74ffeeaf..99297fe7 100644 --- a/pkg/target/pubsub_test.go +++ b/pkg/target/pubsub_test.go @@ -150,6 +150,30 @@ func TestPubSubTarget_WriteSuccessWithMocks(t *testing.T) { assert.Equal(int64(10), ackOps) } +// TestPubSubTarget_ConnCheckErrWithMocks unit tests the Pubsub target initialization with connection error +func TestPubSubTarget_ConnCheckErrWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + srv.Close() + conn.Close() + + pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) + assert.Nil(pubsubTarget) + assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`) +} + +// TestPubSubTarget_TopicFailWithMocks unit tests the Pubsub target initialization with wrong topic name +func TestPubSubTarget_TopicFailWithMocks(t *testing.T) { + assert := assert.New(t) + srv, conn := testutil.InitMockPubsubServer(8563, nil, t) + defer srv.Close() + defer conn.Close() + + pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`) + assert.Nil(pubsubTarget) + assert.EqualError(err, `Connection to PubSub failed, topic does not exist`) +} + // TestPubSubTarget_WriteFailureWithMocks unit tests the unhappy path for PubSub target func TestPubSubTarget_WriteFailureWithMocks(t *testing.T) { assert := assert.New(t)