Skip to content

Commit

Permalink
ui minor fix, select field list rewrite. #3281
Browse files Browse the repository at this point in the history
  • Loading branch information
bamaer committed Oct 21, 2023
1 parent 6391395 commit 74bb816
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.RowMeta;
import org.apache.hop.core.row.value.ValueMetaBigNumber;
import org.apache.hop.core.row.value.ValueMetaDate;
import org.apache.hop.core.row.value.ValueMetaString;
import org.apache.hop.core.util.Utils;
import org.apache.hop.core.vfs.HopVfs;
import org.apache.hop.i18n.BaseMessages;
Expand Down Expand Up @@ -80,11 +78,12 @@ public boolean init() {

if(meta.isStreamToS3Csv()){
// get the file output stream to write to S3
data.writer = HopVfs.getOutputStream(meta.getCopyFromFilename(), false);
data.writer = HopVfs.getOutputStream(resolve(meta.getCopyFromFilename()), false);
}

data.db = new Database(this, this, data.databaseMeta);
data.db.connect();
getDbFields();

if (log.isBasic()) {
logBasic(BaseMessages.getString(PKG, "RedshiftBulkLoader.Connection.Connected", data.db.getDatabaseMeta()));
Expand Down Expand Up @@ -124,8 +123,14 @@ public boolean processRow() throws HopException {
stmt.close();
conn.close();
}catch(SQLException sqle){
setErrors(1);
stopAll();
setOutputDone(); // signal end to receiver(s)
throw new HopDatabaseException("Error executing COPY statements", sqle);
} catch (IOException ioe) {
setErrors(1);
stopAll();
setOutputDone(); // signal end to receiver(s)
throw new HopTransformException("Error releasing resources", ioe);
}
return false;
Expand All @@ -134,6 +139,7 @@ public boolean processRow() throws HopException {
if (first && meta.isStreamToS3Csv()) {

first = false;
data.fieldnrs = new HashMap<>();

if (meta.isTruncateTable()) {
truncateTable();
Expand All @@ -145,13 +151,13 @@ public boolean processRow() throws HopException {
if(meta.isStreamToS3Csv()){

}
// write all fields in the stream to Redshift
if (!meta.specifyFields()){

// Just take the whole input row
data.insertRowMeta = getInputRowMeta().clone();
data.selectedRowFieldIndices = new int[data.insertRowMeta.size()];

data.fieldnrs = new HashMap<>();
try{
getDbFields();
}catch(HopException e){
Expand Down Expand Up @@ -196,13 +202,14 @@ public boolean processRow() throws HopException {

} else {

// use the columns/fields mapping.
int numberOfInsertFields = meta.getFields().size();
data.insertRowMeta = new RowMeta();

// Cache the position of the selected fields in the row array
data.selectedRowFieldIndices = new int[numberOfInsertFields];
for (int insertFieldIdx = 0; insertFieldIdx < numberOfInsertFields; insertFieldIdx++) {
RedshiftBulkLoaderField vbf = meta.getFields().get(insertFieldIdx);
for(int i=0; i < data.dbFields.size(); i++){
RedshiftBulkLoaderField vbf = meta.getFields().get(i);
String inputFieldName = vbf.getStreamField();
int inputFieldIdx = getInputRowMeta().indexOfValue(inputFieldName);
if (inputFieldIdx < 0) {
Expand All @@ -212,7 +219,7 @@ public boolean processRow() throws HopException {
"RedshiftBulkLoader.Exception.FieldRequired",
inputFieldName)); //$NON-NLS-1$
}
data.selectedRowFieldIndices[insertFieldIdx] = inputFieldIdx;
data.selectedRowFieldIndices[i] = inputFieldIdx;

String insertFieldName = vbf.getDatabaseField();
IValueMeta inputValueMeta = getInputRowMeta().getValueMeta(inputFieldIdx);
Expand All @@ -226,12 +233,13 @@ public boolean processRow() throws HopException {
IValueMeta insertValueMeta = inputValueMeta.clone();
insertValueMeta.setName(insertFieldName);
data.insertRowMeta.addValueMeta(insertValueMeta);
data.fieldnrs.put(meta.getFields().get(i).getDatabaseField().toUpperCase(), inputFieldIdx);
}
}
}

if(meta.isStreamToS3Csv()){
writeRowToFile(data.outputRowMeta, r);
writeRowToFile(data.insertRowMeta, r);
putRow(data.outputRowMeta, r);
}

Expand Down Expand Up @@ -279,7 +287,7 @@ private String buildCopyStatementSqlString() {

if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){
sb.append(" (");
final IRowMeta fields = data.insertRowMeta;
final IRowMeta fields = data.outputRowMeta;
for (int i = 0; i < fields.size(); i++) {
if (i > 0) {
sb.append(", " + fields.getValueMeta(i).getName());
Expand All @@ -291,6 +299,8 @@ private String buildCopyStatementSqlString() {
}

sb.append(" FROM '" + resolve(meta.getCopyFromFilename()) + "'");
sb.append(" NULL '' ");
sb.append(" EMPTYASNULL ");
if(meta.isStreamToS3Csv() || meta.getLoadFromExistingFileFormat().equals("CSV")){
sb.append(" delimiter ','");
}
Expand All @@ -308,7 +318,7 @@ private String buildCopyStatementSqlString() {
}
sb.append(" CREDENTIALS 'aws_access_key_id=" + awsAccessKeyId + ";aws_secret_access_key=" + awsSecretAccessKey + "'");
}
if(meta.getLoadFromExistingFileFormat().equals("Parquet")){
if(!StringUtils.isEmpty(meta.getLoadFromExistingFileFormat()) && meta.getLoadFromExistingFileFormat().equals("Parquet")){
sb.append(" FORMAT AS PARQUET;");
}

Expand Down Expand Up @@ -343,11 +353,9 @@ private Object[] writeToOutputStream(Object[] r) throws HopException, IOExceptio
*/
private void getDbFields() throws HopException {
data.dbFields = new ArrayList<>();
String sql = "desc table ";

IRowMeta rowMeta = null;


if (!StringUtils.isEmpty(resolve(meta.getSchemaName()))) {
rowMeta = data.db.getTableFields(meta.getSchemaName() + "." + meta.getTableName());
}else {
Expand Down Expand Up @@ -432,47 +440,52 @@ private void writeRowToFile(IRowMeta rowMeta, Object[] row) throws HopTransformE
writeField(v, valueData, null);
}
data.writer.write(data.binaryNewline);
} else if (meta.isStreamToS3Csv()) {
} else if (meta.isStreamToS3Csv() && meta.isSpecifyFields()) {
/*
* Only write the fields specified!
*/
for (int i = 0; i < data.dbFields.size(); i++) {
if (data.dbFields.get(i) != null) {
for(int i=0; i < meta.getFields().size(); i++){
if(meta.getFields().get(i).getDatabaseField() != null){
if (i > 0 && data.binarySeparator.length > 0) {
data.writer.write(data.binarySeparator);
}

String[] field = data.dbFields.get(i);
IValueMeta v;
IValueMeta v = null;
String streamFieldName = meta.getFields().get(i).getStreamField();
String[] rowFields = data.outputRowMeta.getFieldNames();
String streamFieldType = "";
int streamIndex = -1;
for(int j=0; j < rowFields.length ; j++){
if(streamFieldName.equals(rowFields[j])){
v = rowMeta.getValueMeta(j);
streamIndex = j;
}
}

if (field[1].toUpperCase().startsWith("TIMESTAMP")) {
v = new ValueMetaDate();
v.setConversionMask("yyyy-MM-dd HH:mm:ss.SSS");
} else if (field[1].toUpperCase().startsWith("DATE")) {
boolean needConversion = false;
if (v.getType() == IValueMeta.TYPE_TIMESTAMP) {
v = new ValueMetaDate();
v.setConversionMask("yyyy-MM-dd");
} else if (field[1].toUpperCase().startsWith("TIME")) {
v.setConversionMask("yyyy/MM/dd HH:mm:ss");
needConversion = true;
} else if (v.getType() == IValueMeta.TYPE_DATE) {
v = new ValueMetaDate();
v.setConversionMask("HH:mm:ss.SSS");
} else if (field[1].toUpperCase().startsWith("NUMBER")
|| field[1].toUpperCase().startsWith("FLOAT")) {
v = new ValueMetaBigNumber();
} else {
v = new ValueMetaString();
v.setLength(-1);
v.setConversionMask("yyyy/MM/dd");
needConversion = true;
}

int fieldIndex = -1;
if (data.fieldnrs.get(data.dbFields.get(i)[0]) != null) {
fieldIndex = data.fieldnrs.get(data.dbFields.get(i)[0]);
}
Object valueData = null;
if (fieldIndex >= 0) {
valueData = v.convertData(rowMeta.getValueMeta(fieldIndex), row[fieldIndex]);
if(streamIndex >= 0){
if(needConversion){
IValueMeta valueMeta = rowMeta.getValueMeta(streamIndex);
Object obj = row[i];
valueData = v.convertData(valueMeta, obj);
}else{
valueData = row[i];
}
} else if (meta.isErrorColumnMismatch()) {
throw new HopException(
"Error column mismatch: Database field "
+ data.dbFields.get(i)[0]
"Error column mismatch: Database streamField "
+ meta.getFields().get(i).getStreamField()
+ " not found on stream.");
}
writeField(v, valueData, data.binaryNullValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,9 +767,12 @@ private void generateMappings() {

// refresh data
input.setTablename(variables.resolve(wTable.getText()));
ITransformMeta transformMetaInterface = transformMeta.getTransform();
ITransformMeta transformMeta = this.transformMeta.getTransform();
if(StringUtils.isEmpty(input.getConnection())){
input.setConnection(wConnection.getText());
}
try {
targetFields = transformMetaInterface.getRequiredFields(variables);
targetFields = transformMeta.getRequiredFields(variables);
} catch (HopException e) {
new ErrorDialog(
shell,
Expand Down

0 comments on commit 74bb816

Please sign in to comment.