Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow adding TRUNCATECOLUMNS option to Redshift COPY #43

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ here.
| `max_buffer_size` | `["integer", "null"]` | `104857600` (100MB in bytes) | The maximum number of bytes to buffer in memory before writing to the destination table in Redshift
| `batch_detection_threshold` | `["integer", "null"]` | `5000`, or 1/40th `max_batch_rows` | How often, in rows received, to count the buffered rows and bytes to check if a flush is necessary. There's a slight performance penalty to checking the buffered records count or bytesize, so this controls how often this is polled in order to mitigate the penalty. This value is usually not necessary to set as the default is dynamically adjusted to check reasonably often.
| `persist_empty_tables` | `["boolean", "null"]` | `False` | Whether the Target should create tables which have no records present in Remote. |
| `truncate_columns` | `["boolean", "null"]` | `False` | Whether the Target should truncate values longer than `default_column_length` when loading into Redshift. Adds the [`TRUNCATECOLUMNS` data conversion parameter](https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-data-conversion.html#copy-truncatecolumns) to Redshift's [COPY](https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html).
| `default_column_length` | `["integer", "null"]` | `1000` | All columns with the VARCHAR(CHARACTER VARYING) type will be have this length.Range: 1-65535. |
| `state_support` | `["boolean", "null"]` | `True` | Whether the Target should emit `STATE` messages to stdout for further consumption. In this mode, which is on by default, STATE messages are buffered in memory until all the records that occurred before them are flushed according to the batch flushing schedule the target is configured with. |
| `target_s3` | `["object"]` | `N/A` | See `S3` below |
Expand Down
3 changes: 2 additions & 1 deletion target_redshift/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def main(config, input_stream=None):
redshift_schema=config.get('redshift_schema', 'public'),
logging_level=config.get('logging_level'),
default_column_length=config.get('default_column_length', 1000),
persist_empty_tables=config.get('persist_empty_tables')
persist_empty_tables=config.get('persist_empty_tables'),
truncate_columns=config.get('truncate_columns')
)

if input_stream:
Expand Down
8 changes: 6 additions & 2 deletions target_redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ class RedshiftTarget(PostgresTarget):
MAX_VARCHAR = 65535
CREATE_TABLE_INITIAL_COLUMN = '_sdc_target_redshift_create_table_placeholder'
CREATE_TABLE_INITIAL_COLUMN_TYPE = 'BOOLEAN'
TRUNCATE_COLUMNS_LITERAL = 'TRUNCATECOLUMNS'

def __init__(self, connection, s3, *args,
redshift_schema='public',
logging_level=None,
default_column_length=DEFAULT_COLUMN_LENGTH,
persist_empty_tables=False,
truncate_columns=False,
**kwargs):

self.LOGGER.info(
Expand All @@ -58,6 +60,7 @@ def __init__(self, connection, s3, *args,

self.s3 = s3
self.default_column_length = default_column_length
self.truncate_columns = truncate_columns
PostgresTarget.__init__(self, connection, postgres_schema=redshift_schema, logging_level=logging_level,
persist_empty_tables=persist_empty_tables, add_upsert_indexes=False)

Expand Down Expand Up @@ -155,7 +158,7 @@ def persist_csv_rows(self,
aws_secret_access_key= credentials.get('aws_secret_access_key')
aws_session_token = credentials.get('aws_session_token')

copy_sql = sql.SQL('COPY {}.{} ({}) FROM {} CREDENTIALS {} FORMAT AS CSV NULL AS {}').format(
copy_sql = sql.SQL('COPY {}.{} ({}) FROM {} CREDENTIALS {} FORMAT AS CSV NULL AS {} {}').format(
sql.Identifier(self.postgres_schema),
sql.Identifier(temp_table_name),
sql.SQL(', ').join(map(sql.Identifier, columns)),
Expand All @@ -165,7 +168,8 @@ def persist_csv_rows(self,
aws_secret_access_key,
";token={}".format(aws_session_token) if aws_session_token else '',
)),
sql.Literal(RESERVED_NULL_DEFAULT))
sql.Literal(RESERVED_NULL_DEFAULT),
sql.SQL(self.TRUNCATE_COLUMNS_LITERAL if self.truncate_columns else ''))

cur.execute(copy_sql)

Expand Down
13 changes: 13 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
'age': {
'type': ['null', 'integer']
},
'description': {
'type': ['null', 'string']
},
'adoption': {
'type': ['object', 'null'],
'properties': {
Expand Down Expand Up @@ -217,6 +220,16 @@ def generate_record(self):
}


class LongCatStream(CatStream):
def generate_record(self):
record = CatStream.generate_record(self)

# add some seriously long text
record['description'] = fake.paragraph(nb_sentences=1000)

return record


class InvalidCatStream(CatStream):
def generate_record(self):
record = CatStream.generate_record(self)
Expand Down
37 changes: 36 additions & 1 deletion tests/test_target_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import psycopg2.extras
import pytest

from fixtures import CatStream, CONFIG, db_prep, MultiTypeStream, NestedStream, TEST_DB
from fixtures import CatStream, CONFIG, db_prep, MultiTypeStream, NestedStream, TEST_DB, LongCatStream
from target_postgres import singer_stream
from target_postgres.target_tools import TargetError

Expand Down Expand Up @@ -711,3 +711,38 @@ def test_deduplication_existing_new_rows(db_prep):

assert len(sequences) == 1
assert sequences[0][0] == original_sequence


def test_truncate_columns(db_prep):
stream = LongCatStream(100, version=1, nested_count=2)

# this is what we're testing for
CONFIG['truncate_columns'] = True
CONFIG['default_column_length'] = 1000

main(CONFIG, input_stream=stream)

with psycopg2.connect(**TEST_DB) as conn:
with conn.cursor() as cur:
cur.execute(get_count_sql('cats'))
table_count = cur.fetchone()[0]

cur.execute(sql.SQL('SELECT {}, {} FROM {}.{}').format(
sql.SQL('MAX(LEN(description))'),
sql.SQL('MIN(LEN(description))'),
sql.Identifier(CONFIG['redshift_schema']),
sql.Identifier('cats')
))

result = cur.fetchone()
max_length = result[0]
min_length = result[1]

# check if all records were inserted
assert table_count == 100

# check if they were truncated properly.
# LongCats' description is definitely longer than 1000 bytes,
# so it should always end up at exactly 1000
assert max_length == CONFIG['default_column_length']
assert min_length == CONFIG['default_column_length']