Skip to content

Commit

Permalink
Merge pull request #88 from alpacahq/fix/aggtrigger-filter
Browse files Browse the repository at this point in the history
revert aggtrigger performance refactor
  • Loading branch information
rocketbitz authored Jun 21, 2018
2 parents 81d47f4 + eb5f18c commit e941c16
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 229 deletions.
286 changes: 103 additions & 183 deletions contrib/ondiskagg/aggtrigger/aggtrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ package aggtrigger

import (
"encoding/json"
"fmt"
"errors"
"strconv"
"strings"
"sync"
"time"

"github.com/alpacahq/marketstore/contrib/calendar"
Expand All @@ -50,14 +49,15 @@ type AggTriggerConfig struct {
// OnDiskAggTrigger is the main trigger.
type OnDiskAggTrigger struct {
config map[string]interface{}
destinations timeframes
destinations []string
// filter by market hours if this is "nasdaq"
filter string
aggCache *sync.Map
filter string
}

var _ trigger.Trigger = &OnDiskAggTrigger{}

var loadError = errors.New("plugin load error")

func recast(config map[string]interface{}) *AggTriggerConfig {
data, _ := json.Marshal(config)
ret := AggTriggerConfig{}
Expand All @@ -68,156 +68,137 @@ func recast(config map[string]interface{}) *AggTriggerConfig {
// NewTrigger returns a new on-disk aggregate trigger based on the configuration.
func NewTrigger(conf map[string]interface{}) (trigger.Trigger, error) {
config := recast(conf)

if len(config.Destinations) == 0 {
glog.Errorf("no destinations are configured")
return nil, fmt.Errorf("plugin load error")
return nil, loadError
}

glog.Infof("%d destination(s) configured", len(config.Destinations))

filter := config.Filter
if filter != "" && filter != "nasdaq" {
glog.Warningf("filter value \"%s\" is not recognized", filter)
glog.Infof("filter value \"%s\" is not recognized", filter)
filter = ""
}

var tfs timeframes

for _, dest := range config.Destinations {
tf := utils.TimeframeFromString(dest)
if tf == nil {
glog.Fatalf("invalid destination: %s", dest)
}
tfs = append(tfs, *tf)
}

return &OnDiskAggTrigger{
config: conf,
destinations: tfs,
destinations: config.Destinations,
filter: filter,
aggCache: &sync.Map{},
}, nil
}

func minInt64(values []int64) int64 {
min := values[0]
for _, v := range values[1:] {
if v < min {
min = v
}
}
return min
}

func maxInt64(values []int64) int64 {
max := values[0]
for _, v := range values[1:] {
if v > max {
max = v
}
}
return max
}

// Fire implements trigger interface.
func (s *OnDiskAggTrigger) Fire(keyPath string, records []trigger.Record) {
indexes := make([]int64, len(records))
for i, record := range records {
indexes[i] = record.Index()
}

headIndex := minInt64(indexes)
tailIndex := maxInt64(indexes)

for _, timeframe := range s.destinations {
s.processFor(timeframe, keyPath, headIndex, tailIndex)
}
}

func (s *OnDiskAggTrigger) processFor(timeframe, keyPath string, headIndex, tailIndex int64) {
theInstance := executor.ThisInstance
catalogDir := theInstance.CatalogDir
elements := strings.Split(keyPath, "/")
tbkString := strings.Join(elements[:len(elements)-1], "/")
tf := utils.NewTimeframe(elements[1])
fileName := elements[len(elements)-1]
year, _ := strconv.Atoi(strings.Replace(fileName, ".bin", "", 1))
tbk := io.NewTimeBucketKey(strings.Join(elements[:len(elements)-1], "/"))

head := io.IndexToTime(
records[0].Index(),
tf.Duration,
int16(year))

tail := io.IndexToTime(
records[len(records)-1].Index(),
tf.Duration,
int16(year))
tbk := io.NewTimeBucketKey(tbkString)
headTs := io.IndexToTime(headIndex, tf.Duration, int16(year))
tailTs := io.IndexToTime(tailIndex, tf.Duration, int16(year))
timeWindow := utils.CandleDurationFromString(timeframe)
start := timeWindow.Truncate(headTs)
end := timeWindow.Ceil(tailTs)
// TODO: this is not needed once we support "<" operator
end = end.Add(-time.Second)

targetTbkString := elements[0] + "/" + timeframe + "/" + elements[2]
targetTbk := io.NewTimeBucketKey(targetTbkString)

// query the upper bound since it will contain the most candles
window := utils.CandleDurationFromString(s.destinations.UpperBound().String)

// check if we have a valid cache, if not, re-query
if v, ok := s.aggCache.Load(tbk.String()); ok {
c := v.(*cachedAgg)

if !c.Valid(tail, head) {
s.aggCache.Delete(tbk.String())
// Scan
q := planner.NewQuery(catalogDir)
q.AddTargetKey(tbk)
q.SetRange(start.Unix(), end.Unix())

goto Query
// decide whether to apply market-hour filter
applyingFilter := false
if s.filter == "nasdaq" && timeWindow.Duration() >= utils.Day {
calendarTz := calendar.Nasdaq.Tz()
if utils.InstanceConfig.Timezone.String() != calendarTz.String() {
glog.Errorf("misconfiguration... system must be configure in %s", calendarTz)
} else {
q.AddTimeQual(calendar.Nasdaq.EpochIsMarketOpen)
applyingFilter = true
}

cs := trigger.RecordsToColumnSeries(
*tbk, c.cs.GetDataShapes(),
c.cs.GetCandleAttributes(),
tf.Duration, int16(year), records)

cs = io.ColumnSeriesUnion(cs, &c.cs)

s.write(tbk, cs, tail, head, elements)

}
parsed, err := q.Parse()
if err != nil {
glog.Errorf("%v", err)
return
}

Query:
csm, err := s.query(tbk, window, head, tail)
if err != nil || csm == nil {
glog.Errorf("query error for %v (%v)", tbk.String(), err)
scanner, err := executor.NewReader(parsed)
if err != nil {
glog.Errorf("%v", err)
return
}

cs := (*csm)[*tbk]

if cs != nil {
s.write(tbk, cs, tail, head, elements)
csm, _, err := scanner.Read()
if err != nil {
glog.Errorf("%v", err)
return
}

return
}

func (s *OnDiskAggTrigger) write(
tbk *io.TimeBucketKey,
cs *io.ColumnSeries,
tail, head time.Time,
elements []string) {

for _, dest := range s.destinations {
aggTbk := io.NewTimeBucketKeyFromString(elements[0] + "/" + dest.String + "/" + elements[2])

if err := s.writeAggregates(aggTbk, tbk, *cs, dest, head, tail); err != nil {
cs := csm[*tbk]
if cs == nil || cs.Len() == 0 {
if !applyingFilter {
// Nothing in there... really?
glog.Errorf(
"failed to write %v aggregates (%v)",
tbk.String(),
err)
return
"result is empty for %s -> %s - query: %v - %v",
tbk,
targetTbk,
q.Range.Start,
q.Range.End,
)
}
return
}
}

type cachedAgg struct {
cs io.ColumnSeries
tail, head time.Time
}

func (c *cachedAgg) Valid(tail, head time.Time) bool {
return tail.Equal(tail) && head.Equal(head)
}

func (s *OnDiskAggTrigger) writeAggregates(
aggTbk, baseTbk *io.TimeBucketKey,
cs io.ColumnSeries,
dest utils.Timeframe,
head, tail time.Time) error {

csm := io.NewColumnSeriesMap()

window := utils.CandleDurationFromString(dest.String)
start := window.Truncate(head).Unix()
end := window.Ceil(tail).Add(-time.Second).Unix()

slc, err := io.SliceColumnSeriesByEpoch(cs, &start, &end)
if err != nil {
return err
}

// store when writing for upper bound
if dest.Duration == s.destinations.UpperBound().Duration {
defer func() {
s.aggCache.Store(baseTbk.String(), &cachedAgg{
cs: slc,
tail: time.Unix(start, 0),
head: time.Unix(end, 0),
})
}()
// calculate aggregated values
outCs := aggregate(cs, targetTbk)
outCsm := io.NewColumnSeriesMap()
outCsm.AddColumnSeries(*targetTbk, outCs)
epoch := outCs.GetEpoch()
if err := executor.WriteCSM(outCsm, false); err != nil {
glog.Errorf(
"failed to write %v CSM from: %v to %v - Error: %v",
targetTbk.String(),
time.Unix(epoch[0], 0),
time.Unix(epoch[len(epoch)-1], 0),
err)
}

csm.AddColumnSeries(*aggTbk, aggregate(&slc, aggTbk))

return executor.WriteCSM(csm, false)
}

func aggregate(cs *io.ColumnSeries, tbk *io.TimeBucketKey) *io.ColumnSeries {
Expand Down Expand Up @@ -260,64 +241,3 @@ func aggregate(cs *io.ColumnSeries, tbk *io.TimeBucketKey) *io.ColumnSeries {
accumGroup.addColumns(outCs)
return outCs
}

func (s *OnDiskAggTrigger) query(
tbk *io.TimeBucketKey,
window *utils.CandleDuration,
head, tail time.Time) (*io.ColumnSeriesMap, error) {

cDir := executor.ThisInstance.CatalogDir

start := window.Truncate(head)

// TODO: adding 1 second is not needed once we support "<" operator
end := window.Ceil(tail).Add(-time.Second)

// Scan
q := planner.NewQuery(cDir)
q.AddTargetKey(tbk)
q.SetRange(start.Unix(), end.Unix())

// decide whether to apply market-hour filter
applyingFilter := false
if s.filter == "nasdaq" && window.Duration() >= utils.Day {
calendarTz := calendar.Nasdaq.Tz()
if utils.InstanceConfig.Timezone.String() != calendarTz.String() {
glog.Errorf("misconfiguration... system must be configure in %s", calendarTz)
} else {
q.AddTimeQual(calendar.Nasdaq.EpochIsMarketOpen)
applyingFilter = true
}
}

parsed, err := q.Parse()
if err != nil {
return nil, err
}

scanner, err := executor.NewReader(parsed)
if err != nil {
return nil, err
}

csm, _, err := scanner.Read()
if err != nil {
return nil, err
}

cs := csm[*tbk]
if cs == nil || cs.Len() == 0 {
if !applyingFilter {
// Nothing in there... really?
glog.Errorf(
"result is empty for %s - query: %v - %v",
tbk,
q.Range.Start,
q.Range.End,
)
}
return nil, err
}

return &csm, nil
}
45 changes: 0 additions & 45 deletions contrib/ondiskagg/aggtrigger/util.go

This file was deleted.

Loading

0 comments on commit e941c16

Please sign in to comment.