diff --git a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java index cca06cc2308..ea0be4bbd41 100644 --- a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java +++ b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java @@ -146,7 +146,7 @@ public boolean processRow() throws HopException { } data.outputRowMeta = getInputRowMeta().clone(); - meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider); + meta.getFields(data.insertRowMeta, getTransformName(), null, null, this, metadataProvider); if(meta.isStreamToS3Csv()){ @@ -170,7 +170,7 @@ public boolean processRow() throws HopException { for (int i = 0; i < meta.getFields().size(); i++) { int streamFieldLocation = - data.outputRowMeta.indexOfValue( + data.insertRowMeta.indexOfValue( meta.getFields().get(i).getStreamField()); if (streamFieldLocation < 0) { throw new HopTransformException( @@ -208,10 +208,10 @@ public boolean processRow() throws HopException { // Cache the position of the selected fields in the row array data.selectedRowFieldIndices = new int[numberOfInsertFields]; - for(int i=0; i < data.dbFields.size(); i++){ + for(int i=0; i < meta.getFields().size(); i++){ RedshiftBulkLoaderField vbf = meta.getFields().get(i); String inputFieldName = vbf.getStreamField(); - int inputFieldIdx = getInputRowMeta().indexOfValue(inputFieldName); + int inputFieldIdx = i; if (inputFieldIdx < 0) { throw new HopTransformException( BaseMessages.getString( @@ -239,7 +239,7 @@ public boolean processRow() throws HopException { } if(meta.isStreamToS3Csv()){ - writeRowToFile(data.insertRowMeta, r); + writeRowToFile(data.outputRowMeta, r); putRow(data.outputRowMeta, r); } @@ -287,12 +287,13 @@ private String buildCopyStatementSqlString() { if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){ sb.append(" ("); - final IRowMeta fields = data.outputRowMeta; - for (int i = 0; i < fields.size(); i++) { - if (i > 0) { - sb.append(", " + fields.getValueMeta(i).getName()); + List fieldList = meta.getFields(); + for(int i=0; i < fieldList.size(); i++){ + RedshiftBulkLoaderField field = fieldList.get(i); + if( i > 0){ + sb.append(", " + field.getDatabaseField()); }else{ - sb.append(fields.getValueMeta(i).getName()); + sb.append(field.getDatabaseField()); } } sb.append(")"); @@ -302,7 +303,8 @@ private String buildCopyStatementSqlString() { sb.append(" NULL '' "); sb.append(" EMPTYASNULL "); if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){ - sb.append(" delimiter ','"); + sb.append(" DELIMITER ',' "); + sb.append(" CSV QUOTE AS '\"'"); } if(meta.isUseAwsIamRole()){ sb.append(" iam_role '" + meta.getAwsIamRole() + "'"); @@ -477,10 +479,10 @@ private void writeRowToFile(IRowMeta rowMeta, Object[] row) throws HopTransformE if(streamIndex >= 0){ if(needConversion){ IValueMeta valueMeta = rowMeta.getValueMeta(streamIndex); - Object obj = row[i]; + Object obj = row[streamIndex]; valueData = v.convertData(valueMeta, obj); }else{ - valueData = row[i]; + valueData = row[streamIndex]; } } else if (meta.isErrorColumnMismatch()) { throw new HopException( @@ -495,7 +497,7 @@ private void writeRowToFile(IRowMeta rowMeta, Object[] row) throws HopTransformE } else { int jsonField = data.fieldnrs.get("json"); data.writer.write( - data.outputRowMeta.getString(row, jsonField).getBytes(StandardCharsets.UTF_8)); + data.insertRowMeta.getString(row, jsonField).getBytes(StandardCharsets.UTF_8)); data.writer.write(data.binaryNewline); } } catch (Exception e) { @@ -530,6 +532,8 @@ private void writeField(IValueMeta v, Object valueData, byte[] nullString) boolean writeEnclosures = false; if (v.isString()) { + writeEnclosures = true; + if (containsSeparatorOrEnclosure( str, data.binarySeparator, data.binaryEnclosure, data.escapeCharacters)) { writeEnclosures = true; diff --git a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java index 5c8503c2b14..abec768573b 100644 --- a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java +++ b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderMeta.java @@ -63,7 +63,7 @@ public class RedshiftBulkLoaderMeta public static final String CSV_DELIMITER = ","; public static final String CSV_RECORD_DELIMITER = "\n"; - public static final String CSV_ESCAPE_CHAR = "\\"; + public static final String CSV_ESCAPE_CHAR = "\""; public static final String ENCLOSURE = "\""; @HopMetadataProperty(