diff --git a/contrib/binancefeeder/binancefeeder.go b/contrib/binancefeeder/binancefeeder.go index f5783f9be..65d5cfe5c 100755 --- a/contrib/binancefeeder/binancefeeder.go +++ b/contrib/binancefeeder/binancefeeder.go @@ -4,12 +4,14 @@ import ( "context" "encoding/json" "fmt" + "math" "regexp" "strconv" "time" binance "github.com/adshao/go-binance" "github.com/alpacahq/marketstore/executor" + "github.com/alpacahq/marketstore/planner" "github.com/alpacahq/marketstore/plugins/bgworker" "github.com/alpacahq/marketstore/utils" "github.com/alpacahq/marketstore/utils/io" @@ -79,6 +81,12 @@ func QueryTime(query string) time.Time { return time.Time{} } +//Convert time from milliseconds to Unix +func ConvertMillToTime(originalTime int64) time.Time { + i := time.Unix(0, originalTime*int64(time.Millisecond)) + return i +} + //Gets all symbols from binance func GetAllSymbols() []string { client := binance.NewClient("", "") @@ -88,7 +96,6 @@ func GetAllSymbols() []string { validSymbols := make([]string, 0) if err != nil { - fmt.Println(err) symbols := []string{"BTC", "EOS", "ETH", "BNB", "TRX", "ONT", "XRP", "ADA", "LTC", "BCC", "TUSD", "IOTA", "ETC", "ICX", "NEO", "XLM", "QTUM", "BCH"} return symbols @@ -109,6 +116,28 @@ func GetAllSymbols() []string { return validSymbols } +func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time { + cDir := executor.ThisInstance.CatalogDir + query := planner.NewQuery(cDir) + query.AddTargetKey(tbk) + start := time.Unix(0, 0).In(utils.InstanceConfig.Timezone) + end := time.Unix(math.MaxInt64, 0).In(utils.InstanceConfig.Timezone) + query.SetRange(start.Unix(), end.Unix()) + query.SetRowLimit(io.LAST, 1) + parsed, err := query.Parse() + if err != nil { + return time.Time{} + } + reader, err := executor.NewReader(parsed) + csm, _, err := reader.Read() + cs := csm[*tbk] + if cs == nil || cs.Len() == 0 { + return time.Time{} + } + ts := cs.GetTime() + return ts[0] +} + //Register new background worker func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { config := recast(conf) @@ -128,8 +157,13 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { timeframeStr = config.BaseTimeframe } - queryStart = QueryTime(config.QueryStart) - queryEnd = QueryTime(config.QueryEnd) + if config.QueryStart != "" { + queryStart = QueryTime(config.QueryStart) + } + + if config.QueryEnd != "" { + queryEnd = QueryTime(config.QueryEnd) + } if config.BaseTimeframe != "" { timeframeStr = config.BaseTimeframe @@ -168,6 +202,15 @@ func (bn *BinanceFetcher) Run() { //Replace interval string with correct one with API call timeInterval := timeIntervalNumsOnly + correctIntervalSymbol + for _, symbol := range symbols { + tbk := io.NewTimeBucketKey(symbol + "/" + bn.baseTimeframe.String + "/OHLCV") + lastTimestamp := findLastTimestamp(symbol, tbk) + glog.Infof("lastTimestamp for %s = %v", symbol, lastTimestamp) + if timeStart.IsZero() || (!lastTimestamp.IsZero() && lastTimestamp.Before(timeStart)) { + timeStart = lastTimestamp + } + } + for { if timeStart.IsZero() { if !bn.queryStart.IsZero() { @@ -218,7 +261,7 @@ func (bn *BinanceFetcher) Run() { for _, rate := range rates { errorsConversion = errorsConversion[:0] - openTime = append(openTime, rate.OpenTime) + openTime = append(openTime, ConvertMillToTime(rate.OpenTime).Unix()) open = append(open, ConvertStringToFloat(rate.Open)) high = append(high, ConvertStringToFloat(rate.High)) low = append(low, ConvertStringToFloat(rate.Low))