Skip to content

Commit

Permalink
Merge pull request #93 from ethanchewy/binanceplugin
Browse files Browse the repository at this point in the history
Fix Time to Correct Unix Format and Check empty QueryStart and QueryEnd
  • Loading branch information
rocketbitz authored Jun 25, 2018
2 parents 34f5a07 + 77adee9 commit 006ead1
Showing 1 changed file with 47 additions and 4 deletions.
51 changes: 47 additions & 4 deletions contrib/binancefeeder/binancefeeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"regexp"
"strconv"
"time"

binance "github.com/adshao/go-binance"
"github.com/alpacahq/marketstore/executor"
"github.com/alpacahq/marketstore/planner"
"github.com/alpacahq/marketstore/plugins/bgworker"
"github.com/alpacahq/marketstore/utils"
"github.com/alpacahq/marketstore/utils/io"
Expand Down Expand Up @@ -79,6 +81,12 @@ func QueryTime(query string) time.Time {
return time.Time{}
}

//Convert time from milliseconds to Unix
func ConvertMillToTime(originalTime int64) time.Time {
i := time.Unix(0, originalTime*int64(time.Millisecond))
return i
}

//Gets all symbols from binance
func GetAllSymbols() []string {
client := binance.NewClient("", "")
Expand All @@ -88,7 +96,6 @@ func GetAllSymbols() []string {
validSymbols := make([]string, 0)

if err != nil {
fmt.Println(err)
symbols := []string{"BTC", "EOS", "ETH", "BNB", "TRX", "ONT", "XRP", "ADA",
"LTC", "BCC", "TUSD", "IOTA", "ETC", "ICX", "NEO", "XLM", "QTUM", "BCH"}
return symbols
Expand All @@ -109,6 +116,28 @@ func GetAllSymbols() []string {
return validSymbols
}

func findLastTimestamp(symbol string, tbk *io.TimeBucketKey) time.Time {
cDir := executor.ThisInstance.CatalogDir
query := planner.NewQuery(cDir)
query.AddTargetKey(tbk)
start := time.Unix(0, 0).In(utils.InstanceConfig.Timezone)
end := time.Unix(math.MaxInt64, 0).In(utils.InstanceConfig.Timezone)
query.SetRange(start.Unix(), end.Unix())
query.SetRowLimit(io.LAST, 1)
parsed, err := query.Parse()
if err != nil {
return time.Time{}
}
reader, err := executor.NewReader(parsed)
csm, _, err := reader.Read()
cs := csm[*tbk]
if cs == nil || cs.Len() == 0 {
return time.Time{}
}
ts := cs.GetTime()
return ts[0]
}

//Register new background worker
func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
config := recast(conf)
Expand All @@ -128,8 +157,13 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
timeframeStr = config.BaseTimeframe
}

queryStart = QueryTime(config.QueryStart)
queryEnd = QueryTime(config.QueryEnd)
if config.QueryStart != "" {
queryStart = QueryTime(config.QueryStart)
}

if config.QueryEnd != "" {
queryEnd = QueryTime(config.QueryEnd)
}

if config.BaseTimeframe != "" {
timeframeStr = config.BaseTimeframe
Expand Down Expand Up @@ -168,6 +202,15 @@ func (bn *BinanceFetcher) Run() {
//Replace interval string with correct one with API call
timeInterval := timeIntervalNumsOnly + correctIntervalSymbol

for _, symbol := range symbols {
tbk := io.NewTimeBucketKey(symbol + "/" + bn.baseTimeframe.String + "/OHLCV")
lastTimestamp := findLastTimestamp(symbol, tbk)
glog.Infof("lastTimestamp for %s = %v", symbol, lastTimestamp)
if timeStart.IsZero() || (!lastTimestamp.IsZero() && lastTimestamp.Before(timeStart)) {
timeStart = lastTimestamp
}
}

for {
if timeStart.IsZero() {
if !bn.queryStart.IsZero() {
Expand Down Expand Up @@ -218,7 +261,7 @@ func (bn *BinanceFetcher) Run() {

for _, rate := range rates {
errorsConversion = errorsConversion[:0]
openTime = append(openTime, rate.OpenTime)
openTime = append(openTime, ConvertMillToTime(rate.OpenTime).Unix())
open = append(open, ConvertStringToFloat(rate.Open))
high = append(high, ConvertStringToFloat(rate.High))
low = append(low, ConvertStringToFloat(rate.Low))
Expand Down

0 comments on commit 006ead1

Please sign in to comment.