Skip to content

Commit

Permalink
Merge pull request #303 from slingdata-io/v1.2.11
Browse files Browse the repository at this point in the history
V1.2.11
  • Loading branch information
flarco authored Jun 4, 2024
2 parents b38e6c0 + 439c05f commit 58465e1
Show file tree
Hide file tree
Showing 31 changed files with 162 additions and 69 deletions.
34 changes: 31 additions & 3 deletions cmd/sling/sling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ func TestSuiteDatabasePostgres(t *testing.T) {

// func TestSuiteDatabaseRedshift(t *testing.T) {
// t.Parallel()
// os.Setenv("SAMPLE_SIZE", "1200") // adjust_column_type does not work in Redshift
// testSuite(t, dbio.TypeDbRedshift)
// }

Expand Down Expand Up @@ -746,8 +747,16 @@ streams:
file_max_rows: 0
post_sql: ""
disabled: true
file://tests/files/parquet/*.parquet:
single: true
object: my_schema3.table3
file://tests/files/*.csv:
object: my_schema3.table3
`
replication, err := sling.LoadReplicationConfig(replicationCfg)
replication, err := sling.LoadReplicationConfig(strings.ReplaceAll(replicationCfg, "\t", " "))
if !g.AssertNoError(t, err) {
return
}
Expand All @@ -757,7 +766,7 @@ streams:
return
}

if !assert.Len(t, taskConfigs, 3) {
if !assert.GreaterOrEqual(t, len(taskConfigs), 5) {
return
}

Expand Down Expand Up @@ -790,14 +799,15 @@ streams:
assert.Equal(t, g.Bool(true), config.Source.Options.TrimSpace)
assert.Equal(t, "|", config.Source.Options.Delimiter)

assert.Equal(t, "my_schema2.table2", config.Target.Object)
assert.Equal(t, g.Bool(true), config.Target.Options.AddNewColumns)
assert.EqualValues(t, g.Int64(600000), config.Target.Options.FileMaxRows)
assert.EqualValues(t, g.String("some sql"), config.Target.Options.PostSQL)
assert.EqualValues(t, false, config.ReplicationStream.Disabled)
}

{
// Second Stream: stream_2
// Third Stream: stream_2
config := taskConfigs[2]
assert.Equal(t, "stream_2", config.Source.Stream)
assert.Equal(t, []string{}, config.Source.Select)
Expand All @@ -808,6 +818,24 @@ streams:
assert.EqualValues(t, true, config.ReplicationStream.Disabled)
}

{
// Fourth Stream: file://tests/files/parquet/*.parquet
// single, wildcard not expanded
config := taskConfigs[3]
assert.Equal(t, config.Source.Stream, "file://tests/files/parquet/*.parquet")
assert.Equal(t, "my_schema3.table3", config.Target.Object)
}

{
// Fifth Stream: file://tests/files/*.csv
// wildcard expanded
config := taskConfigs[4]
assert.True(t, strings.HasPrefix(config.Source.Stream, "file://tests/files/"))
assert.NotEqual(t, config.Source.Stream, "file://tests/files/*.csv")
assert.Equal(t, "my_schema3.table3", config.Target.Object)
// g.Info(g.Pretty(config))
}

// g.Debug(g.Pretty(taskConfigs))
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/sling/tests/replications/r.08.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ defaults:
object: 'main.do_{stream_file_name}'

streams:
s3://ocral/test.fs.write/*:
s3://ocral/test.fs.write/*:
s3://ocral/test.fs.write/*.csv:
single: true
3 changes: 1 addition & 2 deletions core/dbio/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ func (conn *BaseConn) GetURL(newURL ...string) string {
// Init initiates the connection object & add default port if missing
func (conn *BaseConn) Init() (err error) {
if conn.instance == nil {
var instance Connection
instance = conn
instance := Connection(conn)
conn.instance = &instance
}

Expand Down
21 changes: 8 additions & 13 deletions core/dbio/database/database_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,13 +824,15 @@ func (conn *BigQueryConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err
return
}

fs.SetProp("header", "false")
fs.SetProp("header", "true")
fs.SetProp("format", "csv")
fs.SetProp("null_if", `\N`)
fs.SetProp("columns", g.Marshal(columns))
fs.SetProp("metadata", conn.GetProp("metadata"))

// setting empty_as_null=true. no way to export with proper null_marker.
// Parquet export doesn't support JSON types
// gcsRef.NullMarker = `\N` does not work, not way to do so in EXPORT DATA OPTIONS
// Also, Parquet export doesn't support JSON types
fs.SetProp("empty_as_null", "true")

df, err = fs.ReadDataflow(gsURL)
Expand All @@ -839,9 +841,6 @@ func (conn *BigQueryConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err
return
}

// need to set columns so they match the source table
// df.MergeColumns(columns, df.Columns.Sourced()) // overwrite types so we don't need to infer

df.Defer(func() { filesys.Delete(fs, gsURL) })

return
Expand Down Expand Up @@ -914,7 +913,7 @@ func (conn *BigQueryConn) ExportToGCS(sql string, gcsURI string) error {
}

func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error {
if true || table.IsQuery() || table.IsView {
if table.IsQuery() || table.IsView {
return conn.ExportToGCS(table.Select(0), gcsURI)
}

Expand All @@ -924,16 +923,12 @@ func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error {
}
defer client.Close()

if strings.ToUpper(conn.GetProp("COMPRESSION")) == "GZIP" {
gcsURI = gcsURI + ".gz"
}
gcsRef := bigquery.NewGCSReference(gcsURI)
gcsRef.FieldDelimiter = ","
gcsRef.AllowQuotedNewlines = true
gcsRef.Quote = `"`
if strings.ToUpper(conn.GetProp("COMPRESSION")) == "GZIP" {
gcsRef.Compression = bigquery.Gzip
}
// gcsRef.NullMarker = `\N` // does not work for export, only importing
gcsRef.Compression = bigquery.Gzip
gcsRef.MaxBadRecords = 0

extractor := client.DatasetInProject(conn.ProjectID, table.Schema).Table(table.Name).ExtractorTo(gcsRef)
Expand All @@ -951,7 +946,7 @@ func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error {
return g.Error(err, "Error in task.Wait")
}
if err := status.Err(); err != nil {
if strings.Contains(err.Error(), "it is currently a VIEW") {
if strings.Contains(err.Error(), "it is currently a VIEW") || strings.Contains(err.Error(), "it currently has type VIEW") {
table.IsView = true
return conn.CopyToGCS(table, gcsURI)
}
Expand Down
3 changes: 0 additions & 3 deletions core/dbio/database/database_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ func (conn *ClickhouseConn) NewTransaction(ctx context.Context, options ...*sql.
Tx := &BaseTransaction{Tx: tx, Conn: conn.Self(), context: &context}
conn.tx = Tx

// CH does not support transactions at the moment
// Tx := &BlankTransaction{Conn: conn.Self(), context: &context}

return Tx, nil
}

Expand Down
1 change: 1 addition & 0 deletions core/dbio/database/database_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream

err = stmt.Close()
if err != nil {
g.Warn("%#v", err)
return g.Error(err, "could not close statement")
}

Expand Down
6 changes: 4 additions & 2 deletions core/dbio/database/database_redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (conn *RedshiftConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err
if len(tables) == 0 {
return df, g.Error("no table/query provided")
} else if conn.GetProp("AWS_BUCKET") == "" {
g.Debug("using cursor to export. Please set AWS creds for Sling to use the UNLOAD function (for bigger datasets). See https://docs.slingdata.io/connections/database-connections/redshift")
g.Warn("using cursor to export. Please set AWS creds for Sling to use the Redshift UNLOAD function (for bigger datasets). See https://docs.slingdata.io/connections/database-connections/redshift")
return conn.BaseConn.BulkExportFlow(tables...)
}

Expand All @@ -182,8 +182,10 @@ func (conn *RedshiftConn) BulkExportFlow(tables ...Table) (df *iop.Dataflow, err
return
}

fs.SetProp("header", "false")
fs.SetProp("format", "csv")
fs.SetProp("delimiter", ",")
fs.SetProp("header", "true")
fs.SetProp("null_if", `\N`)
fs.SetProp("columns", g.Marshal(columns))
fs.SetProp("metadata", conn.GetProp("metadata"))
df, err = fs.ReadDataflow(s3Path)
Expand Down
10 changes: 10 additions & 0 deletions core/dbio/database/database_sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ func (conn *MsSQLServerConn) Init() error {
instance := Connection(conn)
conn.BaseConn.instance = &instance

// https://github.com/slingdata-io/sling-cli/issues/310
// If both a portNumber and instanceName are used, the portNumber will take precedence and the instanceName will be ignored.
// therefore, if instanceName is provided, don't set a defaultPort
if u, err := dburl.Parse(conn.URL); err == nil {
if instanceName := strings.TrimPrefix(u.Path, "/"); instanceName != "" {
// set as 0 so BaseConn.Init won't inject port number
conn.BaseConn.defaultPort = 0
}
}

return conn.BaseConn.Init()
}

Expand Down
8 changes: 7 additions & 1 deletion core/dbio/database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"log"
"math"
"net/url"
"os"
"os/exec"
"path/filepath"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/xo/dburl"
"syreclabs.com/go/faker"
// "github.com/gobuffalo/packr"
)

var (
Expand Down Expand Up @@ -1382,6 +1382,12 @@ func TestConcurrentDuckDb(t *testing.T) {

}

func TestParseURL(t *testing.T) {
u, err := url.Parse("sqlserver://myuser:[email protected]?database=master")
g.AssertNoError(t, err)
g.Info(g.Marshal(u))
}

func TestInteractiveDuckDb(t *testing.T) {
var err error

Expand Down
14 changes: 12 additions & 2 deletions core/dbio/dbio_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ const (
KindUnknown Kind = ""
)

var AllKind = []struct {
Value Kind
TSName string
}{
{KindDatabase, "KindDatabase"},
{KindFile, "KindFile"},
{KindUnknown, "KindUnknown"},
}

// Type is the connection type
type Type string

Expand Down Expand Up @@ -100,8 +109,9 @@ func ValidateType(tStr string) (Type, bool) {
t := Type(strings.ToLower(tStr))

tMap := map[string]Type{
"postgresql": TypeDbPostgres,
"file": TypeFileLocal,
"postgresql": TypeDbPostgres,
"mongodb+srv": TypeDbMongoDB,
"file": TypeFileLocal,
}

if tMatched, ok := tMap[tStr]; ok {
Expand Down
9 changes: 7 additions & 2 deletions core/dbio/filesys/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,13 @@ func WriteDataflow(fs FileSysClient, df *iop.Dataflow, url string) (bw int64, er
if cast.ToBool(fs.GetProp("ignore_existing")) {
paths, err := fs.List(url)
if err != nil {
return 0, g.Error(err, "could not list files")
} else if len(paths) > 0 {
if g.IsDebugLow() {
g.Warn("could not list path %s\n%s", url, err.Error())
}
err = nil
}

if len(paths) > 0 {
g.Debug("not writing since file/folder exists at %s (ignore_existing=true)", url)

// close datastreams
Expand Down
8 changes: 4 additions & 4 deletions core/dbio/iop/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ func (df *Dataflow) SetConfig(cfg *StreamConfig) {
}
}

// ResetConfig resets the Sp config, so that, for example,
// delimiter settings are not carried through.
func (df *Dataflow) ResetConfig() {
// SetBatchLimit set the ds.Batch.Limit
func (df *Dataflow) SetBatchLimit(limit int64) {
df.mux.Lock()
defer df.mux.Unlock()
for _, ds := range df.Streams {
ds.Sp.ResetConfig()
ds.Sp.Config.BatchLimit = limit
ds.CurrentBatch.Limit = limit
}
}

Expand Down
11 changes: 0 additions & 11 deletions core/dbio/iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2340,17 +2340,6 @@ func (ds *Datastream) NewCsvReader(rowLimit int, bytesLimit int64) *io.PipeReade
return
}

// ensure that previous batch has same amount of columns
if pBatch := batch.Previous; pBatch != nil {
if len(pBatch.Columns) != len(batch.Columns) {
err := g.Error("number of columns have changed across files")
ds.Context.CaptureErr(err)
ds.Context.Cancel()
pipeW.Close()
return
}
}

c := 0 // local counter
w := csv.NewWriter(pipeW)
w.Comma = ','
Expand Down
1 change: 1 addition & 0 deletions core/dbio/iop/datastream_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (ds *Datastream) NewBatch(columns Columns) *Batch {
Rows: MakeRowsChan(),
Previous: ds.LatestBatch(),
ds: ds,
Limit: ds.Sp.Config.BatchLimit,
closeChan: make(chan struct{}),
transforms: []func(row []any) []any{},
context: &ctx,
Expand Down
12 changes: 10 additions & 2 deletions core/dbio/iop/parquet_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func (p *ParquetArrowWriter) Columns() Columns {
}

func (p *ParquetArrowWriter) makeSchema() (s *schema.Schema, err error) {
rep := parquet.Repetitions.Undefined
rep := parquet.Repetitions.Optional
fields := make([]schema.Node, len(p.Columns()))
for i, col := range p.Columns() {
fields[i], _ = schema.NewPrimitiveNode(col.Name, rep, parquetMapPhysicalType[col.Type], -1, 12)
Expand Down Expand Up @@ -623,7 +623,7 @@ func (p *ParquetArrowWriter) makeSchema() (s *schema.Schema, err error) {
fields[i] = node
}

node, _ := schema.NewGroupNode("schema", parquet.Repetitions.Required, fields, -1)
node, _ := schema.NewGroupNode("schema", parquet.Repetitions.Optional, fields, -1)
return schema.NewSchema(node), nil
}

Expand Down Expand Up @@ -718,6 +718,10 @@ func (p *ParquetArrowWriter) writeColumnValues(col *Column, writer file.ColumnCh
case *file.ByteArrayColumnChunkWriter:
values := make([]parquet.ByteArray, len(colValuesBatch))
for i, val := range colValuesBatch {
if val == nil {
values[i] = nil // still does not write null
continue
}
valS := cast.ToString(val)
if col.Type == DecimalType {
values[i] = StringToDecimalByteArray(valS, p.decNumScale[col.Position-1], parquet.Types.ByteArray, 16)
Expand All @@ -730,6 +734,10 @@ func (p *ParquetArrowWriter) writeColumnValues(col *Column, writer file.ColumnCh
values := make([]parquet.FixedLenByteArray, len(colValuesBatch))

for i, val := range colValuesBatch {
if val == nil {
values[i] = nil // still does not write null
continue
}
valS := cast.ToString(val)
if col.Type == DecimalType {
values[i] = StringToDecimalByteArray(valS, p.decNumScale[col.Position-1], parquet.Types.FixedLenByteArray, 16)
Expand Down
5 changes: 5 additions & 0 deletions core/dbio/iop/stream_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type StreamConfig struct {
Delimiter string `json:"delimiter"`
Escape string `json:"escape"`
FileMaxRows int64 `json:"file_max_rows"`
BatchLimit int64 `json:"batch_limit"`
MaxDecimals int `json:"max_decimals"`
Flatten bool `json:"flatten"`
FieldsPerRec int `json:"fields_per_rec"`
Expand Down Expand Up @@ -251,6 +252,10 @@ func (sp *StreamProcessor) SetConfig(configMap map[string]string) {
sp.Config.FileMaxRows = cast.ToInt64(configMap["file_max_rows"])
}

if configMap["batch_limit"] != "" {
sp.Config.BatchLimit = cast.ToInt64(configMap["batch_limit"])
}

if configMap["header"] != "" {
sp.Config.Header = cast.ToBool(configMap["header"])
} else {
Expand Down
Loading

0 comments on commit 58465e1

Please sign in to comment.