Skip to content

Commit

Permalink
fix(lint): revive lint issues (#572)
Browse files Browse the repository at this point in the history
  • Loading branch information
dakimura authored Mar 14, 2022
1 parent cde80d7 commit d382167
Show file tree
Hide file tree
Showing 39 changed files with 253 additions and 294 deletions.
70 changes: 33 additions & 37 deletions catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,16 @@ func writeCategoryNameFile(catName, dirName string) error {
return nil
}

func (dRoot *Directory) AddTimeBucket(tbk *io.TimeBucketKey, f *io.TimeBucketInfo) (err error) {
/*
Adds a (possibly) new data item to a rootpath. Takes an existing catalog directory and
adds the new data item to that data directory.
*/
dRoot.Lock()
defer dRoot.Unlock()
// AddTimeBucket adds a (possibly) new data item to a rootpath. Takes an existing catalog directory and
// adds the new data item to that data directory. This is used only for a root category directory.
func (d *Directory) AddTimeBucket(tbk *io.TimeBucketKey, f *io.TimeBucketInfo) (err error) {
d.Lock()
defer d.Unlock()

catkeySplit := tbk.GetCategories()
datakeySplit := tbk.GetItems()

dirname := dRoot.GetPath()
dirname := d.GetPath()
for i, dataDirName := range datakeySplit {
subdirname := filepath.Join(dirname, dataDirName)
if !fileExists(subdirname) {
Expand All @@ -189,36 +187,34 @@ func (dRoot *Directory) AddTimeBucket(tbk *io.TimeBucketKey, f *io.TimeBucketInf
Check to see if this is an empty top level directory, if so - we need to set
the top level category in the catalog entry
*/
if dRoot.category == "" {
dRoot.category = catkeySplit[0]
if d.category == "" {
d.category = catkeySplit[0]
}

/*
Add this child directory tree to the parent top node's tree
*/
childNodeName := datakeySplit[0]
childNodePath := filepath.Join(dRoot.GetPath(), childNodeName)
childNodePath := filepath.Join(d.GetPath(), childNodeName)
childDirectory, err := NewDirectory(childNodePath)
if err != nil {
return err
}
dRoot.addSubdir(childDirectory, childNodeName)
d.addSubdir(childDirectory, childNodeName)
return nil
}

func (dRoot *Directory) RemoveTimeBucket(tbk *io.TimeBucketKey) (err error) {
/*
Deletes the item at the last level specified in the dataItemKey
Also removes empty directories at the higher levels after the delete
*/
if dRoot == nil {
// RemoveTimeBucket deletes the item at the last level specified in the dataItemKey
// Also removes empty directories at the higher levels after the delete. This is used for a root catalog directory.
func (d *Directory) RemoveTimeBucket(tbk *io.TimeBucketKey) (err error) {
if d == nil {
return fmt.Errorf(io.GetCallerFileContext(0) + ": Directory called from is nil")
}

datakeySplit := tbk.GetItems()

tree := make([]*Directory, len(datakeySplit))
current := dRoot
current := d
for i := 0; i < len(datakeySplit); i++ {
itemName := datakeySplit[i]
// Descend from the current directory to find the first directory with the item name
Expand All @@ -234,7 +230,7 @@ func (dRoot *Directory) RemoveTimeBucket(tbk *io.TimeBucketKey) (err error) {
removeDirFiles(tree[i])
deleteMap[i] = true // This dir was deleted, we'll remove it from the parent's subdir list later
} else if deleteMap[i+1] {
tree[i].removeSubDir(tree[i+1].itemName, dRoot.directMap)
tree[i].removeSubDir(tree[i+1].itemName, d.directMap)
}
if !tree[i].DirHasSubDirs() {
removeDirFiles(tree[i])
Expand All @@ -243,7 +239,7 @@ func (dRoot *Directory) RemoveTimeBucket(tbk *io.TimeBucketKey) (err error) {
}
if deleteMap[0] {
removeDirFiles(tree[0])
dRoot.removeSubDir(tree[0].itemName, dRoot.directMap)
d.removeSubDir(tree[0].itemName, d.directMap)
}
return nil
}
Expand Down Expand Up @@ -331,7 +327,7 @@ func (d *Directory) GetDataShapes(key *io.TimeBucketKey) (dsv []io.DataShape, er
return fi.GetDataShapes(), nil
}

func (subDir *Directory) AddFile(newYear int16) (finfo_p *io.TimeBucketInfo, err error) {
func (d *Directory) AddFile(newYear int16) (finfo_p *io.TimeBucketInfo, err error) { //d should be a subdirectory
// Must be thread-safe for WRITE access
/*
Adds a new primary storage file for the provided year to this directory
Expand All @@ -343,25 +339,25 @@ func (subDir *Directory) AddFile(newYear int16) (finfo_p *io.TimeBucketInfo, err
!!! NOTE !!! This should be called from the subdirectory that "owns" the file
*/
subDir.RLock()
if subDir.datafile == nil {
subDir.RUnlock()
return nil, SubdirectoryDoesNotContainFiles(subDir.pathToItemName)
d.RLock()
if d.datafile == nil {
d.RUnlock()
return nil, SubdirectoryDoesNotContainFiles(d.pathToItemName)
}

var finfoTemplate *io.TimeBucketInfo
for _, fi := range subDir.datafile {
for _, fi := range d.datafile {
finfoTemplate = fi
break
}
subDir.RUnlock()
d.RUnlock()

newFileInfo := finfoTemplate.GetDeepCopy()
newFileInfo.Year = newYear
// Create a new filename for the new file
subDir.RLock()
newFileInfo.Path = path.Join(subDir.pathToItemName, strconv.Itoa(int(newYear))+".bin")
subDir.RUnlock()
d.RLock()
newFileInfo.Path = path.Join(d.pathToItemName, strconv.Itoa(int(newYear))+".bin")
d.RUnlock()
if err = newTimeBucketInfoFromTemplate(newFileInfo); err != nil {
var targetErr FileAlreadyExists
if ok := errors.As(err, &targetErr); ok {
Expand All @@ -370,9 +366,9 @@ func (subDir *Directory) AddFile(newYear int16) (finfo_p *io.TimeBucketInfo, err
return nil, err
}
// Locate the directory in the catalog
subDir.Lock()
subDir.datafile[newFileInfo.Path] = newFileInfo
subDir.Unlock()
d.Lock()
d.datafile[newFileInfo.Path] = newFileInfo
d.Unlock()

return newFileInfo, nil
}
Expand Down Expand Up @@ -568,11 +564,11 @@ func (d *Directory) GatherDirectories() []string {

func (d *Directory) GatherFilePaths() []string {
// Must be thread-safe for READ access
filePathListFunc := func(d *Directory, i_list interface{}) {
p_list := i_list.(*[]string)
filePathListFunc := func(d *Directory, iList interface{}) {
pList := iList.(*[]string)
if d.datafile != nil {
for _, dfile := range d.datafile {
*p_list = append(*p_list, dfile.Path)
*pList = append(*pList, dfile.Path)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/connect/loader/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func columnSeriesMapFromCSVData(csmInit io.ColumnSeriesMap, key io.TimeBucketKey
}
csm.AddColumn(key, shape.Name, col)
default:
return nil, fmt.Errorf("unknown column type.rror obtaining column \"%s\" from csv data\n",
return nil, fmt.Errorf("unknown column type.rror obtaining column \"%s\" from csv data",
shape.Name,
)
}
Expand Down
4 changes: 0 additions & 4 deletions cmd/connect/session/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ func (c *Client) getinfo(line string) {
*/

tbkP := io.NewTimeBucketKey(args[0])
if tbkP == nil {
log.Error("Failed to convert argument to key: %s\n", args[0])
return
}
resp, err := c.GetBucketInfo(tbkP)
if err != nil {
log.Error("Failed with error: %s\n", err.Error())
Expand Down
4 changes: 2 additions & 2 deletions contrib/binancefeeder/binancefeeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

const (
defaultHttpTimeout = 10 * time.Second
defaultHTTPTimeout = 10 * time.Second
oneMinTimeframeStr = "1Min"
)

Expand All @@ -44,7 +44,7 @@ type ExchangeInfo struct {

// getJSON via http request and decodes it using NewDecoder. Sets target interface to decoded json.
func getJSON(url string, target interface{}) error {
myClient := &http.Client{Timeout: defaultHttpTimeout}
myClient := &http.Client{Timeout: defaultHTTPTimeout}
req, err := http.NewRequestWithContext(context.Background(), "GET", url, nil)
if err != nil {
return fmt.Errorf("create http req for %s: %w", url, err)
Expand Down
6 changes: 3 additions & 3 deletions contrib/candler/candlecandler/candlecandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type CandleCandler struct {
*candler.Candler
}

func (c CandleCandler) New(argMap *functions.ArgumentMap, args ...interface{}) (ica uda.AggInterface, err error) {
func (ca CandleCandler) New(argMap *functions.ArgumentMap, args ...interface{}) (ica uda.AggInterface, err error) {
cl := candler.Candler{}
ca, err := cl.New(argMap, args...)
return &CandleCandler{ca}, err
ca2, err := cl.New(argMap, args...)
return &CandleCandler{ca2}, err
}

func (c *CandleCandler) GetRequiredArgs() []io.DataShape {
Expand Down
4 changes: 2 additions & 2 deletions contrib/ice/lib/date/date.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type Date civil.Date

func DateOf(t time.Time) Date {
func Of(t time.Time) Date {
return Date(civil.DateOf(t))
}

Expand All @@ -22,7 +22,7 @@ func Parse(layout, value string) (Date, error) {
if err != nil {
return Date{}, err
}
return DateOf(t), nil
return Of(t), nil
}

func (d Date) c() civil.Date {
Expand Down
2 changes: 1 addition & 1 deletion contrib/iex/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func initWriter() error {
return fmt.Errorf("failed to create a new aggtrigger: %w", err)
}

triggerMatchers := []*trigger.TriggerMatcher{
triggerMatchers := []*trigger.Matcher{
trigger.NewMatcher(trig, "*/1Min/OHLCV"),
}

Expand Down
3 changes: 1 addition & 2 deletions contrib/iex/iex.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,8 @@ func onceDaily(lastDailyRunDate *int, runHour, runMinute int) bool {
if *lastDailyRunDate == 0 || (*lastDailyRunDate != now.Day() && runHour == now.Hour() && runMinute <= now.Minute()) {
*lastDailyRunDate = now.Day()
return true
} else {
return false
}
return false
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion contrib/polygon/backfill/backfiller/backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func initConfig() (rootDir string, triggers []*utils.TriggerSetting, walRotateIn
func initWriter(rootDir string, triggers []*utils.TriggerSetting, walRotateInterval int,
) (instanceConfig *executor.InstanceMetadata, shutdownPending *bool, walWG *sync.WaitGroup, err error) {
// if configured, also load the ondiskagg triggers
var tm []*trigger.TriggerMatcher
var tm []*trigger.Matcher
for _, triggerSetting := range triggers {
if triggerSetting.Module == "ondiskagg.so" {
tmatcher := trigger.NewTriggerMatcher(triggerSetting)
Expand Down
6 changes: 3 additions & 3 deletions contrib/polygon/polygon.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"github.com/alpacahq/marketstore/v4/contrib/polygon/api"
"github.com/alpacahq/marketstore/v4/contrib/polygon/backfill"
"github.com/alpacahq/marketstore/v4/contrib/polygon/handlers"
"github.com/alpacahq/marketstore/v4/contrib/polygon/polygon_config"
"github.com/alpacahq/marketstore/v4/contrib/polygon/polygonconfig"
"github.com/alpacahq/marketstore/v4/plugins/bgworker"
)

const defaultBatchSize = 50000

type PolygonFetcher struct {
config polygon_config.FetcherConfig
config polygonconfig.FetcherConfig
types map[string]struct{} // Bars, Quotes, Trades
}

Expand All @@ -24,7 +24,7 @@ type PolygonFetcher struct {
// nolint:deadcode // plugin interface
func NewBgWorker(conf map[string]interface{}) (w bgworker.BgWorker, err error) {
data, _ := json.Marshal(conf)
config := polygon_config.FetcherConfig{}
config := polygonconfig.FetcherConfig{}
err = json.Unmarshal(data, &config)
if err != nil {
return
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package polygon_config
package polygonconfig

type FetcherConfig struct {
// AddTickCountToBars controls if TickCnt is added to the schema for Bars or not
Expand Down
16 changes: 8 additions & 8 deletions contrib/stream/shelf/shelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ import (
"github.com/alpacahq/marketstore/v4/utils/io"
)

// ShelfHandler gets executed by a shelf on its packages.
type ShelfHandler *func(tbk io.TimeBucketKey, data interface{}) error
// Handler gets executed by a shelf on its packages.
type Handler *func(tbk io.TimeBucketKey, data interface{}) error

// NewShelfHandler creates a new ShelfHandler from a supplied function.
func NewShelfHandler(f func(tbk io.TimeBucketKey, data interface{}) error) ShelfHandler {
return ShelfHandler(&f)
// NewShelfHandler creates a new Handler from a supplied function.
func NewShelfHandler(f func(tbk io.TimeBucketKey, data interface{}) error) Handler {
return &f
}

// Shelf stores packages, which have shelf lives (^^) and are
// meant to have the shelf's handler executed after some deadline.
type Shelf struct {
sync.Mutex
m map[string]*Package
handler ShelfHandler
handler Handler
}

// NewShelf initializes a new shelf with the provided handler function.
func NewShelf(h ShelfHandler) *Shelf {
func NewShelf(h Handler) *Shelf {
return &Shelf{
m: map[string]*Package{},
handler: h,
Expand Down Expand Up @@ -98,7 +98,7 @@ func (p *Package) Stop() {
// Start causes the package to begin listening to it's context's
// done channel which is set by the deadline passed to the context.
// This is done in a separate goroutine.
func (p *Package) Start(tbk *io.TimeBucketKey, h ShelfHandler) {
func (p *Package) Start(tbk *io.TimeBucketKey, h Handler) {
p.stopped.Store(false)

go func() {
Expand Down
6 changes: 3 additions & 3 deletions contrib/stream/streamtrigger/streamtrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
"github.com/alpacahq/marketstore/v4/utils/log"
)

type StreamTriggerConfig struct {
type Config struct {
Filter string `json:"filter"`
}

var _ trigger.Trigger = &StreamTrigger{}

func recast(config map[string]interface{}) *StreamTriggerConfig {
func recast(config map[string]interface{}) *Config {
data, _ := json.Marshal(config)
ret := StreamTriggerConfig{}
ret := Config{}
json.Unmarshal(data, &ret)
return &ret
}
Expand Down
Loading

0 comments on commit d382167

Please sign in to comment.