Skip to content

Commit

Permalink
Merge pull request #410 from slingdata-io/v1.2.22
Browse files Browse the repository at this point in the history
v1.2.22
  • Loading branch information
flarco authored Oct 24, 2024
2 parents 73216e1 + 2a1975b commit 6b272de
Show file tree
Hide file tree
Showing 32 changed files with 571 additions and 410 deletions.
43 changes: 0 additions & 43 deletions .github/workflows/build-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,6 @@ env:
GOPRIVATE: github.com/slingdata-io/*

jobs:
build:
runs-on: [self-hosted, linux, ubuntu-20]
timeout-minutes: 5

outputs:
VERSION: ${{ steps.get_version.outputs.VERSION }}

steps:
- uses: actions/checkout@v2

- name: Set up GoLang
uses: actions/setup-go@v3
with:
go-version: "1.22"
cache: false

- name: Get the version
id: get_version
run: |
git pull --ff-only
git tag -l --sort=-creatordate | head -n 9
TAG=$(git tag -l --sort=-creatordate | head -n 1)
VERSION=$(echo $TAG | sed 's/v//')
echo "VERSION -> $VERSION"
echo ::set-output name=VERSION::$VERSION
- id: step1
name: Build
env:
VERSION: ${{ steps.get_version.outputs.VERSION }}
run: |
bash scripts/prep.gomod.sh
export WORK_FOLDER=/__/work/sling-cli/${GITHUB_RUN_NUMBER}
mkdir -p $WORK_FOLDER
cp -r . $WORK_FOLDER/sling
sudo chmod -R 777 $WORK_FOLDER
docker run --rm -v /__/docker-data/devbox/root/go:/root/go -v /__:/__ -v /tmp:/tmp -v $WORK_FOLDER:/work --workdir /work/sling flarco/devbox:base bash scripts/build.test.sh $VERSION
sudo rm -rf $WORK_FOLDER/sling
echo $VERSION
VERSION=$(/__/bin/sling --version | sed 's/Version: //')
echo ::set-output name=version::$VERSION

release-python:
needs: [ release-linux-amd64, release-brew, release-scoop ]
Expand Down
55 changes: 49 additions & 6 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ env:

jobs:

build-test:
build-test-sling:
if: "! (contains(github.event.head_commit.message, '[skip ci]') || contains(github.event.head_commit.message, '[bump]'))"

runs-on: [self-hosted, linux, ubuntu-20]
timeout-minutes: 25
timeout-minutes: 40

steps:
- uses: actions/checkout@v2

- uses: myrotvorets/set-commit-status-action@master
with:
token: ${{ secrets.GITHUB_TOKEN }}
Expand Down Expand Up @@ -65,17 +66,59 @@ jobs:
bash scripts/test.sh
build-test-dbio:
if: "! (contains(github.event.head_commit.message, '[skip ci]') || contains(github.event.head_commit.message, '[bump]'))"

runs-on: [self-hosted, linux, ubuntu-20]
timeout-minutes: 40

steps:
- uses: actions/checkout@v2

- name: Set up GoLang
uses: actions/setup-go@v3
with:
go-version: "1.22"
cache: false

- name: Load Secrets
uses: flarco/infisical-action@v3
with:
version: 0.28.1
client_id: ${{ secrets.INFISICAL_CLIENT_ID }}
client_secret: ${{ secrets.INFISICAL_CLIENT_SECRET }}

- name: Load Secrets (dbio)
uses: flarco/infisical-action@v3
with:
version: 0.28.1
client_id: ${{ secrets.INFISICAL_CLIENT_ID }}
client_secret: ${{ secrets.INFISICAL_CLIENT_SECRET }}
path: /dbio
env: dev

- name: Build Binary
run: |
# Prep
bash scripts/ci/prep.linux.sh
# build
bash scripts/ci/build.linux.sh dev
- name: Run Go Tests (dbio)
run: |
export DEBUG=''
# Oracle env
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH
export PATH="$PATH:$ORACLE_HOME/bin"
cd core/dbio
bash scripts/test.sh
build-test-success:

runs-on: [self-hosted, linux, ubuntu-20]
needs: [ build-test-sling, build-test-dbio ]

steps:
- uses: actions/checkout@v2

- uses: myrotvorets/set-commit-status-action@master
if: always()
Expand Down
2 changes: 2 additions & 0 deletions cmd/sling/tests/suite.cli.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,5 @@ n test_name rows bytes streams fails output_contains command
53 Run sling with empty input 0 execution succeeded echo '' | sling run --stdout
54 Run sling with CSV source and single quote 3 "sling run --src-conn LOCAL --src-stream file://cmd/sling/tests/files/test7.csv --src-options '{ delimiter: ""|"", quote: ""'\''"", escape: ""\\"" }' --stdout > /dev/null"
55 Run sling with CSV source and $symbol quote 3 "sling run --src-conn LOCAL --src-stream file://cmd/sling/tests/files/test8.csv --src-options '{ delimiter: ""|"", quote: ""$"", escape: ""\\"" }' --stdout > /dev/null"
56 Run sling with direct insert full-refresh >10 >1 streaming data (direct insert) SLING_DIRECT_INSERT=true sling run -r cmd/sling/tests/replications/r.09.yaml
57 Run sling with direct insert incremental 0 Nothing to insert|streaming data (direct insert) SLING_DIRECT_INSERT=true sling run --src-conn postgres --src-stream public.test1k --tgt-conn snowflake --tgt-object 'public.{stream_schema}_{stream_table}' --mode incremental --update-key create_date
10 changes: 0 additions & 10 deletions core/dbio/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2053,16 +2053,6 @@ func (conn *BaseConn) CastColumnsForSelect(srcColumns iop.Columns, tgtColumns io
// It will return quoted field names as `newColNames`, the same length as `colNames`
func (conn *BaseConn) ValidateColumnNames(tgtCols iop.Columns, colNames []string, quote bool) (newCols iop.Columns, err error) {

tgtFields := map[string]string{}
for _, colName := range tgtCols.Names() {
colName = conn.Self().Unquote(colName)
if quote {
tgtFields[strings.ToLower(colName)] = conn.Self().Quote(colName)
} else {
tgtFields[strings.ToLower(colName)] = colName
}
}

mismatches := []string{}
for _, colName := range colNames {
newCol := tgtCols.GetColumn(colName)
Expand Down
10 changes: 5 additions & 5 deletions core/dbio/database/database_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (conn *BigQueryConn) importViaLocalStorage(tableFName string, df *iop.Dataf
return
}

localPath := path.Join(env.GetTempFolder(), "bigquery", tableFName, g.NowFileStr())
localPath := path.Join(env.GetTempFolder(), "bigquery", env.CleanTableName(tableFName), g.NowFileStr())
err = filesys.Delete(fs, localPath)
if err != nil {
return count, g.Error(err, "Could not Delete: "+localPath)
Expand All @@ -586,8 +586,8 @@ func (conn *BigQueryConn) importViaLocalStorage(tableFName string, df *iop.Dataf
fileReadyChn := make(chan filesys.FileReady, 10)

go func() {
fs.SetProp("null_as", `\N`)
_, err = fs.WriteDataflowReady(df, localPath, fileReadyChn, iop.DefaultStreamConfig())
config := iop.LoaderStreamConfig(true)
_, err = fs.WriteDataflowReady(df, localPath, fileReadyChn, config)

if err != nil {
df.Context.CaptureErr(g.Error(err, "error writing dataflow to local storage: "+localPath))
Expand Down Expand Up @@ -675,8 +675,8 @@ func (conn *BigQueryConn) importViaGoogleStorage(tableFName string, df *iop.Data
fileReadyChn := make(chan filesys.FileReady, 10)

go func() {
fs.SetProp("null_as", `\N`)
_, err = fs.WriteDataflowReady(df, gcsPath, fileReadyChn, iop.DefaultStreamConfig())
config := iop.LoaderStreamConfig(true)
_, err = fs.WriteDataflowReady(df, gcsPath, fileReadyChn, config)

if err != nil {
g.LogError(err, "error writing dataflow to google storage: "+gcsPath)
Expand Down
8 changes: 2 additions & 6 deletions core/dbio/database/database_duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) (
return
}

folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", g.NowFileStr())
folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", env.CleanTableName(tableFName), g.NowFileStr())
fileReadyChn := make(chan filesys.FileReady, 3)

go func() {
Expand All @@ -188,11 +188,7 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) (
config.Escape = `"`
config.Quote = `"`
config.NullAs = `\N`

timestampZLayout := conn.Type.GetTemplateValue("variable.timestampz_layout")
// config.DatetimeFormat = timestampZLayout // no effect since overwritten
// see ds.SetConfig(fs.Props()) in fs.go
fs.SetProp("datetime_format", timestampZLayout)
config.DatetimeFormat = conn.Type.GetTemplateValue("variable.timestampz_layout")

_, err = fs.WriteDataflowReady(df, folderPath, fileReadyChn, config)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions core/dbio/database/database_duckdb_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (conn *DuckDbConn) importViaNamedPipe(tableFName string, df *iop.Dataflow)
}

// Create a named pipe
folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", g.NowFileStr())
folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", env.CleanTableName(tableFName), g.NowFileStr())
if err = os.MkdirAll(folderPath, 0755); err != nil {
return 0, g.Error(err, "could not create temp folder: %s", folderPath)
}
Expand Down Expand Up @@ -70,9 +70,7 @@ func (conn *DuckDbConn) importViaNamedPipe(tableFName string, df *iop.Dataflow)

tbw := int64(0)
for ds := range df.StreamCh {
ds.Sp.Config = config

for batchR := range ds.NewCsvReaderChnl(0, 0) {
for batchR := range ds.NewCsvReaderChnl(config) {
bw, err := io.Copy(bufWriter, batchR.Reader)
if err != nil {
err = g.Error(err, "Error writing from reader")
Expand Down
2 changes: 1 addition & 1 deletion core/dbio/database/database_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (co
)

proc.Stderr = &stderr
proc.Stdin = ds.NewCsvReader(0, 0)
proc.Stdin = ds.NewCsvReader(iop.DefaultStreamConfig())

err = proc.Run()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions core/dbio/database/database_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui
}

// write to ctlPath
ctlPath := path.Join(env.GetTempFolder(), g.NewTsID("oracle.data.sqlldr")+".ctl")
ctlPath := path.Join(env.GetTempFolder(), g.NewTsID(g.F("oracle.%s.sqlldr", env.CleanTableName(tableFName)))+".ctl")
ctlStr := g.R(
conn.BaseConn.GetTemplateValue("core.sqlldr"),
"table", tableFName,
Expand Down Expand Up @@ -310,8 +310,8 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui
postUpdates := cmap.New[int]()

if runtime.GOOS == "windows" {
dataPath = path.Join(env.GetTempFolder(), g.NewTsID("oracle.data.temp")+".csv")
logPath = path.Join(env.GetTempFolder(), g.NewTsID("oracle.log.temp"))
dataPath = path.Join(env.GetTempFolder(), g.NewTsID(g.F("oracle.%s", env.CleanTableName(tableFName)))+".temp.csv")
logPath = path.Join(env.GetTempFolder(), g.NewTsID(g.F("oracle.%s", env.CleanTableName(tableFName)))+".log")

file, err := os.Create(dataPath)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/dbio/database/database_prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func (conn *PrometheusConn) getNewClient(timeOut ...int) (client v1.API, err err
}

if token := conn.GetProp("token"); token != "" {
rt = config.NewAuthorizationCredentialsRoundTripper("Bearer", config.Secret(token), rt)
rt = config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(token), rt)
}

if user := conn.GetProp("user"); user != "" {
rt = config.NewBasicAuthRoundTripper(user, config.Secret(conn.GetProp("password")), "", "", rt)
rt = config.NewBasicAuthRoundTripper(config.NewFileSecret(user), config.NewInlineSecret(conn.GetProp("password")), rt)
}

c, err := api.NewClient(api.Config{
Expand Down
9 changes: 4 additions & 5 deletions core/dbio/database/database_snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (co
}

// Write the ds to a temp file
folderPath := path.Join(env.GetTempFolder(), "snowflake", "put", tableFName, g.NowFileStr())
folderPath := path.Join(env.GetTempFolder(), "snowflake", "put", env.CleanTableName(tableFName), g.NowFileStr())

// delete folder when done
df.Defer(func() { env.RemoveAllLocalTempFile(folderPath) })
Expand All @@ -748,8 +748,8 @@ func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (co
return
}

fs.SetProp("null_as", `\N`)
_, err = fs.WriteDataflowReady(df, folderPath, fileReadyChn, iop.DefaultStreamConfig())
config := iop.LoaderStreamConfig(true)
_, err = fs.WriteDataflowReady(df, folderPath, fileReadyChn, config)

if err != nil {
df.Context.CaptureErr(g.Error(err, "Error writing dataflow to disk: "+folderPath))
Expand All @@ -759,15 +759,14 @@ func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (co
}()

// Import to staging
stageFolderPath := g.F("@%s.%s/%s/%s", conn.GetProp("schema"), conn.GetProp("internalStage"), tableFName, g.NowFileStr())
stageFolderPath := g.F("@%s.%s/%s/%s", conn.GetProp("schema"), conn.GetProp("internalStage"), env.CleanTableName(tableFName), g.NowFileStr())
conn.Exec("USE SCHEMA " + conn.GetProp("schema"))
_, err = conn.Exec("REMOVE " + stageFolderPath)
if err != nil {
err = g.Error(err, "REMOVE: "+stageFolderPath)
return
}
df.Defer(func() {
_, err := conn.Exec("REMOVE " + stageFolderPath)
if err != nil && strings.Contains(err.Error(), "transaction") {
conn.tx = nil // clear any failed transactions
conn.Exec("REMOVE " + stageFolderPath)
Expand Down
23 changes: 11 additions & 12 deletions core/dbio/database/database_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,18 @@ func (conn *SQLiteConn) BulkImportStream(tableFName string, ds *iop.Datastream)
sameCols := g.Marshal(ds.Columns.Names(true, true)) == g.Marshal(columns.Names(true, true))

// write to temp CSV
csvPath := path.Join(env.GetTempFolder(), g.NewTsID("sqlite.temp")+".csv")
sqlPath := path.Join(env.GetTempFolder(), g.NewTsID("sqlite.temp")+".sql")
csvPath := path.Join(env.GetTempFolder(), g.NewTsID(g.F("sqlite.%s", env.CleanTableName(tableFName)))+".temp.csv")
sqlPath := path.Join(env.GetTempFolder(), g.NewTsID(g.F("sqlite.%s", env.CleanTableName(tableFName)))+".temp.sql")

// set header. not needed if not creating a temp table
cfgMap := ds.GetConfig()
cfgMap["delimiter"] = ","
cfgMap["bool_at_int"] = "true"
cfgMap["header"] = lo.Ternary(sameCols, "false", "true")
cfgMap["datetime_format"] = conn.GetProp("datetime_format")
if strings.ToLower(cfgMap["datetime_format"]) == "auto" {
cfgMap["datetime_format"] = "2006-01-02 15:04:05.000Z"
cfg := iop.DefaultStreamConfig()
cfg.Delimiter = ","
cfg.BoolAsInt = true
cfg.Header = lo.Ternary(sameCols, false, true)
cfg.DatetimeFormat = conn.GetProp("datetime_format")
if strings.ToLower(cfg.DatetimeFormat) == "auto" || cfg.DatetimeFormat == "" {
cfg.DatetimeFormat = "2006-01-02 15:04:05.000Z"
}
ds.SetConfig(cfgMap)

if runtime.GOOS == "windows" {
fs, err := filesys.NewFileSysClient(dbio.TypeFileLocal)
Expand All @@ -174,7 +173,7 @@ func (conn *SQLiteConn) BulkImportStream(tableFName string, ds *iop.Datastream)
return 0, err
}

_, err = fs.Write("file://"+csvPath, ds.NewCsvReader(0, 0))
_, err = fs.Write("file://"+csvPath, ds.NewCsvReader(cfg))
if err != nil {
err = g.Error(err, "could not write to temp file")
return 0, err
Expand All @@ -183,7 +182,7 @@ func (conn *SQLiteConn) BulkImportStream(tableFName string, ds *iop.Datastream)

} else {
csvPath = "/dev/stdin"
cmd.Stdin = ds.NewCsvReader(0, 0)
cmd.Stdin = ds.NewCsvReader(cfg)
}

tempTable := g.RandSuffix("temp_", 4)
Expand Down
Loading

0 comments on commit 6b272de

Please sign in to comment.