Skip to content

Commit

Permalink
fix: random lint issues (#608)
Browse files Browse the repository at this point in the history
  • Loading branch information
dakimura authored Jul 5, 2022
1 parent 0862e7a commit 735e64f
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 248 deletions.
14 changes: 8 additions & 6 deletions catalog/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/alpacahq/marketstore/v4/utils/test"
)

const test1MinBucket = "TEST/1Min/OHLCV"

func setup(t *testing.T) (rootDir string, catalogDir *catalog.Directory) {
t.Helper()

Expand Down Expand Up @@ -138,8 +140,8 @@ func TestAddFile(t *testing.T) {
func TestAddAndRemoveDataItem(t *testing.T) {
rootDir, catalogDir := setup(t)

catKey := "Symbol/Timeframe/AttributeGroup"
dataItemKey := "TEST/1Min/OHLCV"
catKey := io.DefaultTimeBucketSchema
dataItemKey := test1MinBucket
dataItemPath := filepath.Join(rootDir, dataItemKey)
dsv := io.NewDataShapeVector(
[]string{"Open", "High", "Low", "Close", "Volume"},
Expand Down Expand Up @@ -196,8 +198,8 @@ func TestAddAndRemoveDataItemFromEmptyDirectory(t *testing.T) {
return
}

catKey := "Symbol/Timeframe/AttributeGroup"
dataItemKey := "TEST/1Min/OHLCV"
catKey := io.DefaultTimeBucketSchema
dataItemKey := test1MinBucket
tbk := io.NewTimeBucketKey(dataItemKey, catKey)

dataItemPath := filepath.Join(rootDir, dataItemKey)
Expand Down Expand Up @@ -283,8 +285,8 @@ func TestAddAndRemoveDataItemFromEmptyDirectory(t *testing.T) {
func TestCreateNewDirectory(t *testing.T) {
rootDir, catalogDir := setup(t)

catKey := "Symbol/Timeframe/AttributeGroup"
dataItemKey := "TEST/1Min/OHLCV"
catKey := io.DefaultTimeBucketSchema
dataItemKey := test1MinBucket
dataItemPath := filepath.Join(rootDir, dataItemKey)
dsv := io.NewDataShapeVector(
[]string{"Open", "High", "Low", "Close", "Volume"},
Expand Down
2 changes: 1 addition & 1 deletion cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func executeStart(cmd *cobra.Command, _ []string) error {

// Set rpc handler.
log.Info("launching rpc data server...")
http.Handle("/rpc", c.GetHttpServer())
http.Handle("/rpc", c.GetHTTPServer())

// Set websocket handler.
log.Info("initializing websocket...")
Expand Down
92 changes: 46 additions & 46 deletions contrib/binancefeeder/binancefeeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,11 @@ func getStartOfCurrentTimeframe(originalInterval string) time.Time {
return t
}

const getRatesChunkSize = 300

// Run grabs data in intervals from starting time to ending time.
// If query_end is not set, it will run forever.
func (bn *BinanceFetcher) Run() {
baseCurrencies := bn.baseCurrencies
slowDown := false

// Get correct Time Interval for Binance
Expand All @@ -420,23 +421,16 @@ func (bn *BinanceFetcher) Run() {

// For loop for collecting candlestick data forever
// Note that the max amount is 1000 candlesticks which is no problem
var timeStartM int64
var timeEndM int64
var timeEnd time.Time
var originalTimeStart time.Time
var originalTimeEnd time.Time
var originalTimeEndZero time.Time
var waitTill time.Time
var timeStartM, timeEndM int64
var timeEnd, originalTimeStart, originalTimeEnd, originalTimeEndZero, waitTill time.Time
firstLoop := true

for {
// finalTime = time.Now().UTC()
originalTimeStart = timeStart
originalTimeEnd = timeEnd

// Check if it's finished backfilling. If not, just do 300 * Timeframe.duration
// only do beyond 1st loop
const getRatesChunkSize = 300
if !slowDown {
if !firstLoop {
timeStart = timeStart.Add(bn.baseTimeframe.Duration * getRatesChunkSize)
Expand All @@ -449,16 +443,16 @@ func (bn *BinanceFetcher) Run() {
if timeEnd.After(time.Now().UTC()) {
slowDown = true
}
} else {
// Set to the :00 of previous TimeEnd to ensure that the complete candle that was not formed before is written
originalTimeEnd = originalTimeEndZero
}

// Sleep for the timeframe
// Otherwise continue to call every second to backfill the data
// Slow Down for 1 Duration period
// Make sure last candle is formed
if slowDown {
// Set to the :00 of previous TimeEnd to ensure that the complete candle that was not formed before is written
originalTimeEnd = originalTimeEndZero

timeEnd = getStartOfCurrentTimeframe(originalInterval)
// To prevent gaps (ex: querying between 1:31 PM and 2:32 PM (hourly)would not be ideal)
// But we still want to wait 1 candle afterwards (ex: 1:01 PM (hourly))
Expand All @@ -477,7 +471,7 @@ func (bn *BinanceFetcher) Run() {
gotCandle := false
for !gotCandle {
rates, err := bn.client.NewKlinesService().
Symbol(bn.symbols[0] + baseCurrencies[0]).
Symbol(bn.symbols[0] + bn.baseCurrencies[0]).
Interval(timeInterval).
StartTime(timeStartM2).
Do(context.Background())
Expand All @@ -500,38 +494,7 @@ func (bn *BinanceFetcher) Run() {
timeStartM = timeStart.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))
timeEndM = timeEnd.UnixNano() / (int64(time.Millisecond) / int64(time.Nanosecond))

for _, symbol := range bn.symbols {
for _, baseCurrency := range baseCurrencies {
log.Info("Requesting %s %v - %v", symbol, timeStart, timeEnd)
rates, err := bn.client.NewKlinesService().Symbol(symbol + baseCurrency).
Interval(timeInterval).
StartTime(timeStartM).
EndTime(timeEndM).
Do(context.Background())
if err != nil {
log.Info("Response error: %v", err)
log.Info("Problematic symbol %s", symbol)
time.Sleep(time.Minute)
// Go back to last time
timeStart = originalTimeStart
continue
}

openTime, open, high, low, clos, volume := convertRateToRecords(rates)
if len(openTime) == 0 || len(open) == 0 || len(high) == 0 || len(low) == 0 || len(clos) == 0 || len(volume) == 0 {
// if data is nil, do not write to csm
continue
}

symbolDir := fmt.Sprintf("binance_%s-%s", symbol, baseCurrency)
tbk := io.NewTimeBucketKey(symbolDir + "/" + bn.baseTimeframe.String + "/OHLCV")
csm := makeCSM(tbk, slowDown, openTime, open, high, low, clos, volume)
err = executor.WriteCSM(csm, false)
if err != nil {
log.Error(fmt.Sprintf("[binancefeeder]failed to write CSM"), zap.Error(err))
}
}
}
bn.writeSymbols(timeStart, timeEnd, originalTimeStart, timeInterval, timeStartM, timeEndM, slowDown)

if slowDown {
// Sleep till next :00 time
Expand All @@ -543,6 +506,43 @@ func (bn *BinanceFetcher) Run() {
}
}

func (bn *BinanceFetcher) writeSymbols(timeStart, timeEnd, originalTimeStart time.Time,
timeInterval string, timeStartM, timeEndM int64, slowDown bool,
) {
for _, symbol := range bn.symbols {
for _, baseCurrency := range bn.baseCurrencies {
log.Info("Requesting %s %v - %v", symbol, timeStart, timeEnd)
rates, err := bn.client.NewKlinesService().Symbol(symbol + baseCurrency).
Interval(timeInterval).
StartTime(timeStartM).
EndTime(timeEndM).
Do(context.Background())
if err != nil {
log.Info("Response error: %v", err)
log.Info("Problematic symbol %s", symbol)
time.Sleep(time.Minute)
// Go back to last time
timeStart = originalTimeStart
continue
}

openTime, open, high, low, clos, volume := convertRateToRecords(rates)
if len(openTime) == 0 || len(open) == 0 || len(high) == 0 || len(low) == 0 || len(clos) == 0 || len(volume) == 0 {
// if data is nil, do not write to csm
continue
}

symbolDir := fmt.Sprintf("binance_%s-%s", symbol, baseCurrency)
tbk := io.NewTimeBucketKey(symbolDir + "/" + bn.baseTimeframe.String + "/OHLCV")
csm := makeCSM(tbk, slowDown, openTime, open, high, low, clos, volume)
err = executor.WriteCSM(csm, false)
if err != nil {
log.Error(fmt.Sprintf("[binancefeeder]failed to write CSM"), zap.Error(err))
}
}
}
}

func main() {
symbol := "BTC"
interval := "1m"
Expand Down
Loading

0 comments on commit 735e64f

Please sign in to comment.