Skip to content

Commit

Permalink
Merge pull request #90 from alpacahq/feature/agg-opt-v2
Browse files Browse the repository at this point in the history
aggtrigger optimization v2
  • Loading branch information
rocketbitz authored Jun 25, 2018
2 parents 5a9579b + 630db52 commit 34f5a07
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 83 deletions.
14 changes: 13 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

258 changes: 181 additions & 77 deletions contrib/ondiskagg/aggtrigger/aggtrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ package aggtrigger
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/alpacahq/marketstore/contrib/calendar"
Expand All @@ -49,9 +51,10 @@ type AggTriggerConfig struct {
// OnDiskAggTrigger is the main trigger.
type OnDiskAggTrigger struct {
config map[string]interface{}
destinations []string
destinations timeframes
// filter by market hours if this is "nasdaq"
filter string
filter string
aggCache *sync.Map
}

var _ trigger.Trigger = &OnDiskAggTrigger{}
Expand All @@ -68,24 +71,37 @@ func recast(config map[string]interface{}) *AggTriggerConfig {
// NewTrigger returns a new on-disk aggregate trigger based on the configuration.
func NewTrigger(conf map[string]interface{}) (trigger.Trigger, error) {
config := recast(conf)

if len(config.Destinations) == 0 {
glog.Errorf("no destinations are configured")
return nil, loadError
return nil, fmt.Errorf("plugin load error")
}

glog.Infof("%d destination(s) configured", len(config.Destinations))

filter := config.Filter
if filter != "" && filter != "nasdaq" {
glog.Infof("filter value \"%s\" is not recognized", filter)
glog.Warningf("filter value \"%s\" is not recognized", filter)
filter = ""
}

var tfs timeframes

for _, dest := range config.Destinations {
tf := utils.TimeframeFromString(dest)
if tf == nil {
glog.Fatalf("invalid destination: %s", dest)
}
tfs = append(tfs, *tf)
}

return &OnDiskAggTrigger{
config: conf,
destinations: config.Destinations,
destinations: tfs,
filter: filter,
aggCache: &sync.Map{},
}, nil
}

func minInt64(values []int64) int64 {
min := values[0]
for _, v := range values[1:] {
Expand All @@ -108,97 +124,150 @@ func maxInt64(values []int64) int64 {

// Fire implements trigger interface.
func (s *OnDiskAggTrigger) Fire(keyPath string, records []trigger.Record) {
indexes := make([]int64, len(records))
for i, record := range records {
indexes[i] = record.Index()
elements := strings.Split(keyPath, "/")
tf := utils.NewTimeframe(elements[1])
fileName := elements[len(elements)-1]
year, _ := strconv.Atoi(strings.Replace(fileName, ".bin", "", 1))
tbk := io.NewTimeBucketKey(strings.Join(elements[:len(elements)-1], "/"))

head := io.IndexToTime(
records[0].Index(),
tf.Duration,
int16(year))

tail := io.IndexToTime(
records[len(records)-1].Index(),
tf.Duration,
int16(year))

// query the upper bound since it will contain the most candles
window := utils.CandleDurationFromString(s.destinations.UpperBound().String)

// check if we have a valid cache, if not, re-query
if v, ok := s.aggCache.Load(tbk.String()); ok {
c := v.(*cachedAgg)

if !c.Valid(tail, head) {
s.aggCache.Delete(tbk.String())

goto Query
}

cs := trigger.RecordsToColumnSeries(
*tbk, c.cs.GetDataShapes(),
c.cs.GetCandleAttributes(),
tf.Duration, int16(year), records)

cs = io.ColumnSeriesUnion(cs, &c.cs)

s.write(tbk, cs, tail, head, elements)

return
}

Query:
csm, err := s.query(tbk, window, head, tail)
if err != nil || csm == nil {
glog.Errorf("query error for %v (%v)", tbk.String(), err)
return
}

headIndex := minInt64(indexes)
tailIndex := maxInt64(indexes)
cs := (*csm)[*tbk]

for _, timeframe := range s.destinations {
s.processFor(timeframe, keyPath, headIndex, tailIndex)
if cs != nil {
s.write(tbk, cs, tail, head, elements)
}

return
}

func (s *OnDiskAggTrigger) processFor(timeframe, keyPath string, headIndex, tailIndex int64) {
theInstance := executor.ThisInstance
catalogDir := theInstance.CatalogDir
elements := strings.Split(keyPath, "/")
tbkString := strings.Join(elements[:len(elements)-1], "/")
tf := utils.NewTimeframe(elements[1])
fileName := elements[len(elements)-1]
year, _ := strconv.Atoi(strings.Replace(fileName, ".bin", "", 1))
tbk := io.NewTimeBucketKey(tbkString)
headTs := io.IndexToTime(headIndex, tf.Duration, int16(year))
tailTs := io.IndexToTime(tailIndex, tf.Duration, int16(year))
timeWindow := utils.CandleDurationFromString(timeframe)
start := timeWindow.Truncate(headTs)
end := timeWindow.Ceil(tailTs)
// TODO: this is not needed once we support "<" operator
end = end.Add(-time.Second)

targetTbkString := elements[0] + "/" + timeframe + "/" + elements[2]
targetTbk := io.NewTimeBucketKey(targetTbkString)
func (s *OnDiskAggTrigger) write(
tbk *io.TimeBucketKey,
cs *io.ColumnSeries,
tail, head time.Time,
elements []string) {

// Scan
q := planner.NewQuery(catalogDir)
q.AddTargetKey(tbk)
q.SetRange(start.Unix(), end.Unix())
for _, dest := range s.destinations {
aggTbk := io.NewTimeBucketKeyFromString(elements[0] + "/" + dest.String + "/" + elements[2])

if err := s.writeAggregates(aggTbk, tbk, *cs, dest, head, tail); err != nil {
glog.Errorf(
"failed to write %v aggregates (%v)",
tbk.String(),
err)
return
}
}
}

type cachedAgg struct {
cs io.ColumnSeries
tail, head time.Time
}

func (c *cachedAgg) Valid(tail, head time.Time) bool {
return tail.Equal(c.tail) && head.Equal(c.head)
}

func (s *OnDiskAggTrigger) writeAggregates(
aggTbk, baseTbk *io.TimeBucketKey,
cs io.ColumnSeries,
dest utils.Timeframe,
head, tail time.Time) error {

csm := io.NewColumnSeriesMap()

window := utils.CandleDurationFromString(dest.String)
start := window.Truncate(head).Unix()
end := window.Ceil(tail).Add(-time.Second).Unix()

slc, err := io.SliceColumnSeriesByEpoch(cs, &start, &end)
if err != nil {
return err
}

// decide whether to apply market-hour filter
applyingFilter := false
if s.filter == "nasdaq" && timeWindow.Duration() >= utils.Day {
if s.filter == "nasdaq" && window.Duration() >= utils.Day {
calendarTz := calendar.Nasdaq.Tz()
if utils.InstanceConfig.Timezone.String() != calendarTz.String() {
glog.Errorf("misconfiguration... system must be configure in %s", calendarTz)
} else {
q.AddTimeQual(calendar.Nasdaq.EpochIsMarketOpen)
applyingFilter = true
}
}
parsed, err := q.Parse()
if err != nil {
glog.Errorf("%v", err)
return
}
scanner, err := executor.NewReader(parsed)
if err != nil {
glog.Errorf("%v", err)
return
}
csm, _, err := scanner.Read()
if err != nil {
glog.Errorf("%v", err)
return

// store when writing for upper bound
if dest.Duration == s.destinations.UpperBound().Duration {
defer func() {
t := window.Truncate(tail)
tEpoch := t.Unix()
h := time.Unix(end, 0)

cacheSlc, _ := io.SliceColumnSeriesByEpoch(cs, &tEpoch, &end)

s.aggCache.Store(baseTbk.String(), &cachedAgg{
cs: cacheSlc,
tail: t,
head: h,
})
}()
}
cs := csm[*tbk]
if cs == nil || cs.Len() == 0 {
if !applyingFilter {
// Nothing in there... really?
glog.Errorf(
"result is empty for %s -> %s - query: %v - %v",
tbk,
targetTbk,
q.Range.Start,
q.Range.End,
)

// apply the filter
if applyingFilter {
tqSlc := slc.ApplyTimeQual(calendar.Nasdaq.EpochIsMarketOpen)

// normally this will always be true, but when there are random bars
// on the weekend, it won't be, so checking to avoid panic
if len(tqSlc.GetEpoch()) > 0 {
csm.AddColumnSeries(*aggTbk, aggregate(tqSlc, aggTbk))
}
return
}
// calculate aggregated values
outCs := aggregate(cs, targetTbk)
outCsm := io.NewColumnSeriesMap()
outCsm.AddColumnSeries(*targetTbk, outCs)
epoch := outCs.GetEpoch()
if err := executor.WriteCSM(outCsm, false); err != nil {
glog.Errorf(
"failed to write %v CSM from: %v to %v - Error: %v",
targetTbk.String(),
time.Unix(epoch[0], 0),
time.Unix(epoch[len(epoch)-1], 0),
err)
} else {
csm.AddColumnSeries(*aggTbk, aggregate(&slc, aggTbk))
}

return executor.WriteCSM(csm, false)
}

func aggregate(cs *io.ColumnSeries, tbk *io.TimeBucketKey) *io.ColumnSeries {
Expand Down Expand Up @@ -241,3 +310,38 @@ func aggregate(cs *io.ColumnSeries, tbk *io.TimeBucketKey) *io.ColumnSeries {
accumGroup.addColumns(outCs)
return outCs
}

func (s *OnDiskAggTrigger) query(
tbk *io.TimeBucketKey,
window *utils.CandleDuration,
head, tail time.Time) (*io.ColumnSeriesMap, error) {

cDir := executor.ThisInstance.CatalogDir

start := window.Truncate(head)

// TODO: adding 1 second is not needed once we support "<" operator
end := window.Ceil(tail).Add(-time.Second)

// Scan
q := planner.NewQuery(cDir)
q.AddTargetKey(tbk)
q.SetRange(start.Unix(), end.Unix())

parsed, err := q.Parse()
if err != nil {
return nil, err
}

scanner, err := executor.NewReader(parsed)
if err != nil {
return nil, err
}

csm, _, err := scanner.Read()
if err != nil {
return nil, err
}

return &csm, nil
}
Loading

0 comments on commit 34f5a07

Please sign in to comment.