Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NextRaw funciton #66

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions kinsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package kinsumer

import (
"fmt"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -118,6 +119,35 @@ func NewWithInterfaces(kinesis kinesisiface.KinesisAPI, dynamodb dynamodbiface.D
return consumer, nil
}

func (k *Kinsumer) ResetAllToLatest() error {

shardIDs, err := loadShardIDsFromKinesis(k.kinesis, k.streamName)
if err != nil {
return err
}

attrVals, err := dynamodbattribute.MarshalMap(map[string]interface{}{
":sn": aws.String("LATEST"),
})

for _, shardId := range shardIDs {
_, err := k.dynamodb.UpdateItem(
&dynamodb.UpdateItemInput{
TableName: aws.String(k.checkpointTableName),
UpdateExpression: aws.String("SET SequenceNumber = :sn"),
Key: map[string]*dynamodb.AttributeValue{
"Shard": {S: aws.String(shardId)},
},
ExpressionAttributeValues: attrVals,
})
if err != nil {
return err
}
}

return nil
}

// refreshShards registers our client, refreshes the lists of clients and shards, checks if we
// have become/unbecome the leader, and returns whether the shards/clients changed.
//TODO: Write unit test - needs dynamo _and_ kinesis mocking
Expand Down Expand Up @@ -465,6 +495,27 @@ func (k *Kinsumer) Next() (data []byte, err error) {
return data, err
}

// NextRaw is a blocking function used to get the next record from the kinesis queue, or errors that
// occurred during the processing of kinesis. It's up to the caller to stop processing by calling 'Stop()'
//
// It is different from Next in that it returns the Kinesis record object directly
//
// if err is non nil an error occurred in the system.
// if err is nil and data is nil then kinsumer has been stopped
func (k *Kinsumer) NextRaw() (data *kinesis.Record, err error) {
select {
case err = <-k.errors:
return nil, err
case record, ok := <-k.output:
if ok {
k.config.stats.EventToClient(*record.record.ApproximateArrivalTimestamp, record.retrievedAt)
data = record.record
}
}

return data, err
}

// CreateRequiredTables will create the required dynamodb tables
// based on the applicationName
func (k *Kinsumer) CreateRequiredTables() error {
Expand Down