forked from twitchscience/kinsumer
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclients.go
185 lines (151 loc) · 4.64 KB
/
clients.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// Copyright (c) 2016 Twitch Interactive
package kinsumer
//TODO: The filename is bad
import (
"sort"
"strconv"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
)
const clientReapAge = 48 * time.Hour
type clientRecord struct {
ID string
LastUpdate int64
// Columns added to the table that are never used for decision making in the
// library, rather they are useful for manual troubleshooting
Name string
LastUpdateRFC string
}
type sortableClients []clientRecord
func (sc sortableClients) Len() int {
return len(sc)
}
func (sc sortableClients) Less(left, right int) bool {
return sc[left].ID < sc[right].ID
}
func (sc sortableClients) Swap(left, right int) {
sc[left], sc[right] = sc[right], sc[left]
}
// registerWithClientsTable adds or updates our client with a current LastUpdate in dynamo
func registerWithClientsTable(db dynamodbiface.DynamoDBAPI, id, name, tableName string) error {
now := time.Now()
item, err := dynamodbattribute.MarshalMap(clientRecord{
ID: id,
Name: name,
LastUpdate: now.UnixNano(),
LastUpdateRFC: now.UTC().Format(time.RFC1123Z),
})
if err != nil {
return err
}
if _, err = db.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: item,
}); err != nil {
return err
}
return nil
}
// deregisterWithClientsTable deletes our client from dynamo
func deregisterFromClientsTable(db dynamodbiface.DynamoDBAPI, id, tableName string) error {
idStruct := struct{ ID string }{ID: id}
item, err := dynamodbattribute.MarshalMap(idStruct)
if err != nil {
return err
}
if _, err = db.DeleteItem(&dynamodb.DeleteItemInput{
TableName: aws.String(tableName),
Key: item,
}); err != nil {
return err
}
return nil
}
// getClients returns a sorted list of all recently-updated clients in dynamo
func getClients(db dynamodbiface.DynamoDBAPI, name string, tableName string, maxAgeForClientRecord time.Duration, referenceTime time.Time, shardCheckFrequency time.Duration) (clients []clientRecord, err error) {
filterExpression := "LastUpdate > :cutoff"
// shardCheckFrequency added to cutoff to avoid race condition caused by slow clients
subtime := maxAgeForClientRecord + shardCheckFrequency
cutoff := strconv.FormatInt(referenceTime.Add(-subtime).UnixNano(), 10)
params := &dynamodb.ScanInput{
TableName: aws.String(tableName),
ConsistentRead: aws.Bool(true),
FilterExpression: aws.String(filterExpression),
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":cutoff": {N: &cutoff},
},
}
var innerError error
err = db.ScanPages(params, func(p *dynamodb.ScanOutput, lastPage bool) (shouldContinue bool) {
for _, item := range p.Items {
var record clientRecord
innerError = dynamodbattribute.UnmarshalMap(item, &record)
if innerError != nil {
return false
}
clients = append(clients, record)
}
return !lastPage
})
if innerError != nil {
return nil, innerError
}
if err != nil {
return nil, err
}
sort.Sort(sortableClients(clients))
return clients, nil
}
// reapClients deletes any sufficiently old clients from dynamo
func reapClients(db dynamodbiface.DynamoDBAPI, tableName string) error {
filterExpression := "LastUpdate < :cutoff"
cutoff := strconv.FormatInt(time.Now().Add(-clientReapAge).UnixNano(), 10)
params := &dynamodb.ScanInput{
TableName: aws.String(tableName),
ConsistentRead: aws.Bool(true),
FilterExpression: aws.String(filterExpression),
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":cutoff": {N: &cutoff},
},
}
var clients []clientRecord
var innerError error
err := db.ScanPages(params, func(p *dynamodb.ScanOutput, lastPage bool) (shouldContinue bool) {
for _, item := range p.Items {
var record clientRecord
innerError = dynamodbattribute.UnmarshalMap(item, &record)
if innerError != nil {
return false
}
clients = append(clients, record)
}
return !lastPage
})
if innerError != nil {
return innerError
}
if err != nil {
return err
}
for _, client := range clients {
idStruct := struct{ ID string }{ID: client.ID}
item, err := dynamodbattribute.MarshalMap(idStruct)
if err != nil {
return err
}
if _, err = db.DeleteItem(&dynamodb.DeleteItemInput{
TableName: aws.String(tableName),
Key: item,
ConditionExpression: aws.String(filterExpression),
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":cutoff": {N: &cutoff},
},
}); err != nil {
return err
}
}
return nil
}