Skip to content

Commit

Permalink
fix multi channel errors
Browse files Browse the repository at this point in the history
  • Loading branch information
andefined committed Jan 14, 2022
1 parent 11067c3 commit 8c37366
Show file tree
Hide file tree
Showing 16 changed files with 429 additions and 138 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ dist/*
targets/
bot.yaml
*.jsondist/
.env
.env
examples/the-follower
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
test:
godotenv -f ./.env go test -v
test-cover:
godotenv -f ./.env go test -v -cover
godotenv -f ./.env go test -v -cover

test-channels:
godotenv -f ./.env go test -timeout 120s -run Test_GetUserFollowers_Error

test-stream:
godotenv -f ./.env go test -timeout 120s -run Test_PostFilterStreamRules
80 changes: 75 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
[![Language](https://img.shields.io/badge/Language-Go-blue.svg)](https://golang.org/)
[![GoDoc](https://pkg.go.dev/badge/github.com/cvcio/twitter)](https://pkg.go.dev/github.com/cvcio/twitter)
[![Go Report Card](https://goreportcard.com/badge/github.com/cvcio/twitter)](https://goreportcard.com/report/github.com/cvcio/twitter)

# Twitter API v2 Client for Go

**twitter** is a Go package for the [Twitter v2 API](https://developer.twitter.com/en/docs/twitter-api/early-access), inspired by [ChimeraCoder/anaconda](https://github.com/ChimeraCoder/anaconda). This library uses channels for both, to retrieve data from Twitter API and return the results, with a built-in throttle to avoid rate-limit errors return from Twitter. You can bypass the throttling by usgin `twitter.WithRate(time.Duration)` option. The library will auto paginate results unless you use the `twitter.WithAuto(false)` option.
Expand Down Expand Up @@ -56,11 +60,76 @@ for {
...
}
```

Implement with error channel

```go
v := url.Values{}
v.Add("max_results", "1000")
res, errs := api.GetUserFollowing(*id, v, twitter.WithRate(time.Minute/6), twitter.WithAuto(true))

for {
select {
case r, ok := <-res:
if !ok {
res = nil
break
}

var d []*twitter.User
b, err := json.Marshal(r.Data)
if err != nil {
t.Fatalf("json Marshar Error: %v", err)
}

json.Unmarshal(b, &d)

case e, ok := <-errs:
if !ok {
errs = nil
break
}
t.Errorf("Twitter API Error: %v", e)
}

if res == nil && errs == nil {
break
}

}
```

#### Options

- `twitter.WithDealy(time.Duration)`
- `twitter.WithRate(time.Duration)`
- `twitter.WithAuth(bool)`
[cvcio/twitter](https://github.com/cvcio/twitter) supports the following options for all methods. You can pass any option during the method contrstruction.

```go
followers, _ := api.GetUserFollowers(*id, url.Values{}, twitter.WithDelay(1*time.Minute), twitter.WithRate(1*time.Minute) ...)
```

##### WithDealy

Adjust the the duration between each errored requests due to rate limit errors from Twitter API by using the `WithDelay` option.

```go
twitter.WithDelay(time.Duration)
```

##### WithRate

Throttle requests (distinct for each method) to avoid rate limit errors from Twitter API.

```go
twitter.WithRate(time.Duration)
```

##### WithAuto

Auto paginate results (if available) when `pagination_token` is present in the response object.

```go
twitter.WithAuto(Bool)
```

### Examples

Expand Down Expand Up @@ -134,8 +203,9 @@ If you're new to contributing to Open Source on Github, [this guide](https://ope

## Contributors

- Dimitris Papaevagelou ([@andefined](https://github.com/andefined))
- Ilias Dimos ([@dosko64](https://github.com/dosko64))
<a href="https://github.com/cvcio/twitter/graphs/contributors">
<img src="https://contrib.rocks/image?repo=cvcio/twitter" />
</a>

### License

Expand Down
4 changes: 3 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ import (
"strings"
)

// APIError implements the APIError struct
type APIError struct {
Code int
Message string
}

// NewAPIError returns a new APIError. It will look
// for error code (int) as returned from twitter.
func NewAPIError(err error) *APIError {
if err == nil {
return &APIError{0, ""}
}
apiError := &APIError{0, err.Error()}

if errorParts := strings.Split(err.Error(), " - "); len(errorParts) > 0 {
code, e := strconv.Atoi(errorParts[len(errorParts)-1])
if e != nil {
Expand Down
28 changes: 24 additions & 4 deletions examples/followers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"net/url"
"strconv"
"time"

"github.com/cvcio/twitter"
Expand All @@ -27,9 +28,17 @@ func main() {
panic(err)
}

fmt.Print("ID,CreatedAt,Name,UserName,Protected,Verified,Followers,Following,Tweets,Listed,ProfileImageURL\n")

v := url.Values{}
// set size of response ids to 1000
v.Add("max_results", "1000")
followers, _ := api.GetUserFollowers(*id, v)
// set user fields to return
v.Add("user.fields", "created_at,description,id,location,name,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified")
// set tweet fields to return
v.Add("tweet.fields", "created_at,id,lang,source,public_metrics")

followers, _ := api.GetUserFollowers(*id, v, twitter.WithRate(15*time.Minute/15), twitter.WithAuto(true))

for {
r, ok := <-followers
Expand All @@ -45,15 +54,26 @@ func main() {

var data []*twitter.User
json.Unmarshal(b, &data)

for _, v := range data {
fmt.Printf("%s,%s,%s\n", v.ID, v.UserName, v.Name)
fmt.Printf("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n",
v.ID,
v.CreatedAt,
v.Name,
v.UserName,
strconv.FormatBool(v.Protected),
strconv.FormatBool(v.Verified),
strconv.Itoa(v.PublicMetrics.Followers),
strconv.Itoa(v.PublicMetrics.Following),
strconv.Itoa(v.PublicMetrics.Tweets),
strconv.Itoa(v.PublicMetrics.Listed),
v.ProfileImageURL,
)
}

fmt.Println()
fmt.Printf("Result Count: %d Next Token: %s\n", r.Meta.ResultCount, r.Meta.NextToken)
}

end := time.Now()

fmt.Printf("Done in %s", end.Sub(start))
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ module github.com/cvcio/twitter
go 1.15

require (
github.com/dustin/go-jsonpointer v0.0.0-20160814072949-ba0abeacc3dc
github.com/dustin/gojson v0.0.0-20160307161227-2e71ec9dd5ad // indirect
github.com/joho/godotenv v1.4.0 // indirect
github.com/mrjones/oauth v0.0.0-20190623134757-126b35219450
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5
)
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-jsonpointer v0.0.0-20160814072949-ba0abeacc3dc h1:tP7tkU+vIsEOKiK+l/NSLN4uUtkyuxc6hgYpQeCWAeI=
github.com/dustin/go-jsonpointer v0.0.0-20160814072949-ba0abeacc3dc/go.mod h1:ORH5Qp2bskd9NzSfKqAF7tKfONsEkCarTE5ESr/RVBw=
github.com/dustin/gojson v0.0.0-20160307161227-2e71ec9dd5ad h1:Qk76DOWdOp+GlyDKBAG3Klr9cn7N+LcYc82AZ2S7+cA=
github.com/dustin/gojson v0.0.0-20160307161227-2e71ec9dd5ad/go.mod h1:mPKfmRa823oBIgl2r20LeMSpTAteW5j7FLkc0vjmzyQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -82,7 +86,6 @@ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
Expand All @@ -98,6 +101,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg=
github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand Down Expand Up @@ -228,7 +233,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand Down
46 changes: 29 additions & 17 deletions queue.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package twitter

import (
"fmt"
"time"
)

Expand All @@ -14,32 +13,41 @@ type Queue struct {
rate time.Duration
delay time.Duration
auto bool
closeChannels bool
requestsChannel chan *Request
responseChannel chan *Response
}

// QueueOption queue options
type QueueOption func(*Queue)

// WithRate (default: according to endpoint) adjusts the the duration between each request to avoid
// rate limits from Twitter API
func WithRate(rate time.Duration) QueueOption {
return func(q *Queue) {
q.rate = rate
}
}

// WithDelay (default:15 minutes) adjusts the the duration between each errored requests
// due to rate limit errors from Twitter API
func WithDelay(delay time.Duration) QueueOption {
return func(q *Queue) {
q.delay = delay
}
}

// WithAuto (default:true) will auto continue to the next page if
// pagination_token exists in the response object
func WithAuto(auto bool) QueueOption {
return func(q *Queue) {
q.auto = auto
}
}

// NewQueue creates a new queue
func NewQueue(rate, delay time.Duration, auto bool, in chan *Request, out chan *Response, options ...QueueOption) *Queue {
queue := &Queue{rate, delay, auto, in, out}
queue := &Queue{rate, delay, auto, true, in, out}

for _, o := range options {
o(queue)
Expand All @@ -51,35 +59,32 @@ func NewQueue(rate, delay time.Duration, auto bool, in chan *Request, out chan *
// processRequests, processes the incoming requests and with a build-in rate-limiter
// to avoid any rate-limit errors from Twitter API
func (q *Queue) processRequests(api *Twitter) {
// get queue throttle
throttle := time.Tick(q.rate)
// close queue's channels
defer q.Close()

// listen channel
for {
// capture input request and channel state
req, ok := <-q.requestsChannel
req := <-q.requestsChannel

// break the loop if the channel is closed
if !ok {
break
}
// if !ok {
// break
// }

// send the request on twitter api
err := api.apiDo(req)

// capture request errors
if err != nil {
// fmt.Println(err)
if err.Code == 429 {
if err != nil && q.auto {
if err.Code == 420 || err.Code == 429 || err.Code >= 500 {
// if err == rate limit then add req to channel again and continue
go func(c *Queue, r *Request) {
c.requestsChannel <- r
}(q, req)

// get the delay
delay := time.Tick(q.delay)

fmt.Printf("Error %s. Delay request for %s\n", err.Message, q.delay.String())
// delay next request for q.delay duration
<-delay
<-time.After(q.delay)

// go to start
continue
Expand All @@ -88,10 +93,17 @@ func (q *Queue) processRequests(api *Twitter) {
break
}
}

// add response to channel
q.responseChannel <- &Response{req.Results, err}

// throttle requests to avoid rate-limit errors
<-throttle
<-time.After(q.rate)
}
}

// Close closes requests and response channels
func (q *Queue) Close() {
close(q.requestsChannel)
close(q.responseChannel)
}
3 changes: 3 additions & 0 deletions requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Request struct {
Results Data
}

// NewRquest returns a new Request struct
func NewRquest(method, url string, v url.Values) (*Request, error) {
request, err := http.NewRequest(method, url, nil)
query := request.URL.Query()
Expand All @@ -22,6 +23,7 @@ func NewRquest(method, url string, v url.Values) (*Request, error) {
return &Request{request, Data{}}, err
}

// UpdateURLValues updates request's query values
func (r *Request) UpdateURLValues(v url.Values) {
query := r.Req.URL.Query()
for key, value := range v {
Expand All @@ -30,6 +32,7 @@ func (r *Request) UpdateURLValues(v url.Values) {
r.Req.URL.RawQuery = query.Encode()
}

// ResetResults resets request's results
func (r *Request) ResetResults() {
r.Results = Data{}
}
Loading

0 comments on commit 8c37366

Please sign in to comment.