Skip to content

Commit

Permalink
Merge pull request #454 from slingdata-io/v1.3
Browse files Browse the repository at this point in the history
V1.3
  • Loading branch information
flarco authored Dec 1, 2024
2 parents 2ace22f + 9fdb5ff commit c5fa293
Show file tree
Hide file tree
Showing 29 changed files with 568 additions and 129 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ cmd/sling/build/
cmd/sling/frontend
.cursorignore
.sling.json
.python-version
pyproject.toml
uv.lock
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ You can target specific tests or suites using environment variables:
3. CLI Suite:
```sh
cd cmd/sling
export SLING_BIN=../../sling
export SLING_BIN=./sling
go test -v -run TestCLI # run all CLI tests
TESTS="31+" go test -v -run TestCLI # run CLI tests 31 and all subsequent tests
```
Expand Down
4 changes: 2 additions & 2 deletions cmd/sling/sling_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ func runReplication(cfgPath string, cfgOverwrite *sling.Config, selectStreams ..
break
}

env.LogSink = nil // clear log sink

if cfg.ReplicationStream.Disabled {
println()
g.Debug("skipping stream %s since it is disabled", cfg.StreamName)
Expand All @@ -472,8 +474,6 @@ func runReplication(cfgPath string, cfgOverwrite *sling.Config, selectStreams ..
g.Info("[%d / %d] running stream %s", counter, streamCnt, cfg.StreamName)
}

env.LogSink = nil // clear log sink

env.TelMap = g.M("begin_time", time.Now().UnixMicro(), "run_mode", "replication") // reset map
env.SetTelVal("replication_md5", replication.MD5())
err = runTask(cfg, &replication)
Expand Down
4 changes: 0 additions & 4 deletions cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,6 @@ defaults:
primary_key: [col1, col2]
update_key: col3
source_options:
trim_space: false
delimiter: ","
target_options:
file_max_rows: 500000
Expand All @@ -985,7 +984,6 @@ streams:
update_key: col2
source_options:
columns: { pro: 'decimal(10,4)', pro2: 'string' }
trim_space: true
delimiter: "|"
transforms: [trim_space]
target_options:
Expand Down Expand Up @@ -1040,7 +1038,6 @@ streams:
assert.Equal(t, []string{"col1", "col2", "col3"}, config.Source.Select)
assert.Equal(t, []string{"col1", "col2"}, config.Source.PrimaryKey())
assert.Equal(t, "col3", config.Source.UpdateKey)
assert.Equal(t, g.Bool(false), config.Source.Options.TrimSpace)
assert.Equal(t, ",", config.Source.Options.Delimiter)

assert.Equal(t, "postgres", config.Target.Conn)
Expand All @@ -1058,7 +1055,6 @@ streams:
assert.Equal(t, []string{"col1"}, config.Source.Select)
assert.Equal(t, []string{"col3"}, config.Source.PrimaryKey())
assert.Equal(t, "col2", config.Source.UpdateKey)
assert.Equal(t, g.Bool(true), config.Source.Options.TrimSpace)
assert.Equal(t, "|", config.Source.Options.Delimiter)
assert.Equal(t, "[{\"name\":\"pro\",\"type\":\"decimal(10,4)\"},{\"name\":\"pro2\",\"type\":\"string\"}]", g.Marshal(config.Target.Columns))
assert.Equal(t, `["trim_space"]`, g.Marshal(config.Transforms))
Expand Down
1 change: 1 addition & 0 deletions cmd/sling/tests/suite.cli.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ n test_name rows bytes streams fails output_contains command
55 Run sling with CSV source and $symbol quote 3 "sling run --src-conn LOCAL --src-stream file://cmd/sling/tests/files/test8.csv --src-options '{ delimiter: ""|"", quote: ""$"", escape: ""\\"" }' --stdout > /dev/null"
56 Run sling with direct insert full-refresh >10 >1 streaming data (direct insert) SLING_DIRECT_INSERT=true sling run -r cmd/sling/tests/replications/r.09.yaml
57 Run sling with direct insert incremental 0 Nothing to insert|streaming data (direct insert) SLING_DIRECT_INSERT=true sling run --src-conn postgres --src-stream public.test1k --tgt-conn snowflake --tgt-object 'public.{stream_schema}_{stream_table}' --mode incremental --update-key create_date
58 Run sling writing to partitioned parquet 1000 partition_by ( sling run --src-stream file://cmd/sling/tests/files/test1.csv --tgt-object 'file:///tmp/sling/output8/{part_year}/{part_month}' -d --tgt-options '{ format: parquet }' --update-key create_dt
62 changes: 54 additions & 8 deletions core/dbio/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2957,17 +2957,63 @@ func (conn *BaseConn) credsProvided(provider string) bool {
return false
}

func (conn *BaseConn) getTlsConfig() (tlsConfig *tls.Config, err error) {
func (conn *BaseConn) makeTlsConfig() (tlsConfig *tls.Config, err error) {
if cast.ToBool(conn.GetProp("tls")) {
tlsConfig = &tls.Config{}
}

cert := conn.GetProp("cert_file")
key := conn.GetProp("cert_key_file")
caCert := conn.GetProp("cert_ca_file")
// legacy only handles files
certificate := conn.GetProp("cert_file")
certificateKey := conn.GetProp("cert_key_file")
certificateCA := conn.GetProp("cert_ca_file")

if key != "" && cert != "" {
cert, err := tls.LoadX509KeyPair(cert, key)
// processValue handles files or values
processValue := func(val string) (string, error) {
// check if file exists
if _, err := os.Stat(val); err == nil {
bytes, err := os.ReadFile(val)
if err != nil {
return "", g.Error(err, "Failed to load file: %s", val)
}
val = string(bytes)
}

// check if it is a pem encoded certificate
val = strings.TrimSpace(val)
if strings.HasPrefix(val, "-----BEGIN") {
return val, nil
}

// not a file or pem encoded certificate
return "", g.Error("invalid certificate value or file path")
}

if val := conn.GetProp("certificate"); val != "" {
certificate, err = processValue(val)
if err != nil {
return nil, g.Error(err, "Failed to load client certificate")
}
conn.SetProp("certificate", certificate) // overwrite
}

if val := conn.GetProp("certificate_key"); val != "" {
certificateKey, err = processValue(val)
if err != nil {
return nil, g.Error(err, "Failed to load client certificate key")
}
conn.SetProp("certificate_key", certificateKey) // overwrite
}

if val := conn.GetProp("certificate_ca"); val != "" {
certificateCA, err = processValue(val)
if err != nil {
return nil, g.Error(err, "Failed to load CA certificate")
}
conn.SetProp("certificate_ca", certificateCA) // overwrite
}

if certificateKey != "" && certificate != "" {
cert, err := tls.LoadX509KeyPair(certificate, certificateKey)
if err != nil {
return nil, g.Error(err, "Failed to load client certificate")
}
Expand All @@ -2976,8 +3022,8 @@ func (conn *BaseConn) getTlsConfig() (tlsConfig *tls.Config, err error) {
Certificates: []tls.Certificate{cert},
}

if caCert != "" {
caCert, err := os.ReadFile(caCert)
if certificateCA != "" {
caCert, err := os.ReadFile(certificateCA)
if err != nil {
return nil, g.Error(err, "Failed to load CA certificate")
}
Expand Down
9 changes: 7 additions & 2 deletions core/dbio/database/database_duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func (conn *DuckDbConn) GetURL(newURL ...string) string {
return URL
}

// DuckDb returns the DuckDb instance
func (conn *DuckDbConn) DuckDb() *iop.DuckDb {
return conn.duck
}

func (conn *DuckDbConn) dbPath() (string, error) {
dbPathU, err := net.NewURL(conn.GetURL())
if err != nil {
Expand All @@ -89,7 +94,7 @@ func (conn *DuckDbConn) Connect(timeOut ...int) (err error) {
if err != nil {
return g.Error(err, "could not get db path")
} else if conn.GetType() != dbio.TypeDbMotherDuck && !g.PathExists(dbPath) {
g.Warn("The file %s does not exist, however it will be created if needed.", dbPath)
g.Debug("The file %s does not exist, however it will be created if needed.", dbPath)
}

connPool.Mux.Lock()
Expand Down Expand Up @@ -182,7 +187,7 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) (
}

config := iop.DefaultStreamConfig()
config.BatchLimit = 250000
config.FileMaxRows = 250000
config.Header = true
config.Delimiter = ","
config.Escape = `"`
Expand Down
2 changes: 1 addition & 1 deletion core/dbio/database/database_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (conn *MongoDBConn) getNewClient(timeOut ...int) (client *mongo.Client, err
options.Client().SetCompressors([]string{"zstd", "snappy", "zlib"}),
}

tlsConfig, err := conn.getTlsConfig()
tlsConfig, err := conn.makeTlsConfig()
if err != nil {
return nil, g.Error(err)
} else if tlsConfig != nil {
Expand Down
13 changes: 10 additions & 3 deletions core/dbio/database/database_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,16 @@ func (conn *OracleConn) SubmitTemplate(level string, templateMap map[string]stri
return conn.BaseConn.SubmitTemplate(level, templateMap, name, values)
}

func (conn *OracleConn) sqlldrPath() string {
if val := conn.GetProp("sqlldr_path"); val != "" {
return val
}
return "sqlldr"
}

// BulkImportStream bulk import stream
func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error) {
_, err = exec.LookPath("sqlldr")
_, err = exec.LookPath(conn.sqlldrPath())
if err != nil {
g.Debug("sqlldr not found in path. Using cursor...")
return conn.BaseConn.InsertBatchStream(tableFName, ds)
Expand Down Expand Up @@ -295,7 +302,7 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui
return
}

g.Debug("sqldr ctl file content (%s):\n%s", ctlPath, ctlStr)
g.Debug("sqlldr ctl file content (%s):\n%s", ctlPath, ctlStr)

password, _ := url.User.Password()
hostPort := url.Host
Expand Down Expand Up @@ -333,7 +340,7 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui
}

proc := exec.Command(
"sqlldr",
conn.sqlldrPath(),
"'"+credHost+"'",
"control="+ctlPath,
"discardmax=0",
Expand Down
2 changes: 1 addition & 1 deletion core/dbio/database/database_prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (conn *PrometheusConn) getNewClient(timeOut ...int) (client v1.API, err err
rt := api.DefaultRoundTripper

// get tls
tlsConfig, err := conn.getTlsConfig()
tlsConfig, err := conn.makeTlsConfig()
if err != nil {
return nil, g.Error(err)
} else if tlsConfig != nil {
Expand Down
15 changes: 11 additions & 4 deletions core/dbio/database/database_sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastr
conn.Commit() // cannot have transaction lock table

// return conn.BaseConn.InsertBatchStream(tableFName, ds)
_, err = exec.LookPath("bcp")
_, err = exec.LookPath(conn.bcpPath())
if err != nil {
g.Debug("bcp not found in path. Using cursor...")
return conn.BaseConn.InsertBatchStream(tableFName, ds)
Expand Down Expand Up @@ -407,6 +407,13 @@ func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.D
return ds.Count, ds.Err()
}

func (conn *MsSQLServerConn) bcpPath() string {
if val := conn.GetProp("bcp_path"); val != "" {
return val
}
return "bcp"
}

// BcpImportFile Import using bcp tool
// https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15
// bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000
Expand All @@ -427,7 +434,7 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u

// get version
version := 14
versionOut, err := exec.Command("bcp", "-v").Output()
versionOut, err := exec.Command(conn.bcpPath(), "-v").Output()
if err != nil {
return 0, g.Error(err, "could not get bcp version")
}
Expand Down Expand Up @@ -504,7 +511,7 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u
bcpArgs = append(bcpArgs, bcpExtraParts...)
}

proc := exec.Command("bcp", bcpArgs...)
proc := exec.Command(conn.bcpPath(), bcpArgs...)
proc.Stderr = &stderr
proc.Stdout = &stdout

Expand All @@ -517,7 +524,7 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u

// build cmdStr
args := lo.Map(proc.Args, func(v string, i int) string {
if !g.In(v, "in", "-S", "-d", "-U", "-P", "-t", "-u", "-m", "-c", "-q", "-b", "-F", "-e", "bcp") {
if !g.In(v, "in", "-S", "-d", "-U", "-P", "-t", "-u", "-m", "-c", "-q", "-b", "-F", "-e", conn.bcpPath()) {
v = strings.ReplaceAll(v, hostPort, "****")
if password != "" {
v = strings.ReplaceAll(v, password, "****")
Expand Down
8 changes: 5 additions & 3 deletions core/dbio/database/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,11 @@ func Upsert(conn Connection, tx Transaction, sourceTable, targetTable string, pk
return
}

count, err = result.RowsAffected()
if err != nil || (count == 0 && conn.GetType() == dbio.TypeDbClickhouse) {
count = -1
if result != nil {
count, err = result.RowsAffected()
if err != nil || (count == 0 && conn.GetType() == dbio.TypeDbClickhouse) {
count = -1
}
}

return
Expand Down
Loading

0 comments on commit c5fa293

Please sign in to comment.