Skip to content

Commit

Permalink
V1.0.69 (#110)
Browse files Browse the repository at this point in the history
* fix snowflake GetTables

* add dry run mode

* add clickhouse http_url option

* update clickhouse driver version

* add SLING_LOGGING logic

* set SLING_LOGGING from env payload

* update fs concurrency logic

* set TargetFileOptionsDefault Concurrency

* update default settingMppBulkImportFlow

* Fix bigquery checksum, update fs_google for default creds

* add file partitioning notation parsing

* Allow sling to create empty tables with SLING_ALLOW_EMPTY_TABLES
  • Loading branch information
flarco authored Jan 12, 2024
1 parent 2b87350 commit 5af9f29
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 113 deletions.
9 changes: 9 additions & 0 deletions cmd/sling/sling_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,18 @@ func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error
setProjectID(cfg.Env["SLING_CONFIG_PATH"])
cfg.Env["SLING_PROJECT_ID"] = projectID

// set logging
if val := cfg.Env["SLING_LOGGING"]; val != "" {
os.Setenv("SLING_LOGGING", val)
}

task = sling.NewTask(0, cfg)
task.Replication = replication

if cast.ToBool(cfg.Env["SLING_DRY_RUN"]) || cast.ToBool(os.Getenv("SLING_DRY_RUN")) {
return nil
}

// insert into store for history keeping
sling.StoreInsert(task)

Expand Down
4 changes: 2 additions & 2 deletions core/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func InitLogger() {
return fmt.Sprintf("%s", i)
}

if os.Getenv("G_LOGGING") == "TASK" {
if os.Getenv("SLING_LOGGING") == "NO_COLOR" {
outputOut.NoColor = true
outputErr.NoColor = true
g.ZLogOut = zerolog.New(outputOut).With().Timestamp().Logger()
g.ZLogErr = zerolog.New(outputErr).With().Timestamp().Logger()
} else if os.Getenv("G_LOGGING") == "MASTER" || os.Getenv("G_LOGGING") == "WORKER" {
} else if os.Getenv("SLING_LOGGING") == "JSON" {
zerolog.LevelFieldName = "lvl"
zerolog.MessageFieldName = "msg"
g.ZLogOut = zerolog.New(os.Stdout).With().Timestamp().Logger()
Expand Down
3 changes: 1 addition & 2 deletions core/sling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"os"
"regexp"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -854,7 +853,7 @@ var TargetFileOptionsDefault = TargetOptions{
Concurrency: lo.Ternary(
os.Getenv("CONCURRENCY") != "",
cast.ToInt(os.Getenv("CONCURRENCY")),
runtime.NumCPU(),
7,
),
FileMaxRows: lo.Ternary(
os.Getenv("FILE_MAX_ROWS") != "",
Expand Down
36 changes: 19 additions & 17 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
defer t.PBar.Finish()

// detect empty
if len(df.Buffer) == 0 {
g.Warn("No data found in stream. Nothing to do.")
return
} else if len(df.Columns) == 0 {
if len(df.Columns) == 0 {
err = g.Error("no stream columns detected")
return
}
Expand Down Expand Up @@ -292,21 +289,26 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
}
}

// FIXME: find root cause of why columns don't synch while streaming
df.SyncColumns()
if cnt == 0 && !cast.ToBool(os.Getenv("SLING_ALLOW_EMPTY_TABLES")) {
g.Warn("No data or records found in stream. Nothing to do. To allow Sling to create empty tables, set SLING_ALLOW_EMPTY_TABLES=TRUE")
return
} else if cnt > 0 {
// FIXME: find root cause of why columns don't synch while streaming
df.SyncColumns()

// aggregate stats from stream processors
df.Inferred = !cfg.sourceIsFile() // re-infer is source is file
df.SyncStats()
// aggregate stats from stream processors
df.Inferred = !cfg.sourceIsFile() // re-infer is source is file
df.SyncStats()

// Checksum Comparison, data quality. Limit to 10k, cause sums get too high
if df.Count() <= 10000 {
err = tgtConn.CompareChecksums(cfg.Target.Options.TableTmp, df.Columns)
if err != nil {
if os.Getenv("ERROR_ON_CHECKSUM_FAILURE") != "" {
return
// Checksum Comparison, data quality. Limit to 10k, cause sums get too high
if df.Count() <= 10000 {
err = tgtConn.CompareChecksums(cfg.Target.Options.TableTmp, df.Columns)
if err != nil {
if os.Getenv("ERROR_ON_CHECKSUM_FAILURE") != "" {
return
}
g.DebugLow(g.ErrMsgSimple(err))
}
g.DebugLow(g.ErrMsgSimple(err))
}
}

Expand All @@ -326,7 +328,7 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas

defer tgtConn.Rollback() // rollback in case of error

if cnt > 0 {
{
if cfg.Mode == FullRefreshMode {
// drop, (create if not exists) and insert directly
err = tgtConn.DropTable(targetTable)
Expand Down
53 changes: 29 additions & 24 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ require (
github.com/denisbrodbeck/machineid v1.0.1
github.com/dustin/go-humanize v1.0.1
github.com/fatih/color v1.13.0
github.com/flarco/dbio v0.4.62
github.com/flarco/dbio v0.4.71
github.com/flarco/g v0.1.67
github.com/getsentry/sentry-go v0.11.0
github.com/google/uuid v1.4.0
github.com/google/uuid v1.5.0
github.com/integrii/flaggy v1.5.2
github.com/jmoiron/sqlx v1.2.0
github.com/json-iterator/go v1.1.12
Expand All @@ -21,7 +21,7 @@ require (
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/cast v1.5.0
github.com/stretchr/testify v1.8.4
golang.org/x/text v0.13.0
golang.org/x/text v0.14.0
gopkg.in/cheggaaa/pb.v2 v2.0.7
gopkg.in/yaml.v2 v2.4.0
gorm.io/gorm v1.20.7
Expand Down Expand Up @@ -52,11 +52,12 @@ require (
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.2.0 // indirect
github.com/ClickHouse/ch-go v0.58.2 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.17.1 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/PuerkitoBio/goquery v1.6.0 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/andybalholm/brotli v1.0.6 // indirect
github.com/andybalholm/cascadia v1.1.0 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/apache/thrift v0.16.0 // indirect
Expand All @@ -83,17 +84,19 @@ require (
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denisenkom/go-mssqldb v0.12.3 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/docker v20.10.17+incompatible // indirect
github.com/docker/distribution v2.8.2+incompatible // indirect
github.com/docker/docker v24.0.7+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/envoyproxy/go-control-plane v0.11.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect
github.com/flarco/bigquery v0.0.9 // indirect
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/errors v0.20.2 // indirect
Expand Down Expand Up @@ -152,9 +155,9 @@ require (
github.com/oklog/ulid v1.3.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
github.com/parquet-go/parquet-go v0.20.0 // indirect
github.com/paulmach/orb v0.7.1 // indirect
github.com/paulmach/orb v0.10.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -166,35 +169,37 @@ require (
github.com/psanford/sqlite3vfshttp v0.0.0-20220827153928-a19f096e6eb4 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/segmentio/backo-go v0.0.0-20200129164019-23eae7c10bd3 // indirect
github.com/segmentio/encoding v0.3.6 // indirect
github.com/shirou/gopsutil/v3 v3.22.7 // indirect
github.com/shirou/gopsutil/v3 v3.23.9 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/snowflakedb/gosnowflake v1.6.25 // indirect
github.com/tidwall/gjson v1.9.3 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/viant/xunsafe v0.8.0 // indirect
github.com/xo/dburl v0.3.0 // indirect
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.mongodb.org/mongo-driver v1.10.0 // indirect
go.mongodb.org/mongo-driver v1.11.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/oauth2 v0.13.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/term v0.15.0 // indirect
golang.org/x/tools v0.10.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.149.0 // indirect
Expand Down
Loading

0 comments on commit 5af9f29

Please sign in to comment.