From 12db65de84d9378849ff306586ea517fc7f4a06d Mon Sep 17 00:00:00 2001 From: ethanchewy <17chiue@gmail.com> Date: Mon, 25 Jun 2018 11:12:06 -0700 Subject: [PATCH 1/3] Fix timestamp error --- contrib/binancefeeder/binancefeeder.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/contrib/binancefeeder/binancefeeder.go b/contrib/binancefeeder/binancefeeder.go index f5783f9be..067eb5eb9 100755 --- a/contrib/binancefeeder/binancefeeder.go +++ b/contrib/binancefeeder/binancefeeder.go @@ -79,6 +79,12 @@ func QueryTime(query string) time.Time { return time.Time{} } +//Convert time from milliseconds to Unix +func ConvertMillToTime(originalTime int64) time.Time { + i := time.Unix(0, originalTime*int64(time.Millisecond)) + return i +} + //Gets all symbols from binance func GetAllSymbols() []string { client := binance.NewClient("", "") @@ -128,8 +134,13 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { timeframeStr = config.BaseTimeframe } - queryStart = QueryTime(config.QueryStart) - queryEnd = QueryTime(config.QueryEnd) + if config.QueryStart != "" { + queryStart = QueryTime(config.QueryStart) + } + + if config.QueryEnd != "" { + queryEnd = QueryTime(config.QueryEnd) + } if config.BaseTimeframe != "" { timeframeStr = config.BaseTimeframe @@ -218,7 +229,7 @@ func (bn *BinanceFetcher) Run() { for _, rate := range rates { errorsConversion = errorsConversion[:0] - openTime = append(openTime, rate.OpenTime) + openTime = append(openTime, ConvertMillToTime(rate.OpenTime).Unix()) open = append(open, ConvertStringToFloat(rate.Open)) high = append(high, ConvertStringToFloat(rate.High)) low = append(low, ConvertStringToFloat(rate.Low)) From 1a7b3ab5b9fa10b18b80dc4ba2472856b599b646 Mon Sep 17 00:00:00 2001 From: ethanchewy <17chiue@gmail.com> Date: Mon, 25 Jun 2018 11:39:07 -0700 Subject: [PATCH 2/3] Add findLastTimestamp functionality --- contrib/binancefeeder/binancefeeder.go | 33 ++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/contrib/binancefeeder/binancefeeder.go b/contrib/binancefeeder/binancefeeder.go index 067eb5eb9..8010bef0e 100755 --- a/contrib/binancefeeder/binancefeeder.go +++ b/contrib/binancefeeder/binancefeeder.go @@ -4,12 +4,14 @@ import ( "context" "encoding/json" "fmt" + "math" "regexp" "strconv" "time" binance "github.com/adshao/go-binance" "github.com/alpacahq/marketstore/executor" + "github.com/alpacahq/marketstore/planner" "github.com/alpacahq/marketstore/plugins/bgworker" "github.com/alpacahq/marketstore/utils" "github.com/alpacahq/marketstore/utils/io" @@ -115,6 +117,28 @@ func GetAllSymbols() []string { return validSymbols } +func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time { + cDir := executor.ThisInstance.CatalogDir + query := planner.NewQuery(cDir) + query.AddTargetKey(tbk) + start := time.Unix(0, 0).In(utils.InstanceConfig.Timezone) + end := time.Unix(math.MaxInt64, 0).In(utils.InstanceConfig.Timezone) + query.SetRange(start.Unix(), end.Unix()) + query.SetRowLimit(io.LAST, 1) + parsed, err := query.Parse() + if err != nil { + return time.Time{} + } + reader, err := executor.NewReader(parsed) + csm, _, err := reader.Read() + cs := csm[*tbk] + if cs == nil || cs.Len() == 0 { + return time.Time{} + } + ts := cs.GetTime() + return ts[0] +} + //Register new background worker func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { config := recast(conf) @@ -179,6 +203,15 @@ func (bn *BinanceFetcher) Run() { //Replace interval string with correct one with API call timeInterval := timeIntervalNumsOnly + correctIntervalSymbol + for _, symbol := range symbols { + tbk := io.NewTimeBucketKey(symbol + "/" + bn.baseTimeframe.String + "/OHLCV") + lastTimestamp := findLastTimestamp(symbol, tbk) + glog.Infof("lastTimestamp for %s = %v", symbol, lastTimestamp) + if timeStart.IsZero() || (!lastTimestamp.IsZero() && lastTimestamp.Before(timeStart)) { + timeStart = lastTimestamp + } + } + for { if timeStart.IsZero() { if !bn.queryStart.IsZero() { From 77adee977077e83d9fadb908710422dc166623d1 Mon Sep 17 00:00:00 2001 From: ethanchewy <17chiue@gmail.com> Date: Mon, 25 Jun 2018 13:01:53 -0700 Subject: [PATCH 3/3] Remove print line statement if Symbol error --- contrib/binancefeeder/binancefeeder.go | 1 - 1 file changed, 1 deletion(-) diff --git a/contrib/binancefeeder/binancefeeder.go b/contrib/binancefeeder/binancefeeder.go index 8010bef0e..65d5cfe5c 100755 --- a/contrib/binancefeeder/binancefeeder.go +++ b/contrib/binancefeeder/binancefeeder.go @@ -96,7 +96,6 @@ func GetAllSymbols() []string { validSymbols := make([]string, 0) if err != nil { - fmt.Println(err) symbols := []string{"BTC", "EOS", "ETH", "BNB", "TRX", "ONT", "XRP", "ADA", "LTC", "BCC", "TUSD", "IOTA", "ETC", "ICX", "NEO", "XLM", "QTUM", "BCH"} return symbols