From 630db5234c442db195576ee7b16f0d7853840fcc Mon Sep 17 00:00:00 2001 From: Christopher Ryan Date: Mon, 25 Jun 2018 08:11:14 -0700 Subject: [PATCH] aggtrigger optimization v2 --- Gopkg.lock | 14 +- contrib/ondiskagg/aggtrigger/aggtrigger.go | 258 +++++++++++++++------ contrib/ondiskagg/aggtrigger/util.go | 45 ++++ utils/io/all_test.go | 78 +++++++ utils/io/columnseries.go | 42 +++- 5 files changed, 354 insertions(+), 83 deletions(-) create mode 100644 contrib/ondiskagg/aggtrigger/util.go diff --git a/Gopkg.lock b/Gopkg.lock index 3ad719646..fa1b982ae 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -19,6 +19,12 @@ packages = ["."] revision = "8828253bfc54f2f91aaef5f32b44a244b0a98e4f" +[[projects]] + name = "github.com/adshao/go-binance" + packages = ["."] + revision = "6cf0e802b22a56ff09523536cf27fbb666180a98" + version = "1.1.2" + [[projects]] name = "github.com/alpacahq/slait" packages = [ @@ -50,6 +56,12 @@ ] revision = "b565731e1464263de0bda75f2e45d97b54b60110" +[[projects]] + name = "github.com/bitly/go-simplejson" + packages = ["."] + revision = "aabad6e819789e569bd6aabf444c935aa9ba1e44" + version = "v0.5.0" + [[projects]] branch = "master" name = "github.com/buger/jsonparser" @@ -397,6 +409,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "0b1792f8f5e6028c98a7ea6872131af234eca422988af00d0ac303aeb53addba" + inputs-digest = "1a17f8ee8522fb718c2435215b88c237ab4ff6c9649b14f84625bd4179173057" solver-name = "gps-cdcl" solver-version = 1 diff --git a/contrib/ondiskagg/aggtrigger/aggtrigger.go b/contrib/ondiskagg/aggtrigger/aggtrigger.go index 218de7bcc..dc4a5b1ba 100644 --- a/contrib/ondiskagg/aggtrigger/aggtrigger.go +++ b/contrib/ondiskagg/aggtrigger/aggtrigger.go @@ -26,8 +26,10 @@ package aggtrigger import ( "encoding/json" "errors" + "fmt" "strconv" "strings" + "sync" "time" "github.com/alpacahq/marketstore/contrib/calendar" @@ -49,9 +51,10 @@ type AggTriggerConfig struct { // OnDiskAggTrigger is the main trigger. type OnDiskAggTrigger struct { config map[string]interface{} - destinations []string + destinations timeframes // filter by market hours if this is "nasdaq" - filter string + filter string + aggCache *sync.Map } var _ trigger.Trigger = &OnDiskAggTrigger{} @@ -68,24 +71,37 @@ func recast(config map[string]interface{}) *AggTriggerConfig { // NewTrigger returns a new on-disk aggregate trigger based on the configuration. func NewTrigger(conf map[string]interface{}) (trigger.Trigger, error) { config := recast(conf) + if len(config.Destinations) == 0 { glog.Errorf("no destinations are configured") - return nil, loadError + return nil, fmt.Errorf("plugin load error") } glog.Infof("%d destination(s) configured", len(config.Destinations)) + filter := config.Filter if filter != "" && filter != "nasdaq" { - glog.Infof("filter value \"%s\" is not recognized", filter) + glog.Warningf("filter value \"%s\" is not recognized", filter) filter = "" } + + var tfs timeframes + + for _, dest := range config.Destinations { + tf := utils.TimeframeFromString(dest) + if tf == nil { + glog.Fatalf("invalid destination: %s", dest) + } + tfs = append(tfs, *tf) + } + return &OnDiskAggTrigger{ config: conf, - destinations: config.Destinations, + destinations: tfs, filter: filter, + aggCache: &sync.Map{}, }, nil } - func minInt64(values []int64) int64 { min := values[0] for _, v := range values[1:] { @@ -108,97 +124,150 @@ func maxInt64(values []int64) int64 { // Fire implements trigger interface. func (s *OnDiskAggTrigger) Fire(keyPath string, records []trigger.Record) { - indexes := make([]int64, len(records)) - for i, record := range records { - indexes[i] = record.Index() + elements := strings.Split(keyPath, "/") + tf := utils.NewTimeframe(elements[1]) + fileName := elements[len(elements)-1] + year, _ := strconv.Atoi(strings.Replace(fileName, ".bin", "", 1)) + tbk := io.NewTimeBucketKey(strings.Join(elements[:len(elements)-1], "/")) + + head := io.IndexToTime( + records[0].Index(), + tf.Duration, + int16(year)) + + tail := io.IndexToTime( + records[len(records)-1].Index(), + tf.Duration, + int16(year)) + + // query the upper bound since it will contain the most candles + window := utils.CandleDurationFromString(s.destinations.UpperBound().String) + + // check if we have a valid cache, if not, re-query + if v, ok := s.aggCache.Load(tbk.String()); ok { + c := v.(*cachedAgg) + + if !c.Valid(tail, head) { + s.aggCache.Delete(tbk.String()) + + goto Query + } + + cs := trigger.RecordsToColumnSeries( + *tbk, c.cs.GetDataShapes(), + c.cs.GetCandleAttributes(), + tf.Duration, int16(year), records) + + cs = io.ColumnSeriesUnion(cs, &c.cs) + + s.write(tbk, cs, tail, head, elements) + + return + } + +Query: + csm, err := s.query(tbk, window, head, tail) + if err != nil || csm == nil { + glog.Errorf("query error for %v (%v)", tbk.String(), err) + return } - headIndex := minInt64(indexes) - tailIndex := maxInt64(indexes) + cs := (*csm)[*tbk] - for _, timeframe := range s.destinations { - s.processFor(timeframe, keyPath, headIndex, tailIndex) + if cs != nil { + s.write(tbk, cs, tail, head, elements) } + + return } -func (s *OnDiskAggTrigger) processFor(timeframe, keyPath string, headIndex, tailIndex int64) { - theInstance := executor.ThisInstance - catalogDir := theInstance.CatalogDir - elements := strings.Split(keyPath, "/") - tbkString := strings.Join(elements[:len(elements)-1], "/") - tf := utils.NewTimeframe(elements[1]) - fileName := elements[len(elements)-1] - year, _ := strconv.Atoi(strings.Replace(fileName, ".bin", "", 1)) - tbk := io.NewTimeBucketKey(tbkString) - headTs := io.IndexToTime(headIndex, tf.Duration, int16(year)) - tailTs := io.IndexToTime(tailIndex, tf.Duration, int16(year)) - timeWindow := utils.CandleDurationFromString(timeframe) - start := timeWindow.Truncate(headTs) - end := timeWindow.Ceil(tailTs) - // TODO: this is not needed once we support "<" operator - end = end.Add(-time.Second) - - targetTbkString := elements[0] + "/" + timeframe + "/" + elements[2] - targetTbk := io.NewTimeBucketKey(targetTbkString) +func (s *OnDiskAggTrigger) write( + tbk *io.TimeBucketKey, + cs *io.ColumnSeries, + tail, head time.Time, + elements []string) { - // Scan - q := planner.NewQuery(catalogDir) - q.AddTargetKey(tbk) - q.SetRange(start.Unix(), end.Unix()) + for _, dest := range s.destinations { + aggTbk := io.NewTimeBucketKeyFromString(elements[0] + "/" + dest.String + "/" + elements[2]) + + if err := s.writeAggregates(aggTbk, tbk, *cs, dest, head, tail); err != nil { + glog.Errorf( + "failed to write %v aggregates (%v)", + tbk.String(), + err) + return + } + } +} + +type cachedAgg struct { + cs io.ColumnSeries + tail, head time.Time +} + +func (c *cachedAgg) Valid(tail, head time.Time) bool { + return tail.Equal(c.tail) && head.Equal(c.head) +} + +func (s *OnDiskAggTrigger) writeAggregates( + aggTbk, baseTbk *io.TimeBucketKey, + cs io.ColumnSeries, + dest utils.Timeframe, + head, tail time.Time) error { + + csm := io.NewColumnSeriesMap() + + window := utils.CandleDurationFromString(dest.String) + start := window.Truncate(head).Unix() + end := window.Ceil(tail).Add(-time.Second).Unix() + + slc, err := io.SliceColumnSeriesByEpoch(cs, &start, &end) + if err != nil { + return err + } // decide whether to apply market-hour filter applyingFilter := false - if s.filter == "nasdaq" && timeWindow.Duration() >= utils.Day { + if s.filter == "nasdaq" && window.Duration() >= utils.Day { calendarTz := calendar.Nasdaq.Tz() if utils.InstanceConfig.Timezone.String() != calendarTz.String() { glog.Errorf("misconfiguration... system must be configure in %s", calendarTz) } else { - q.AddTimeQual(calendar.Nasdaq.EpochIsMarketOpen) applyingFilter = true } } - parsed, err := q.Parse() - if err != nil { - glog.Errorf("%v", err) - return - } - scanner, err := executor.NewReader(parsed) - if err != nil { - glog.Errorf("%v", err) - return - } - csm, _, err := scanner.Read() - if err != nil { - glog.Errorf("%v", err) - return + + // store when writing for upper bound + if dest.Duration == s.destinations.UpperBound().Duration { + defer func() { + t := window.Truncate(tail) + tEpoch := t.Unix() + h := time.Unix(end, 0) + + cacheSlc, _ := io.SliceColumnSeriesByEpoch(cs, &tEpoch, &end) + + s.aggCache.Store(baseTbk.String(), &cachedAgg{ + cs: cacheSlc, + tail: t, + head: h, + }) + }() } - cs := csm[*tbk] - if cs == nil || cs.Len() == 0 { - if !applyingFilter { - // Nothing in there... really? - glog.Errorf( - "result is empty for %s -> %s - query: %v - %v", - tbk, - targetTbk, - q.Range.Start, - q.Range.End, - ) + + // apply the filter + if applyingFilter { + tqSlc := slc.ApplyTimeQual(calendar.Nasdaq.EpochIsMarketOpen) + + // normally this will always be true, but when there are random bars + // on the weekend, it won't be, so checking to avoid panic + if len(tqSlc.GetEpoch()) > 0 { + csm.AddColumnSeries(*aggTbk, aggregate(tqSlc, aggTbk)) } - return - } - // calculate aggregated values - outCs := aggregate(cs, targetTbk) - outCsm := io.NewColumnSeriesMap() - outCsm.AddColumnSeries(*targetTbk, outCs) - epoch := outCs.GetEpoch() - if err := executor.WriteCSM(outCsm, false); err != nil { - glog.Errorf( - "failed to write %v CSM from: %v to %v - Error: %v", - targetTbk.String(), - time.Unix(epoch[0], 0), - time.Unix(epoch[len(epoch)-1], 0), - err) + } else { + csm.AddColumnSeries(*aggTbk, aggregate(&slc, aggTbk)) } + + return executor.WriteCSM(csm, false) } func aggregate(cs *io.ColumnSeries, tbk *io.TimeBucketKey) *io.ColumnSeries { @@ -241,3 +310,38 @@ func aggregate(cs *io.ColumnSeries, tbk *io.TimeBucketKey) *io.ColumnSeries { accumGroup.addColumns(outCs) return outCs } + +func (s *OnDiskAggTrigger) query( + tbk *io.TimeBucketKey, + window *utils.CandleDuration, + head, tail time.Time) (*io.ColumnSeriesMap, error) { + + cDir := executor.ThisInstance.CatalogDir + + start := window.Truncate(head) + + // TODO: adding 1 second is not needed once we support "<" operator + end := window.Ceil(tail).Add(-time.Second) + + // Scan + q := planner.NewQuery(cDir) + q.AddTargetKey(tbk) + q.SetRange(start.Unix(), end.Unix()) + + parsed, err := q.Parse() + if err != nil { + return nil, err + } + + scanner, err := executor.NewReader(parsed) + if err != nil { + return nil, err + } + + csm, _, err := scanner.Read() + if err != nil { + return nil, err + } + + return &csm, nil +} diff --git a/contrib/ondiskagg/aggtrigger/util.go b/contrib/ondiskagg/aggtrigger/util.go new file mode 100644 index 000000000..bf58fd45b --- /dev/null +++ b/contrib/ondiskagg/aggtrigger/util.go @@ -0,0 +1,45 @@ +package aggtrigger + +import ( + "github.com/alpacahq/marketstore/utils" +) + +type timeframes []utils.Timeframe + +func (tfs *timeframes) UpperBound() (tf *utils.Timeframe) { + if tfs == nil { + return nil + } + + for _, t := range *tfs { + if tf == nil { + tf = &t + continue + } + + if t.Duration > tf.Duration { + tf = &t + } + } + + return +} + +func (tfs *timeframes) LowerBound() (tf *utils.Timeframe) { + if tfs == nil { + return nil + } + + for _, t := range *tfs { + if tf == nil { + tf = &t + continue + } + + if t.Duration < tf.Duration { + tf = &t + } + } + + return +} diff --git a/utils/io/all_test.go b/utils/io/all_test.go index 37454a6fd..f0cc8d3b1 100644 --- a/utils/io/all_test.go +++ b/utils/io/all_test.go @@ -483,3 +483,81 @@ func (s *TestSuite) TestUnion(c *C) { c.Assert(cs.GetEpoch()[4], Equals, csC.GetEpoch()[1]) c.Assert(cs.GetEpoch()[5], Equals, csC.GetEpoch()[2]) } + +func (s *TestSuite) TestSliceByEpoch(c *C) { + cs := makeTestCS() + + // just start + start := int64(2) + slc, err := SliceColumnSeriesByEpoch(*cs, &start, nil) + c.Assert(err, IsNil) + c.Assert(slc, NotNil) + c.Assert(slc.Len(), Equals, 2) + c.Assert(slc.GetEpoch()[0], Equals, cs.GetEpoch()[1]) + + // no slice + start = int64(0) + slc, err = SliceColumnSeriesByEpoch(*cs, &start, nil) + c.Assert(err, IsNil) + c.Assert(slc, NotNil) + c.Assert(slc.Len(), Equals, 3) + c.Assert(slc.GetEpoch()[0], Equals, cs.GetEpoch()[0]) + + // just end + end := int64(3) + slc, err = SliceColumnSeriesByEpoch(*cs, nil, &end) + c.Assert(err, IsNil) + c.Assert(slc, NotNil) + c.Assert(slc.Len(), Equals, 2) + c.Assert(slc.GetEpoch()[1], Equals, cs.GetEpoch()[1]) + + // no slice + end = int64(4) + slc, err = SliceColumnSeriesByEpoch(*cs, nil, &end) + c.Assert(err, IsNil) + c.Assert(slc, NotNil) + c.Assert(slc.Len(), Equals, 3) + c.Assert(slc.GetEpoch()[2], Equals, cs.GetEpoch()[2]) + + // start and end + start = int64(2) + end = int64(3) + slc, err = SliceColumnSeriesByEpoch(*cs, &start, &end) + c.Assert(err, IsNil) + c.Assert(slc, NotNil) + c.Assert(slc.Len(), Equals, 1) + c.Assert(slc.GetEpoch()[0], Equals, cs.GetEpoch()[1]) + + // no slice + start = int64(0) + end = int64(4) + slc, err = SliceColumnSeriesByEpoch(*cs, &start, &end) + c.Assert(err, IsNil) + c.Assert(slc, NotNil) + c.Assert(slc.Len(), Equals, 3) + c.Assert(slc.GetEpoch()[0], Equals, cs.GetEpoch()[0]) + c.Assert(slc.GetEpoch()[2], Equals, cs.GetEpoch()[2]) +} + +func (s *TestSuite) TestApplyTimeQual(c *C) { + cs := makeTestCS() + + tq := func(epoch int64) bool { + if epoch == int64(2) { + return true + } + return false + } + + tqCS := cs.ApplyTimeQual(tq) + + c.Assert(tqCS.Len(), Equals, 1) + c.Assert(tqCS.GetEpoch()[0], Equals, cs.GetEpoch()[1]) + c.Assert(tqCS.GetByName("One").([]float32)[0], Equals, cs.GetByName("One").([]float32)[1]) + + tq = func(epoch int64) bool { + return false + } + + c.Assert(cs.ApplyTimeQual(tq).Len(), Equals, 0) +} diff --git a/utils/io/columnseries.go b/utils/io/columnseries.go index fd73553af..1756b74e2 100644 --- a/utils/io/columnseries.go +++ b/utils/io/columnseries.go @@ -238,6 +238,39 @@ func (cs *ColumnSeries) AddNullColumn(ds DataShape) { cs.AddColumn(ds.Name, ds.Type.SliceOf(cs.Len())) } +// ApplyTimeQual takes a function that determines whether or +// not a given epoch time is valid, and applies that function +// to the ColumnSeries, removing invalid entries. +func (cs *ColumnSeries) ApplyTimeQual(tq func(epoch int64) bool) *ColumnSeries { + indexes := []int{} + + out := &ColumnSeries{ + orderedNames: cs.orderedNames, + candleAttributes: cs.candleAttributes, + nameIncrement: cs.nameIncrement, + columns: map[string]interface{}{}, + } + + for i, epoch := range cs.GetEpoch() { + if tq(epoch) { + indexes = append(indexes, i) + } + } + + for name, col := range cs.columns { + iv := reflect.ValueOf(col) + slc := reflect.MakeSlice(reflect.TypeOf(col), 0, 0) + + for _, index := range indexes { + slc = reflect.Append(slc, iv.Index(index)) + } + + out.columns[name] = slc.Interface() + } + + return out +} + // SliceColumnSeriesByEpoch slices the column series by the provided epochs, // returning a new column series with only records occurring // between the two provided epoch times. If only one is provided, @@ -262,7 +295,7 @@ func SliceColumnSeriesByEpoch(cs ColumnSeries, start, end *int64) (slc ColumnSer if start != nil { for ; index < len(epochs); index++ { if epochs[index] >= *start { - if err = slc.RestrictLength(len(epochs)-index, FIRST); err != nil { + if err = slc.RestrictLength(len(epochs)-index, LAST); err != nil { return } break @@ -272,10 +305,9 @@ func SliceColumnSeriesByEpoch(cs ColumnSeries, start, end *int64) (slc ColumnSer if end != nil { epochs = slc.GetEpoch() - - for index = len(epochs) - 1; index > 0; index-- { - if epochs[index] <= *end { - if err = slc.RestrictLength(index, LAST); err != nil { + for index = len(epochs) - 1; index >= 0; index-- { + if epochs[index] < *end { + if err = slc.RestrictLength(index+1, FIRST); err != nil { return } break