Skip to content

Commit

Permalink
[Rebased] fix races (#633)
Browse files Browse the repository at this point in the history
* get rid of dead code
* have LeakRoutined started in a tomb
* fix race and multiple small issues in the way we handle tombs
* yet another race fix
* another race
* get rid of leaky.KillSwitch for proper tomb use
* fix deadlock
* empty overflow before exiting
* fix an obvious typo
* proper use of waitgroup
* have a smart signalisation for allowing LeakRoutine being killed
* ugly workaround
* fix lint error
* fix compilation
* fix panic
* shorten lock
* up lock both copy
* wait for crowdsec to die
* fix coding style and lint issue
* go mod tidy

Co-authored-by: bui <[email protected]>
  • Loading branch information
registergoofy and buixor authored Feb 25, 2021
1 parent 8b504e9 commit 5b7ac4a
Show file tree
Hide file tree
Showing 17 changed files with 424 additions and 259 deletions.
3 changes: 2 additions & 1 deletion cmd/crowdsec/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"fmt"

"github.com/crowdsecurity/crowdsec/pkg/apiserver"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
)

func initAPIServer() (*apiserver.APIServer, error) {
func initAPIServer(cConfig *csconfig.GlobalConfig) (*apiserver.APIServer, error) {
apiServer, err := apiserver.NewServer(cConfig.API.Server)
if err != nil {
return nil, fmt.Errorf("unable to run local API: %s", err)
Expand Down
131 changes: 68 additions & 63 deletions cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package main

import (
"fmt"
"time"
"sync"

"github.com/crowdsecurity/crowdsec/pkg/acquisition"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
"github.com/crowdsecurity/crowdsec/pkg/exprhelpers"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
Expand All @@ -13,7 +14,7 @@ import (
log "github.com/sirupsen/logrus"
)

func initCrowdsec() (*parser.Parsers, error) {
func initCrowdsec(cConfig *csconfig.GlobalConfig) (*parser.Parsers, error) {
err := exprhelpers.Init()
if err != nil {
return &parser.Parsers{}, fmt.Errorf("Failed to init expr helpers : %s", err)
Expand All @@ -40,46 +41,72 @@ func initCrowdsec() (*parser.Parsers, error) {
return csParsers, nil
}

func runCrowdsec(parsers *parser.Parsers) error {
func runCrowdsec(cConfig *csconfig.GlobalConfig, parsers *parser.Parsers) error {
inputLineChan := make(chan types.Event)
inputEventChan := make(chan types.Event)

//start go-routines for parsing, buckets pour and ouputs.
for i := 0; i < cConfig.Crowdsec.ParserRoutinesCount; i++ {
parsersTomb.Go(func() error {
defer types.CatchPanic("crowdsec/runParse")
err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes)
if err != nil {
log.Fatalf("starting parse error : %s", err)
return err
parserWg := &sync.WaitGroup{}
parsersTomb.Go(func() error {
parserWg.Add(1)
for i := 0; i < cConfig.Crowdsec.ParserRoutinesCount; i++ {
parsersTomb.Go(func() error {
defer types.CatchPanic("crowdsec/runParse")
if err := runParse(inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes); err != nil { //this error will never happen as parser.Parse is not able to return errors
log.Fatalf("starting parse error : %s", err)
return err
}
return nil
})
}
parserWg.Done()
return nil
})
parserWg.Wait()

bucketWg := &sync.WaitGroup{}
bucketsTomb.Go(func() error {
bucketWg.Add(1)
/*restore as well previous state if present*/
if cConfig.Crowdsec.BucketStateFile != "" {
log.Warningf("Restoring buckets state from %s", cConfig.Crowdsec.BucketStateFile)
if err := leaky.LoadBucketsState(cConfig.Crowdsec.BucketStateFile, buckets, holders); err != nil {
return fmt.Errorf("unable to restore buckets : %s", err)
}
return nil
})
}
}

for i := 0; i < cConfig.Crowdsec.BucketsRoutinesCount; i++ {
bucketsTomb.Go(func() error {
defer types.CatchPanic("crowdsec/runPour")
err := runPour(inputEventChan, holders, buckets)
if err != nil {
log.Fatalf("starting pour error : %s", err)
return err
}
return nil
})
}
for i := 0; i < cConfig.Crowdsec.OutputRoutinesCount; i++ {

outputsTomb.Go(func() error {
defer types.CatchPanic("crowdsec/runOutput")
err := runOutput(inputEventChan, outputEventChan, buckets, *parsers.Povfwctx, parsers.Povfwnodes, *cConfig.API.Client.Credentials)
if err != nil {
log.Fatalf("starting outputs error : %s", err)
return err
}
return nil
})
}
for i := 0; i < cConfig.Crowdsec.BucketsRoutinesCount; i++ {
bucketsTomb.Go(func() error {
defer types.CatchPanic("crowdsec/runPour")
if err := runPour(inputEventChan, holders, buckets, cConfig); err != nil {
log.Fatalf("starting pour error : %s", err)
return err
}
return nil
})
}
bucketWg.Done()
return nil
})
bucketWg.Wait()

outputWg := &sync.WaitGroup{}
outputsTomb.Go(func() error {
outputWg.Add(1)
for i := 0; i < cConfig.Crowdsec.OutputRoutinesCount; i++ {
outputsTomb.Go(func() error {
defer types.CatchPanic("crowdsec/runOutput")
if err := runOutput(inputEventChan, outputEventChan, buckets, *parsers.Povfwctx, parsers.Povfwnodes, *cConfig.API.Client.Credentials); err != nil {
log.Fatalf("starting outputs error : %s", err)
return err
}
return nil
})
}
outputWg.Done()
return nil
})
outputWg.Wait()
log.Warningf("Starting processing data")

if err := acquisition.StartAcquisition(dataSources, inputLineChan, &acquisTomb); err != nil {
Expand All @@ -90,12 +117,14 @@ func runCrowdsec(parsers *parser.Parsers) error {
return nil
}

func serveCrowdsec(parsers *parser.Parsers) {
func serveCrowdsec(parsers *parser.Parsers, cConfig *csconfig.GlobalConfig) {
crowdsecTomb.Go(func() error {
defer types.CatchPanic("crowdsec/serveCrowdsec")
go func() {
defer types.CatchPanic("crowdsec/runCrowdsec")
runCrowdsec(parsers)
if err := runCrowdsec(cConfig, parsers); err != nil {
log.Fatalf("unable to start crowdsec routines: %s", err)
}
}()

/*we should stop in two cases :
Expand All @@ -119,9 +148,6 @@ func waitOnTomb() {
/*if it's acquisition dying it means that we were in "cat" mode.
while shutting down, we need to give time for all buckets to process in flight data*/
log.Warningf("Acquisition is finished, shutting down")
bucketCount := leaky.LeakyRoutineCount
rounds := 0
successiveStillRounds := 0
/*
While it might make sense to want to shut-down parser/buckets/etc. as soon as acquisition is finished,
we might have some pending buckets : buckets that overflowed, but which LeakRoutine are still alive because they
Expand All @@ -134,30 +160,9 @@ func waitOnTomb() {
So : we are waiting for the number of buckets to stop decreasing before returning. "how long" we should wait is a bit of the trick question,
as some operations (ie. reverse dns or such in post-overflow) can take some time :)
*/
for {
currBucketCount := leaky.LeakyRoutineCount

if currBucketCount == 0 {
/*no bucket to wait on*/
break
}
if currBucketCount != bucketCount {
if rounds == 0 || rounds%2 == 0 {
log.Printf("Still %d live LeakRoutines, waiting (was %d)", currBucketCount, bucketCount)
}
bucketCount = currBucketCount
successiveStillRounds = 0
} else {
if successiveStillRounds > 1 {
log.Printf("LeakRoutines commit over.")
break
}
successiveStillRounds++
}
rounds++
time.Sleep(5 * time.Second)
}
return

case <-crowdsecTomb.Dying():
log.Infof("Crowdsec engine shutting down")
return
Expand Down
40 changes: 16 additions & 24 deletions cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ var (

flags *Flags

/*global crowdsec config*/
cConfig *csconfig.GlobalConfig
/*the state of acquisition*/
dataSources []acquisition.DataSource
/*the state of the buckets*/
Expand Down Expand Up @@ -125,22 +123,15 @@ func LoadBuckets(cConfig *csconfig.GlobalConfig) error {
files = append(files, hubScenarioItem.LocalPath)
}
}
buckets = leaky.NewBuckets()

log.Infof("Loading %d scenario files", len(files))
holders, outputEventChan, err = leaky.LoadBuckets(cConfig.Crowdsec, files)
holders, outputEventChan, err = leaky.LoadBuckets(cConfig.Crowdsec, files, &bucketsTomb, buckets)

if err != nil {
return fmt.Errorf("Scenario loading failed : %v", err)
}
buckets = leaky.NewBuckets()

/*restore as well previous state if present*/
if cConfig.Crowdsec.BucketStateFile != "" {
log.Warningf("Restoring buckets state from %s", cConfig.Crowdsec.BucketStateFile)
if err := leaky.LoadBucketsState(cConfig.Crowdsec.BucketStateFile, buckets, holders); err != nil {
return fmt.Errorf("unable to restore buckets : %s", err)
}
}
if cConfig.Prometheus != nil && cConfig.Prometheus.Enabled {
for holderIndex := range holders {
holders[holderIndex].Profiling = true
Expand Down Expand Up @@ -200,11 +191,11 @@ func (f *Flags) Parse() {
}

// LoadConfig return configuration parsed from configuration file
func LoadConfig(config *csconfig.GlobalConfig) error {
func LoadConfig(cConfig *csconfig.GlobalConfig) error {
disableAPI = flags.DisableAPI
disableAgent = flags.DisableAgent
if flags.ConfigFile != "" {
if err := config.LoadConfigurationFile(flags.ConfigFile, disableAPI, disableAgent); err != nil {
if err := cConfig.LoadConfigurationFile(flags.ConfigFile, disableAPI, disableAgent); err != nil {
return fmt.Errorf("while loading configuration : %s", err)
}
} else {
Expand Down Expand Up @@ -242,37 +233,38 @@ func LoadConfig(config *csconfig.GlobalConfig) error {

if flags.DebugLevel {
logLevel := log.DebugLevel
config.Common.LogLevel = &logLevel
cConfig.Common.LogLevel = &logLevel
}
if flags.InfoLevel || config.Common.LogLevel == nil {
if flags.InfoLevel || cConfig.Common.LogLevel == nil {
logLevel := log.InfoLevel
config.Common.LogLevel = &logLevel
cConfig.Common.LogLevel = &logLevel
}
if flags.TraceLevel {
logLevel := log.TraceLevel
config.Common.LogLevel = &logLevel
cConfig.Common.LogLevel = &logLevel
}

if flags.TestMode && !disableAgent {
config.Crowdsec.LintOnly = true
cConfig.Crowdsec.LintOnly = true
}

if flags.SingleFilePath != "" || flags.SingleJournalctlFilter != "" {
config.API.Server.OnlineClient = nil
cConfig.API.Server.OnlineClient = nil
/*if the api is disabled as well, just read file and exit, don't daemonize*/
if disableAPI {
config.Common.Daemonize = false
cConfig.Common.Daemonize = false
}
config.Common.LogMedia = "stdout"
log.Infof("single file mode : log_media=%s daemonize=%t", config.Common.LogMedia, config.Common.Daemonize)
cConfig.Common.LogMedia = "stdout"
log.Infof("single file mode : log_media=%s daemonize=%t", cConfig.Common.LogMedia, cConfig.Common.Daemonize)
}

return nil
}

func main() {
var (
err error
cConfig *csconfig.GlobalConfig
err error
)

defer types.CatchPanic("crowdsec/main")
Expand Down Expand Up @@ -314,7 +306,7 @@ func main() {
go registerPrometheus(cConfig.Prometheus)
}

if err := Serve(); err != nil {
if err := Serve(cConfig); err != nil {
log.Fatalf(err.Error())
}

Expand Down
14 changes: 0 additions & 14 deletions cmd/crowdsec/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"fmt"
"time"

"github.com/crowdsecurity/crowdsec/pkg/acquisition"
v1 "github.com/crowdsecurity/crowdsec/pkg/apiserver/controllers/v1"
Expand Down Expand Up @@ -64,19 +63,6 @@ var globalCsInfo = prometheus.NewGauge(
},
)

func dumpMetrics() {
var tmpFile string
var err error

if cConfig.Crowdsec.BucketStateDumpDir != "" {
log.Infof("!! Dumping buckets state")
if tmpFile, err = leaky.DumpBucketsStateAt(time.Now(), cConfig.Crowdsec.BucketStateDumpDir, buckets); err != nil {
log.Fatalf("Failed dumping bucket state : %s", err)
}
log.Infof("Buckets state dumped to %s", tmpFile)
}
}

func registerPrometheus(config *csconfig.PrometheusCfg) {
if !config.Enabled {
return
Expand Down
3 changes: 2 additions & 1 deletion cmd/crowdsec/pour.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"fmt"
"time"

"github.com/crowdsecurity/crowdsec/pkg/csconfig"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
log "github.com/sirupsen/logrus"
)

func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets) error {
func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.GlobalConfig) error {
var (
count int
)
Expand Down
Loading

0 comments on commit 5b7ac4a

Please sign in to comment.