From 3b065acc623caab71305790cf6e8ff58cd1ad1bf Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Thu, 28 Nov 2024 07:30:52 -0300 Subject: [PATCH 01/15] use SLING_SAMPLE_SIZE --- core/dbio/iop/datatype.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/dbio/iop/datatype.go b/core/dbio/iop/datatype.go index b22e0e2b..076b4e46 100755 --- a/core/dbio/iop/datatype.go +++ b/core/dbio/iop/datatype.go @@ -138,9 +138,14 @@ func (cs *ColumnStats) DuplicatePercent() float64 { } func init() { - if os.Getenv("SAMPLE_SIZE") != "" { - SampleSize = cast.ToInt(os.Getenv("SAMPLE_SIZE")) + if val := os.Getenv("SAMPLE_SIZE"); val != "" { + SampleSize = cast.ToInt(val) // legacy } + + if val := os.Getenv("SLING_SAMPLE_SIZE"); val != "" { + SampleSize = cast.ToInt(val) + } + if os.Getenv("REMOVE_TRAILING_ZEROS") != "" { RemoveTrailingDecZeros = cast.ToBool(os.Getenv("REMOVE_TRAILING_ZEROS")) } From 2314f157342bd0577dcca93af4d95c43d719ff92 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Thu, 28 Nov 2024 07:34:36 -0300 Subject: [PATCH 02/15] add error message for non-writable targets --- core/sling/config.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/sling/config.go b/core/sling/config.go index 82db5caa..e5214b6c 100644 --- a/core/sling/config.go +++ b/core/sling/config.go @@ -590,6 +590,12 @@ func (cfg *Config) Prepare() (err error) { cfg.Source.Type = cfg.SrcConn.Type cfg.Target.Type = cfg.TgtConn.Type + // validate capability to write + switch cfg.Target.Type { + case dbio.TypeDbPrometheus, dbio.TypeDbMongoDB, dbio.TypeDbBigTable: + return g.Error("sling cannot currently write to %s", cfg.Target.Type) + } + // validate table keys if tkMap := cfg.Target.Options.TableKeys; tkMap != nil { for _, kt := range lo.Keys(tkMap) { From f027a2e71e1651e5dfe2f0d129d7eb0873631745 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Thu, 28 Nov 2024 07:35:40 -0300 Subject: [PATCH 03/15] add runtime vars injection into DDL --- core/sling/task_run_write.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/sling/task_run_write.go b/core/sling/task_run_write.go index 1f208b98..e823036f 100644 --- a/core/sling/task_run_write.go +++ b/core/sling/task_run_write.go @@ -508,7 +508,15 @@ func initializeTargetTable(cfg *Config, tgtConn database.Connection) (database.T if cfg.Target.Options.TableDDL != nil { targetTable.DDL = *cfg.Target.Options.TableDDL } - targetTable.DDL = g.R(targetTable.DDL, "object_name", targetTable.Raw, "table", targetTable.Raw) + + // inject variables + fm, err := cfg.GetFormatMap() + if err != nil { + return database.Table{}, err + } + fm["table"] = targetTable.Raw + targetTable.DDL = g.Rm(targetTable.DDL, fm) + targetTable.SetKeys(cfg.Source.PrimaryKey(), cfg.Source.UpdateKey, cfg.Target.Options.TableKeys) // check table ddl From 61ae2876322a60823621332b785c5c228851ddfb Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Thu, 28 Nov 2024 07:51:27 -0300 Subject: [PATCH 04/15] accept sqlldr_path & bcp_path --- core/dbio/database/database_oracle.go | 13 ++++++++++--- core/dbio/database/database_sqlserver.go | 15 +++++++++++---- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/core/dbio/database/database_oracle.go b/core/dbio/database/database_oracle.go index b060fbd9..ea2f2770 100755 --- a/core/dbio/database/database_oracle.go +++ b/core/dbio/database/database_oracle.go @@ -228,9 +228,16 @@ func (conn *OracleConn) SubmitTemplate(level string, templateMap map[string]stri return conn.BaseConn.SubmitTemplate(level, templateMap, name, values) } +func (conn *OracleConn) sqlldrPath() string { + if val := conn.GetProp("sqlldr_path"); val != "" { + return val + } + return "sqlldr" +} + // BulkImportStream bulk import stream func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error) { - _, err = exec.LookPath("sqlldr") + _, err = exec.LookPath(conn.sqlldrPath()) if err != nil { g.Debug("sqlldr not found in path. Using cursor...") return conn.BaseConn.InsertBatchStream(tableFName, ds) @@ -295,7 +302,7 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui return } - g.Debug("sqldr ctl file content (%s):\n%s", ctlPath, ctlStr) + g.Debug("sqlldr ctl file content (%s):\n%s", ctlPath, ctlStr) password, _ := url.User.Password() hostPort := url.Host @@ -333,7 +340,7 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui } proc := exec.Command( - "sqlldr", + conn.sqlldrPath(), "'"+credHost+"'", "control="+ctlPath, "discardmax=0", diff --git a/core/dbio/database/database_sqlserver.go b/core/dbio/database/database_sqlserver.go index 6f1b95cd..e351431c 100755 --- a/core/dbio/database/database_sqlserver.go +++ b/core/dbio/database/database_sqlserver.go @@ -229,7 +229,7 @@ func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastr conn.Commit() // cannot have transaction lock table // return conn.BaseConn.InsertBatchStream(tableFName, ds) - _, err = exec.LookPath("bcp") + _, err = exec.LookPath(conn.bcpPath()) if err != nil { g.Debug("bcp not found in path. Using cursor...") return conn.BaseConn.InsertBatchStream(tableFName, ds) @@ -407,6 +407,13 @@ func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.D return ds.Count, ds.Err() } +func (conn *MsSQLServerConn) bcpPath() string { + if val := conn.GetProp("bcp_path"); val != "" { + return val + } + return "bcp" +} + // BcpImportFile Import using bcp tool // https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15 // bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 @@ -427,7 +434,7 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u // get version version := 14 - versionOut, err := exec.Command("bcp", "-v").Output() + versionOut, err := exec.Command(conn.bcpPath(), "-v").Output() if err != nil { return 0, g.Error(err, "could not get bcp version") } @@ -504,7 +511,7 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u bcpArgs = append(bcpArgs, bcpExtraParts...) } - proc := exec.Command("bcp", bcpArgs...) + proc := exec.Command(conn.bcpPath(), bcpArgs...) proc.Stderr = &stderr proc.Stdout = &stdout @@ -517,7 +524,7 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u // build cmdStr args := lo.Map(proc.Args, func(v string, i int) string { - if !g.In(v, "in", "-S", "-d", "-U", "-P", "-t", "-u", "-m", "-c", "-q", "-b", "-F", "-e", "bcp") { + if !g.In(v, "in", "-S", "-d", "-U", "-P", "-t", "-u", "-m", "-c", "-q", "-b", "-F", "-e", conn.bcpPath()) { v = strings.ReplaceAll(v, hostPort, "****") if password != "" { v = strings.ReplaceAll(v, password, "****") From b8c9132ff476d27caa8eae206d0563e430c0d9ea Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Fri, 29 Nov 2024 19:18:20 -0300 Subject: [PATCH 05/15] =?UTF-8?q?=F0=9F=94=A7=20chore(gitignore):=20add=20?= =?UTF-8?q?python=20project=20files=20to=20gitignore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - added .python-version, pyproject.toml, and uv.lock to ignore files --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index c85721f0..4c7880c2 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,6 @@ cmd/sling/build/ cmd/sling/frontend .cursorignore .sling.json +.python-version +pyproject.toml +uv.lock From 7e36cd39325efb6f08c73397ce6f30fc71c12c82 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 30 Nov 2024 17:58:53 -0300 Subject: [PATCH 06/15] =?UTF-8?q?=F0=9F=90=9B=20fix(dbio):=20improve=20con?= =?UTF-8?q?straint=20handling=20and=20error=20reporting?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/dbio/iop/datastream.go | 6 ++++++ core/dbio/iop/datatype.go | 9 ++++----- core/dbio/iop/stream_processor.go | 10 +++++++++- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/core/dbio/iop/datastream.go b/core/dbio/iop/datastream.go index 7ff47bbf..e9115d69 100644 --- a/core/dbio/iop/datastream.go +++ b/core/dbio/iop/datastream.go @@ -902,6 +902,12 @@ loop: if ds.config.SkipBlankLines && ds.Sp.rowBlankValCnt == len(row) { goto loop } + + if ds.Sp.skipCurrent { + ds.Sp.skipCurrent = false // reset + goto loop + } + if ds.Limited() { break loop } diff --git a/core/dbio/iop/datatype.go b/core/dbio/iop/datatype.go index 076b4e46..72390ee0 100755 --- a/core/dbio/iop/datatype.go +++ b/core/dbio/iop/datatype.go @@ -875,18 +875,17 @@ func (col *Column) SetLengthPrecisionScale() { } // EvaluateConstraint evaluates a value against the constraint function -func (col *Column) EvaluateConstraint(value any, sp *StreamProcessor) { +func (col *Column) EvaluateConstraint(value any, sp *StreamProcessor) (err error) { if c := col.Constraint; c.EvalFunc != nil && !c.EvalFunc(value) { c.FailCnt++ - if c.FailCnt <= 10 { + if c.FailCnt <= 20 { errMsg := g.F("constraint failure for column '%s', at row number %d, for value: %s", col.Name, sp.N, cast.ToString(value)) g.Warn(errMsg) c.Errors = append(c.Errors, errMsg) - if os.Getenv("SLING_ON_CONSTRAINT_FAILURE") == "abort" { - sp.ds.Context.CaptureErr(g.Error(errMsg)) - } + return g.Error(errMsg) } } + return } func (col *Column) SetMetadata(key string, value string) { diff --git a/core/dbio/iop/stream_processor.go b/core/dbio/iop/stream_processor.go index 8f436a85..f8c819a9 100644 --- a/core/dbio/iop/stream_processor.go +++ b/core/dbio/iop/stream_processor.go @@ -33,6 +33,7 @@ type StreamProcessor struct { rowChecksum []uint64 unrecognizedDate string warn bool + skipCurrent bool // whether to skip current row (for constraints) parseFuncs map[string]func(s string) (interface{}, error) decReplRegex *regexp.Regexp ds *Datastream @@ -1238,7 +1239,14 @@ func (sp *StreamProcessor) CastRow(row []interface{}, columns Columns) []interfa // evaluate constraint if col.Constraint != nil { - col.EvaluateConstraint(row[i], sp) + if err := col.EvaluateConstraint(row[i], sp); err != nil { + switch os.Getenv("SLING_ON_CONSTRAINT_FAILURE") { + case "abort": + sp.ds.Context.CaptureErr(err) + case "skip": + sp.skipCurrent = true + } + } } } From ddcd981c7ec70cbe4b6e553f42ddc599cd0db665 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 30 Nov 2024 18:00:44 -0300 Subject: [PATCH 07/15] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor(sling):=20s?= =?UTF-8?q?implify=20hook=20logic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/sling/hooks.go | 29 +++++++---------------------- core/sling/replication.go | 6 ++++-- core/sling/task.go | 1 + core/sling/task_run.go | 20 ++++++-------------- 4 files changed, 18 insertions(+), 38 deletions(-) diff --git a/core/sling/hooks.go b/core/sling/hooks.go index 6d93d4ee..8db520b0 100644 --- a/core/sling/hooks.go +++ b/core/sling/hooks.go @@ -11,30 +11,15 @@ type Hook interface { ExecuteOnDone(error, *TaskExecution) error } -type Hooks struct { - Pre []any `json:"pre" yaml:"pre"` - Post []any `json:"post" yaml:"post"` -} - -func (h *Hooks) PreHooks(te *TaskExecution) (hooks []Hook, err error) { - for i, hook := range h.Pre { - preHook, err := ParseHook(hook, te, g.F("pre-%02d", i+1)) - if err != nil { - return nil, g.Error(err, "error making pre-hook") - } else if preHook != nil { - hooks = append(hooks, preHook) - } - } - return hooks, nil -} +type Hooks []any -func (h *Hooks) PostHooks(te *TaskExecution) (hooks []Hook, err error) { - for i, hook := range h.Post { - postHook, err := ParseHook(hook, te, g.F("post-%02d", i+1)) +func (hks Hooks) Parse(stage string, te *TaskExecution) (hooks []Hook, err error) { + for i, hook := range hks { + parsedHook, err := ParseHook(hook, te, g.F("%s-%02d", stage, i+1)) if err != nil { - return nil, g.Error(err, "error making pre-hook") - } else if postHook != nil { - hooks = append(hooks, postHook) + return nil, g.Error(err, "error making %s-hook", stage) + } else if parsedHook != nil { + hooks = append(hooks, parsedHook) } } return hooks, nil diff --git a/core/sling/replication.go b/core/sling/replication.go index 3470dddb..11514ac6 100644 --- a/core/sling/replication.go +++ b/core/sling/replication.go @@ -586,7 +586,8 @@ type ReplicationStreamConfig struct { Single *bool `json:"single,omitempty" yaml:"single,omitempty"` Transforms any `json:"transforms,omitempty" yaml:"transforms,omitempty"` Columns any `json:"columns,omitempty" yaml:"columns,omitempty"` - Hooks Hooks `json:"hooks,omitempty" yaml:"hooks,omitempty"` + PreHooks Hooks `json:"pre_hooks,omitempty" yaml:"pre_hooks,omitempty"` + PostHooks Hooks `json:"post_hooks,omitempty" yaml:"post_hooks,omitempty"` } func (s *ReplicationStreamConfig) PrimaryKey() []string { @@ -614,7 +615,8 @@ func SetStreamDefaults(name string, stream *ReplicationStreamConfig, replication "single": func() { stream.Single = g.Ptr(g.PtrVal(replicationCfg.Defaults.Single)) }, "transforms": func() { stream.Transforms = replicationCfg.Defaults.Transforms }, "columns": func() { stream.Columns = replicationCfg.Defaults.Columns }, - "hooks": func() { stream.Hooks = replicationCfg.Defaults.Hooks }, + "pre_hooks": func() { stream.PreHooks = replicationCfg.Defaults.PreHooks }, + "post_hooks": func() { stream.PostHooks = replicationCfg.Defaults.PostHooks }, } for key, setFunc := range defaultSet { diff --git a/core/sling/task.go b/core/sling/task.go index 32877ede..3e2a40f0 100644 --- a/core/sling/task.go +++ b/core/sling/task.go @@ -36,6 +36,7 @@ type TaskExecution struct { data *iop.Dataset `json:"-"` prevRowCount uint64 prevByteCount uint64 + skipStream bool `json:"skip_stream"` lastIncrement time.Time // the time of last row increment (to determine stalling) Output strings.Builder `json:"-"` OutputLines chan *g.LogLine diff --git a/core/sling/task_run.go b/core/sling/task_run.go index 9a858fcf..94e341b8 100644 --- a/core/sling/task_run.go +++ b/core/sling/task_run.go @@ -112,6 +112,10 @@ func (t *TaskExecution) Execute() error { // pre-hooks if t.Err = t.ExecuteHooks("pre"); t.Err != nil { return + } else if t.skipStream { + t.SetProgress("skipping stream") + t.Status = ExecStatusSkipped + return } switch t.Type { @@ -199,9 +203,9 @@ func (t *TaskExecution) ExecuteHooks(stage string) (err error) { var hooks []Hook if stage == "pre" { - hooks, err = t.Config.ReplicationStream.Hooks.PreHooks(t) + hooks, err = t.Config.ReplicationStream.PreHooks.Parse(stage, t) } else if stage == "post" { - hooks, err = t.Config.ReplicationStream.Hooks.PostHooks(t) + hooks, err = t.Config.ReplicationStream.PostHooks.Parse(stage, t) } else { return g.Error("invalid hook stage") } @@ -413,18 +417,6 @@ func (t *TaskExecution) runDbToFile() (err error) { } -func (t *TaskExecution) runFolderToDB() (err error) { - /* - This will take a URL as a folder path - 1. list the files/folders in it (not recursive) - 2a. run runFileToDB for each of the files, naming the target table respectively - 2b. OR run runFileToDB for each of the files, to the same target able, assume each file has same structure - 3. keep list of file inserted in Job.Settings (view handleExecutionHeartbeat in server_ws.go). - - */ - return -} - func (t *TaskExecution) runFileToDB() (err error) { start = time.Now() From 592752182d2f504ec387ff71fe1a83aea63d86e7 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 30 Nov 2024 18:01:52 -0300 Subject: [PATCH 08/15] =?UTF-8?q?=F0=9F=90=9B=20fix(dbio):=20handle=20nil?= =?UTF-8?q?=20result=20in=20Upsert=20function?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/dbio/database/transaction.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/dbio/database/transaction.go b/core/dbio/database/transaction.go index cf5686d2..cb19c9ec 100644 --- a/core/dbio/database/transaction.go +++ b/core/dbio/database/transaction.go @@ -480,9 +480,11 @@ func Upsert(conn Connection, tx Transaction, sourceTable, targetTable string, pk return } - count, err = result.RowsAffected() - if err != nil || (count == 0 && conn.GetType() == dbio.TypeDbClickhouse) { - count = -1 + if result != nil { + count, err = result.RowsAffected() + if err != nil || (count == 0 && conn.GetType() == dbio.TypeDbClickhouse) { + count = -1 + } } return From 9fa83cac98ea4137c43af1c8d0fc8049054ffce5 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 30 Nov 2024 18:03:22 -0300 Subject: [PATCH 09/15] =?UTF-8?q?=F0=9F=90=9B=20fix(dbio):=20handle=20inva?= =?UTF-8?q?lid=20certificate=20values=20and=20file=20paths?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Improved error handling for invalid certificate values and file paths in the `makeTlsConfig` function. - Added support for PEM-encoded certificates passed directly as values, not just file paths. - Updated the `processValue` function to handle both files and PEM-encoded certificates. - Enhanced error messages to provide more specific information about invalid input. - Added checks to ensure that certificates and keys are properly loaded before creating the TLS config. - Updated unit tests to cover these changes. - Modified existing tests to accommodate changes in the `makeTlsConfig` function. - Ensured that the new error handling and value processing logic functions correctly in both normal and edge cases. - Refactored the code to improve readability and maintainability. --- core/dbio/database/database.go | 62 ++++++++++++++++++++--- core/dbio/database/database_mongo.go | 2 +- core/dbio/database/database_prometheus.go | 2 +- 3 files changed, 56 insertions(+), 10 deletions(-) diff --git a/core/dbio/database/database.go b/core/dbio/database/database.go index 81766440..40ae88d5 100755 --- a/core/dbio/database/database.go +++ b/core/dbio/database/database.go @@ -2957,17 +2957,63 @@ func (conn *BaseConn) credsProvided(provider string) bool { return false } -func (conn *BaseConn) getTlsConfig() (tlsConfig *tls.Config, err error) { +func (conn *BaseConn) makeTlsConfig() (tlsConfig *tls.Config, err error) { if cast.ToBool(conn.GetProp("tls")) { tlsConfig = &tls.Config{} } - cert := conn.GetProp("cert_file") - key := conn.GetProp("cert_key_file") - caCert := conn.GetProp("cert_ca_file") + // legacy only handles files + certificate := conn.GetProp("cert_file") + certificateKey := conn.GetProp("cert_key_file") + certificateCA := conn.GetProp("cert_ca_file") - if key != "" && cert != "" { - cert, err := tls.LoadX509KeyPair(cert, key) + // processValue handles files or values + processValue := func(val string) (string, error) { + // check if file exists + if _, err := os.Stat(val); err == nil { + bytes, err := os.ReadFile(val) + if err != nil { + return "", g.Error(err, "Failed to load file: %s", val) + } + val = string(bytes) + } + + // check if it is a pem encoded certificate + val = strings.TrimSpace(val) + if strings.HasPrefix(val, "-----BEGIN") { + return val, nil + } + + // not a file or pem encoded certificate + return "", g.Error("invalid certificate value or file path") + } + + if val := conn.GetProp("certificate"); val != "" { + certificate, err = processValue(val) + if err != nil { + return nil, g.Error(err, "Failed to load client certificate") + } + conn.SetProp("certificate", certificate) // overwrite + } + + if val := conn.GetProp("certificate_key"); val != "" { + certificateKey, err = processValue(val) + if err != nil { + return nil, g.Error(err, "Failed to load client certificate key") + } + conn.SetProp("certificate_key", certificateKey) // overwrite + } + + if val := conn.GetProp("certificate_ca"); val != "" { + certificateCA, err = processValue(val) + if err != nil { + return nil, g.Error(err, "Failed to load CA certificate") + } + conn.SetProp("certificate_ca", certificateCA) // overwrite + } + + if certificateKey != "" && certificate != "" { + cert, err := tls.LoadX509KeyPair(certificate, certificateKey) if err != nil { return nil, g.Error(err, "Failed to load client certificate") } @@ -2976,8 +3022,8 @@ func (conn *BaseConn) getTlsConfig() (tlsConfig *tls.Config, err error) { Certificates: []tls.Certificate{cert}, } - if caCert != "" { - caCert, err := os.ReadFile(caCert) + if certificateCA != "" { + caCert, err := os.ReadFile(certificateCA) if err != nil { return nil, g.Error(err, "Failed to load CA certificate") } diff --git a/core/dbio/database/database_mongo.go b/core/dbio/database/database_mongo.go index eb565451..0fa6a8b2 100644 --- a/core/dbio/database/database_mongo.go +++ b/core/dbio/database/database_mongo.go @@ -60,7 +60,7 @@ func (conn *MongoDBConn) getNewClient(timeOut ...int) (client *mongo.Client, err options.Client().SetCompressors([]string{"zstd", "snappy", "zlib"}), } - tlsConfig, err := conn.getTlsConfig() + tlsConfig, err := conn.makeTlsConfig() if err != nil { return nil, g.Error(err) } else if tlsConfig != nil { diff --git a/core/dbio/database/database_prometheus.go b/core/dbio/database/database_prometheus.go index b86adf2b..fd9f6e70 100644 --- a/core/dbio/database/database_prometheus.go +++ b/core/dbio/database/database_prometheus.go @@ -43,7 +43,7 @@ func (conn *PrometheusConn) getNewClient(timeOut ...int) (client v1.API, err err rt := api.DefaultRoundTripper // get tls - tlsConfig, err := conn.getTlsConfig() + tlsConfig, err := conn.makeTlsConfig() if err != nil { return nil, g.Error(err) } else if tlsConfig != nil { From 561b49725ca73719076eb9843289121ba0e5932a Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 30 Nov 2024 18:03:52 -0300 Subject: [PATCH 10/15] =?UTF-8?q?=F0=9F=90=9B=20fix(dbio):=20change=20warn?= =?UTF-8?q?ing=20log=20to=20debug=20log?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/dbio/database/database_duckdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbio/database/database_duckdb.go b/core/dbio/database/database_duckdb.go index 2d27fa28..9524ebc9 100644 --- a/core/dbio/database/database_duckdb.go +++ b/core/dbio/database/database_duckdb.go @@ -89,7 +89,7 @@ func (conn *DuckDbConn) Connect(timeOut ...int) (err error) { if err != nil { return g.Error(err, "could not get db path") } else if conn.GetType() != dbio.TypeDbMotherDuck && !g.PathExists(dbPath) { - g.Warn("The file %s does not exist, however it will be created if needed.", dbPath) + g.Debug("The file %s does not exist, however it will be created if needed.", dbPath) } connPool.Mux.Lock() From 4ad4569ba8a4e0b081ea33e991dfedd30f71c8a8 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 30 Nov 2024 18:13:07 -0300 Subject: [PATCH 11/15] =?UTF-8?q?=F0=9F=90=9B=20fix(dbio):=20remove=20unne?= =?UTF-8?q?cessary=20trim=5Fspace=20option?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Removed the `trim_space` option from the `StreamConfig` struct and related functions. - This option was redundant as the trimming of spaces is now handled implicitly based on the column type. - This change simplifies the code and improves maintainability. - Updated related test cases to reflect the removal of the `trim_space` option. --- core/dbio/iop/csv_test.go | 1 - core/dbio/iop/stream_processor.go | 6 +----- core/sling/config.go | 5 ----- 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/core/dbio/iop/csv_test.go b/core/dbio/iop/csv_test.go index de4bb475..9a6d94f6 100755 --- a/core/dbio/iop/csv_test.go +++ b/core/dbio/iop/csv_test.go @@ -192,7 +192,6 @@ func TestSreamOptions(t *testing.T) { assert.Equal(t, nil, data.Rows[20][0]) configMap["null_if"] = "NULL" - configMap["trim_space"] = "TRUE" configMap["skip_blank_lines"] = "TRUE" configMap["datetime_format"] = "DD-MM-YYYY HH:mm:ss.s" data = consume() diff --git a/core/dbio/iop/stream_processor.go b/core/dbio/iop/stream_processor.go index f8c819a9..afe6fd0b 100644 --- a/core/dbio/iop/stream_processor.go +++ b/core/dbio/iop/stream_processor.go @@ -45,7 +45,6 @@ type StreamProcessor struct { } type StreamConfig struct { - TrimSpace bool `json:"trim_space"` EmptyAsNull bool `json:"empty_as_null"` Header bool `json:"header"` Compression CompressorType `json:"compression"` // AUTO | ZIP | GZIP | SNAPPY | NONE @@ -335,9 +334,6 @@ func (sp *StreamProcessor) SetConfig(configMap map[string]string) { if val, ok := configMap["null_as"]; ok { sp.Config.NullAs = val } - if val, ok := configMap["trim_space"]; ok { - sp.Config.TrimSpace = cast.ToBool(val) - } if val, ok := configMap["jmespath"]; ok { sp.Config.Jmespath = cast.ToString(val) @@ -589,7 +585,7 @@ func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interfac } isString = true - if sp.Config.TrimSpace || !col.IsString() { + if !col.IsString() { // if colType is not string, and the value is string, we should trim it // in case it comes from a CSV. If it's empty, it should be considered nil sVal = strings.TrimSpace(sVal) diff --git a/core/sling/config.go b/core/sling/config.go index e5214b6c..d68d1e2a 100644 --- a/core/sling/config.go +++ b/core/sling/config.go @@ -1275,7 +1275,6 @@ func (t *Target) MD5() string { // SourceOptions are connection and stream processing options type SourceOptions struct { - TrimSpace *bool `json:"trim_space,omitempty" yaml:"trim_space,omitempty"` EmptyAsNull *bool `json:"empty_as_null,omitempty" yaml:"empty_as_null,omitempty"` Header *bool `json:"header,omitempty" yaml:"header,omitempty"` Flatten *bool `json:"flatten,omitempty" yaml:"flatten,omitempty"` @@ -1329,7 +1328,6 @@ type TargetOptions struct { } var SourceFileOptionsDefault = SourceOptions{ - TrimSpace: g.Bool(false), EmptyAsNull: g.Bool(true), Header: g.Bool(true), Flatten: g.Bool(false), @@ -1399,9 +1397,6 @@ func (o *SourceOptions) SetDefaults(sourceOptions SourceOptions) { if o == nil { o = &sourceOptions } - if o.TrimSpace == nil { - o.TrimSpace = sourceOptions.TrimSpace - } if o.EmptyAsNull == nil { o.EmptyAsNull = sourceOptions.EmptyAsNull } From cf49abc4e36ae33af9e93f034ecc738ad62b4de3 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 30 Nov 2024 18:32:21 -0300 Subject: [PATCH 12/15] =?UTF-8?q?=F0=9F=90=9B=20fix(sling,=20dbio):=20remo?= =?UTF-8?q?ve=20unnecessary=20trim=5Fspace=20option?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Removed the `trim_space` option from the sling configuration. - Updated the stream processor to handle empty values more robustly, using `empty_as_null` instead of relying on `trim_space`. - Improved the handling of transforms in the stream processor, adding a dedicated `empty_as_null` transform. - Refactored the `transforms` field in `StreamConfig` to use a `TransformList` instead of a simple array, allowing for more efficient transform lookup. - Updated the test cases to reflect the changes. --- cmd/sling/sling_test.go | 4 --- core/dbio/iop/stream_processor.go | 57 ++++++++++++++++--------------- core/dbio/iop/transforms.go | 19 +++++++++++ 3 files changed, 48 insertions(+), 32 deletions(-) diff --git a/cmd/sling/sling_test.go b/cmd/sling/sling_test.go index 42119a5a..e6d38c13 100755 --- a/cmd/sling/sling_test.go +++ b/cmd/sling/sling_test.go @@ -967,7 +967,6 @@ defaults: primary_key: [col1, col2] update_key: col3 source_options: - trim_space: false delimiter: "," target_options: file_max_rows: 500000 @@ -985,7 +984,6 @@ streams: update_key: col2 source_options: columns: { pro: 'decimal(10,4)', pro2: 'string' } - trim_space: true delimiter: "|" transforms: [trim_space] target_options: @@ -1040,7 +1038,6 @@ streams: assert.Equal(t, []string{"col1", "col2", "col3"}, config.Source.Select) assert.Equal(t, []string{"col1", "col2"}, config.Source.PrimaryKey()) assert.Equal(t, "col3", config.Source.UpdateKey) - assert.Equal(t, g.Bool(false), config.Source.Options.TrimSpace) assert.Equal(t, ",", config.Source.Options.Delimiter) assert.Equal(t, "postgres", config.Target.Conn) @@ -1058,7 +1055,6 @@ streams: assert.Equal(t, []string{"col1"}, config.Source.Select) assert.Equal(t, []string{"col3"}, config.Source.PrimaryKey()) assert.Equal(t, "col2", config.Source.UpdateKey) - assert.Equal(t, g.Bool(true), config.Source.Options.TrimSpace) assert.Equal(t, "|", config.Source.Options.Delimiter) assert.Equal(t, "[{\"name\":\"pro\",\"type\":\"decimal(10,4)\"},{\"name\":\"pro2\",\"type\":\"string\"}]", g.Marshal(config.Target.Columns)) assert.Equal(t, `["trim_space"]`, g.Marshal(config.Transforms)) diff --git a/core/dbio/iop/stream_processor.go b/core/dbio/iop/stream_processor.go index afe6fd0b..5b8b33f9 100644 --- a/core/dbio/iop/stream_processor.go +++ b/core/dbio/iop/stream_processor.go @@ -45,29 +45,29 @@ type StreamProcessor struct { } type StreamConfig struct { - EmptyAsNull bool `json:"empty_as_null"` - Header bool `json:"header"` - Compression CompressorType `json:"compression"` // AUTO | ZIP | GZIP | SNAPPY | NONE - NullIf string `json:"null_if"` - NullAs string `json:"null_as"` - DatetimeFormat string `json:"datetime_format"` - SkipBlankLines bool `json:"skip_blank_lines"` - Delimiter string `json:"delimiter"` - Escape string `json:"escape"` - Quote string `json:"quote"` - FileMaxRows int64 `json:"file_max_rows"` - FileMaxBytes int64 `json:"file_max_bytes"` - BatchLimit int64 `json:"batch_limit"` - MaxDecimals int `json:"max_decimals"` - Flatten bool `json:"flatten"` - FieldsPerRec int `json:"fields_per_rec"` - Jmespath string `json:"jmespath"` - Sheet string `json:"sheet"` - ColumnCasing ColumnCasing `json:"column_casing"` - BoolAsInt bool `json:"-"` - Columns Columns `json:"columns"` // list of column types. Can be partial list! likely is! - transforms map[string][]Transform // array of transform functions to apply - maxDecimalsFormat string `json:"-"` + EmptyAsNull bool `json:"empty_as_null"` + Header bool `json:"header"` + Compression CompressorType `json:"compression"` // AUTO | ZIP | GZIP | SNAPPY | NONE + NullIf string `json:"null_if"` + NullAs string `json:"null_as"` + DatetimeFormat string `json:"datetime_format"` + SkipBlankLines bool `json:"skip_blank_lines"` + Delimiter string `json:"delimiter"` + Escape string `json:"escape"` + Quote string `json:"quote"` + FileMaxRows int64 `json:"file_max_rows"` + FileMaxBytes int64 `json:"file_max_bytes"` + BatchLimit int64 `json:"batch_limit"` + MaxDecimals int `json:"max_decimals"` + Flatten bool `json:"flatten"` + FieldsPerRec int `json:"fields_per_rec"` + Jmespath string `json:"jmespath"` + Sheet string `json:"sheet"` + ColumnCasing ColumnCasing `json:"column_casing"` + BoolAsInt bool `json:"-"` + Columns Columns `json:"columns"` // list of column types. Can be partial list! likely is! + transforms map[string]TransformList // array of transform functions to apply + maxDecimalsFormat string `json:"-"` Map map[string]string `json:"-"` } @@ -384,9 +384,9 @@ func makeColumnTransforms(transformsPayload string) map[string][]string { func (sp *StreamProcessor) applyTransforms(transformsPayload string) { columnTransforms := makeColumnTransforms(transformsPayload) - sp.Config.transforms = map[string][]Transform{} + sp.Config.transforms = map[string]TransformList{} for key, names := range columnTransforms { - sp.Config.transforms[key] = []Transform{} + sp.Config.transforms[key] = TransformList{} for _, name := range names { t, ok := TransformsMap[name] if ok { @@ -567,6 +567,8 @@ func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interfac return nil } + colKey := strings.ToLower(col.Name) + switch v := val.(type) { case big.Int: val = v.Int64() @@ -593,7 +595,7 @@ func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interfac } if sVal == "" { sp.rowBlankValCnt++ - if sp.Config.EmptyAsNull || !col.IsString() { + if sp.Config.EmptyAsNull || !col.IsString() || sp.Config.transforms[colKey].HasTransform(TransformEmptyAsNull) { cs.TotalCnt++ cs.NullCnt++ return nil @@ -606,8 +608,7 @@ func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interfac } // get transforms - key := strings.ToLower(col.Name) - transforms := append(sp.Config.transforms[key], sp.Config.transforms["*"]...) + transforms := append(sp.Config.transforms[colKey], sp.Config.transforms["*"]...) switch { case col.Type.IsString(): diff --git a/core/dbio/iop/transforms.go b/core/dbio/iop/transforms.go index 9da621d3..ceb7663b 100644 --- a/core/dbio/iop/transforms.go +++ b/core/dbio/iop/transforms.go @@ -68,6 +68,17 @@ type Transform struct { makeFunc func(t *Transform, params ...any) error } +type TransformList []Transform + +func (tl TransformList) HasTransform(t Transform) bool { + for _, t0 := range tl { + if t.Name == t0.Name { + return true + } + } + return false +} + var ( TransformDecodeLatin1 = Transform{ Name: "decode_latin1", @@ -283,6 +294,14 @@ var ( }, } + // used as lookup, cannot return null since is not pointer + TransformEmptyAsNull = Transform{ + Name: "empty_as_null", + FuncString: func(sp *StreamProcessor, val string) (string, error) { + return val, nil + }, + } + TransformSetTimezone = Transform{ Name: "set_timezone", makeFunc: func(t *Transform, location ...any) error { From 0fdb025f421f00c3984c59c49d7e6c55c71fd26c Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sat, 30 Nov 2024 18:39:13 -0300 Subject: [PATCH 13/15] =?UTF-8?q?=F0=9F=90=9B=20fix(datastream):=20handle?= =?UTF-8?q?=20edge=20cases=20in=20data=20processing=20loop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Prevent unnecessary processing when SampleSize is 0 - Prevent unnecessary processing when all columns are coerced to strings --- core/dbio/iop/datastream.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/dbio/iop/datastream.go b/core/dbio/iop/datastream.go index e9115d69..dca57bd3 100644 --- a/core/dbio/iop/datastream.go +++ b/core/dbio/iop/datastream.go @@ -698,6 +698,12 @@ loop: default: if ds.it.Counter == 1 && !ds.NoDebug { g.Trace("%#v", ds.it.Row) // trace first row for debugging + } else if SampleSize == 0 { + // if sample size if zero, don't process rows + break loop + } else if cols := ds.Sp.Config.Columns; len(cols) == 1 && cols[0].Name == "*" && cols[0].IsString() { + // if specified to coerce all columns to string, don't process rows + break loop } row := ds.Sp.ProcessRow(ds.it.Row) From 578efa81b13ea35de2dbe8cbae8f4946f646e2e2 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sun, 1 Dec 2024 15:59:10 -0300 Subject: [PATCH 14/15] improve CLI test suite and add partitioned parquet support --- README.md | 2 +- cmd/sling/sling_run.go | 4 +- cmd/sling/tests/suite.cli.tsv | 1 + core/dbio/database/database_duckdb.go | 7 +- core/dbio/filesys/fs.go | 83 +++++++++++++++++++- core/dbio/iop/dataflow.go | 8 ++ core/dbio/iop/duckdb.go | 91 ++++++++++++++++++++++ core/dbio/templates/duckdb.yaml | 23 ++++++ core/env/env.go | 6 ++ core/sling/config.go | 21 +++-- core/sling/task.go | 9 +++ core/sling/task_func.go | 20 +++++ core/sling/task_run_write.go | 107 ++++++++++++++++++++++++-- 13 files changed, 365 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 04aeecf3..3c9347f0 100644 --- a/README.md +++ b/README.md @@ -230,7 +230,7 @@ You can target specific tests or suites using environment variables: 3. CLI Suite: ```sh cd cmd/sling - export SLING_BIN=../../sling + export SLING_BIN=./sling go test -v -run TestCLI # run all CLI tests TESTS="31+" go test -v -run TestCLI # run CLI tests 31 and all subsequent tests ``` diff --git a/cmd/sling/sling_run.go b/cmd/sling/sling_run.go index 54266abb..6cfe4346 100755 --- a/cmd/sling/sling_run.go +++ b/cmd/sling/sling_run.go @@ -460,6 +460,8 @@ func runReplication(cfgPath string, cfgOverwrite *sling.Config, selectStreams .. break } + env.LogSink = nil // clear log sink + if cfg.ReplicationStream.Disabled { println() g.Debug("skipping stream %s since it is disabled", cfg.StreamName) @@ -472,8 +474,6 @@ func runReplication(cfgPath string, cfgOverwrite *sling.Config, selectStreams .. g.Info("[%d / %d] running stream %s", counter, streamCnt, cfg.StreamName) } - env.LogSink = nil // clear log sink - env.TelMap = g.M("begin_time", time.Now().UnixMicro(), "run_mode", "replication") // reset map env.SetTelVal("replication_md5", replication.MD5()) err = runTask(cfg, &replication) diff --git a/cmd/sling/tests/suite.cli.tsv b/cmd/sling/tests/suite.cli.tsv index da8d42f9..0d8fc2ab 100644 --- a/cmd/sling/tests/suite.cli.tsv +++ b/cmd/sling/tests/suite.cli.tsv @@ -56,3 +56,4 @@ n test_name rows bytes streams fails output_contains command 55 Run sling with CSV source and $symbol quote 3 "sling run --src-conn LOCAL --src-stream file://cmd/sling/tests/files/test8.csv --src-options '{ delimiter: ""|"", quote: ""$"", escape: ""\\"" }' --stdout > /dev/null" 56 Run sling with direct insert full-refresh >10 >1 streaming data (direct insert) SLING_DIRECT_INSERT=true sling run -r cmd/sling/tests/replications/r.09.yaml 57 Run sling with direct insert incremental 0 Nothing to insert|streaming data (direct insert) SLING_DIRECT_INSERT=true sling run --src-conn postgres --src-stream public.test1k --tgt-conn snowflake --tgt-object 'public.{stream_schema}_{stream_table}' --mode incremental --update-key create_date +58 Run sling writing to partitioned parquet 1000 partition_by ( sling run --src-stream file://cmd/sling/tests/files/test1.csv --tgt-object 'file:///tmp/sling/output8/{part_year}/{part_month}' -d --tgt-options '{ format: parquet }' --update-key create_dt diff --git a/core/dbio/database/database_duckdb.go b/core/dbio/database/database_duckdb.go index 9524ebc9..a854f65d 100644 --- a/core/dbio/database/database_duckdb.go +++ b/core/dbio/database/database_duckdb.go @@ -72,6 +72,11 @@ func (conn *DuckDbConn) GetURL(newURL ...string) string { return URL } +// DuckDb returns the DuckDb instance +func (conn *DuckDbConn) DuckDb() *iop.DuckDb { + return conn.duck +} + func (conn *DuckDbConn) dbPath() (string, error) { dbPathU, err := net.NewURL(conn.GetURL()) if err != nil { @@ -182,7 +187,7 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) ( } config := iop.DefaultStreamConfig() - config.BatchLimit = 250000 + config.FileMaxRows = 250000 config.Header = true config.Delimiter = "," config.Escape = `"` diff --git a/core/dbio/filesys/fs.go b/core/dbio/filesys/fs.go index 8a55fcf1..da88cb8a 100755 --- a/core/dbio/filesys/fs.go +++ b/core/dbio/filesys/fs.go @@ -7,6 +7,7 @@ import ( "io" "os" "path" + "path/filepath" "runtime" "runtime/debug" "strings" @@ -662,13 +663,13 @@ func (fs *BaseFileSysClient) WriteDataflowReady(df *iop.Dataflow, url string, fi fileExt := cast.ToString(fs.GetProp("FILE_EXTENSION")) // use provided config or get from dataflow - if val := fs.GetProp("COMPRESSION"); val != "" { + if val := fs.GetProp("COMPRESSION"); val != "" && sc.Compression == iop.NoneCompressorType { sc.Compression = iop.CompressorType(strings.ToLower(val)) } - if val := fs.GetProp("FILE_MAX_ROWS"); val != "" { + if val := fs.GetProp("FILE_MAX_ROWS"); val != "" && sc.FileMaxRows == 0 { sc.FileMaxRows = cast.ToInt64(val) } - if val := fs.GetProp("FILE_MAX_BYTES"); val != "" { + if val := fs.GetProp("FILE_MAX_BYTES"); val != "" && sc.FileMaxBytes == 0 { sc.FileMaxBytes = cast.ToInt64(val) } @@ -848,6 +849,9 @@ func (fs *BaseFileSysClient) WriteDataflowReady(df *iop.Dataflow, url string, fi } } + // set default batch limit + df.SetBatchLimit(sc.BatchLimit) + partCnt := 1 var streamCh chan *iop.Datastream @@ -1322,3 +1326,76 @@ func InferFileFormat(path string, defaults ...dbio.FileType) dbio.FileType { // default is csv return dbio.FileTypeCsv } + +// CopyFromLocalRecursive copies a local file or directory recursively to a remote filesystem +func CopyFromLocalRecursive(fs FileSysClient, localPath string, remotePath string) (totalBytes int64, err error) { + // Check if source exists + info, err := os.Stat(localPath) + if err != nil { + return 0, g.Error(err, "Error accessing local path: "+localPath) + } + + // If it's a single file, copy it directly + if !info.IsDir() { + file, err := os.Open(localPath) + if err != nil { + return 0, g.Error(err, "Error opening local file: "+localPath) + } + defer file.Close() + + bw, err := fs.Write(remotePath, file) + if err != nil { + return 0, g.Error(err, "Error writing to remote path: "+remotePath) + } + return bw, nil + } + + // For directories, walk through and copy each file + err = filepath.Walk(localPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return g.Error(err, "Error walking path: "+path) + } + + // Skip directories themselves + if info.IsDir() { + return nil + } + + // Calculate relative path to maintain directory structure + relPath, err := filepath.Rel(localPath, path) + if err != nil { + return g.Error(err, "Error getting relative path for: "+path) + } + + // Convert path separators to forward slashes for consistency + relPath = filepath.ToSlash(relPath) + + // Construct remote path + remoteFilePath := remotePath + if !strings.HasSuffix(remotePath, "/") { + remoteFilePath += "/" + } + remoteFilePath += relPath + + // Open and copy the file + file, err := os.Open(path) + if err != nil { + return g.Error(err, "Error opening file: "+path) + } + defer file.Close() + + bw, err := fs.Write(remoteFilePath, file) + if err != nil { + return g.Error(err, "Error writing to remote path: "+remoteFilePath) + } + + totalBytes += bw + return nil + }) + + if err != nil { + return totalBytes, g.Error(err, "Error during recursive copy") + } + + return totalBytes, nil +} diff --git a/core/dbio/iop/dataflow.go b/core/dbio/iop/dataflow.go index be8fadf5..c0c3af0a 100644 --- a/core/dbio/iop/dataflow.go +++ b/core/dbio/iop/dataflow.go @@ -151,6 +151,14 @@ func (df *Dataflow) Close() { df.closed = true } +// BufferDataset return the buffer as a dataset +func (df *Dataflow) BufferDataset() Dataset { + data := NewDataset(df.Columns) + data.Rows = df.Buffer + data.Inferred = df.Inferred + return data +} + // Pause pauses all streams func (df *Dataflow) Pause(exceptDs ...string) bool { if df.Ready { diff --git a/core/dbio/iop/duckdb.go b/core/dbio/iop/duckdb.go index f517c945..fc533416 100644 --- a/core/dbio/iop/duckdb.go +++ b/core/dbio/iop/duckdb.go @@ -681,6 +681,97 @@ func (duck *DuckDb) initScanner() { } +type DuckDbCopyOptions struct { + Compression string + PartitionFields []string // part_year, part_month, part_day, etc. + PartitionKey string + WritePartitionCols bool + FileSizeBytes int64 +} + +func (duck *DuckDb) GenerateCopyStatement(fromTable, toLocalPath string, options DuckDbCopyOptions) (sql string, err error) { + type partExpression struct { + alias string + expression string + } + + var partExpressions []partExpression + + // validate PartitionLevels + for _, pl := range options.PartitionFields { + if !g.In(pl, "part_year", "part_month", "part_week", "part_day", "part_hour", "part_minute", "part_year_month") { + return sql, g.Error("invalid partition level: %s", pl) + } else if options.PartitionKey == "" { + return "", g.Error("missing partition key") + } + + datePart := strings.TrimPrefix(pl, "part_") + pe := partExpression{ + alias: g.F("%s_%s", dbio.TypeDbDuckDb.Unquote(options.PartitionKey), datePart), + expression: g.F("date_part('%s', %s)", datePart, options.PartitionKey), + } + + switch pl { + case "part_year_month": + pe.expression = g.F("strftime(%s, '%s')", options.PartitionKey, "%Y-%m") + case "part_month": + pe.expression = g.F("strftime(%s, '%s')", options.PartitionKey, "%m") + case "part_week": + pe.expression = g.F("strftime(%s, '%s')", options.PartitionKey, "%V") + case "part_day": + pe.expression = g.F("strftime(%s, '%s')", options.PartitionKey, "%d") + case "part_hour": + pe.expression = g.F("strftime(%s, '%s')", options.PartitionKey, "%H") + case "part_minute": + pe.expression = g.F("strftime(%s, '%s')", options.PartitionKey, "%M") + } + + partExpressions = append(partExpressions, pe) + } + + // validate compression + options.Compression = strings.ToLower(options.Compression) + if g.In(options.Compression, "", "none", "auto") { + options.Compression = "snappy" + } + + fileSizeBytesExpr := "" + if options.FileSizeBytes > 0 { + fileSizeBytesExpr = g.F("file_size_bytes %d,", options.FileSizeBytes) + } + + if len(partExpressions) > 0 { + partSqlColumns := make([]string, len(partExpressions)) + partSqlExpressions := make([]string, len(partExpressions)) + for i, partExpression := range partExpressions { + aliasQ := dbio.TypeDbDuckDb.Quote(partExpression.alias) + partSqlColumns[i] = aliasQ + partSqlExpressions[i] = g.F("%s as %s", partExpression.expression, aliasQ) + } + + sql = g.R( + dbio.TypeDbDuckDb.GetTemplateValue("core.export_parquet_partitions"), + "table", fromTable, + "local_path", toLocalPath, + "file_size_bytes_expr", fileSizeBytesExpr, + "partition_expressions", strings.Join(partSqlExpressions, ", "), + "partition_columns", strings.Join(partSqlColumns, ", "), + "compression", cast.ToString(options.Compression), + "write_partition_columns", cast.ToString(options.WritePartitionCols), + ) + } else { + sql = g.R( + dbio.TypeDbDuckDb.GetTemplateValue("core.export_parquet"), + "table", fromTable, + "local_path", toLocalPath, + "file_size_bytes_expr", fileSizeBytesExpr, + "compression", cast.ToString(options.Compression), + ) + } + + return +} + // Quote quotes a column name func (duck *DuckDb) Quote(col string) (qName string) { qName = `"` + col + `"` diff --git a/core/dbio/templates/duckdb.yaml b/core/dbio/templates/duckdb.yaml index 9d610970..e399c87c 100755 --- a/core/dbio/templates/duckdb.yaml +++ b/core/dbio/templates/duckdb.yaml @@ -10,6 +10,29 @@ core: insert_option: "" modify_column: 'alter {column} type {type}' select_stream_scanner: select {fields} from {stream_scanner} {where} + export_parquet: | + COPY ( + select * + from {table} + ) TO '{local_path}' + ( + format 'parquet', {file_size_bytes_expr} + compression '{compression}' + ) + export_parquet_partitions: | + COPY ( + select + *, + {partition_expressions} + from {table} + ) TO '{local_path}' + ( + format 'parquet', {file_size_bytes_expr} + compression '{compression}', + overwrite true, + write_partition_columns {write_partition_columns}, + partition_by ( {partition_columns} ) + ) metadata: diff --git a/core/env/env.go b/core/env/env.go index bb0c5d3f..0f2c9a58 100755 --- a/core/env/env.go +++ b/core/env/env.go @@ -165,6 +165,12 @@ func LoadSlingEnvFile() (ef EnvFile) { } func LoadSlingEnvFileBody(body string) (ef EnvFile, err error) { + if body == "" { + return EnvFile{ + Connections: map[string]map[string]interface{}{}, + Variables: map[string]interface{}{}, + }, nil + } err = yaml.Unmarshal([]byte(body), &ef) return } diff --git a/core/sling/config.go b/core/sling/config.go index d68d1e2a..20228d0b 100644 --- a/core/sling/config.go +++ b/core/sling/config.go @@ -569,16 +569,20 @@ func (cfg *Config) Prepare() (err error) { } else if strings.Contains(cfg.Target.Object, "{") { words := []string{} for _, m := range g.Matches(cfg.Target.Object, `\{([^}]+)\}`) { - if len(m.Group) > 0 { + if len(m.Group) > 0 && !strings.HasPrefix(m.Group[0], "part_") { words = append(words, m.Group[0]) } } - g.Debug("Could not successfully format target object name. Blank values for: %s", strings.Join(words, ", ")) - for _, word := range words { - cfg.Target.Object = strings.ReplaceAll(cfg.Target.Object, "{"+word+"}", "") + if len(words) > 0 { + g.Debug("Could not successfully format target object name. Blank values for: %s", strings.Join(words, ", ")) + for _, word := range words { + cfg.Target.Object = strings.ReplaceAll(cfg.Target.Object, "{"+word+"}", "") + } + } + if cfg.ReplicationStream != nil { + cfg.ReplicationStream.Object = cfg.Target.Object } - cfg.ReplicationStream.Object = cfg.Target.Object } // add md5 of options, so that wee reconnect for various options @@ -1259,6 +1263,13 @@ type Target struct { columns iop.Columns `json:"-" yaml:"-"` } +func (t *Target) ObjectFileFormat() dbio.FileType { + if t.Options != nil && t.Options.Format != dbio.FileTypeNone { + return t.Options.Format + } + return filesys.InferFileFormat(t.Object) +} + func (t *Target) MD5() string { payload := g.Marshal([]any{ g.M("conn", t.Conn), diff --git a/core/sling/task.go b/core/sling/task.go index 3e2a40f0..506403e4 100644 --- a/core/sling/task.go +++ b/core/sling/task.go @@ -387,6 +387,15 @@ func (t *TaskExecution) Cleanup() { } } +// shouldWriteViaDuckDB determines whether we should use duckdb +// at the moment, use duckdb only for partitioned target parquet files +func (t *TaskExecution) shouldWriteViaDuckDB(uri string) bool { + if g.In(t.Config.Target.ObjectFileFormat(), dbio.FileTypeParquet) { + return len(extractPartFields(uri)) > 0 + } + return false +} + // isIncrementalWithUpdateKey means it has an update_key and is incremental mode func (t *TaskExecution) isIncrementalWithUpdateKey() bool { return t.Config.Source.HasUpdateKey() && t.Config.Mode == IncrementalMode diff --git a/core/sling/task_func.go b/core/sling/task_func.go index a86bbbc7..872c9ace 100644 --- a/core/sling/task_func.go +++ b/core/sling/task_func.go @@ -3,6 +3,7 @@ package sling import ( "math" "os" + "regexp" "strings" "time" @@ -94,6 +95,25 @@ func pullTargetTableColumns(cfg *Config, tgtConn database.Connection, force bool return cfg.Target.columns, nil } +// extractPartFields extract the partition fields from the given path +func extractPartFields(path string) []string { + // Regex pattern to match {part_*} fields + pattern := regexp.MustCompile(`{(part_[^}]+)}`) + + // Find all matches in the path + matches := pattern.FindAllStringSubmatch(path, -1) + + // Extract the captured groups (without braces) + result := make([]string, 0, len(matches)) + for _, match := range matches { + if len(match) > 1 { + result = append(result, match[1]) + } + } + + return result +} + func pullTargetTempTableColumns(cfg *Config, tgtConn database.Connection, force bool) (cols iop.Columns, err error) { if len(cfg.Target.columns) == 0 || force { cfg.Target.columns, err = tgtConn.GetColumns(cfg.Target.Options.TableTmp) diff --git a/core/sling/task_run_write.go b/core/sling/task_run_write.go index e823036f..bccfc23f 100644 --- a/core/sling/task_run_write.go +++ b/core/sling/task_run_write.go @@ -6,6 +6,7 @@ import ( "database/sql" "fmt" "os" + "path" "strings" "time" @@ -16,6 +17,7 @@ import ( "github.com/slingdata-io/sling-cli/core/dbio/database" "github.com/slingdata-io/sling-cli/core/dbio/filesys" "github.com/slingdata-io/sling-cli/core/dbio/iop" + "github.com/slingdata-io/sling-cli/core/env" "github.com/spf13/cast" ) @@ -51,9 +53,15 @@ func (t *TaskExecution) WriteToFile(cfg *Config, df *iop.Dataflow) (cnt uint64, // apply column casing applyColumnCasingToDf(df, fs.FsType(), t.Config.Target.Options.ColumnCasing) - bw, err = filesys.WriteDataflow(fs, df, uri) + // use duckdb for writing parquet + if t.shouldWriteViaDuckDB(uri) { + // push to temp duck file + bw, err = writeDataflowViaDuckDB(t, df, fs, uri) + } else { + bw, err = filesys.WriteDataflow(fs, df, uri) + } if err != nil { - err = g.Error(err, "Could not FileSysWriteDataflow") + err = g.Error(err, "Could not write") return cnt, err } cnt = df.Count() @@ -659,9 +667,7 @@ func prepareDataflow(t *TaskExecution, df *iop.Dataflow, tgtConn database.Connec // apply column casing applyColumnCasingToDf(df, tgtConn.GetType(), t.Config.Target.Options.ColumnCasing) - sampleData := iop.NewDataset(df.Columns) - sampleData.Rows = df.Buffer - sampleData.Inferred = df.Inferred + sampleData := df.BufferDataset() if !sampleData.Inferred { sampleData.SafeInference = true sampleData.InferColumnTypes() @@ -802,6 +808,97 @@ func truncateTable(t *TaskExecution, tgtConn database.Connection, tableName stri return nil } +// writeDataflowViaDuckDB is to use a temporary duckdb, especially for writing parquet files. +// duckdb has the best parquet file writer, also allows partitioning +func writeDataflowViaDuckDB(t *TaskExecution, df *iop.Dataflow, fs filesys.FileSysClient, uri string) (bw int64, err error) { + // push to temp duck file + var duckConn database.Connection + + tempTable, _ := database.ParseTableName("main.sling_parquet_temp", dbio.TypeDbDuckDb) + folder := path.Join(env.GetTempFolder(), "duckdb", g.RandSuffix(tempTable.Name, 3)) + defer env.RemoveAllLocalTempFile(folder) + + duckPath := env.CleanWindowsPath(path.Join(folder, "db")) + duckConn, err = database.NewConnContext(t.Context.Ctx, "duckdb://"+duckPath) + if err != nil { + err = g.Error(err, "Could not create temp duckdb connection") + return bw, err + } + defer duckConn.Close() + + // create table + _, err = createTableIfNotExists(duckConn, df.BufferDataset(), &tempTable, false) + if err != nil { + err = g.Error(err, "Could not create temp duckdb table") + return bw, err + } + + // insert into table + _, err = duckConn.BulkImportFlow(tempTable.Name, df) + if err != nil { + err = g.Error(err, "Could not write to temp duckdb table") + return bw, err + } + + // export to local file + if err = os.MkdirAll(folder, 0755); err != nil { + err = g.Error(err, "Could not create temp duckdb output folder") + return bw, err + } + + // get duckdb instance + duck := duckConn.(*database.DuckDbConn).DuckDb() + + copyOptions := iop.DuckDbCopyOptions{ + Compression: string(g.PtrVal(t.Config.Target.Options.Compression)), + PartitionFields: extractPartFields(uri), + PartitionKey: t.Config.Source.UpdateKey, + WritePartitionCols: true, + FileSizeBytes: g.PtrVal(t.Config.Target.Options.FileMaxBytes), + } + + if len(copyOptions.PartitionFields) > 0 && copyOptions.PartitionKey == "" { + return bw, g.Error("missing update_key in order to partition") + } + + // duckdb does not allow limiting by number of rows + if g.PtrVal(t.Config.Target.Options.FileMaxRows) > 0 { + return bw, g.Error("can no longer use file_max_rows to write to parquet (use file_max_bytes instead).") + } + + // if * is specified, set default FileSizeBytes, + if strings.Contains(uri, "*") && copyOptions.FileSizeBytes == 0 { + copyOptions.FileSizeBytes = 50 * 1024 * 1024 // 50MB default file size + } + + // generate sql for parquet export + localPath := env.CleanWindowsPath(path.Join(folder, "output")) + sql, err := duck.GenerateCopyStatement(tempTable.FullName(), localPath, copyOptions) + if err != nil { + err = g.Error(err, "Could not generate duckdb copy statement") + return bw, err + } + + _, err = duckConn.Exec(sql) + if err != nil { + err = g.Error(err, "Could not write to parquet file") + return bw, err + } + + // copy files bytes recursively to target + if strings.Contains(uri, "*") { + uri = filesys.GetDeepestParent(uri) // get target folder, since split by files + } + + // remove partition fields from url + for _, field := range copyOptions.PartitionFields { + uri = strings.ReplaceAll(uri, g.F("/{%s}", field), "") + } + bw, err = filesys.CopyFromLocalRecursive(fs, localPath, uri) + + return bw, err +} + func performUpsert(tgtConn database.Connection, tableTmp, targetTable database.Table, cfg *Config) error { tgtPrimaryKey := cfg.Source.PrimaryKey() if casing := cfg.Target.Options.ColumnCasing; casing != nil { From 9fdb5ff46c9d9151e568b78b6a8293cc0d0cfefe Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Sun, 1 Dec 2024 16:15:21 -0300 Subject: [PATCH 15/15] fix(dataflow): handle nil CurrentBatch in SetBatchLimit --- core/dbio/iop/dataflow.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/dbio/iop/dataflow.go b/core/dbio/iop/dataflow.go index c0c3af0a..e4a3db90 100644 --- a/core/dbio/iop/dataflow.go +++ b/core/dbio/iop/dataflow.go @@ -130,7 +130,9 @@ func (df *Dataflow) SetBatchLimit(limit int64) { defer df.mux.Unlock() for _, ds := range df.Streams { ds.Sp.Config.BatchLimit = limit - ds.CurrentBatch.Limit = limit + if ds.CurrentBatch != nil { + ds.CurrentBatch.Limit = limit + } } }