Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More retrying changes #377

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ jobs:
check-latest: true
cache: true

- name: Check if go mod tidy should be run
uses: katexochen/go-tidy-check@v2

- name: Extract project version from file
id: version
run: |
Expand Down Expand Up @@ -79,4 +82,4 @@ jobs:
run: make e2e-up

- name: Run e2e tests
run: make e2e-test
run: make e2e-test
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.4.2
3.0.0-rc5
11 changes: 11 additions & 0 deletions assets/docs/configuration/overview-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ stats_receiver {
// log level configuration (default: "info")
log_level = "info"

// Specifies how failed writes to the target should be retried, depending on an error type
retry {
transient {
delay_ms = 1000
max_attempts = 5
}
setup {
delay_ms = 20000
}
}

license {
accept = true
}
9 changes: 9 additions & 0 deletions assets/docs/configuration/retry-example.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
retry {
transient {
delay_ms = 5000
max_attempts = 10
}
setup {
delay_ms = 30000
}
}
3 changes: 3 additions & 0 deletions assets/docs/configuration/sources/kafka-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ source {
# The SASL Algorithm to use: "plaintext", "sha512" or "sha256" (default: "sha512")
sasl_algorithm = "sha256"

# Whether to enable TLS
enable_tls = true

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"

Expand Down
25 changes: 25 additions & 0 deletions assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ target {
# you could also reference an environment variable.
basic_auth_password = env.MY_AUTH_PASSWORD

# Whether to enable TLS
enable_tls = true

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"

Expand Down Expand Up @@ -62,5 +65,27 @@ target {

# Optional path to the file containing template which is used to build HTTP request based on a batch of input data
template_file = "myTemplate.file"

# 2 invalid + 1 setup error rules
response_rules {
# This one is a match when...
invalid {
# ...HTTP statuses match...
http_codes = [400]
# AND this string exists in a response body
body = "Invalid value for 'purchase' field"
}
# If no match yet, we can check the next one...
invalid {
# again 400 status...
http_codes = [400]
# BUT we expect different error message in the response body
body = "Invalid value for 'attributes' field"
}
# Same for 'setup' rules..
setup {
http_codes = [401, 403]
}
}
}
}
3 changes: 3 additions & 0 deletions assets/docs/configuration/targets/kafka-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ target {
# The SASL Algorithm to use: "plaintext", "sha512" or "sha256" (default: "sha512")
sasl_algorithm = "sha256"

# Whether to enable TLS
enable_tls = true

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
transform {
use "jqFilter" {
jq_command = "has(\"app_id\")"
timeout_ms = 800
snowplow_mode = true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
transform {
use "jqFilter" {
jq_command = "has(\"app_id\")"
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
transform {
use "spGtmssPreview" {
# Message expiry time in seconds (comparing current time to the message's collector timestamp). If message is expired, it's sent to failure target.
expiry_seconds = 600
}
}
132 changes: 95 additions & 37 deletions cmd/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (

"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
retry "github.com/snowplow-devops/go-retry"
"github.com/urfave/cli"

"net/http"
// pprof imported for the side effect of registering its HTTP handlers
_ "net/http/pprof"

retry "github.com/avast/retry-go/v4"
"github.com/snowplow/snowbridge/cmd"
"github.com/snowplow/snowbridge/config"
"github.com/snowplow/snowbridge/pkg/failure/failureiface"
Expand Down Expand Up @@ -171,7 +171,7 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup

// Callback functions for the source to leverage when writing data
sf := sourceiface.SourceFunctions{
WriteToTarget: sourceWriteFunc(t, ft, tr, o),
WriteToTarget: sourceWriteFunc(t, ft, tr, o, cfg),
}

// Read is a long running process and will only return when the source
Expand All @@ -195,7 +195,7 @@ func RunApp(cfg *config.Config, supportedSources []config.ConfigurationPair, sup
// 4. Observing these results
//
// All with retry logic baked in to remove any of this handling from the implementations
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer) func(messages []*models.Message) error {
func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform.TransformationApplyFunction, o *observer.Observer, cfg *config.Config) func(messages []*models.Message) error {
return func(messages []*models.Message) error {

// Apply transformations
Expand All @@ -215,65 +215,123 @@ func sourceWriteFunc(t targetiface.Target, ft failureiface.Failure, tr transform

// Send message buffer
messagesToSend := transformed.Result
invalid := transformed.Invalid
var oversized []*models.Message

res, err := retry.ExponentialWithInterface(5, time.Second, "target.Write", func() (interface{}, error) {
res, err := t.Write(messagesToSend)
write := func() error {
result, err := t.Write(messagesToSend)

o.TargetWrite(result)
messagesToSend = result.Failed
oversized = append(oversized, result.Oversized...)
invalid = append(invalid, result.Invalid...)
return err
}

err := handleWrite(cfg, write)

o.TargetWrite(res)
messagesToSend = res.Failed
return res, err
})
if err != nil {
return err
}
resCast := res.(*models.TargetWriteResult)

// Send oversized message buffer
messagesToSend = resCast.Oversized
if len(messagesToSend) > 0 {
err2 := retry.Exponential(5, time.Second, "failureTarget.WriteOversized", func() error {
res, err := ft.WriteOversized(t.MaximumAllowedMessageSizeBytes(), messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
if len(oversized) > 0 {
messagesToSend = oversized
writeOversized := func() error {
result, err := ft.WriteOversized(t.MaximumAllowedMessageSizeBytes(), messagesToSend)
if len(result.Oversized) != 0 || len(result.Invalid) != 0 {
log.Fatal("Oversized message transformation resulted in new oversized / invalid messages")
}

o.TargetWriteOversized(res)
messagesToSend = res.Failed
o.TargetWriteOversized(result)
messagesToSend = result.Failed
return err
}

err := handleWrite(cfg, writeOversized)

if err != nil {
return err
})
if err2 != nil {
return err2
}
}

// Send invalid message buffer
messagesToSend = append(resCast.Invalid, transformed.Invalid...)
if len(messagesToSend) > 0 {
err3 := retry.Exponential(5, time.Second, "failureTarget.WriteInvalid", func() error {
res, err := ft.WriteInvalid(messagesToSend)
if err != nil {
return err
}
if len(res.Oversized) != 0 || len(res.Invalid) != 0 {
if len(invalid) > 0 {
messagesToSend = invalid
writeInvalid := func() error {
result, err := ft.WriteInvalid(messagesToSend)
if len(result.Oversized) != 0 || len(result.Invalid) != 0 {
log.Fatal("Invalid message transformation resulted in new invalid / oversized messages")
}

o.TargetWriteInvalid(res)
messagesToSend = res.Failed
o.TargetWriteInvalid(result)
messagesToSend = result.Failed
return err
})
if err3 != nil {
return err3
}
}

err := handleWrite(cfg, writeInvalid)

if err != nil {
return err
}
}
return nil
}
}

// Wrap each target write operation with 2 kinds of retries:
// - setup errors: long delay, unlimited attempts, unhealthy state + alerts
// - transient errors: short delay, limited attempts
// If it's setup/transient error is decided based on a response returned by the target.
func handleWrite(cfg *config.Config, write func() error) error {
retryOnlySetupErrors := retry.RetryIf(func(err error) bool {
_, isSetup := err.(models.SetupWriteError)
return isSetup
})

onSetupError := retry.OnRetry(func(attempt uint, err error) {
log.Warnf("Setup target write error. Attempt: %d, error: %s\n", attempt+1, err)
// Here we can set unhealthy status + send monitoring alerts in the future. Nothing happens here now.
})

//First try to handle error as setup...
err := retry.Do(
write,
retryOnlySetupErrors,
onSetupError,
retry.Delay(time.Duration(cfg.Data.Retry.Setup.Delay) * time.Millisecond),
// for now let's limit attempts to 5 for setup errors, because we don't have health check which would allow app to be killed externally. Unlimited attempts don't make sense right now.
retry.Attempts(5),
retry.LastErrorOnly(true),
//enable when health check + monitoring implemented
// retry.Attempts(0), //unlimited
)

if err == nil {
return err
}

// If no setup, then handle as transient.
log.Warnf("Transient target write error. Starting retrying. error: %s\n", err)

// We already had at least 1 attempt from above 'setup' retrying section, so before we start transient retrying we need add 'manual' initial delay.
time.Sleep(time.Duration(cfg.Data.Retry.Transient.Delay) * time.Millisecond)

onTransientError := retry.OnRetry(func(retry uint, err error) {
log.Warnf("Retry failed with transient error. Retry counter: %d, error: %s\n", retry+1, err)
})

err = retry.Do(
write,
onTransientError,
// * 2 because we have initial sleep above
retry.Delay(time.Duration(cfg.Data.Retry.Transient.Delay*2) * time.Millisecond),
retry.Attempts(uint(cfg.Data.Retry.Transient.MaxAttempts)),
retry.LastErrorOnly(true),
)
return err
}

// exitWithError will ensure we log the error and leave time for Sentry to flush
func exitWithError(err error, flushSentry bool) {
log.WithFields(log.Fields{"error": err}).Error(err)
Expand Down
Loading
Loading