Skip to content

Commit

Permalink
Merge pull request #472 from slingdata-io/v1.3.4
Browse files Browse the repository at this point in the history
V1.3.4
  • Loading branch information
flarco authored Dec 27, 2024
2 parents 7f2ea7c + b822fb7 commit 9f2fdc5
Show file tree
Hide file tree
Showing 50 changed files with 1,012 additions and 240 deletions.
7 changes: 5 additions & 2 deletions cmd/sling/sling_cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ import (
func TestCLI(t *testing.T) {
bin := os.Getenv("SLING_BIN")
if bin == "" {
t.Fatalf("SLING_BIN environment variable is not set")
return
bin = "./sling"
if !g.PathExists(bin) {
t.Fatalf("SLING_BIN environment variable is not set")
return
}
}

p, err := process.NewProc("bash")
Expand Down
21 changes: 16 additions & 5 deletions cmd/sling/sling_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
totalBytes = uint64(0)
constraintFails = uint64(0)
lookupReplication = func(id string) (r sling.ReplicationConfig, e error) { return }

runReplication func(string, *sling.Config, ...string) error = replicationRun
)

func processRun(c *g.CliSC) (ok bool, err error) {
Expand Down Expand Up @@ -355,15 +357,17 @@ func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error

if cast.ToBool(cfg.Env["SLING_DRY_RUN"]) || cast.ToBool(os.Getenv("SLING_DRY_RUN")) {
return nil
} else if replication.FailErr != "" {
task.Status = sling.ExecStatusError
task.Err = g.Error(replication.FailErr)
}

// set log sink
env.LogSink = func(ll *g.LogLine) {
task.AppendOutput(ll)
}

sling.StoreInsert(task) // insert into store
defer sling.StoreUpdate(task) // update into store after
sling.StoreSet(task) // set into store

if task.Err != nil {
err = g.Error(task.Err)
Expand All @@ -373,6 +377,9 @@ func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error
// set context
task.Context = ctx

// set into store after
defer sling.StoreSet(task)

// run task
setTM()
err = task.Execute()
Expand Down Expand Up @@ -413,7 +420,7 @@ func runTask(cfg *sling.Config, replication *sling.ReplicationConfig) (err error
return nil
}

func runReplication(cfgPath string, cfgOverwrite *sling.Config, selectStreams ...string) (err error) {
func replicationRun(cfgPath string, cfgOverwrite *sling.Config, selectStreams ...string) (err error) {
startTime := time.Now()

replication, err := sling.LoadReplicationConfigFromFile(cfgPath)
Expand Down Expand Up @@ -482,7 +489,7 @@ func runReplication(cfgPath string, cfgOverwrite *sling.Config, selectStreams ..

// if a connection issue, stop
if e, ok := err.(*g.ErrType); ok && strings.Contains(e.Debug(), "Could not connect to ") {
break
replication.FailErr = g.ErrMsg(e)
}
} else {
successes++
Expand All @@ -500,7 +507,9 @@ func runReplication(cfgPath string, cfgOverwrite *sling.Config, selectStreams ..
failureStr = env.GreenString(failureStr)
}

g.Info("Sling Replication Completed in %s | %s -> %s | %s | %s\n", g.DurationString(delta), replication.Source, replication.Target, successStr, failureStr)
if streamCnt > 1 {
g.Info("Sling Replication Completed in %s | %s -> %s | %s | %s\n", g.DurationString(delta), replication.Source, replication.Target, successStr, failureStr)
}

return eG.Err()
}
Expand Down Expand Up @@ -537,6 +546,8 @@ func parsePayload(payload string, validate bool) (options map[string]any, err er

// setProjectID attempts to get the first sha of the repo
func setProjectID(cfgPath string) {
projectID = os.Getenv("SLING_PROJECT_ID")

if cfgPath == "" && !strings.HasPrefix(cfgPath, "{") {
return
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ func testSuite(t *testing.T, connType dbio.Type, testSelect ...string) {
env, _ := g.UnmarshalMap(cast.ToString(rec["env"]))

if val := cast.ToString(rec["source_primary_key"]); val != "" {
if g.In(connType, dbio.TypeDbStarRocks) {
val = "id" // starrocks can't have a decimal as PK
}
streamConfig["primary_key"] = strings.Split(val, ",")
}
if val := cast.ToString(rec["source_update_key"]); val != "" {
Expand Down Expand Up @@ -1094,7 +1097,7 @@ streams:
// Fifth Stream: file://tests/files/*.csv
// wildcard expanded
config := replication.Tasks[4]
assert.True(t, strings.HasPrefix(config.Source.Stream, "tests/files/"))
assert.True(t, strings.HasPrefix(config.Source.Stream, "file://tests/files/"))
assert.NotEqual(t, config.Source.Stream, "tests/files/*.csv")
assert.Equal(t, `"my_schema3"."table3"`, config.Target.Object)
// g.Info(g.Pretty(config))
Expand Down
7 changes: 3 additions & 4 deletions cmd/sling/sling_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,16 @@ func checkUpdate(force bool) {

func getSlingPackage() string {
slingPackage := strings.ToLower(os.Getenv("SLING_PACKAGE"))
execFileName, _ := osext.Executable()
switch {
case slingPackage != "":
_ = slingPackage
case os.Getenv("SLING_SOURCE") != "" && os.Getenv("SLING_TARGET") != "":
slingPackage = "dagster"
case strings.Contains(execFileName, "homebrew"):
case strings.Contains(env.Executable, "homebrew"):
slingPackage = "homebrew"
case strings.Contains(execFileName, "scoop"):
case strings.Contains(env.Executable, "scoop"):
slingPackage = "scoop"
case strings.Contains(execFileName, "python") || strings.Contains(execFileName, "virtualenvs"):
case strings.Contains(env.Executable, "python") || strings.Contains(env.Executable, "virtualenvs"):
slingPackage = "python"
default:
slingPackage = "binary"
Expand Down
8 changes: 6 additions & 2 deletions cmd/sling/tests/replications/r.05.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
source: OCRAL_R2
target: SQLITE
target: POSTGRES

defaults:
object: 'main.{stream_file_name}_slack_data'
object: 'public.{stream_file_name}_slack_data'
source_options:
flatten: true
mode: full-refresh
Expand All @@ -20,3 +20,7 @@ streams:
s3://ocral/mlo.community.test/jobs/:
s3://ocral/mlo.community.test/leadership/:
s3://ocral/mlo.community.test/random/:

env:
SLING_THREADS: 5
SLING_RETRIES: 2
18 changes: 18 additions & 0 deletions cmd/sling/tests/replications/r.17.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
source: postgres
target: aws_s3

defaults:
mode: full-refresh

streams:
public.test1k_postgres_pg:
update_key: update_dt
primary_key: id
mode: incremental
object: test/{stream_name}_{format}/{part_year}/{part_month}
target_options:
format: '{format}'

env:
SLING_STATE: AWS_S3/state/r.17/${FORMAT}
format: ${FORMAT}
14 changes: 9 additions & 5 deletions cmd/sling/tests/suite.cli.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ n test_name rows bytes streams fails output_contains command
13 Run sling with echo input and empty allowed 0 6 execution succeeded echo 'a,b,c' | SLING_ALLOW_EMPTY=true sling run --tgt-object file:///tmp/test.csv
14 Run sling with POSTGRES source and CSV output 18 sling run --src-conn POSTGRES --src-stream public.my_table --stdout > /tmp/my_table.csv
15 Run sling with POSTGRES source and CSV target 18 sling run --src-conn POSTGRES --src-stream public.my_table --tgt-object file:///tmp/my_table.csv
16 Run sling with POSTGRES source and select columns 2 sling run --src-conn POSTGRES --src-stream public.my_table --stdout --select 'id' -l 2
17 Run sling with POSTGRES source and exclude columns 2 sling run --src-conn POSTGRES --src-stream public.my_table --stdout --select '-id' -l 2
16 Run sling with POSTGRES source and select columns 2 id,email sling run --src-conn POSTGRES --src-stream public.my_table --stdout --select 'id,email' -l 2
17 Run sling with POSTGRES source and exclude columns 2 first_name,last_name,email,target,create_dt sling run --src-conn POSTGRES --src-stream public.my_table --stdout --select '-id' -l 2
18 Run sling with gzipped CSV source and POSTGRES target with ignore existing 0 execution succeeded cat cmd/sling/tests/files/test1.1.csv.gz | sling run --tgt-conn POSTGRES --tgt-object public.my_table --mode full-refresh --tgt-options 'ignore_existing: true'
19 Run sling with POSTGRES source and CSV target with ignore existing 0 execution succeeded sling run --src-conn POSTGRES --src-stream public.my_table --tgt-object file:///tmp/my_table.csv --tgt-options 'ignore_existing: true'
20 Run sling with binary CSV source and POSTGRES target 1 sling run --src-stream file://cmd/sling/tests/files/binary/test.bytes.csv --tgt-conn postgres --tgt-object public.my_table_bytes
Expand Down Expand Up @@ -54,6 +54,10 @@ n test_name rows bytes streams fails output_contains command
53 Run sling with empty input 0 execution succeeded echo '' | sling run --stdout
54 Run sling with CSV source and single quote 3 "sling run --src-conn LOCAL --src-stream file://cmd/sling/tests/files/test7.csv --src-options '{ delimiter: ""|"", quote: ""'\''"", escape: ""\\"" }' --stdout > /dev/null"
55 Run sling with CSV source and $symbol quote 3 "sling run --src-conn LOCAL --src-stream file://cmd/sling/tests/files/test8.csv --src-options '{ delimiter: ""|"", quote: ""$"", escape: ""\\"" }' --stdout > /dev/null"
56 Run sling with direct insert full-refresh >10 >1 streaming data (direct insert) SLING_DIRECT_INSERT=true sling run --src-conn postgres --src-stream public.test1k_postgres_pg --tgt-conn mysql --tgt-object 'mysql.{stream_schema}_{stream_table}' --mode full-refresh
57 Run sling with direct insert incremental 0 Nothing to insert|streaming data (direct insert) SLING_DIRECT_INSERT=true sling run --src-conn postgres --src-stream public.test1k_postgres_pg --tgt-conn mysql --tgt-object 'mysql.{stream_schema}_{stream_table}' --mode incremental --update-key create_dt
58 Run sling writing to partitioned parquet 1000 partition_by ( sling run --src-stream file://cmd/sling/tests/files/test1.csv --tgt-object 'file:///tmp/sling/output8/{part_year}/{part_month}' -d --tgt-options '{ format: parquet }' --update-key create_dt
56 Run sling with direct insert full-refresh >10 >1 streaming data (direct insert) SLING_DIRECT_INSERT=true sling run --src-conn postgres --src-stream public.test1k_postgres_pg --tgt-conn mysql --tgt-object 'mysql.public_test1k_postgres_pg' --mode full-refresh
57 Run sling with incremental (delete missing soft) 0 and not exists ( sling run -d --src-conn postgres --src-stream 'select * from public.test1k_postgres_pg where {incremental_where_cond} limit 900' --tgt-conn mysql --tgt-object 'mysql.public_test1k_postgres_pg' --mode incremental --primary-key id --update-key create_dt --tgt-options '{ delete_missing: soft }'
58 Run sling with incremental (delete missing hard) 0 and not exists ( sling run -d --src-conn postgres --src-stream 'select * from public.test1k_postgres_pg where {incremental_where_cond} limit 900' --tgt-conn mysql --tgt-object 'mysql.public_test1k_postgres_pg' --mode incremental --primary-key id --update-key create_dt --tgt-options '{ delete_missing: hard }'
59 Run sling writing to partitioned parquet (local) 1000 partition_by (|create_dt_year=2018 rm -rf /tmp/sling/output8 ; sling run --src-stream file://cmd/sling/tests/files/test1.csv --tgt-object 'file:///tmp/sling/output8/{part_year}/{part_month}' -d --tgt-options '{ format: parquet }' --update-key create_dt ; ls -l /tmp/sling/output8
60 Run sling writing to partitioned parquet (aws) 1002 partition_by ( FORMAT=parquet sling run -d -r cmd/sling/tests/replications/r.17.yaml --mode full-refresh
61 Run sling with incremental writing to partitioned parquet (aws) 40 partition_by ( FORMAT=parquet sling run -d -r cmd/sling/tests/replications/r.17.yaml
62 Run sling writing to partitioned csv (aws) 1002 partition_by ( FORMAT=csv sling run -d -r cmd/sling/tests/replications/r.17.yaml --mode full-refresh
4 changes: 2 additions & 2 deletions cmd/sling/tests/suite.db.template.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ n test_name source_conn source_stream source_options stream_config target_conn t
15 sql_incremental_into_postgres [conn] select * from [schema].[table] where {incremental_where_cond} "{""limit"": 10000}" {} postgres public.[table]_pg incremental id update_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", , ""validation_stream_row_count"": "">0""}"
16 table_backfill_into_postgres [conn] [schema].[table] "{""range"":""2020-01-01,2021-01-01""}" {} postgres public.[table]_pg backfill id create_dt {}
17 table_full_refresh_from_postgres postgres public.[table]_pg_orig {} {} [conn] [schema].[table]_pg full-refresh id "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_row_count"": "">999"", ""validation_types"": {""code"":""decimal"",""create_dt"":""timestamp"",""date"":""date"",""rating"":""decimal"",""target"":""bool"",""update_dt"":""timestampz""}}"
18 table_incremental_from_postgres postgres public.[table]_pg {} {} [conn] [schema].[table]_pg incremental id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": false}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_stream_row_count"": "">0""}"
18 table_incremental_from_postgres postgres public.[table]_pg {} {} [conn] [schema].[table]_pg incremental id,code create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": false, ""delete_missing"": ""soft""}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_stream_row_count"": "">0""}"
19 view_full_refresh_from_postgres postgres public.[table]_pg_vw "{""columns"": {""first_name"": ""string(100)""}}" {} [conn] [schema].[table]_vw_pg full-refresh "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_row_count"": "">0""}"
20 sql_full_refresh_from_postgres postgres select * from public.[table]_pg_orig where 1=1 {} {} [conn] [schema].[table]_pg full-refresh id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": true}" "{""validation_row_count"": "">900""}"
21 sql_incremental_from_postgres postgres select * from public.[table]_pg where {incremental_where_cond} {} {} [conn] [schema].[table]_pg incremental id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": false, ""post_sql"": ""{drop_view}""}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_row_count"": "">0"", ""validation_stream_row_count"": "">0""}"
21 sql_incremental_from_postgres postgres select * from public.[table]_pg where {incremental_where_cond} {} {} [conn] [schema].[table]_pg incremental id create_dt "{""adjust_column_type"":true, ""add_new_columns"":true, ""use_bulk"": false, ""post_sql"": ""{drop_view}"", ""delete_missing"": ""hard""}" "{""validation_file"": ""file://tests/files/test1.result.csv"", ""validation_cols"": ""0,1,2,3,4,6"", ""validation_row_count"": "">0"", ""validation_stream_row_count"": "">0""}"
22 table_backfill_from_postgres postgres public.[table]_pg "{""range"":""2020-01-01,2021-01-01""}" {} [conn] [schema].[table]_pg backfill id create_dt "{""post_sql"": ""drop table [schema].[table] ; drop table [schema].[table]_pg; drop table [schema].[table]_vw_pg""}" "{""validation_stream_row_count"": "">0""}"
23 discover_schemas [conn] discover "{""level"": ""schema"", ""validation_row_count"": "">0""}"
24 discover_tables [conn] [schema].* discover "{""level"": ""table"", ""validation_row_count"": "">0""}"
Expand Down
26 changes: 17 additions & 9 deletions core/dbio/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (c *Connection) URL() string {
// Close closes the connection
func (c *Connection) Close() error {
// remove from cache
connCache.Remove(c.Name)
defer connCache.Remove(c.Hash())

if c.Database != nil {
return c.Database.Close()
Expand All @@ -265,64 +265,72 @@ func (c *Connection) Close() error {
var connCache = cmap.New[*Connection]()

func (c *Connection) AsDatabase(cache ...bool) (dc database.Connection, err error) {
return c.AsDatabaseContext(c.Context().Ctx, cache...)
}

func (c *Connection) AsDatabaseContext(ctx context.Context, cache ...bool) (dc database.Connection, err error) {
if !c.Type.IsDb() {
return nil, g.Error("not a database type: %s", c.Type)
}

// default cache to true
if len(cache) == 0 || (len(cache) > 0 && cache[0]) {
if cc, ok := connCache.Get(c.Name); ok {
if cc, ok := connCache.Get(c.Hash()); ok {
if cc.Database != nil {
return cc.Database, nil
}
}

if c.Database == nil {
c.Database, err = database.NewConnContext(
c.Context().Ctx, c.URL(), g.MapToKVArr(c.DataS())...,
ctx, c.URL(), g.MapToKVArr(c.DataS())...,
)
if err != nil {
return
}
connCache.Set(c.Name, c) // cache
connCache.Set(c.Hash(), c) // cache
}

return c.Database, nil
}

return database.NewConnContext(
c.Context().Ctx, c.URL(), g.MapToKVArr(c.DataS())...,
ctx, c.URL(), g.MapToKVArr(c.DataS())...,
)
}

func (c *Connection) AsFile(cache ...bool) (fc filesys.FileSysClient, err error) {
return c.AsFileContext(c.Context().Ctx, cache...)
}

func (c *Connection) AsFileContext(ctx context.Context, cache ...bool) (fc filesys.FileSysClient, err error) {
if !c.Type.IsFile() {
return nil, g.Error("not a file system type: %s", c.Type)
}

// default cache to true
if len(cache) == 0 || (len(cache) > 0 && cache[0]) {
if cc, ok := connCache.Get(c.Name); ok {
if cc, ok := connCache.Get(c.Hash()); ok {
if cc.File != nil {
return cc.File, nil
}
}

if c.File == nil {
c.File, err = filesys.NewFileSysClientFromURLContext(
c.Context().Ctx, c.URL(), g.MapToKVArr(c.DataS())...,
ctx, c.URL(), g.MapToKVArr(c.DataS())...,
)
if err != nil {
return
}
connCache.Set(c.Name, c) // cache
connCache.Set(c.Hash(), c) // cache
}

return c.File, nil
}

return filesys.NewFileSysClientFromURLContext(
c.Context().Ctx, c.URL(), g.MapToKVArr(c.DataS())...,
ctx, c.URL(), g.MapToKVArr(c.DataS())...,
)
}

Expand Down
7 changes: 7 additions & 0 deletions core/dbio/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ func NewConnContext(ctx context.Context, URL string, props ...string) (Connectio

err = conn.Init()

// set sling_conn_id
conn.SetProp("sling_conn_id", g.RandSuffix(g.F("conn-%s-", conn.GetType()), 3))

return conn, err
}

Expand Down Expand Up @@ -1109,6 +1111,11 @@ func (conn *BaseConn) ExecMulti(sqls ...string) (result sql.Result, err error) {

// ExecContext runs a sql query with context, returns `error`
func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) {
err = reconnectIfClosed(conn)
if err != nil {
err = g.Error(err, "Could not reconnect")
return
}

if strings.TrimSpace(q) == "" {
g.Warn("Empty Query")
Expand Down
4 changes: 2 additions & 2 deletions core/dbio/database/database_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ func (conn *ClickhouseConn) GenerateUpsertSQL(srcTable string, tgtTable string,
}

sqlTempl := `
ALTER TABLE {tgt_table}
DELETE where ({pk_fields}) in (
alter table {tgt_table}
delete where ({pk_fields}) in (
select {pk_fields}
from {src_table} src
)
Expand Down
Loading

0 comments on commit 9f2fdc5

Please sign in to comment.