Skip to content

Commit

Permalink
Redshift bulk loader minor tweaks, updates. #3281
Browse files Browse the repository at this point in the history
  • Loading branch information
bamaer committed Oct 25, 2023
1 parent 74bb816 commit 604ca84
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public boolean processRow() throws HopException {
}

data.outputRowMeta = getInputRowMeta().clone();
meta.getFields(data.outputRowMeta, getTransformName(), null, null, this, metadataProvider);
meta.getFields(data.insertRowMeta, getTransformName(), null, null, this, metadataProvider);

if(meta.isStreamToS3Csv()){

Expand All @@ -170,7 +170,7 @@ public boolean processRow() throws HopException {

for (int i = 0; i < meta.getFields().size(); i++) {
int streamFieldLocation =
data.outputRowMeta.indexOfValue(
data.insertRowMeta.indexOfValue(
meta.getFields().get(i).getStreamField());
if (streamFieldLocation < 0) {
throw new HopTransformException(
Expand Down Expand Up @@ -208,10 +208,10 @@ public boolean processRow() throws HopException {

// Cache the position of the selected fields in the row array
data.selectedRowFieldIndices = new int[numberOfInsertFields];
for(int i=0; i < data.dbFields.size(); i++){
for(int i=0; i < meta.getFields().size(); i++){
RedshiftBulkLoaderField vbf = meta.getFields().get(i);
String inputFieldName = vbf.getStreamField();
int inputFieldIdx = getInputRowMeta().indexOfValue(inputFieldName);
int inputFieldIdx = i;
if (inputFieldIdx < 0) {
throw new HopTransformException(
BaseMessages.getString(
Expand Down Expand Up @@ -239,7 +239,7 @@ public boolean processRow() throws HopException {
}

if(meta.isStreamToS3Csv()){
writeRowToFile(data.insertRowMeta, r);
writeRowToFile(data.outputRowMeta, r);
putRow(data.outputRowMeta, r);
}

Expand Down Expand Up @@ -287,12 +287,13 @@ private String buildCopyStatementSqlString() {

if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){
sb.append(" (");
final IRowMeta fields = data.outputRowMeta;
for (int i = 0; i < fields.size(); i++) {
if (i > 0) {
sb.append(", " + fields.getValueMeta(i).getName());
List<RedshiftBulkLoaderField> fieldList = meta.getFields();
for(int i=0; i < fieldList.size(); i++){
RedshiftBulkLoaderField field = fieldList.get(i);
if( i > 0){
sb.append(", " + field.getDatabaseField());
}else{
sb.append(fields.getValueMeta(i).getName());
sb.append(field.getDatabaseField());
}
}
sb.append(")");
Expand All @@ -302,7 +303,8 @@ private String buildCopyStatementSqlString() {
sb.append(" NULL '' ");
sb.append(" EMPTYASNULL ");
if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){
sb.append(" delimiter ','");
sb.append(" DELIMITER ',' ");
sb.append(" CSV QUOTE AS '\"'");
}
if(meta.isUseAwsIamRole()){
sb.append(" iam_role '" + meta.getAwsIamRole() + "'");
Expand Down Expand Up @@ -477,10 +479,10 @@ private void writeRowToFile(IRowMeta rowMeta, Object[] row) throws HopTransformE
if(streamIndex >= 0){
if(needConversion){
IValueMeta valueMeta = rowMeta.getValueMeta(streamIndex);
Object obj = row[i];
Object obj = row[streamIndex];
valueData = v.convertData(valueMeta, obj);
}else{
valueData = row[i];
valueData = row[streamIndex];
}
} else if (meta.isErrorColumnMismatch()) {
throw new HopException(
Expand All @@ -495,7 +497,7 @@ private void writeRowToFile(IRowMeta rowMeta, Object[] row) throws HopTransformE
} else {
int jsonField = data.fieldnrs.get("json");
data.writer.write(
data.outputRowMeta.getString(row, jsonField).getBytes(StandardCharsets.UTF_8));
data.insertRowMeta.getString(row, jsonField).getBytes(StandardCharsets.UTF_8));
data.writer.write(data.binaryNewline);
}
} catch (Exception e) {
Expand Down Expand Up @@ -530,6 +532,8 @@ private void writeField(IValueMeta v, Object valueData, byte[] nullString)
boolean writeEnclosures = false;

if (v.isString()) {
writeEnclosures = true;

if (containsSeparatorOrEnclosure(
str, data.binarySeparator, data.binaryEnclosure, data.escapeCharacters)) {
writeEnclosures = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class RedshiftBulkLoaderMeta

public static final String CSV_DELIMITER = ",";
public static final String CSV_RECORD_DELIMITER = "\n";
public static final String CSV_ESCAPE_CHAR = "\\";
public static final String CSV_ESCAPE_CHAR = "\"";
public static final String ENCLOSURE = "\"";

@HopMetadataProperty(
Expand Down

0 comments on commit 604ca84

Please sign in to comment.