From 8c37366fb20aa8247eecb361d7577cae66d17e2e Mon Sep 17 00:00:00 2001 From: andefined Date: Fri, 14 Jan 2022 14:25:00 +0200 Subject: [PATCH] fix multi channel errors --- .gitignore | 3 +- Makefile | 8 ++- README.md | 80 ++++++++++++++++++++++-- errors.go | 4 +- examples/followers/main.go | 28 +++++++-- go.mod | 3 + go.sum | 8 ++- queue.go | 46 ++++++++------ requests.go | 3 + responses.go | 122 ++++++++++++++++++++++--------------- search.go | 8 --- stream.go | 94 +++++++++++++++++++++++++++- tweets.go | 16 ----- twitter.go | 26 ++++++-- twitter_test.go | 93 ++++++++++++++++++++++++++-- users.go | 25 +------- 16 files changed, 429 insertions(+), 138 deletions(-) diff --git a/.gitignore b/.gitignore index acd7911..737ff07 100644 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,5 @@ dist/* targets/ bot.yaml *.jsondist/ -.env \ No newline at end of file +.env +examples/the-follower \ No newline at end of file diff --git a/Makefile b/Makefile index 781bc21..0528815 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,10 @@ test: godotenv -f ./.env go test -v test-cover: - godotenv -f ./.env go test -v -cover \ No newline at end of file + godotenv -f ./.env go test -v -cover + +test-channels: + godotenv -f ./.env go test -timeout 120s -run Test_GetUserFollowers_Error + +test-stream: + godotenv -f ./.env go test -timeout 120s -run Test_PostFilterStreamRules \ No newline at end of file diff --git a/README.md b/README.md index 279e664..a217a65 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,7 @@ +[![Language](https://img.shields.io/badge/Language-Go-blue.svg)](https://golang.org/) +[![GoDoc](https://pkg.go.dev/badge/github.com/cvcio/twitter)](https://pkg.go.dev/github.com/cvcio/twitter) +[![Go Report Card](https://goreportcard.com/badge/github.com/cvcio/twitter)](https://goreportcard.com/report/github.com/cvcio/twitter) + # Twitter API v2 Client for Go **twitter** is a Go package for the [Twitter v2 API](https://developer.twitter.com/en/docs/twitter-api/early-access), inspired by [ChimeraCoder/anaconda](https://github.com/ChimeraCoder/anaconda). This library uses channels for both, to retrieve data from Twitter API and return the results, with a built-in throttle to avoid rate-limit errors return from Twitter. You can bypass the throttling by usgin `twitter.WithRate(time.Duration)` option. The library will auto paginate results unless you use the `twitter.WithAuto(false)` option. @@ -56,11 +60,76 @@ for { ... } ``` + +Implement with error channel + +```go +v := url.Values{} +v.Add("max_results", "1000") +res, errs := api.GetUserFollowing(*id, v, twitter.WithRate(time.Minute/6), twitter.WithAuto(true)) + +for { + select { + case r, ok := <-res: + if !ok { + res = nil + break + } + + var d []*twitter.User + b, err := json.Marshal(r.Data) + if err != nil { + t.Fatalf("json Marshar Error: %v", err) + } + + json.Unmarshal(b, &d) + + case e, ok := <-errs: + if !ok { + errs = nil + break + } + t.Errorf("Twitter API Error: %v", e) + } + + if res == nil && errs == nil { + break + } + +} +``` + #### Options -- `twitter.WithDealy(time.Duration)` -- `twitter.WithRate(time.Duration)` -- `twitter.WithAuth(bool)` +[cvcio/twitter](https://github.com/cvcio/twitter) supports the following options for all methods. You can pass any option during the method contrstruction. + +```go +followers, _ := api.GetUserFollowers(*id, url.Values{}, twitter.WithDelay(1*time.Minute), twitter.WithRate(1*time.Minute) ...) +``` + +##### WithDealy + +Adjust the the duration between each errored requests due to rate limit errors from Twitter API by using the `WithDelay` option. + +```go +twitter.WithDelay(time.Duration) +``` + +##### WithRate + +Throttle requests (distinct for each method) to avoid rate limit errors from Twitter API. + +```go +twitter.WithRate(time.Duration) +``` + +##### WithAuto + +Auto paginate results (if available) when `pagination_token` is present in the response object. + +```go +twitter.WithAuto(Bool) +``` ### Examples @@ -134,8 +203,9 @@ If you're new to contributing to Open Source on Github, [this guide](https://ope ## Contributors -- Dimitris Papaevagelou ([@andefined](https://github.com/andefined)) -- Ilias Dimos ([@dosko64](https://github.com/dosko64)) + + + ### License diff --git a/errors.go b/errors.go index dc7972f..098e4be 100644 --- a/errors.go +++ b/errors.go @@ -5,17 +5,19 @@ import ( "strings" ) +// APIError implements the APIError struct type APIError struct { Code int Message string } +// NewAPIError returns a new APIError. It will look +// for error code (int) as returned from twitter. func NewAPIError(err error) *APIError { if err == nil { return &APIError{0, ""} } apiError := &APIError{0, err.Error()} - if errorParts := strings.Split(err.Error(), " - "); len(errorParts) > 0 { code, e := strconv.Atoi(errorParts[len(errorParts)-1]) if e != nil { diff --git a/examples/followers/main.go b/examples/followers/main.go index 87f538f..266eac5 100644 --- a/examples/followers/main.go +++ b/examples/followers/main.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "net/url" + "strconv" "time" "github.com/cvcio/twitter" @@ -27,9 +28,17 @@ func main() { panic(err) } + fmt.Print("ID,CreatedAt,Name,UserName,Protected,Verified,Followers,Following,Tweets,Listed,ProfileImageURL\n") + v := url.Values{} + // set size of response ids to 1000 v.Add("max_results", "1000") - followers, _ := api.GetUserFollowers(*id, v) + // set user fields to return + v.Add("user.fields", "created_at,description,id,location,name,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified") + // set tweet fields to return + v.Add("tweet.fields", "created_at,id,lang,source,public_metrics") + + followers, _ := api.GetUserFollowers(*id, v, twitter.WithRate(15*time.Minute/15), twitter.WithAuto(true)) for { r, ok := <-followers @@ -45,15 +54,26 @@ func main() { var data []*twitter.User json.Unmarshal(b, &data) + for _, v := range data { - fmt.Printf("%s,%s,%s\n", v.ID, v.UserName, v.Name) + fmt.Printf("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n", + v.ID, + v.CreatedAt, + v.Name, + v.UserName, + strconv.FormatBool(v.Protected), + strconv.FormatBool(v.Verified), + strconv.Itoa(v.PublicMetrics.Followers), + strconv.Itoa(v.PublicMetrics.Following), + strconv.Itoa(v.PublicMetrics.Tweets), + strconv.Itoa(v.PublicMetrics.Listed), + v.ProfileImageURL, + ) } - fmt.Println() fmt.Printf("Result Count: %d Next Token: %s\n", r.Meta.ResultCount, r.Meta.NextToken) } end := time.Now() - fmt.Printf("Done in %s", end.Sub(start)) } diff --git a/go.mod b/go.mod index fad89c7..5caf7ab 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,9 @@ module github.com/cvcio/twitter go 1.15 require ( + github.com/dustin/go-jsonpointer v0.0.0-20160814072949-ba0abeacc3dc + github.com/dustin/gojson v0.0.0-20160307161227-2e71ec9dd5ad // indirect + github.com/joho/godotenv v1.4.0 // indirect github.com/mrjones/oauth v0.0.0-20190623134757-126b35219450 golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 ) diff --git a/go.sum b/go.sum index 9f3bd7d..ee02c66 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,10 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-jsonpointer v0.0.0-20160814072949-ba0abeacc3dc h1:tP7tkU+vIsEOKiK+l/NSLN4uUtkyuxc6hgYpQeCWAeI= +github.com/dustin/go-jsonpointer v0.0.0-20160814072949-ba0abeacc3dc/go.mod h1:ORH5Qp2bskd9NzSfKqAF7tKfONsEkCarTE5ESr/RVBw= +github.com/dustin/gojson v0.0.0-20160307161227-2e71ec9dd5ad h1:Qk76DOWdOp+GlyDKBAG3Klr9cn7N+LcYc82AZ2S7+cA= +github.com/dustin/gojson v0.0.0-20160307161227-2e71ec9dd5ad/go.mod h1:mPKfmRa823oBIgl2r20LeMSpTAteW5j7FLkc0vjmzyQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -82,7 +86,6 @@ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -98,6 +101,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg= +github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -228,7 +233,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/queue.go b/queue.go index 19f7759..48bfe77 100644 --- a/queue.go +++ b/queue.go @@ -1,7 +1,6 @@ package twitter import ( - "fmt" "time" ) @@ -14,32 +13,41 @@ type Queue struct { rate time.Duration delay time.Duration auto bool + closeChannels bool requestsChannel chan *Request responseChannel chan *Response } +// QueueOption queue options type QueueOption func(*Queue) +// WithRate (default: according to endpoint) adjusts the the duration between each request to avoid +// rate limits from Twitter API func WithRate(rate time.Duration) QueueOption { return func(q *Queue) { q.rate = rate } } +// WithDelay (default:15 minutes) adjusts the the duration between each errored requests +// due to rate limit errors from Twitter API func WithDelay(delay time.Duration) QueueOption { return func(q *Queue) { q.delay = delay } } +// WithAuto (default:true) will auto continue to the next page if +// pagination_token exists in the response object func WithAuto(auto bool) QueueOption { return func(q *Queue) { q.auto = auto } } +// NewQueue creates a new queue func NewQueue(rate, delay time.Duration, auto bool, in chan *Request, out chan *Response, options ...QueueOption) *Queue { - queue := &Queue{rate, delay, auto, in, out} + queue := &Queue{rate, delay, auto, true, in, out} for _, o := range options { o(queue) @@ -51,35 +59,32 @@ func NewQueue(rate, delay time.Duration, auto bool, in chan *Request, out chan * // processRequests, processes the incoming requests and with a build-in rate-limiter // to avoid any rate-limit errors from Twitter API func (q *Queue) processRequests(api *Twitter) { - // get queue throttle - throttle := time.Tick(q.rate) + // close queue's channels + defer q.Close() + // listen channel for { // capture input request and channel state - req, ok := <-q.requestsChannel + req := <-q.requestsChannel + // break the loop if the channel is closed - if !ok { - break - } + // if !ok { + // break + // } // send the request on twitter api err := api.apiDo(req) // capture request errors - if err != nil { - // fmt.Println(err) - if err.Code == 429 { + if err != nil && q.auto { + if err.Code == 420 || err.Code == 429 || err.Code >= 500 { // if err == rate limit then add req to channel again and continue go func(c *Queue, r *Request) { c.requestsChannel <- r }(q, req) - // get the delay - delay := time.Tick(q.delay) - - fmt.Printf("Error %s. Delay request for %s\n", err.Message, q.delay.String()) // delay next request for q.delay duration - <-delay + <-time.After(q.delay) // go to start continue @@ -88,10 +93,17 @@ func (q *Queue) processRequests(api *Twitter) { break } } + // add response to channel q.responseChannel <- &Response{req.Results, err} // throttle requests to avoid rate-limit errors - <-throttle + <-time.After(q.rate) } } + +// Close closes requests and response channels +func (q *Queue) Close() { + close(q.requestsChannel) + close(q.responseChannel) +} diff --git a/requests.go b/requests.go index 80197cf..b5b2e15 100644 --- a/requests.go +++ b/requests.go @@ -12,6 +12,7 @@ type Request struct { Results Data } +// NewRquest returns a new Request struct func NewRquest(method, url string, v url.Values) (*Request, error) { request, err := http.NewRequest(method, url, nil) query := request.URL.Query() @@ -22,6 +23,7 @@ func NewRquest(method, url string, v url.Values) (*Request, error) { return &Request{request, Data{}}, err } +// UpdateURLValues updates request's query values func (r *Request) UpdateURLValues(v url.Values) { query := r.Req.URL.Query() for key, value := range v { @@ -30,6 +32,7 @@ func (r *Request) UpdateURLValues(v url.Values) { r.Req.URL.RawQuery = query.Encode() } +// ResetResults resets request's results func (r *Request) ResetResults() { r.Results = Data{} } diff --git a/responses.go b/responses.go index 899bc54..3688f61 100644 --- a/responses.go +++ b/responses.go @@ -98,11 +98,11 @@ type EntityMention struct { // Entities response object. type Entities struct { - Annotations []EntityAnnotation `json:"annotations,omitempty"` - URLs []EntityURL `json:"urls,omitempty"` - HashTags []EntityTag `json:"hashtags,omitempty"` - Mentions []EntityMention `json:"mentions,omitempty"` - CashTags []EntityTag `json:"cashtags,omitempty"` + Annotations []EntityAnnotation `json:"annotations"` + URLs []EntityURL `json:"urls"` + HashTags []EntityTag `json:"hashtags"` + Mentions []EntityMention `json:"mentions"` + CashTags []EntityTag `json:"cashtags"` } // Withheld response object. @@ -114,13 +114,13 @@ type Withheld struct { // TweetMetrics response object. type TweetMetrics struct { - Retweets int `json:"retweet_count,omitempty"` - Replies int `json:"reply_count,omitempty"` - Likes int `json:"like_count,omitempty"` - Quotes int `json:"quote_count,omitempty"` - Impressions int `json:"impression_count,omitempty"` - URLLinkClicks int `json:"url_link_clicks,omitempty"` - UserProfileClicks int `json:"user_profile_clicks,omitempty"` + Retweets int `json:"retweet_count"` + Replies int `json:"reply_count"` + Likes int `json:"like_count"` + Quotes int `json:"quote_count"` + Impressions int `json:"impression_count"` + URLLinkClicks int `json:"url_link_clicks"` + UserProfileClicks int `json:"user_profile_clicks"` } // UserMetrics response object. @@ -133,38 +133,41 @@ type UserMetrics struct { // Includes response object. type Includes struct { - Tweets []Tweet `json:"tweets,omitempty"` - Users []User `json:"users,omitempty"` + Tweets []Tweet `json:"tweets"` + Users []User `json:"users"` } // Error response object. type Error struct{} +type StreamData struct { + Data Tweet `json:"data"` +} // Tweet response object as returned from /2/tweets endpoint. For detailed information // refer to https://developer.twitter.com/en/docs/twitter-api/tweets/lookup/api-reference/get-tweets. type Tweet struct { - ID string `json:"id"` - Text string `json:"text"` - CreatedAt string `json:"created_at,omitempty"` - AuthorID string `json:"author_id,omitempty"` - ConversationID string `json:"converstation_id,omitempty"` - InReplyToUserID string `json:"in_reply_to_user_id,omitempty"` - ReferencedTweets []*ReferencedTweet `json:"referenced_tweets,omitempty"` - Attachments *Attachment `json:"attachments,omitempty"` - Geo *Geo `json:"geo,omitempty"` - ContextAnnotations []*ContextAnnotation `json:"context_annotations,omitempty"` - Entities *Entities `json:"entities,omitempty"` - Withheld *TweetMetrics `json:"withheld,omitempty"` - PublicMetrics *TweetMetrics `json:"public_metrics,omitempty"` - NonPublicMetrics *TweetMetrics `json:"non_public_metrics,omitempty"` - OrganicMetrics *TweetMetrics `json:"organic_metrics,omitempty"` - PromotedMetrics *TweetMetrics `json:"promoted_metrics,omitempty"` - PossibySensitive bool `json:"possiby_sensitive,omitempty"` - Lang string `json:"lang,omitempty"` - ReplySettings string `json:"reply_settings,omitempty"` - Source string `json:"source,omitempty"` - Includes *Includes `json:"includes,omitempty"` - Errors *Error `json:"errors,omitempty"` + ID string `json:"id"` + Text string `json:"text"` + CreatedAt string `json:"created_at"` + AuthorID string `json:"author_id"` + ConversationID string `json:"converstation_id"` + InReplyToUserID string `json:"in_reply_to_user_id"` + ReferencedTweets []ReferencedTweet `json:"referenced_tweets"` + Attachments Attachment `json:"attachments"` + Geo Geo `json:"geo"` + ContextAnnotations []ContextAnnotation `json:"context_annotations"` + Entities Entities `json:"entities"` + Withheld TweetMetrics `json:"withheld"` + PublicMetrics TweetMetrics `json:"public_metrics"` + NonPublicMetrics TweetMetrics `json:"non_public_metrics"` + OrganicMetrics TweetMetrics `json:"organic_metrics"` + PromotedMetrics TweetMetrics `json:"promoted_metrics"` + PossibySensitive bool `json:"possiby_sensitive"` + Lang string `json:"lang"` + ReplySettings string `json:"reply_settings"` + Source string `json:"source"` + Includes Includes `json:"includes"` + Errors Error `json:"errors"` } // CreatedAtTime is a convenience wrapper that returns the Created_at time, parsed as a time.Time struct @@ -178,22 +181,43 @@ type User struct { ID string `json:"id"` Name string `json:"name"` UserName string `json:"username"` - CreatedAt string `json:"created_at,omitempty"` - Protected bool `json:"protected,omitempty"` - Withheld *Withheld `json:"withheld,omitempty"` - Location string `json:"location,omitempty"` - URL string `json:"url,omitempty"` - Description string `json:"description,omitempty"` - Verified bool `json:"verified,omitempty"` - Entities *Entities `json:"entities,omitempty"` - ProfileImageURL string `json:"profile_image_url,omitempty"` - PublicMetrics *UserMetrics `json:"public_metrics,omitempty"` - PinnedTweetID string `json:"pinned_tweet_id,omitempty"` - Includes *Includes `json:"includes,omitempty"` - Errors *Error `json:"errors,omitempty"` + CreatedAt string `json:"created_at"` + Protected bool `json:"protected"` + Withheld *Withheld `json:"withheld"` + Location string `json:"location"` + URL string `json:"url"` + Description string `json:"description"` + Verified bool `json:"verified"` + Entities *Entities `json:"entities"` + ProfileImageURL string `json:"profile_image_url"` + PublicMetrics *UserMetrics `json:"public_metrics"` + PinnedTweetID string `json:"pinned_tweet_id"` + Includes *Includes `json:"includes"` + Errors *Error `json:"errors"` } // CreatedAtTime is a convenience wrapper that returns the Created_at time, parsed as a time.Time struct func (u User) CreatedAtTime() (time.Time, error) { return time.Parse(time.RubyDate, u.CreatedAt) } + +type RulesData struct { + Value string `json:"value,omitempty"` + Tag string `json:"tag,omitempty"` + ID string `json:"id,omitempty"` +} +type RulesSummary struct { + Created int `json:"created,omitempty"` + NotCreated int `json:"not_created,omitempty"` + Deleted int `json:"deleted,omitempty"` + NotDeleted int `json:"not_deleted,omitempty"` +} +type RulesMeta struct { + Sent time.Time `json:"sent,omitempty"` + Summary *RulesSummary `json:"summary,omitempty"` +} + +type Rules struct { + Data []RulesData `json:"data,omitempty"` + Meta *RulesMeta `json:"meta,omitempty"` +} diff --git a/search.go b/search.go index 2096fa8..95fef39 100644 --- a/search.go +++ b/search.go @@ -27,8 +27,6 @@ func (api *Twitter) GetTweetsSearchRecent(v url.Values, options ...QueueOption) // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -53,8 +51,6 @@ func (api *Twitter) GetTweetsSearchRecent(v url.Values, options ...QueueOption) // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel @@ -85,8 +81,6 @@ func (api *Twitter) GetTweetsSearchAll(v url.Values, options ...QueueOption) (ch // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -111,8 +105,6 @@ func (api *Twitter) GetTweetsSearchAll(v url.Values, options ...QueueOption) (ch // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel diff --git a/stream.go b/stream.go index 6e8784a..09972b9 100644 --- a/stream.go +++ b/stream.go @@ -1,11 +1,98 @@ package twitter +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + + "github.com/dustin/go-jsonpointer" +) + +type Stream struct { + api *Twitter + C chan interface{} + run bool +} + +func (stream *Stream) Stop() { + stream.run = false +} + +func (stream *Stream) start(urlStr string, v url.Values) { + stream.run = true + go stream.loop(urlStr, v) +} +func (stream *Stream) loop(urlStr string, v url.Values) { + defer close(stream.C) + for stream.run { + request, _ := NewRquest("GET", urlStr, v) + r, _ := stream.api.client.Do(request.Req) + stream.listen(*r) + } +} + +func jsonToKnownType(j []byte) interface{} { + // TODO: DRY + var tweet StreamData + json.Unmarshal(j, &tweet) + return tweet +} +func (stream *Stream) listen(response http.Response) { + if response.Body != nil { + defer response.Body.Close() + } + + b, err := io.ReadAll(response.Body) + // b, err := ioutil.ReadAll(resp.Body) Go.1.15 and earlier + if err != nil { + log.Fatalln(err) + } + + log.Print(string(b)) + + scanner := bufio.NewScanner(response.Body) + + for scanner.Scan() && stream.run { + j := scanner.Bytes() + if len(j) == 0 { + break + } + + stream.C <- jsonToKnownType(j) + } +} +func jsonAsStruct(j []byte, path string, obj interface{}) (res bool) { + if v, _ := jsonpointer.Find(j, path); v == nil { + return false + } + err := json.Unmarshal(j, obj) + return err == nil +} + +func (api Twitter) newStream(urlStr string, v url.Values) *Stream { + stream := Stream{ + api: &api, + C: make(chan interface{}), + } + + stream.start(urlStr, v) + return &stream +} + // GetFilterStream streams tweets in real-time based on a specific set of filter rules. // Endpoint URL: https://api.twitter.com/2/tweets/search/stream // Official Documentation: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream // Authentication Methods: OAuth 2.0 Bearer Token // Rate Limit: 50/15m (app) -func (api *Twitter) GetFilterStream() {} +func (api *Twitter) GetFilterStream(v url.Values) *Stream { + return api.newStream( + fmt.Sprintf("%s/tweets/search/stream", api.baseURL), v, + ) +} // GetFilterStreamRules returns a list of rules currently active on the streaming endpoint, either as a list or individually. // Endpoint URL: https://api.twitter.com/2/tweets/search/stream/rules @@ -19,7 +106,10 @@ func (api *Twitter) GetFilterStreamRules() {} // Official Documentation: https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/post-tweets-search-stream-rules // Authentication Methods: OAuth 2.0 Bearer Token // Rate Limit: 450/15m (app) -func (api *Twitter) PostFilterStreamRules() {} +func (api *Twitter) PostFilterStreamRules(v url.Values) (*http.Response, *APIError) { + request, _ := NewRquest("POST", fmt.Sprintf("%s/tweets/search/stream/rules", api.baseURL), v) + return api.apiDoRest(request) +} // GetSampleStream streams about 1% of all Tweets in real-time. // Endpoint URL: https://api.twitter.com/2/tweets/sample/stream diff --git a/tweets.go b/tweets.go index 8e6f66e..92f1cda 100644 --- a/tweets.go +++ b/tweets.go @@ -27,8 +27,6 @@ func (api *Twitter) GetUserMentions(id string, v url.Values, options ...QueueOpt // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -70,8 +68,6 @@ func (api *Twitter) GetUserMentions(id string, v url.Values, options ...QueueOpt // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel @@ -99,8 +95,6 @@ func (api *Twitter) GetUserTweets(id string, v url.Values, options ...QueueOptio // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -142,8 +136,6 @@ func (api *Twitter) GetUserTweets(id string, v url.Values, options ...QueueOptio // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel @@ -171,8 +163,6 @@ func (api *Twitter) GetTweets(v url.Values, options ...QueueOption) (chan *Data, // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -214,8 +204,6 @@ func (api *Twitter) GetTweets(v url.Values, options ...QueueOption) (chan *Data, // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel @@ -243,8 +231,6 @@ func (api *Twitter) GetTweetByID(id string, v url.Values, options ...QueueOption // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -269,8 +255,6 @@ func (api *Twitter) GetTweetByID(id string, v url.Values, options ...QueueOption // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel diff --git a/twitter.go b/twitter.go index 5746746..d71b50c 100644 --- a/twitter.go +++ b/twitter.go @@ -20,7 +20,6 @@ const ( AccessTokenURL = "https://api.twitter.com/oauth/access_token" TokenURL = "https://api.twitter.com/oauth2/token" RateLimitStatusURL = "https://api.twitter.com/1.1/application/rate_limit_status.json" - VerifyCredentials = "https://api.twitter.com/1.1/account/verify_credentials.json" ) // Twitter API Client @@ -30,7 +29,7 @@ type Twitter struct { queue *Queue } -// NewTwitter return a new Twitter API v2 Client using OAuth 2.0 based authentication. +// NewTwitter returns a new Twitter API v2 Client using OAuth 2.0 based authentication. // This method is usufull when you only need to make Application-Only requests. // Official Documentation: https://developer.twitter.com/en/docs/authentication/oauth-2-0 func NewTwitter(consumerKey, consumerSecret string) (*Twitter, *APIError) { @@ -97,9 +96,9 @@ func (api *Twitter) GetClient() *http.Client { } // VerifyCredentials returns bool upon successful request. This method will make a request -// on the account/verify_credentials endpoint. +// on the rate-limit endpoint since there is no official token validation method. func (api *Twitter) VerifyCredentials() (bool, *APIError) { - response, err := api.client.Get(VerifyCredentials) + response, err := api.client.Get(RateLimitStatusURL) if err != nil { return false, &APIError{0, err.Error()} } @@ -141,3 +140,22 @@ func (api *Twitter) apiDo(req *Request) *APIError { return parseResponse(resp, &req.Results) } + +// apiDoRest send's the request to Twitter API and returns an error. +// The results are processed by `parseResponse` and written to the temporary +// `req.Results` interaface. +func (api *Twitter) apiDoRest(req *Request) (*http.Response, *APIError) { + // fmt.Println(req.Req.URL.String()) + resp, err := api.client.Do(req.Req) + if err != nil { + return nil, &APIError{0, err.Error()} + } + + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, &APIError{resp.StatusCode, resp.Status} + } + + return resp, nil +} diff --git a/twitter_test.go b/twitter_test.go index 3e66776..4233b2b 100644 --- a/twitter_test.go +++ b/twitter_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - twitter "github.com/cvcio/twitter" + "github.com/cvcio/twitter" ) var ( @@ -72,6 +72,9 @@ func Test_GetUserFollowers(t *testing.T) { v := url.Values{} v.Add("max_results", "50") + v.Add("user.fields", "created_at,description,id,location,name,pinned_tweet_id,profile_image_url,protected,public_metrics,url,username,verified") + v.Add("expansions", "pinned_tweet_id") + v.Add("tweet.fields", "created_at,id,lang,source,public_metrics") res, errs := api.GetUserFollowers("44142397", v, twitter.WithRate(15*time.Minute/15), twitter.WithAuto(false)) // @andefined for { @@ -113,7 +116,7 @@ func Test_GetUserFollowers_Error(t *testing.T) { v := url.Values{} v.Add("max_results", "5000") - res, errs := api.GetUserFollowers("44142397", v, twitter.WithRate(15*time.Minute/15), twitter.WithAuto(false)) // @andefined + res, errs := api.GetUserFollowers("44142397", v, twitter.WithRate(15*time.Minute/15), twitter.WithAuto(true)) // @andefined for { _, rok := <-res @@ -133,6 +136,53 @@ func Test_GetUserFollowers_Error(t *testing.T) { } } +func Test_Channels(t *testing.T) { + api, err := twitter.NewTwitter(consumerKey, consumerSecret) + if err != nil { + t.Fatalf("Twitter API VerifyCredentials Error: %s", err.Message) + } + + size := 0 + v := url.Values{} + v.Add("max_results", "1000") + res, errs := api.GetUserFollowing("44142397", v, twitter.WithRate(time.Minute/6), twitter.WithAuto(true)) // @andefined + + for { + select { + case r, ok := <-res: + if !ok { + res = nil + break + } + + var d []*twitter.User + b, err := json.Marshal(r.Data) + if err != nil { + t.Fatalf("json Marshar Error: %v", err) + } + + json.Unmarshal(b, &d) + size += len(d) + + case e, ok := <-errs: + if !ok { + errs = nil + break + } + t.Errorf("Twitter API Error: %v", e) + } + + if res == nil && errs == nil { + break + } + + } + + if size < 1000 { + t.Fatalf("Twitter API GetUserFollowing Error. Should have returned more than 1000, got %d", size) + } +} + func Test_GetUserFollowing(t *testing.T) { api, err := twitter.NewTwitter(consumerKey, consumerSecret) if err != nil { @@ -305,8 +355,8 @@ func Test_GetUserTweets(t *testing.T) { json.Unmarshal(b, &data) } - if len(data) != 50 { - t.Fatalf("Twitter API GetUserTweets Error. Should have returned 50, got %d", len(data)) + if len(data) < 1 { + t.Fatalf("Twitter API GetUserTweets Error. Should have returned >= 1, got %d", len(data)) } } @@ -433,3 +483,38 @@ func Test_GetTweetsSearchRecent(t *testing.T) { t.Fatalf("Twitter API GetTweetsSearchRecent Error. Should have returned 100, got %d", len(data)) } } + +// func Test_PostFilterStreamRules(t *testing.T) { +// api, err := twitter.NewTwitter(consumerKey, consumerSecret) +// if err != nil { +// t.Fatalf("Twitter API VerifyCredentials Error: %s", err.Message) +// } + +// v := url.Values{} +// // v.Add("backfill_minutes", "10") +// res, err := api.PostFilterStreamRules(v) +// if err != nil { +// t.Fatalf("json Marshar Error: %v", err) +// } +// defer res.Body.Close() +// body, _ := ioutil.ReadAll(res.Body) + +// fmt.Printf("%v %v\n", res, body) +// } + +// func Test_GetFilterStream(t *testing.T) { +// api, err := twitter.NewTwitter(consumerKey, consumerSecret) +// if err != nil { +// t.Fatalf("Twitter API VerifyCredentials Error: %s", err.Message) +// } + +// v := url.Values{} +// // v.Add("backfill_minutes", "10") +// res, err := api.PostFilterStreamRules(v) +// fmt.Printf("%v %v\n", res, err) + +// s := api.GetFilterStream(v) +// for t := range s.C { +// fmt.Printf("%v", t) +// } +// } diff --git a/users.go b/users.go index 6061cb1..960f02f 100644 --- a/users.go +++ b/users.go @@ -27,8 +27,6 @@ func (api *Twitter) GetUserFollowers(id string, v url.Values, options ...QueueOp // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -71,8 +69,6 @@ func (api *Twitter) GetUserFollowers(id string, v url.Values, options ...QueueOp // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel @@ -100,8 +96,6 @@ func (api *Twitter) GetUserFollowing(id string, v url.Values, options ...QueueOp // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -122,6 +116,7 @@ func (api *Twitter) GetUserFollowing(id string, v url.Values, options ...QueueOp if res.Error != nil { e <- res.Error } + // if there is a next page, transform the original request object // by setting the `pagination_token` parameter to get the next page if res.Results.Meta.NextToken != "" && q.auto { @@ -143,8 +138,6 @@ func (api *Twitter) GetUserFollowing(id string, v url.Values, options ...QueueOp // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel @@ -172,8 +165,6 @@ func (api *Twitter) GetUsers(v url.Values, options ...QueueOption) (chan *Data, // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -197,8 +188,6 @@ func (api *Twitter) GetUsers(v url.Values, options ...QueueOption) (chan *Data, // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel @@ -226,8 +215,6 @@ func (api *Twitter) GetUsersBy(v url.Values, options ...QueueOption) (chan *Data // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -251,8 +238,6 @@ func (api *Twitter) GetUsersBy(v url.Values, options ...QueueOption) (chan *Data // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel @@ -280,8 +265,6 @@ func (api *Twitter) GetUserByID(id string, v url.Values, options ...QueueOption) // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -306,8 +289,6 @@ func (api *Twitter) GetUserByID(id string, v url.Values, options ...QueueOption) // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel @@ -335,8 +316,6 @@ func (api *Twitter) GetUsersByUserName(username string, v url.Values, options .. // async process the response channel go (func(q *Queue, d chan *Data, e chan *APIError, req *Request) { // on done close channels - // close response channel - defer close(q.responseChannel) // close data channel defer close(d) // close error channel @@ -361,8 +340,6 @@ func (api *Twitter) GetUsersByUserName(username string, v url.Values, options .. // we are done! break the loop and close the channels break } - // make sure to close the requestsChannel - close(queue.requestsChannel) })(queue, data, errors, request) // return the data channel