Skip to content

Commit

Permalink
Fail Stream Replicator on startup if source or target isn't reachable (
Browse files Browse the repository at this point in the history
…closes #182)
  • Loading branch information
TiganeteaRobert committed Aug 4, 2022
1 parent 4f474cf commit adb452b
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 66 deletions.
9 changes: 9 additions & 0 deletions pkg/source/pubsub/pubsub_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ func newPubSubSource(concurrentWrites int, projectID string, subscriptionID stri
return nil, errors.Wrap(err, "Failed to create PubSub client")
}

sub := client.SubscriptionInProject(subscriptionID, projectID)
exists, err := sub.Exists(ctx)
if err != nil {
return nil, errors.Wrap(err, "Connection to PubSub failed")
}
if !exists {
return nil, errors.New("Connection to PubSub failed, subscription does not exist")
}

return &pubSubSource{
projectID: projectID,
client: client,
Expand Down
24 changes: 24 additions & 0 deletions pkg/source/pubsub/pubsub_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,30 @@ func TestPubSubSource_ReadAndReturnSuccessWithMock(t *testing.T) {
assert.Equal(expected, msgDatas)
}

// TestPubSubSource_ConnCheckErrWithMocks unit tests PubSub source with connection error
func TestPubSubSource_ConnCheckErrWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
srv.Close()
conn.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong")
assert.Nil(pubsubSource)
assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`)
}

// TestPubSubSource_SubFailWithMocks unit tests PubSub source initiation with wrong sub name
func TestPubSubSource_SubFailWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
defer srv.Close()
defer conn.Close()

pubsubSource, err := newPubSubSource(10, "project-test", "test-sub-wrong")
assert.Nil(pubsubSource)
assert.EqualError(err, `Connection to PubSub failed, subscription does not exist`)
}

// TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks tests the behaviour of pubsub source when some messages take longer to ack than others
func TestPubSubSource_ReadAndReturnSuccessWithMock_DelayedAcks(t *testing.T) {
assert := assert.New(t)
Expand Down
6 changes: 6 additions & 0 deletions pkg/target/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func newEventHubTarget(cfg *EventHubConfig) (*EventHubTarget, error) {
// If none is specified, it will retry indefinitely until the context times out, which hides the actual error message
// To avoid obscuring errors, contextTimeoutInSeconds should be configured to ensure all retries may be completed before its expiry

// get the runtime information of the event hub in order to check the connection
_, err = hub.GetRuntimeInformation(context.Background())
if err != nil {
return nil, errors.Errorf("Error initialising EventHub client: could not reach Event Hub: %v", err)
}

return newEventHubTargetWithInterfaces(hub, cfg), err
}

Expand Down
36 changes: 15 additions & 21 deletions pkg/target/eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,41 +364,35 @@ func TestWriteFailure(t *testing.T) {
assert.Nil(twres.Invalid)
}

// TestNewEventHubTarget_KeyValue tests that we can initialise a client with key value credentials.
func TestNewEventHubTarget_KeyValue(t *testing.T) {
// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values.
func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) {
assert := assert.New(t)

// Test that we can initialise a client with Key and Value
t.Setenv("EVENTHUB_KEY_NAME", "fake")
t.Setenv("EVENTHUB_KEY_VALUE", "fake")

tgt, err := newEventHubTarget(&cfg)
assert.Nil(err)
assert.NotNil(tgt)
assert.NotNil(err)
if err != nil {
assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error())
}
assert.Nil(tgt)
}

// TestNewEventHubTarget_ConnString tests that we can initialise a client with connection string credentials.
func TestNewEventHubTarget_ConnString(t *testing.T) {
func TestNewEventHubTarget_ConnStringErr(t *testing.T) {
assert := assert.New(t)

// Test that we can initialise a client with Connection String

t.Setenv("EVENTHUB_CONNECTION_STRING", "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=fake;SharedAccessKey=fake")

tgt, err := newEventHubTarget(&cfg)
assert.Nil(err)
assert.NotNil(tgt)
assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`)
assert.Nil(tgt)
}

// TestNewEventHubTarget_CredentialsNotFound tests that we fail on startup when we're not provided with appropriate credential values.
func TestNewEventHubTarget_CredentialsNotFound(t *testing.T) {
func TestNewEventHubTarget_ConnVarsErr(t *testing.T) {
assert := assert.New(t)

t.Setenv("EVENTHUB_KEY_NAME", "fake")
t.Setenv("EVENTHUB_KEY_VALUE", "fake")

tgt, err := newEventHubTarget(&cfg)
assert.NotNil(err)
if err != nil {
assert.Equal("Error initialising EventHub client: No valid combination of authentication Env vars found. https://pkg.go.dev/github.com/Azure/azure-event-hubs-go#NewHubWithNamespaceNameAndEnvironment", err.Error())
}
assert.EqualError(err, `Error initialising EventHub client: could not reach Event Hub: dial tcp: lookup test.servicebus.windows.net: no such host`)
assert.Nil(tgt)
}

Expand Down
21 changes: 17 additions & 4 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,24 @@ func newHTTPTarget(httpURL string, requestTimeout int, byteLimit int, contentTyp
transport.TLSClientConfig = tlsConfig
}

client := &http.Client{
Transport: transport,
Timeout: time.Duration(requestTimeout) * time.Second,
}

// send a HEAD request to the URL to check the connection
request, err := http.NewRequest("HEAD", httpURL, nil)
if err != nil {
return nil, errors.Wrap(err, `Error creating HEAD request`)
}
resp, err := client.Do(request)
if err != nil {
return nil, errors.Wrap(err, `Connection to host error`)
}
defer resp.Body.Close()

return &HTTPTarget{
client: &http.Client{
Transport: transport,
Timeout: time.Duration(requestTimeout) * time.Second,
},
client: client,
httpURL: httpURL,
byteLimit: byteLimit,
contentType: contentType,
Expand Down
54 changes: 13 additions & 41 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ func TestAddHeadersToRequest(t *testing.T) {
func TestNewHTTPTarget(t *testing.T) {
assert := assert.New(t)

httpTarget, err := newHTTPTarget("http://something", 5, 1048576, "application/json", "", "", "", "", "", "", true)
httpTarget, err := newHTTPTarget("http://wrong-url-12345678.com", 5, 1048576, "application/json", "", "", "", "", "", "", true)

assert.Nil(err)
assert.NotNil(httpTarget)
assert.EqualError(err, `Connection to host error: Head "http://wrong-url-12345678.com": dial tcp: lookup wrong-url-12345678.com: no such host`)
assert.Nil(httpTarget)

failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1048576, "application/json", "", "", "", "", "", "", true)

Expand All @@ -145,6 +145,8 @@ func TestHttpWrite_Simple(t *testing.T) {

var results [][]byte
wg := sync.WaitGroup{}
// add to waitGroup for connection check HEAD request
wg.Add(1)
server := createTestServer(&results, &wg)
defer server.Close()

Expand All @@ -166,8 +168,8 @@ func TestHttpWrite_Simple(t *testing.T) {

assert.Nil(err1)
assert.Equal(501, len(writeResult.Sent))
assert.Equal(501, len(results))
for _, result := range results {
assert.Equal(502, len(results))
for _, result := range results[1:] {
assert.Equal("Hello Server!!", string(result))
}

Expand All @@ -179,6 +181,7 @@ func TestHttpWrite_Concurrent(t *testing.T) {

var results [][]byte
wg := sync.WaitGroup{}
wg.Add(1)
server := createTestServer(&results, &wg)
defer server.Close()

Expand Down Expand Up @@ -209,51 +212,20 @@ func TestHttpWrite_Concurrent(t *testing.T) {

wg.Wait()

assert.Equal(10, len(results))
for _, result := range results {
assert.Equal(11, len(results))
for _, result := range results[1:] {
assert.Equal("Hello Server!!", string(result))
}

assert.Equal(int64(10), ackOps)
}

func TestHttpWrite_Failure(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
server := createTestServer(&results, &wg)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1048576, "application/json", "", "", "", "", "", "", true)
if err != nil {
t.Fatal(err)
}

var ackOps int64
ackFunc := func() {
atomic.AddInt64(&ackOps, 1)
}

messages := testutil.GetTestMessages(10, "Hello Server!!", ackFunc)

writeResult, err1 := target.Write(messages)

assert.NotNil(err1)
if err1 != nil {
assert.Regexp("Error sending http request: 10 errors occurred:.*", err1.Error())
}

assert.Equal(10, len(writeResult.Failed))
assert.Nil(writeResult.Sent)
assert.Nil(writeResult.Oversized)
}

func TestHttpWrite_Oversized(t *testing.T) {
assert := assert.New(t)

var results [][]byte
wg := sync.WaitGroup{}
wg.Add(1)
server := createTestServer(&results, &wg)
defer server.Close()

Expand All @@ -278,8 +250,8 @@ func TestHttpWrite_Oversized(t *testing.T) {
assert.Nil(err1)
assert.Equal(10, len(writeResult.Sent))
assert.Equal(1, len(writeResult.Oversized))
assert.Equal(10, len(results))
for _, result := range results {
assert.Equal(11, len(results))
for _, result := range results[1:] {
assert.Equal("Hello Server!!", string(result))
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/target/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func newPubSubTarget(projectID string, topicName string) (*PubSubTarget, error)
return nil, errors.Wrap(err, "Failed to create PubSub client")
}

sub := client.TopicInProject(topicName, projectID)
exists, err := sub.Exists(ctx)
if err != nil {
return nil, errors.Wrap(err, "Connection to PubSub failed")
}
if !exists {
return nil, errors.New("Connection to PubSub failed, topic does not exist")
}

return &PubSubTarget{
projectID: projectID,
client: client,
Expand Down
24 changes: 24 additions & 0 deletions pkg/target/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,30 @@ func TestPubSubTarget_WriteSuccessWithMocks(t *testing.T) {
assert.Equal(int64(10), ackOps)
}

// TestPubSubTarget_ConnCheckErrWithMocks unit tests the Pubsub target initialization with connection error
func TestPubSubTarget_ConnCheckErrWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
srv.Close()
conn.Close()

pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`)
assert.Nil(pubsubTarget)
assert.EqualError(err, `Connection to PubSub failed: context deadline exceeded`)
}

// TestPubSubTarget_TopicFailWithMocks unit tests the Pubsub target initialization with wrong topic name
func TestPubSubTarget_TopicFailWithMocks(t *testing.T) {
assert := assert.New(t)
srv, conn := testutil.InitMockPubsubServer(8563, nil, t)
defer srv.Close()
defer conn.Close()

pubsubTarget, err := newPubSubTarget(`project-test`, `test-topic-wrong`)
assert.Nil(pubsubTarget)
assert.EqualError(err, `Connection to PubSub failed, topic does not exist`)
}

// TestPubSubTarget_WriteFailureWithMocks unit tests the unhappy path for PubSub target
func TestPubSubTarget_WriteFailureWithMocks(t *testing.T) {
assert := assert.New(t)
Expand Down

0 comments on commit adb452b

Please sign in to comment.