diff --git a/config/examples/sources/kinesis-extended.hcl b/config/examples/sources/kinesis-extended.hcl index aaacacfd..5bf9ba82 100644 --- a/config/examples/sources/kinesis-extended.hcl +++ b/config/examples/sources/kinesis-extended.hcl @@ -21,5 +21,14 @@ source { # Number of events to process concurrently (default: 50) concurrent_writes = 15 + + # Delay between tests for the client or shard numbers changing (in seconds) + shard_check_frequency_seconds = 60 + + # Time between leader actions (in seconds) + leader_action_frequency_seconds = 60 + + # Max age for client record before we consider it stale (in seconds) + client_record_max_age_seconds = 120 } } diff --git a/config/test-fixtures/source-kinesis-extended.hcl b/config/test-fixtures/source-kinesis-extended.hcl index cbb85ada..08e42fae 100644 --- a/config/test-fixtures/source-kinesis-extended.hcl +++ b/config/test-fixtures/source-kinesis-extended.hcl @@ -2,11 +2,14 @@ source { use "kinesis" { - stream_name = "testStream" - region = "us-test-1" - role_arn = "xxx-test-role-arn" - app_name = "testApp" - start_timestamp = "2022-03-15 07:52:53" - concurrent_writes = 51 + stream_name = "testStream" + region = "us-test-1" + role_arn = "xxx-test-role-arn" + app_name = "testApp" + start_timestamp = "2022-03-15 07:52:53" + concurrent_writes = 51 + shard_check_frequency_seconds = 20 + client_record_max_age_seconds = 30 + leader_action_frequency_seconds = 25 } } diff --git a/pkg/source/kinesis/kinesis_source.go b/pkg/source/kinesis/kinesis_source.go index 70a76490..ff813781 100644 --- a/pkg/source/kinesis/kinesis_source.go +++ b/pkg/source/kinesis/kinesis_source.go @@ -29,23 +29,29 @@ import ( // configuration configures the source for records pulled type configuration struct { - StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"` - Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"` - AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"` - RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"` - StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional) - ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"` + StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"` + Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"` + AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"` + RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"` + StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional) + ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"` + ClientRecordMaxAge *int `hcl:"client_record_max_age_seconds,optional" env:"SOURCE_KINESIS_CLIENT_MAX_AGE_SECONDS"` + ShardCheckFrequency int `hcl:"shard_check_frequency_seconds,optional" env:"SOURCE_KINESIS_SHARD_FREQUENCY_SECONDS"` + LeaderActionFrequency int `hcl:"leader_action_frequency_seconds,optional" env:"SOURCE_KINESIS_LEADER_FREQUENCY_SECONDS"` } // --- Kinesis source // kinesisSource holds a new client for reading messages from kinesis type kinesisSource struct { - client *kinsumer.Kinsumer - streamName string - concurrentWrites int - region string - accountID string + client *kinsumer.Kinsumer + streamName string + concurrentWrites int + region string + accountID string + clientRecordMaxAge *int + shardCheckFrequency int + leaderActionFrequency int log *log.Entry } @@ -75,7 +81,10 @@ func configFunctionGeneratorWithInterfaces(kinesisClient kinesisiface.KinesisAPI c.Region, c.StreamName, c.AppName, - &iteratorTstamp) + &iteratorTstamp, + c.LeaderActionFrequency, + c.ShardCheckFrequency, + c.ClientRecordMaxAge) } } @@ -109,7 +118,9 @@ func (f adapter) Create(i interface{}) (interface{}, error) { func (f adapter) ProvideDefault() (interface{}, error) { // Provide defaults cfg := &configuration{ - ConcurrentWrites: 50, + ConcurrentWrites: 50, + ShardCheckFrequency: 10, + LeaderActionFrequency: 10, } return cfg, nil @@ -145,15 +156,21 @@ func (kl *KinsumerLogrus) Log(format string, v ...interface{}) { // newKinesisSourceWithInterfaces allows you to provide a Kinesis + DynamoDB client directly to allow // for mocking and localstack usage -func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynamodbClient dynamodbiface.DynamoDBAPI, awsAccountID string, concurrentWrites int, region string, streamName string, appName string, startTimestamp *time.Time) (*kinesisSource, error) { +func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynamodbClient dynamodbiface.DynamoDBAPI, awsAccountID string, concurrentWrites int, region string, streamName string, appName string, startTimestamp *time.Time, leaderActionFrequency, shardCheckFrequency int, clientRecordMaxAge *int) (*kinesisSource, error) { // TODO: Add statistics monitoring to be able to report on consumer latency + config := kinsumer.NewConfig(). - WithShardCheckFrequency(10 * time.Second). - WithLeaderActionFrequency(10 * time.Second). + WithShardCheckFrequency(time.Duration(shardCheckFrequency) * time.Second). + WithLeaderActionFrequency(time.Duration(leaderActionFrequency) * time.Second). WithManualCheckpointing(true). WithLogger(&KinsumerLogrus{}). WithIteratorStartTimestamp(startTimestamp) + if clientRecordMaxAge != nil { + maxAge := time.Duration(*clientRecordMaxAge) * time.Second + config = config.WithClientRecordMaxAge(&maxAge) + } + // TODO: See if the client name can be reused to survive same node reboots name := uuid.NewV4().String() @@ -163,12 +180,15 @@ func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynam } return &kinesisSource{ - client: k, - streamName: streamName, - concurrentWrites: concurrentWrites, - region: region, - accountID: awsAccountID, - log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}), + client: k, + streamName: streamName, + concurrentWrites: concurrentWrites, + region: region, + accountID: awsAccountID, + log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}), + clientRecordMaxAge: clientRecordMaxAge, + shardCheckFrequency: shardCheckFrequency, + leaderActionFrequency: leaderActionFrequency, }, nil } diff --git a/pkg/source/kinesis/kinesis_source_test.go b/pkg/source/kinesis/kinesis_source_test.go index be6e08da..e7fe07b3 100644 --- a/pkg/source/kinesis/kinesis_source_test.go +++ b/pkg/source/kinesis/kinesis_source_test.go @@ -32,6 +32,11 @@ func TestMain(m *testing.M) { os.Exit(exitVal) } +var ( + fifty = 50 + one = 1 +) + func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") @@ -59,8 +64,9 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) { defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName) - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 10, 10, nil) + fmt.Println(source.clientRecordMaxAge) assert.IsType(&kinesisSource{}, source) assert.Nil(err) } @@ -75,7 +81,7 @@ func TestNewKinesisSourceWithInterfaces_Failure(t *testing.T) { kinesisClient := testutil.GetAWSLocalstackKinesisClient() dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil,10, 10, &fifty) assert.Nil(&kinesisSource{}, source) assert.NotNil(err) @@ -94,7 +100,7 @@ func TestKinesisSource_ReadFailure_NoResources(t *testing.T) { kinesisClient := testutil.GetAWSLocalstackKinesisClient() dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 10, 10, &fifty) assert.Nil(err) assert.NotNil(source) assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID()) @@ -140,7 +146,7 @@ func TestKinesisSource_ReadMessages(t *testing.T) { time.Sleep(1 * time.Second) // Create the source and assert that it's there - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 10, 10, &fifty) assert.Nil(err) assert.NotNil(source) assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-2", source.GetID()) @@ -193,7 +199,7 @@ func TestKinesisSource_StartTimestamp(t *testing.T) { } // Create the source (with start timestamp) and assert that it's there - source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart) + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 10, 10, &fifty) assert.Nil(err) assert.NotNil(source) assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-3", source.GetID()) @@ -222,10 +228,6 @@ func putNRecordsIntoKinesis(kinesisClient kinesisiface.KinesisAPI, n int, stream } func TestGetSource_WithKinesisSource(t *testing.T) { - if testing.Short() { - t.Skip("skipping integration test") - } - assert := assert.New(t) // Set up localstack resources @@ -272,7 +274,40 @@ func TestGetSource_WithKinesisSource(t *testing.T) { assert.IsType(&kinesisSource{}, source) } +func TestGetSource_ConfigErrorLeaderAction(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + assert := assert.New(t) + + kinesisClient := testutil.GetAWSLocalstackKinesisClient() + dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() + + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "something", "test", nil, 0, 10, &one) + + assert.Nil(source) + assert.EqualError(err, `Failed to create Kinsumer client: leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency`) +} + +func TestGetSource_ConfigErrorMaxAge(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + assert := assert.New(t) + + kinesisClient := testutil.GetAWSLocalstackKinesisClient() + dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient() + + source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "something", "test", nil, 10, 10, &one) + + assert.Nil(source) + assert.EqualError(err, `Failed to create Kinsumer client: clientRecordMaxAge value must be at least as long as shardCheckFrequency`) +} + func TestKinesisSourceHCL(t *testing.T) { + var thirty = 30 testFixPath := "../../../config/test-fixtures" testCases := []struct { File string @@ -283,24 +318,30 @@ func TestKinesisSourceHCL(t *testing.T) { File: "source-kinesis-simple.hcl", Plug: testKinesisSourceAdapter(testKinesisSourceFunc), Expected: &configuration{ - StreamName: "testStream", - Region: "us-test-1", - AppName: "testApp", - RoleARN: "", - StartTimestamp: "", - ConcurrentWrites: 50, + StreamName: "testStream", + Region: "us-test-1", + AppName: "testApp", + RoleARN: "", + StartTimestamp: "", + ConcurrentWrites: 50, + ClientRecordMaxAge: nil, + ShardCheckFrequency: 10, + LeaderActionFrequency: 10, }, }, { File: "source-kinesis-extended.hcl", Plug: testKinesisSourceAdapter(testKinesisSourceFunc), Expected: &configuration{ - StreamName: "testStream", - Region: "us-test-1", - AppName: "testApp", - RoleARN: "xxx-test-role-arn", - StartTimestamp: "2022-03-15 07:52:53", - ConcurrentWrites: 51, + StreamName: "testStream", + Region: "us-test-1", + AppName: "testApp", + RoleARN: "xxx-test-role-arn", + StartTimestamp: "2022-03-15 07:52:53", + ConcurrentWrites: 51, + ClientRecordMaxAge: &thirty, + ShardCheckFrequency: 20, + LeaderActionFrequency: 25, }, }, }