From 74bb816c02f103f6e81f4f3122761ed4a0782d8b Mon Sep 17 00:00:00 2001 From: Bart Maertens Date: Sat, 21 Oct 2023 18:12:03 +0200 Subject: [PATCH] ui minor fix, select field list rewrite. #3281 --- .../bulkloader/RedshiftBulkLoader.java | 89 +++++++++++-------- .../bulkloader/RedshiftBulkLoaderDialog.java | 7 +- 2 files changed, 56 insertions(+), 40 deletions(-) diff --git a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java index c55448c8e06..cca06cc2308 100644 --- a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java +++ b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoader.java @@ -28,9 +28,7 @@ import org.apache.hop.core.row.IRowMeta; import org.apache.hop.core.row.IValueMeta; import org.apache.hop.core.row.RowMeta; -import org.apache.hop.core.row.value.ValueMetaBigNumber; import org.apache.hop.core.row.value.ValueMetaDate; -import org.apache.hop.core.row.value.ValueMetaString; import org.apache.hop.core.util.Utils; import org.apache.hop.core.vfs.HopVfs; import org.apache.hop.i18n.BaseMessages; @@ -80,11 +78,12 @@ public boolean init() { if(meta.isStreamToS3Csv()){ // get the file output stream to write to S3 - data.writer = HopVfs.getOutputStream(meta.getCopyFromFilename(), false); + data.writer = HopVfs.getOutputStream(resolve(meta.getCopyFromFilename()), false); } data.db = new Database(this, this, data.databaseMeta); data.db.connect(); + getDbFields(); if (log.isBasic()) { logBasic(BaseMessages.getString(PKG, "RedshiftBulkLoader.Connection.Connected", data.db.getDatabaseMeta())); @@ -124,8 +123,14 @@ public boolean processRow() throws HopException { stmt.close(); conn.close(); }catch(SQLException sqle){ + setErrors(1); + stopAll(); + setOutputDone(); // signal end to receiver(s) throw new HopDatabaseException("Error executing COPY statements", sqle); } catch (IOException ioe) { + setErrors(1); + stopAll(); + setOutputDone(); // signal end to receiver(s) throw new HopTransformException("Error releasing resources", ioe); } return false; @@ -134,6 +139,7 @@ public boolean processRow() throws HopException { if (first && meta.isStreamToS3Csv()) { first = false; + data.fieldnrs = new HashMap<>(); if (meta.isTruncateTable()) { truncateTable(); @@ -145,13 +151,13 @@ public boolean processRow() throws HopException { if(meta.isStreamToS3Csv()){ } + // write all fields in the stream to Redshift if (!meta.specifyFields()){ // Just take the whole input row data.insertRowMeta = getInputRowMeta().clone(); data.selectedRowFieldIndices = new int[data.insertRowMeta.size()]; - data.fieldnrs = new HashMap<>(); try{ getDbFields(); }catch(HopException e){ @@ -196,13 +202,14 @@ public boolean processRow() throws HopException { } else { + // use the columns/fields mapping. int numberOfInsertFields = meta.getFields().size(); data.insertRowMeta = new RowMeta(); // Cache the position of the selected fields in the row array data.selectedRowFieldIndices = new int[numberOfInsertFields]; - for (int insertFieldIdx = 0; insertFieldIdx < numberOfInsertFields; insertFieldIdx++) { - RedshiftBulkLoaderField vbf = meta.getFields().get(insertFieldIdx); + for(int i=0; i < data.dbFields.size(); i++){ + RedshiftBulkLoaderField vbf = meta.getFields().get(i); String inputFieldName = vbf.getStreamField(); int inputFieldIdx = getInputRowMeta().indexOfValue(inputFieldName); if (inputFieldIdx < 0) { @@ -212,7 +219,7 @@ public boolean processRow() throws HopException { "RedshiftBulkLoader.Exception.FieldRequired", inputFieldName)); //$NON-NLS-1$ } - data.selectedRowFieldIndices[insertFieldIdx] = inputFieldIdx; + data.selectedRowFieldIndices[i] = inputFieldIdx; String insertFieldName = vbf.getDatabaseField(); IValueMeta inputValueMeta = getInputRowMeta().getValueMeta(inputFieldIdx); @@ -226,12 +233,13 @@ public boolean processRow() throws HopException { IValueMeta insertValueMeta = inputValueMeta.clone(); insertValueMeta.setName(insertFieldName); data.insertRowMeta.addValueMeta(insertValueMeta); + data.fieldnrs.put(meta.getFields().get(i).getDatabaseField().toUpperCase(), inputFieldIdx); } } } if(meta.isStreamToS3Csv()){ - writeRowToFile(data.outputRowMeta, r); + writeRowToFile(data.insertRowMeta, r); putRow(data.outputRowMeta, r); } @@ -279,7 +287,7 @@ private String buildCopyStatementSqlString() { if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){ sb.append(" ("); - final IRowMeta fields = data.insertRowMeta; + final IRowMeta fields = data.outputRowMeta; for (int i = 0; i < fields.size(); i++) { if (i > 0) { sb.append(", " + fields.getValueMeta(i).getName()); @@ -291,6 +299,8 @@ private String buildCopyStatementSqlString() { } sb.append(" FROM '" + resolve(meta.getCopyFromFilename()) + "'"); + sb.append(" NULL '' "); + sb.append(" EMPTYASNULL "); if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){ sb.append(" delimiter ','"); } @@ -308,7 +318,7 @@ private String buildCopyStatementSqlString() { } sb.append(" CREDENTIALS 'aws_access_key_id=" + awsAccessKeyId + ";aws_secret_access_key=" + awsSecretAccessKey + "'"); } - if(meta.getLoadFromExistingFileFormat().equals("Parquet")){ + if(!StringUtils.isEmpty(meta.getLoadFromExistingFileFormat()) && meta.getLoadFromExistingFileFormat().equals("Parquet")){ sb.append(" FORMAT AS PARQUET;"); } @@ -343,11 +353,9 @@ private Object[] writeToOutputStream(Object[] r) throws HopException, IOExceptio */ private void getDbFields() throws HopException { data.dbFields = new ArrayList<>(); - String sql = "desc table "; IRowMeta rowMeta = null; - if (!StringUtils.isEmpty(resolve(meta.getSchemaName()))) { rowMeta = data.db.getTableFields(meta.getSchemaName() + "." + meta.getTableName()); }else { @@ -432,47 +440,52 @@ private void writeRowToFile(IRowMeta rowMeta, Object[] row) throws HopTransformE writeField(v, valueData, null); } data.writer.write(data.binaryNewline); - } else if (meta.isStreamToS3Csv()) { + } else if (meta.isStreamToS3Csv() && meta.isSpecifyFields()) { /* * Only write the fields specified! */ - for (int i = 0; i < data.dbFields.size(); i++) { - if (data.dbFields.get(i) != null) { + for(int i=0; i < meta.getFields().size(); i++){ + if(meta.getFields().get(i).getDatabaseField() != null){ if (i > 0 && data.binarySeparator.length > 0) { data.writer.write(data.binarySeparator); } - String[] field = data.dbFields.get(i); - IValueMeta v; + IValueMeta v = null; + String streamFieldName = meta.getFields().get(i).getStreamField(); + String[] rowFields = data.outputRowMeta.getFieldNames(); + String streamFieldType = ""; + int streamIndex = -1; + for(int j=0; j < rowFields.length ; j++){ + if(streamFieldName.equals(rowFields[j])){ + v = rowMeta.getValueMeta(j); + streamIndex = j; + } + } - if (field[1].toUpperCase().startsWith("TIMESTAMP")) { - v = new ValueMetaDate(); - v.setConversionMask("yyyy-MM-dd HH:mm:ss.SSS"); - } else if (field[1].toUpperCase().startsWith("DATE")) { + boolean needConversion = false; + if (v.getType() == IValueMeta.TYPE_TIMESTAMP) { v = new ValueMetaDate(); - v.setConversionMask("yyyy-MM-dd"); - } else if (field[1].toUpperCase().startsWith("TIME")) { + v.setConversionMask("yyyy/MM/dd HH:mm:ss"); + needConversion = true; + } else if (v.getType() == IValueMeta.TYPE_DATE) { v = new ValueMetaDate(); - v.setConversionMask("HH:mm:ss.SSS"); - } else if (field[1].toUpperCase().startsWith("NUMBER") - || field[1].toUpperCase().startsWith("FLOAT")) { - v = new ValueMetaBigNumber(); - } else { - v = new ValueMetaString(); - v.setLength(-1); + v.setConversionMask("yyyy/MM/dd"); + needConversion = true; } - int fieldIndex = -1; - if (data.fieldnrs.get(data.dbFields.get(i)[0]) != null) { - fieldIndex = data.fieldnrs.get(data.dbFields.get(i)[0]); - } Object valueData = null; - if (fieldIndex >= 0) { - valueData = v.convertData(rowMeta.getValueMeta(fieldIndex), row[fieldIndex]); + if(streamIndex >= 0){ + if(needConversion){ + IValueMeta valueMeta = rowMeta.getValueMeta(streamIndex); + Object obj = row[i]; + valueData = v.convertData(valueMeta, obj); + }else{ + valueData = row[i]; + } } else if (meta.isErrorColumnMismatch()) { throw new HopException( - "Error column mismatch: Database field " - + data.dbFields.get(i)[0] + "Error column mismatch: Database streamField " + + meta.getFields().get(i).getStreamField() + " not found on stream."); } writeField(v, valueData, data.binaryNullValue); diff --git a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderDialog.java b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderDialog.java index d667f1f9d98..b9996ed03af 100644 --- a/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderDialog.java +++ b/plugins/tech/aws/src/main/java/org/apache/hop/pipeline/transforms/redshift/bulkloader/RedshiftBulkLoaderDialog.java @@ -767,9 +767,12 @@ private void generateMappings() { // refresh data input.setTablename(variables.resolve(wTable.getText())); - ITransformMeta transformMetaInterface = transformMeta.getTransform(); + ITransformMeta transformMeta = this.transformMeta.getTransform(); + if(StringUtils.isEmpty(input.getConnection())){ + input.setConnection(wConnection.getText()); + } try { - targetFields = transformMetaInterface.getRequiredFields(variables); + targetFields = transformMeta.getRequiredFields(variables); } catch (HopException e) { new ErrorDialog( shell,