Skip to content

Commit

Permalink
Add clientRecordMaxAge() to kinesis source (closes #117)
Browse files Browse the repository at this point in the history
  • Loading branch information
TiganeteaRobert committed Aug 23, 2022
1 parent 9b219a0 commit 54c45ba
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 49 deletions.
9 changes: 9 additions & 0 deletions config/examples/sources/kinesis-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,14 @@ source {

# Number of events to process concurrently (default: 50)
concurrent_writes = 15

# Delay between tests for the client or shard numbers changing (in seconds)
shard_check_frequency_seconds = 60

# Time between leader actions (in seconds)
leader_action_frequency_seconds = 60

# Max age for client record before we consider it stale (in seconds)
client_record_max_age_seconds = 120
}
}
15 changes: 9 additions & 6 deletions config/test-fixtures/source-kinesis-extended.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

source {
use "kinesis" {
stream_name = "testStream"
region = "us-test-1"
role_arn = "xxx-test-role-arn"
app_name = "testApp"
start_timestamp = "2022-03-15 07:52:53"
concurrent_writes = 51
stream_name = "testStream"
region = "us-test-1"
role_arn = "xxx-test-role-arn"
app_name = "testApp"
start_timestamp = "2022-03-15 07:52:53"
concurrent_writes = 51
shard_check_frequency_seconds = 20
client_record_max_age_seconds = 30
leader_action_frequency_seconds = 25
}
}
64 changes: 42 additions & 22 deletions pkg/source/kinesis/kinesis_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,29 @@ import (

// configuration configures the source for records pulled
type configuration struct {
StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"`
Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"`
AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"`
RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"`
StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional)
ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"`
StreamName string `hcl:"stream_name" env:"SOURCE_KINESIS_STREAM_NAME"`
Region string `hcl:"region" env:"SOURCE_KINESIS_REGION"`
AppName string `hcl:"app_name" env:"SOURCE_KINESIS_APP_NAME"`
RoleARN string `hcl:"role_arn,optional" env:"SOURCE_KINESIS_ROLE_ARN"`
StartTimestamp string `hcl:"start_timestamp,optional" env:"SOURCE_KINESIS_START_TIMESTAMP"` // Timestamp for the kinesis shard iterator to begin processing. Format YYYY-MM-DD HH:MM:SS.MS (miliseconds optional)
ConcurrentWrites int `hcl:"concurrent_writes,optional" env:"SOURCE_CONCURRENT_WRITES"`
ClientRecordMaxAge *int `hcl:"client_record_max_age_seconds,optional" env:"SOURCE_KINESIS_CLIENT_MAX_AGE_SECONDS"`
ShardCheckFrequency int `hcl:"shard_check_frequency_seconds,optional" env:"SOURCE_KINESIS_SHARD_FREQUENCY_SECONDS"`
LeaderActionFrequency int `hcl:"leader_action_frequency_seconds,optional" env:"SOURCE_KINESIS_LEADER_FREQUENCY_SECONDS"`
}

// --- Kinesis source

// kinesisSource holds a new client for reading messages from kinesis
type kinesisSource struct {
client *kinsumer.Kinsumer
streamName string
concurrentWrites int
region string
accountID string
client *kinsumer.Kinsumer
streamName string
concurrentWrites int
region string
accountID string
clientRecordMaxAge *int
shardCheckFrequency int
leaderActionFrequency int

log *log.Entry
}
Expand Down Expand Up @@ -75,7 +81,10 @@ func configFunctionGeneratorWithInterfaces(kinesisClient kinesisiface.KinesisAPI
c.Region,
c.StreamName,
c.AppName,
&iteratorTstamp)
&iteratorTstamp,
c.LeaderActionFrequency,
c.ShardCheckFrequency,
c.ClientRecordMaxAge)
}
}

Expand Down Expand Up @@ -109,7 +118,9 @@ func (f adapter) Create(i interface{}) (interface{}, error) {
func (f adapter) ProvideDefault() (interface{}, error) {
// Provide defaults
cfg := &configuration{
ConcurrentWrites: 50,
ConcurrentWrites: 50,
ShardCheckFrequency: 10,
LeaderActionFrequency: 10,
}

return cfg, nil
Expand Down Expand Up @@ -145,15 +156,21 @@ func (kl *KinsumerLogrus) Log(format string, v ...interface{}) {

// newKinesisSourceWithInterfaces allows you to provide a Kinesis + DynamoDB client directly to allow
// for mocking and localstack usage
func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynamodbClient dynamodbiface.DynamoDBAPI, awsAccountID string, concurrentWrites int, region string, streamName string, appName string, startTimestamp *time.Time) (*kinesisSource, error) {
func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynamodbClient dynamodbiface.DynamoDBAPI, awsAccountID string, concurrentWrites int, region string, streamName string, appName string, startTimestamp *time.Time, leaderActionFrequency, shardCheckFrequency int, clientRecordMaxAge *int) (*kinesisSource, error) {
// TODO: Add statistics monitoring to be able to report on consumer latency

config := kinsumer.NewConfig().
WithShardCheckFrequency(10 * time.Second).
WithLeaderActionFrequency(10 * time.Second).
WithShardCheckFrequency(time.Duration(shardCheckFrequency) * time.Second).
WithLeaderActionFrequency(time.Duration(leaderActionFrequency) * time.Second).
WithManualCheckpointing(true).
WithLogger(&KinsumerLogrus{}).
WithIteratorStartTimestamp(startTimestamp)

if clientRecordMaxAge != nil {
maxAge := time.Duration(*clientRecordMaxAge) * time.Second
config = config.WithClientRecordMaxAge(&maxAge)
}

// TODO: See if the client name can be reused to survive same node reboots
name := uuid.NewV4().String()

Expand All @@ -163,12 +180,15 @@ func newKinesisSourceWithInterfaces(kinesisClient kinesisiface.KinesisAPI, dynam
}

return &kinesisSource{
client: k,
streamName: streamName,
concurrentWrites: concurrentWrites,
region: region,
accountID: awsAccountID,
log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}),
client: k,
streamName: streamName,
concurrentWrites: concurrentWrites,
region: region,
accountID: awsAccountID,
log: log.WithFields(log.Fields{"source": "kinesis", "cloud": "AWS", "region": region, "stream": streamName}),
clientRecordMaxAge: clientRecordMaxAge,
shardCheckFrequency: shardCheckFrequency,
leaderActionFrequency: leaderActionFrequency,
}, nil
}

Expand Down
83 changes: 62 additions & 21 deletions pkg/source/kinesis/kinesis_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}

var (
fifty = 50
one = 1
)

func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
Expand Down Expand Up @@ -59,8 +64,9 @@ func TestNewKinesisSourceWithInterfaces_Success(t *testing.T) {

defer testutil.DeleteAWSLocalstackDynamoDBTables(dynamodbClient, appName)

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 10, 10, nil)

fmt.Println(source.clientRecordMaxAge)
assert.IsType(&kinesisSource{}, source)
assert.Nil(err)
}
Expand All @@ -75,7 +81,7 @@ func TestNewKinesisSourceWithInterfaces_Failure(t *testing.T) {
kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "nonexistent-stream", "test", nil,10, 10, &fifty)
assert.Nil(&kinesisSource{}, source)
assert.NotNil(err)
Expand All @@ -94,7 +100,7 @@ func TestKinesisSource_ReadFailure_NoResources(t *testing.T) {
kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 1, testutil.AWSLocalstackRegion, "not-exists", "fake-name", nil, 10, 10, &fifty)
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", source.GetID())
Expand Down Expand Up @@ -140,7 +146,7 @@ func TestKinesisSource_ReadMessages(t *testing.T) {
time.Sleep(1 * time.Second)

// Create the source and assert that it's there
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, nil, 10, 10, &fifty)
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-2", source.GetID())
Expand Down Expand Up @@ -193,7 +199,7 @@ func TestKinesisSource_StartTimestamp(t *testing.T) {
}

// Create the source (with start timestamp) and assert that it's there
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart)
source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, streamName, appName, &timeToStart, 10, 10, &fifty)
assert.Nil(err)
assert.NotNil(source)
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/kinesis-source-integration-3", source.GetID())
Expand Down Expand Up @@ -222,10 +228,6 @@ func putNRecordsIntoKinesis(kinesisClient kinesisiface.KinesisAPI, n int, stream
}

func TestGetSource_WithKinesisSource(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

assert := assert.New(t)

// Set up localstack resources
Expand Down Expand Up @@ -272,7 +274,40 @@ func TestGetSource_WithKinesisSource(t *testing.T) {
assert.IsType(&kinesisSource{}, source)
}

func TestGetSource_ConfigErrorLeaderAction(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

assert := assert.New(t)

kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "something", "test", nil, 0, 10, &one)

assert.Nil(source)
assert.EqualError(err, `Failed to create Kinsumer client: leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency`)
}

func TestGetSource_ConfigErrorMaxAge(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

assert := assert.New(t)

kinesisClient := testutil.GetAWSLocalstackKinesisClient()
dynamodbClient := testutil.GetAWSLocalstackDynamoDBClient()

source, err := newKinesisSourceWithInterfaces(kinesisClient, dynamodbClient, "00000000000", 15, testutil.AWSLocalstackRegion, "something", "test", nil, 10, 10, &one)

assert.Nil(source)
assert.EqualError(err, `Failed to create Kinsumer client: clientRecordMaxAge value must be at least as long as shardCheckFrequency`)
}

func TestKinesisSourceHCL(t *testing.T) {
var thirty = 30
testFixPath := "../../../config/test-fixtures"
testCases := []struct {
File string
Expand All @@ -283,24 +318,30 @@ func TestKinesisSourceHCL(t *testing.T) {
File: "source-kinesis-simple.hcl",
Plug: testKinesisSourceAdapter(testKinesisSourceFunc),
Expected: &configuration{
StreamName: "testStream",
Region: "us-test-1",
AppName: "testApp",
RoleARN: "",
StartTimestamp: "",
ConcurrentWrites: 50,
StreamName: "testStream",
Region: "us-test-1",
AppName: "testApp",
RoleARN: "",
StartTimestamp: "",
ConcurrentWrites: 50,
ClientRecordMaxAge: nil,
ShardCheckFrequency: 10,
LeaderActionFrequency: 10,
},
},
{
File: "source-kinesis-extended.hcl",
Plug: testKinesisSourceAdapter(testKinesisSourceFunc),
Expected: &configuration{
StreamName: "testStream",
Region: "us-test-1",
AppName: "testApp",
RoleARN: "xxx-test-role-arn",
StartTimestamp: "2022-03-15 07:52:53",
ConcurrentWrites: 51,
StreamName: "testStream",
Region: "us-test-1",
AppName: "testApp",
RoleARN: "xxx-test-role-arn",
StartTimestamp: "2022-03-15 07:52:53",
ConcurrentWrites: 51,
ClientRecordMaxAge: &thirty,
ShardCheckFrequency: 20,
LeaderActionFrequency: 25,
},
},
}
Expand Down

0 comments on commit 54c45ba

Please sign in to comment.