Skip to content

Commit

Permalink
Add findLastTimestamp functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanchewy committed Jun 25, 2018
1 parent 12db65d commit 1a7b3ab
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions contrib/binancefeeder/binancefeeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"regexp"
"strconv"
"time"

binance "github.com/adshao/go-binance"
"github.com/alpacahq/marketstore/executor"
"github.com/alpacahq/marketstore/planner"
"github.com/alpacahq/marketstore/plugins/bgworker"
"github.com/alpacahq/marketstore/utils"
"github.com/alpacahq/marketstore/utils/io"
Expand Down Expand Up @@ -115,6 +117,28 @@ func GetAllSymbols() []string {
return validSymbols
}

func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time {
cDir := executor.ThisInstance.CatalogDir
query := planner.NewQuery(cDir)
query.AddTargetKey(tbk)
start := time.Unix(0, 0).In(utils.InstanceConfig.Timezone)
end := time.Unix(math.MaxInt64, 0).In(utils.InstanceConfig.Timezone)
query.SetRange(start.Unix(), end.Unix())
query.SetRowLimit(io.LAST, 1)
parsed, err := query.Parse()
if err != nil {
return time.Time{}
}
reader, err := executor.NewReader(parsed)
csm, _, err := reader.Read()
cs := csm[*tbk]
if cs == nil || cs.Len() == 0 {
return time.Time{}
}
ts := cs.GetTime()
return ts[0]
}

//Register new background worker
func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
config := recast(conf)
Expand Down Expand Up @@ -179,6 +203,15 @@ func (bn *BinanceFetcher) Run() {
//Replace interval string with correct one with API call
timeInterval := timeIntervalNumsOnly + correctIntervalSymbol

for _, symbol := range symbols {
tbk := io.NewTimeBucketKey(symbol + "/" + bn.baseTimeframe.String + "/OHLCV")
lastTimestamp := findLastTimestamp(symbol, tbk)
glog.Infof("lastTimestamp for %s = %v", symbol, lastTimestamp)
if timeStart.IsZero() || (!lastTimestamp.IsZero() && lastTimestamp.Before(timeStart)) {
timeStart = lastTimestamp
}
}

for {
if timeStart.IsZero() {
if !bn.queryStart.IsZero() {
Expand Down

0 comments on commit 1a7b3ab

Please sign in to comment.