From eb5f18c44cdfd173df5cde64c163a5e75804bada Mon Sep 17 00:00:00 2001 From: Christopher Ryan Date: Thu, 21 Jun 2018 16:25:09 -0700 Subject: [PATCH] revert aggtrigger caching The refactored aggtrigger caching has a bug in it that causes it to aggregate incorrectly for large batches. For now I am going to revert the changes, and file an issue to improve performance in the future. --- contrib/ondiskagg/aggtrigger/aggtrigger.go | 286 ++++++++------------- contrib/ondiskagg/aggtrigger/util.go | 45 ---- contrib/polygon/api/api.go | 15 +- 3 files changed, 117 insertions(+), 229 deletions(-) delete mode 100644 contrib/ondiskagg/aggtrigger/util.go diff --git a/contrib/ondiskagg/aggtrigger/aggtrigger.go b/contrib/ondiskagg/aggtrigger/aggtrigger.go index dc121bdbe..218de7bcc 100644 --- a/contrib/ondiskagg/aggtrigger/aggtrigger.go +++ b/contrib/ondiskagg/aggtrigger/aggtrigger.go @@ -25,10 +25,9 @@ package aggtrigger import ( "encoding/json" - "fmt" + "errors" "strconv" "strings" - "sync" "time" "github.com/alpacahq/marketstore/contrib/calendar" @@ -50,14 +49,15 @@ type AggTriggerConfig struct { // OnDiskAggTrigger is the main trigger. type OnDiskAggTrigger struct { config map[string]interface{} - destinations timeframes + destinations []string // filter by market hours if this is "nasdaq" - filter string - aggCache *sync.Map + filter string } var _ trigger.Trigger = &OnDiskAggTrigger{} +var loadError = errors.New("plugin load error") + func recast(config map[string]interface{}) *AggTriggerConfig { data, _ := json.Marshal(config) ret := AggTriggerConfig{} @@ -68,156 +68,137 @@ func recast(config map[string]interface{}) *AggTriggerConfig { // NewTrigger returns a new on-disk aggregate trigger based on the configuration. func NewTrigger(conf map[string]interface{}) (trigger.Trigger, error) { config := recast(conf) - if len(config.Destinations) == 0 { glog.Errorf("no destinations are configured") - return nil, fmt.Errorf("plugin load error") + return nil, loadError } glog.Infof("%d destination(s) configured", len(config.Destinations)) - filter := config.Filter if filter != "" && filter != "nasdaq" { - glog.Warningf("filter value \"%s\" is not recognized", filter) + glog.Infof("filter value \"%s\" is not recognized", filter) filter = "" } - - var tfs timeframes - - for _, dest := range config.Destinations { - tf := utils.TimeframeFromString(dest) - if tf == nil { - glog.Fatalf("invalid destination: %s", dest) - } - tfs = append(tfs, *tf) - } - return &OnDiskAggTrigger{ config: conf, - destinations: tfs, + destinations: config.Destinations, filter: filter, - aggCache: &sync.Map{}, }, nil } +func minInt64(values []int64) int64 { + min := values[0] + for _, v := range values[1:] { + if v < min { + min = v + } + } + return min +} + +func maxInt64(values []int64) int64 { + max := values[0] + for _, v := range values[1:] { + if v > max { + max = v + } + } + return max +} + // Fire implements trigger interface. func (s *OnDiskAggTrigger) Fire(keyPath string, records []trigger.Record) { + indexes := make([]int64, len(records)) + for i, record := range records { + indexes[i] = record.Index() + } + + headIndex := minInt64(indexes) + tailIndex := maxInt64(indexes) + + for _, timeframe := range s.destinations { + s.processFor(timeframe, keyPath, headIndex, tailIndex) + } +} + +func (s *OnDiskAggTrigger) processFor(timeframe, keyPath string, headIndex, tailIndex int64) { + theInstance := executor.ThisInstance + catalogDir := theInstance.CatalogDir elements := strings.Split(keyPath, "/") + tbkString := strings.Join(elements[:len(elements)-1], "/") tf := utils.NewTimeframe(elements[1]) fileName := elements[len(elements)-1] year, _ := strconv.Atoi(strings.Replace(fileName, ".bin", "", 1)) - tbk := io.NewTimeBucketKey(strings.Join(elements[:len(elements)-1], "/")) - - head := io.IndexToTime( - records[0].Index(), - tf.Duration, - int16(year)) - - tail := io.IndexToTime( - records[len(records)-1].Index(), - tf.Duration, - int16(year)) + tbk := io.NewTimeBucketKey(tbkString) + headTs := io.IndexToTime(headIndex, tf.Duration, int16(year)) + tailTs := io.IndexToTime(tailIndex, tf.Duration, int16(year)) + timeWindow := utils.CandleDurationFromString(timeframe) + start := timeWindow.Truncate(headTs) + end := timeWindow.Ceil(tailTs) + // TODO: this is not needed once we support "<" operator + end = end.Add(-time.Second) + + targetTbkString := elements[0] + "/" + timeframe + "/" + elements[2] + targetTbk := io.NewTimeBucketKey(targetTbkString) - // query the upper bound since it will contain the most candles - window := utils.CandleDurationFromString(s.destinations.UpperBound().String) - - // check if we have a valid cache, if not, re-query - if v, ok := s.aggCache.Load(tbk.String()); ok { - c := v.(*cachedAgg) - - if !c.Valid(tail, head) { - s.aggCache.Delete(tbk.String()) + // Scan + q := planner.NewQuery(catalogDir) + q.AddTargetKey(tbk) + q.SetRange(start.Unix(), end.Unix()) - goto Query + // decide whether to apply market-hour filter + applyingFilter := false + if s.filter == "nasdaq" && timeWindow.Duration() >= utils.Day { + calendarTz := calendar.Nasdaq.Tz() + if utils.InstanceConfig.Timezone.String() != calendarTz.String() { + glog.Errorf("misconfiguration... system must be configure in %s", calendarTz) + } else { + q.AddTimeQual(calendar.Nasdaq.EpochIsMarketOpen) + applyingFilter = true } - - cs := trigger.RecordsToColumnSeries( - *tbk, c.cs.GetDataShapes(), - c.cs.GetCandleAttributes(), - tf.Duration, int16(year), records) - - cs = io.ColumnSeriesUnion(cs, &c.cs) - - s.write(tbk, cs, tail, head, elements) - + } + parsed, err := q.Parse() + if err != nil { + glog.Errorf("%v", err) return } - -Query: - csm, err := s.query(tbk, window, head, tail) - if err != nil || csm == nil { - glog.Errorf("query error for %v (%v)", tbk.String(), err) + scanner, err := executor.NewReader(parsed) + if err != nil { + glog.Errorf("%v", err) return } - - cs := (*csm)[*tbk] - - if cs != nil { - s.write(tbk, cs, tail, head, elements) + csm, _, err := scanner.Read() + if err != nil { + glog.Errorf("%v", err) + return } - - return -} - -func (s *OnDiskAggTrigger) write( - tbk *io.TimeBucketKey, - cs *io.ColumnSeries, - tail, head time.Time, - elements []string) { - - for _, dest := range s.destinations { - aggTbk := io.NewTimeBucketKeyFromString(elements[0] + "/" + dest.String + "/" + elements[2]) - - if err := s.writeAggregates(aggTbk, tbk, *cs, dest, head, tail); err != nil { + cs := csm[*tbk] + if cs == nil || cs.Len() == 0 { + if !applyingFilter { + // Nothing in there... really? glog.Errorf( - "failed to write %v aggregates (%v)", - tbk.String(), - err) - return + "result is empty for %s -> %s - query: %v - %v", + tbk, + targetTbk, + q.Range.Start, + q.Range.End, + ) } + return } -} - -type cachedAgg struct { - cs io.ColumnSeries - tail, head time.Time -} - -func (c *cachedAgg) Valid(tail, head time.Time) bool { - return tail.Equal(tail) && head.Equal(head) -} - -func (s *OnDiskAggTrigger) writeAggregates( - aggTbk, baseTbk *io.TimeBucketKey, - cs io.ColumnSeries, - dest utils.Timeframe, - head, tail time.Time) error { - - csm := io.NewColumnSeriesMap() - - window := utils.CandleDurationFromString(dest.String) - start := window.Truncate(head).Unix() - end := window.Ceil(tail).Add(-time.Second).Unix() - - slc, err := io.SliceColumnSeriesByEpoch(cs, &start, &end) - if err != nil { - return err - } - - // store when writing for upper bound - if dest.Duration == s.destinations.UpperBound().Duration { - defer func() { - s.aggCache.Store(baseTbk.String(), &cachedAgg{ - cs: slc, - tail: time.Unix(start, 0), - head: time.Unix(end, 0), - }) - }() + // calculate aggregated values + outCs := aggregate(cs, targetTbk) + outCsm := io.NewColumnSeriesMap() + outCsm.AddColumnSeries(*targetTbk, outCs) + epoch := outCs.GetEpoch() + if err := executor.WriteCSM(outCsm, false); err != nil { + glog.Errorf( + "failed to write %v CSM from: %v to %v - Error: %v", + targetTbk.String(), + time.Unix(epoch[0], 0), + time.Unix(epoch[len(epoch)-1], 0), + err) } - - csm.AddColumnSeries(*aggTbk, aggregate(&slc, aggTbk)) - - return executor.WriteCSM(csm, false) } func aggregate(cs *io.ColumnSeries, tbk *io.TimeBucketKey) *io.ColumnSeries { @@ -260,64 +241,3 @@ func aggregate(cs *io.ColumnSeries, tbk *io.TimeBucketKey) *io.ColumnSeries { accumGroup.addColumns(outCs) return outCs } - -func (s *OnDiskAggTrigger) query( - tbk *io.TimeBucketKey, - window *utils.CandleDuration, - head, tail time.Time) (*io.ColumnSeriesMap, error) { - - cDir := executor.ThisInstance.CatalogDir - - start := window.Truncate(head) - - // TODO: adding 1 second is not needed once we support "<" operator - end := window.Ceil(tail).Add(-time.Second) - - // Scan - q := planner.NewQuery(cDir) - q.AddTargetKey(tbk) - q.SetRange(start.Unix(), end.Unix()) - - // decide whether to apply market-hour filter - applyingFilter := false - if s.filter == "nasdaq" && window.Duration() >= utils.Day { - calendarTz := calendar.Nasdaq.Tz() - if utils.InstanceConfig.Timezone.String() != calendarTz.String() { - glog.Errorf("misconfiguration... system must be configure in %s", calendarTz) - } else { - q.AddTimeQual(calendar.Nasdaq.EpochIsMarketOpen) - applyingFilter = true - } - } - - parsed, err := q.Parse() - if err != nil { - return nil, err - } - - scanner, err := executor.NewReader(parsed) - if err != nil { - return nil, err - } - - csm, _, err := scanner.Read() - if err != nil { - return nil, err - } - - cs := csm[*tbk] - if cs == nil || cs.Len() == 0 { - if !applyingFilter { - // Nothing in there... really? - glog.Errorf( - "result is empty for %s - query: %v - %v", - tbk, - q.Range.Start, - q.Range.End, - ) - } - return nil, err - } - - return &csm, nil -} diff --git a/contrib/ondiskagg/aggtrigger/util.go b/contrib/ondiskagg/aggtrigger/util.go deleted file mode 100644 index bf58fd45b..000000000 --- a/contrib/ondiskagg/aggtrigger/util.go +++ /dev/null @@ -1,45 +0,0 @@ -package aggtrigger - -import ( - "github.com/alpacahq/marketstore/utils" -) - -type timeframes []utils.Timeframe - -func (tfs *timeframes) UpperBound() (tf *utils.Timeframe) { - if tfs == nil { - return nil - } - - for _, t := range *tfs { - if tf == nil { - tf = &t - continue - } - - if t.Duration > tf.Duration { - tf = &t - } - } - - return -} - -func (tfs *timeframes) LowerBound() (tf *utils.Timeframe) { - if tfs == nil { - return nil - } - - for _, t := range *tfs { - if tf == nil { - tf = &t - continue - } - - if t.Duration < tf.Duration { - tf = &t - } - } - - return -} diff --git a/contrib/polygon/api/api.go b/contrib/polygon/api/api.go index 1629844cf..b1ebf068e 100644 --- a/contrib/polygon/api/api.go +++ b/contrib/polygon/api/api.go @@ -51,6 +51,8 @@ func GetAggregates(symbol string, from time.Time) (*GetAggregatesResponse, error from = from.In(NY) to := from.Add(7 * 24 * time.Hour) + retry := 0 + for { url := fmt.Sprintf("%s/v1/historic/agg/%s/%s?apiKey=%s&from=%s&to=%s", baseURL, "minute", symbol, @@ -82,8 +84,19 @@ func GetAggregates(symbol string, from time.Time) (*GetAggregatesResponse, error return nil, err } + // Sometimes polygon returns empty data set even though the data + // is there. Here we retry up to 5 times to ensure the data + // is really empty. This does add overhead, but since it is only + // called for the beginning backfill, it is worth it to not miss + // any data. Usually the data is returned within 3 retries. if len(r.Ticks) == 0 { - break + if retry <= 5 && from.Before(time.Now()) { + retry++ + continue + } else { + retry = 0 + break + } } resp.Ticks = append(resp.Ticks, r.Ticks...)