From 164c1a2cc23f61d00cefe01ed346139638206779 Mon Sep 17 00:00:00 2001 From: Mark Robinson Date: Tue, 5 Apr 2022 12:34:05 -0700 Subject: [PATCH 1/5] Add NextRaw funciton --- kinsumer.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/kinsumer.go b/kinsumer.go index b1dcace..bec0fc6 100644 --- a/kinsumer.go +++ b/kinsumer.go @@ -465,6 +465,27 @@ func (k *Kinsumer) Next() (data []byte, err error) { return data, err } +// NextRaw is a blocking function used to get the next record from the kinesis queue, or errors that +// occurred during the processing of kinesis. It's up to the caller to stop processing by calling 'Stop()' +// +// It is different from Next in that it returns the Kinesis record object directly +// +// if err is non nil an error occurred in the system. +// if err is nil and data is nil then kinsumer has been stopped +func (k *Kinsumer) NextRaw() (data *kinesis.Record, err error) { + select { + case err = <-k.errors: + return nil, err + case record, ok := <-k.output: + if ok { + k.config.stats.EventToClient(*record.record.ApproximateArrivalTimestamp, record.retrievedAt) + data = record.record + } + } + + return data, err +} + // CreateRequiredTables will create the required dynamodb tables // based on the applicationName func (k *Kinsumer) CreateRequiredTables() error { From 5a344f7ce36f7823879c72af717495d95a923766 Mon Sep 17 00:00:00 2001 From: Mark Robinson Date: Sun, 17 Apr 2022 20:24:35 -0700 Subject: [PATCH 2/5] Reset to present --- kinsumer.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/kinsumer.go b/kinsumer.go index bec0fc6..3cd7bbf 100644 --- a/kinsumer.go +++ b/kinsumer.go @@ -118,6 +118,30 @@ func NewWithInterfaces(kinesis kinesisiface.KinesisAPI, dynamodb dynamodbiface.D return consumer, nil } +func (k *Kinsumer) ResetToPresent() error { + + shardIDs, err := loadShardIDsFromKinesis(k.kinesis, k.streamName) + if err != nil { + return err + } + + for _, shardId := range shardIDs { + _, err := k.dynamodb.UpdateItem( + &dynamodb.UpdateItemInput{ + TableName: aws.String(k.checkpointTableName), + UpdateExpression: aws.String("SET SequenceNumber = \"LATEST\""), + Key: map[string]*dynamodb.AttributeValue{ + "Shard": {S: aws.String(shardId)}, + }, + }) + if err != nil { + return err + } + } + + return nil +} + // refreshShards registers our client, refreshes the lists of clients and shards, checks if we // have become/unbecome the leader, and returns whether the shards/clients changed. //TODO: Write unit test - needs dynamo _and_ kinesis mocking From f4f019fb7435a87b79ce123a4e8555d5c37fa53a Mon Sep 17 00:00:00 2001 From: Mark Robinson Date: Sun, 17 Apr 2022 20:24:40 -0700 Subject: [PATCH 3/5] Reset to latest --- kinsumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kinsumer.go b/kinsumer.go index 3cd7bbf..d5ac47c 100644 --- a/kinsumer.go +++ b/kinsumer.go @@ -118,7 +118,7 @@ func NewWithInterfaces(kinesis kinesisiface.KinesisAPI, dynamodb dynamodbiface.D return consumer, nil } -func (k *Kinsumer) ResetToPresent() error { +func (k *Kinsumer) ResetToLatest() error { shardIDs, err := loadShardIDsFromKinesis(k.kinesis, k.streamName) if err != nil { From c909ce30dc098d28cf090017e420593d2d71f979 Mon Sep 17 00:00:00 2001 From: Mark Robinson Date: Sun, 17 Apr 2022 20:24:54 -0700 Subject: [PATCH 4/5] Reset all to latest --- kinsumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kinsumer.go b/kinsumer.go index d5ac47c..1da3711 100644 --- a/kinsumer.go +++ b/kinsumer.go @@ -118,7 +118,7 @@ func NewWithInterfaces(kinesis kinesisiface.KinesisAPI, dynamodb dynamodbiface.D return consumer, nil } -func (k *Kinsumer) ResetToLatest() error { +func (k *Kinsumer) ResetAllToLatest() error { shardIDs, err := loadShardIDsFromKinesis(k.kinesis, k.streamName) if err != nil { From 01d42e20b8a32c2d3c538b30402f3bd1c46c9f76 Mon Sep 17 00:00:00 2001 From: Mark Robinson Date: Mon, 18 Apr 2022 10:16:01 -0700 Subject: [PATCH 5/5] Reset all to latest --- kinsumer.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kinsumer.go b/kinsumer.go index 1da3711..cda3fea 100644 --- a/kinsumer.go +++ b/kinsumer.go @@ -4,6 +4,7 @@ package kinsumer import ( "fmt" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" "sync" "sync/atomic" "time" @@ -125,14 +126,19 @@ func (k *Kinsumer) ResetAllToLatest() error { return err } + attrVals, err := dynamodbattribute.MarshalMap(map[string]interface{}{ + ":sn": aws.String("LATEST"), + }) + for _, shardId := range shardIDs { _, err := k.dynamodb.UpdateItem( &dynamodb.UpdateItemInput{ TableName: aws.String(k.checkpointTableName), - UpdateExpression: aws.String("SET SequenceNumber = \"LATEST\""), + UpdateExpression: aws.String("SET SequenceNumber = :sn"), Key: map[string]*dynamodb.AttributeValue{ "Shard": {S: aws.String(shardId)}, }, + ExpressionAttributeValues: attrVals, }) if err != nil { return err