Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail Stream Replicator on startup if source or target isn't reachable #205

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform
if len(messagesToSend) > 0 {
err2 := retry.Exponential(5, time.Second, "failureTarget.WriteOversized", func() error {
res, err := ft.WriteOversized(t.MaximumAllowedMessageSizeBytes(), messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
log.Fatal("Oversized message transformation resulted in new oversized / invalid messages")
}
Expand All @@ -238,6 +241,9 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform
if len(messagesToSend) > 0 {
err3 := retry.Exponential(5, time.Second, "failureTarget.WriteInvalid", func() error {
res, err := ft.WriteInvalid(messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
log.Fatal("Invalid message transformation resulted in new invalid / oversized messages")
}
Expand Down
9 changes: 9 additions & 0 deletions config/examples/sources/kinesis-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,14 @@ source {

# Number of events to process concurrently (default: 50)
concurrent_writes = 15

# Delay between tests for the client or shard numbers changing (in seconds)
shard_check_frequency_seconds = 60

# Time between leader actions (in seconds)
leader_action_frequency_seconds = 60

# Max age for client record before we consider it stale (in seconds)
client_record_max_age_seconds = 120
}
}
15 changes: 9 additions & 6 deletions config/test-fixtures/source-kinesis-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

source {
use "kinesis" {
stream_name = "testStream"
region = "us-test-1"
role_arn = "xxx-test-role-arn"
app_name = "testApp"
start_timestamp = "2022-03-15 07:52:53"
concurrent_writes = 51
stream_name = "testStream"
region = "us-test-1"
role_arn = "xxx-test-role-arn"
app_name = "testApp"
start_timestamp = "2022-03-15 07:52:53"
concurrent_writes = 51
shard_check_frequency_seconds = 20
client_record_max_age_seconds = 30
leader_action_frequency_seconds = 25
}
}
3 changes: 3 additions & 0 deletions pkg/models/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Message struct {
// any cleanup process for the source is actioned
AckFunc func()

// Metadata holds the message's metadata
Metadata map[string]interface{}

// If the message is invalid it can be decorated with an error
// message for logging and reporting
err error
Expand Down
72 changes: 49 additions & 23 deletions pkg/source/kinesis/kinesis_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,29 @@ import (

// configuration configures the source for records pulled
type configuration struct {
StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"`
Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"`
AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"`
RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"`
StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional)
ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"`
StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"`
Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"`
AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"`
RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"`
StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional)
ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"`
ClientRecordMaxAge *int `hcl:"client_record_max_age_seconds,optional" env:"SOURCE_KINESIS_CLIENT_MAX_AGE_SECONDS"`
ShardCheckFrequency int `hcl:"shard_check_frequency_seconds,optional" env:"SOURCE_KINESIS_SHARD_FREQUENCY_SECONDS"`
LeaderActionFrequency int `hcl:"leader_action_frequency_seconds,optional" env:"SOURCE_KINESIS_LEADER_FREQUENCY_SECONDS"`
}

// --- Kinesis source

// kinesisSource holds a new client for reading messages from kinesis
type kinesisSource struct {
client *kinsumer.Kinsumer
streamName string
concurrentWrites int
region string
accountID string
client *kinsumer.Kinsumer
streamName string
concurrentWrites int
region string
accountID string
clientRecordMaxAge *int
shardCheckFrequency int
leaderActionFrequency int

log *log.Entry
}
Expand Down Expand Up @@ -75,7 +81,10 @@ func configFunctionGeneratorWithInterfaces(kinesisClient kinesisiface.KinesisAPI
c.Region,
c.StreamName,
c.AppName,
&iteratorTstamp)
&iteratorTstamp,
c.LeaderActionFrequency,
c.ShardCheckFrequency,
c.ClientRecordMaxAge)
}
}

Expand Down Expand Up @@ -109,7 +118,9 @@ func (f adapter) Create(i interface{}) (interface{}, error) {
func (f adapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &configuration{
ConcurrentWrites: 50,
ConcurrentWrites: 50,
ShardCheckFrequency: 10,
LeaderActionFrequency: 10,
}

return cfg, nil
Expand Down Expand Up @@ -145,30 +156,45 @@ func (kl *KinsumerLogrus) Log(format string, v ...interface{}) {

// newKinesisSourceWithInterfaces allows you to provide a Kinesis + DynamoDB client directly to allow
// for mocking and localstack usage
func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynamodbClient dynamodbiface.DynamoDBAPI, awsAccountID string, concurrentWrites int, region string, streamName string, appName string, startTimestamp *time.Time) (*kinesisSource, error) {
func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynamodbClient dynamodbiface.DynamoDBAPI, awsAccountID string, concurrentWrites int, region string, streamName string, appName string, startTimestamp *time.Time, leaderActionFrequency, shardCheckFrequency int, clientRecordMaxAge *int) (*kinesisSource, error) {
// TODO: Add statistics monitoring to be able to report on consumer latency

config := kinsumer.NewConfig().
WithShardCheckFrequency(10 * time.Second).
WithLeaderActionFrequency(10 * time.Second).
WithShardCheckFrequency(time.Duration(shardCheckFrequency) * time.Second).
WithLeaderActionFrequency(time.Duration(leaderActionFrequency) * time.Second).
WithManualCheckpointing(true).
WithLogger(&KinsumerLogrus{}).
WithIteratorStartTimestamp(startTimestamp)

if clientRecordMaxAge != nil {
maxAge := time.Duration(*clientRecordMaxAge) * time.Second
config = config.WithClientRecordMaxAge(&maxAge)
}

// TODO: See if the client name can be reused to survive same node reboots
name := uuid.NewV4().String()

// test the connection to kinesis by trying to make an API call
_, err := kinesisClient.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &streamName})
if err != nil {
return nil, err
}

k, err := kinsumer.NewWithInterfaces(kinesisClient, dynamodbClient, streamName, appName, name, config)
if err != nil {
return nil, errors.Wrap(err, "Failed to create Kinsumer client")
}

return &kinesisSource{
client: k,
streamName: streamName,
concurrentWrites: concurrentWrites,
region: region,
accountID: awsAccountID,
log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}),
client: k,
streamName: streamName,
concurrentWrites: concurrentWrites,
region: region,
accountID: awsAccountID,
log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}),
clientRecordMaxAge: clientRecordMaxAge,
shardCheckFrequency: shardCheckFrequency,
leaderActionFrequency: leaderActionFrequency,
}, nil
}

Expand Down Expand Up @@ -243,7 +269,7 @@ func (ks *kinesisSource) Read(sf *sourceiface.SourceFunctions) error {
case <-time.After(10 * time.Second):
// Append errors and crash
multierror.Append(kinesisPullErr, errors.Errorf("wg.Wait() took too long, forcing app close."))
ks.log.WithFields(log.Fields{"error": err}).Fatal(err)
ks.log.WithFields(log.Fields{"error": kinesisPullErr}).Fatal(kinesisPullErr)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed randomly that the error we logged here was the wrong one, fixed it.

Copy link
Collaborator

@colmsnowplow colmsnowplow Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot - Can we pull this into its own issue for auditability - it's a distinct bug which might be important to some debugging investigation in future - so I'd prefer it to have its own audit trail.

(Doesn't need a separate PR though, just its own commit and issue within this PR is fine)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue here: #209 and a separate commit here for it.

}

// Return kinesisPullErr if we have one
Expand Down
Loading