-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqueue.go
122 lines (100 loc) · 2.87 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package twitter
import (
"strconv"
"strings"
"time"
)
// Queue struct holds information for each method, such as
// @rate time.Duration specific for each endpoint on Twitter
// @delay time.Duration fallback for @rate, specific for each endpoint on Twitter
// @requestsChannel chan *Request the incoming (requests) channel
// @responseChannel chan *Response the outgoing (response) channel
type Queue struct {
rate time.Duration
delay time.Duration
auto bool
closeChannels bool
requestsChannel chan *Request
responseChannel chan *Response
}
// QueueOption queue options struct
type QueueOption func(*Queue)
// WithRate (default: according to endpoint) adjusts the the duration between each request to avoid
// rate limits from Twitter API
func WithRate(rate time.Duration) QueueOption {
return func(q *Queue) {
q.rate = rate
}
}
// WithDelay (default:15 minutes) adjusts the the duration between each errored requests
// due to rate limit errors from Twitter API
func WithDelay(delay time.Duration) QueueOption {
return func(q *Queue) {
q.delay = delay
}
}
// WithAuto (default:true) will auto continue to the next page if
// pagination_token exists in the response object
func WithAuto(auto bool) QueueOption {
return func(q *Queue) {
q.auto = auto
}
}
// NewQueue creates a new queue
func NewQueue(rate, delay time.Duration, auto bool, in chan *Request, out chan *Response, options ...QueueOption) *Queue {
queue := &Queue{rate, delay, auto, true, in, out}
for _, o := range options {
o(queue)
}
return queue
}
// processRequests, processes the incoming requests and with a build-in rate-limiter
// to avoid any rate-limit errors from Twitter API
func (q *Queue) processRequests(api *Twitter) {
// close queue's channels
defer q.Close()
// listen channel
for {
// capture input request and channel state
req := <-q.requestsChannel
// send the request on twitter api
err := api.apiDo(req)
// capture request errors
if err != nil && q.auto {
code := parseErrorCode(err)
if code == 420 || code == 429 || code >= 500 {
// if err == rate limit then add req to channel again and continue
go func(c *Queue, r *Request) {
c.requestsChannel <- r
}(q, req)
// delay next request for q.delay duration
<-time.After(q.delay)
// go to start
continue
} else {
q.responseChannel <- &Response{req.Results, err}
break
}
}
// add response to channel
q.responseChannel <- &Response{req.Results, err}
// throttle requests to avoid rate-limit errors
<-time.After(q.rate)
}
}
func parseErrorCode(err error) int {
var code int
e := strings.Split(err.Error(), " - ")
if len(e) > 0 {
code, err := strconv.Atoi(e[0])
if err != nil {
return code
}
}
return code
}
// Close closes requests and response channels
func (q *Queue) Close() {
close(q.requestsChannel)
close(q.responseChannel)
}