-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathconfig.go
170 lines (138 loc) · 4.95 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright (c) 2016 Twitch Interactive
package kinsumer
import (
"time"
)
//TODO: Update documentation to include the defaults
//TODO: Update the 'with' methods' comments to be less ridiculous
// Config holds all configuration values for a single Kinsumer instance
type Config struct {
stats StatReceiver
logger Logger
// ---------- [ Per Shard Worker ] ----------
// Time to sleep if no records are found
throttleDelay time.Duration
// Delay between commits to the checkpoint database
commitFrequency time.Duration
// Delay between tests for the client or shard numbers changing
shardCheckFrequency time.Duration
// ---------- [ For the leader (first client alphabetically) ] ----------
// Time between leader actions
leaderActionFrequency time.Duration
// ---------- [ For the entire Kinsumer ] ----------
// Size of the buffer for the combined records channel. When the channel fills up
// the workers will stop adding new elements to the queue, so a slow client will
// potentially fall behind the kinesis stream.
bufferSize int
// ---------- [ For the Dynamo DB tables ] ----------
// Read and write capacity for the Dynamo DB tables when created
// with CreateRequiredTables() call. If tables already exist because they were
// created on a prevoius run or created manually, these parameters will not be used.
dynamoReadCapacity int64
dynamoWriteCapacity int64
// Time to wait between attempts to verify tables were created/deleted completely
dynamoWaiterDelay time.Duration
// use ListShards to avoid LimitExceedException from DescribeStream
useListShardsForKinesisStreamReady bool
}
// NewConfig returns a default Config struct
func NewConfig() Config {
return Config{
throttleDelay: 250 * time.Millisecond,
commitFrequency: 1000 * time.Millisecond,
shardCheckFrequency: 1 * time.Minute,
leaderActionFrequency: 1 * time.Minute,
bufferSize: 100,
stats: &NoopStatReceiver{},
dynamoReadCapacity: 10,
dynamoWriteCapacity: 10,
dynamoWaiterDelay: 3 * time.Second,
logger: &DefaultLogger{},
}
}
// WithThrottleDelay returns a Config with a modified throttle delay
func (c Config) WithThrottleDelay(delay time.Duration) Config {
c.throttleDelay = delay
return c
}
// WithCommitFrequency returns a Config with a modified commit frequency
func (c Config) WithCommitFrequency(commitFrequency time.Duration) Config {
c.commitFrequency = commitFrequency
return c
}
// WithShardCheckFrequency returns a Config with a modified shard check frequency
func (c Config) WithShardCheckFrequency(shardCheckFrequency time.Duration) Config {
c.shardCheckFrequency = shardCheckFrequency
return c
}
// WithLeaderActionFrequency returns a Config with a modified leader action frequency
func (c Config) WithLeaderActionFrequency(leaderActionFrequency time.Duration) Config {
c.leaderActionFrequency = leaderActionFrequency
return c
}
// WithBufferSize returns a Config with a modified buffer size
func (c Config) WithBufferSize(bufferSize int) Config {
c.bufferSize = bufferSize
return c
}
// WithStats returns a Config with a modified stats
func (c Config) WithStats(stats StatReceiver) Config {
c.stats = stats
return c
}
// WithDynamoReadCapacity returns a Config with a modified dynamo read capacity
func (c Config) WithDynamoReadCapacity(readCapacity int64) Config {
c.dynamoReadCapacity = readCapacity
return c
}
// WithDynamoWriteCapacity returns a Config with a modified dynamo write capacity
func (c Config) WithDynamoWriteCapacity(writeCapacity int64) Config {
c.dynamoWriteCapacity = writeCapacity
return c
}
// WithDynamoWaiterDelay returns a Config with a modified dynamo waiter delay
func (c Config) WithDynamoWaiterDelay(delay time.Duration) Config {
c.dynamoWaiterDelay = delay
return c
}
// WithLogger returns a Config with a modified logger
func (c Config) WithLogger(logger Logger) Config {
c.logger = logger
return c
}
// WithUseListShardsForKinesisStreamReady returns a config with a modified useListShardsForKinesisStreamReady toggle
func (c Config) WithUseListShardsForKinesisStreamReady(shouldUse bool) Config {
c.useListShardsForKinesisStreamReady = shouldUse
return c
}
// Verify that a config struct has sane and valid values
func validateConfig(c *Config) error {
if c.throttleDelay < 200*time.Millisecond {
return ErrConfigInvalidThrottleDelay
}
if c.commitFrequency == 0 {
return ErrConfigInvalidCommitFrequency
}
if c.shardCheckFrequency == 0 {
return ErrConfigInvalidShardCheckFrequency
}
if c.leaderActionFrequency == 0 {
return ErrConfigInvalidLeaderActionFrequency
}
if c.shardCheckFrequency > c.leaderActionFrequency {
return ErrConfigInvalidLeaderActionFrequency
}
if c.bufferSize == 0 {
return ErrConfigInvalidBufferSize
}
if c.stats == nil {
return ErrConfigInvalidStats
}
if c.dynamoReadCapacity == 0 || c.dynamoWriteCapacity == 0 {
return ErrConfigInvalidDynamoCapacity
}
if c.logger == nil {
return ErrConfigInvalidLogger
}
return nil
}