diff --git a/contrib/binancefeeder/binancefeeder.go b/contrib/binancefeeder/binancefeeder.go index 067eb5eb9..8010bef0e 100755 --- a/contrib/binancefeeder/binancefeeder.go +++ b/contrib/binancefeeder/binancefeeder.go @@ -4,12 +4,14 @@ import ( "context" "encoding/json" "fmt" + "math" "regexp" "strconv" "time" binance "github.com/adshao/go-binance" "github.com/alpacahq/marketstore/executor" + "github.com/alpacahq/marketstore/planner" "github.com/alpacahq/marketstore/plugins/bgworker" "github.com/alpacahq/marketstore/utils" "github.com/alpacahq/marketstore/utils/io" @@ -115,6 +117,28 @@ func GetAllSymbols() []string { return validSymbols } +func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time { + cDir := executor.ThisInstance.CatalogDir + query := planner.NewQuery(cDir) + query.AddTargetKey(tbk) + start := time.Unix(0, 0).In(utils.InstanceConfig.Timezone) + end := time.Unix(math.MaxInt64, 0).In(utils.InstanceConfig.Timezone) + query.SetRange(start.Unix(), end.Unix()) + query.SetRowLimit(io.LAST, 1) + parsed, err := query.Parse() + if err != nil { + return time.Time{} + } + reader, err := executor.NewReader(parsed) + csm, _, err := reader.Read() + cs := csm[*tbk] + if cs == nil || cs.Len() == 0 { + return time.Time{} + } + ts := cs.GetTime() + return ts[0] +} + //Register new background worker func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { config := recast(conf) @@ -179,6 +203,15 @@ func (bn *BinanceFetcher) Run() { //Replace interval string with correct one with API call timeInterval := timeIntervalNumsOnly + correctIntervalSymbol + for _, symbol := range symbols { + tbk := io.NewTimeBucketKey(symbol + "/" + bn.baseTimeframe.String + "/OHLCV") + lastTimestamp := findLastTimestamp(symbol, tbk) + glog.Infof("lastTimestamp for %s = %v", symbol, lastTimestamp) + if timeStart.IsZero() || (!lastTimestamp.IsZero() && lastTimestamp.Before(timeStart)) { + timeStart = lastTimestamp + } + } + for { if timeStart.IsZero() { if !bn.queryStart.IsZero() {