-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.go
123 lines (97 loc) · 2.23 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main
import (
"fmt"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
)
func listStreams(svc *kinesis.Kinesis) error {
rsp, err := svc.ListStreams(&kinesis.ListStreamsInput{})
if err != nil {
return err
}
for _, s := range rsp.StreamNames {
fmt.Println(*s)
}
return nil
}
func followShard(svc *kinesis.Kinesis, streamName string, shardId string, stream chan string) error {
rsp, err := svc.GetShardIterator(&kinesis.GetShardIteratorInput{
ShardId: aws.String(shardId),
ShardIteratorType: aws.String("LATEST"),
StreamName: aws.String(streamName),
})
if err != nil {
return err
}
shardIterator := rsp.ShardIterator
for {
rrsp, err := svc.GetRecords(&kinesis.GetRecordsInput{
ShardIterator: aws.String(*shardIterator),
})
if err != nil {
return err
}
for _, record := range rrsp.Records {
stream <- string(record.Data[:])
}
shardIterator = rrsp.NextShardIterator
if len(rrsp.Records) == 0 {
time.Sleep(1000 * time.Millisecond)
}
}
}
func followStream(svc *kinesis.Kinesis, streamName *string, shardId *string) error {
rsp, err := svc.DescribeStream(&kinesis.DescribeStreamInput{
StreamName: aws.String(*streamName),
})
if err != nil {
return err
}
shardIds := make([]string, 0)
stream := make(chan string, 5)
for _, s := range rsp.StreamDescription.Shards {
if shardId != nil {
if *s.ShardId == *shardId {
shardIds = append(shardIds, *s.ShardId)
}
} else {
shardIds = append(shardIds, *s.ShardId)
}
}
if len(shardIds) == 0 {
return fmt.Errorf("Shards empty or shard id %v not found", *shardId)
}
for _, sid := range shardIds {
go followShard(svc, *streamName, sid, stream)
}
for {
fmt.Println(<-stream)
}
return nil
}
func main() {
svc := kinesis.New(session.New())
if len(os.Args) < 2 {
err := listStreams(svc)
if err != nil {
fmt.Println(os.Stderr, err.Error())
os.Exit(1)
}
os.Exit(0)
}
var shardId *string
streamName := os.Args[1]
if len(os.Args) > 2 {
shardId = &os.Args[2]
} else {
shardId = nil
}
err := followStream(svc, &streamName, shardId)
if err != nil {
fmt.Println(os.Stderr, err.Error())
os.Exit(1)
}
}