Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit-CloudSufi committed Jan 9, 2025
1 parent f29f859 commit c1eb99e
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public static String getAccessTokenByRefreshToken(CloseableHttpClient httpclient
String encondedAuthorization = new String(Base64.getEncoder().encode(stringToEncode.getBytes()));
httppost.setHeader("Authorization", String.format("Basic %s", encondedAuthorization));


CloseableHttpResponse response = httpclient.execute(httppost);
String responseString = EntityUtils.toString(response.getEntity(), "UTF-8");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.util.List;


/**
* Error details provided for the Snowflake
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
package io.cdap.plugin.snowflake.common;

import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
import net.snowflake.client.jdbc.ErrorCode;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* Error Type provided based on the Snowflake error message code
Expand Down Expand Up @@ -140,7 +147,7 @@ public class SnowflakeErrorType {
* @param errorCode the error code to classify
* @return the corresponding ErrorType (USER, SYSTEM, UNKNOWN)
*/
public static ErrorType getErrorTypeFromErrorCode(int errorCode) {
private static ErrorType getErrorTypeFromErrorCode(int errorCode) {
if (ERROR_CODE_TO_ERROR_TYPE.containsKey(errorCode)) {
return ERROR_CODE_TO_ERROR_TYPE.get(errorCode);
}
Expand All @@ -154,10 +161,23 @@ public static ErrorType getErrorTypeFromErrorCode(int errorCode) {
* @param errorCode the error code to classify
* @return the corresponding ErrorCategory
*/
public static ErrorCategory getErrorCategoryFromSqlState(int errorCode) {
private static ErrorCategory getErrorCategoryFromSqlState(int errorCode) {
if (ERROR_CODE_TO_ERROR_CATEGORY.containsKey(errorCode)) {
return ERROR_CODE_TO_ERROR_CATEGORY.get(errorCode);
}
return new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN);
}

public static void fetchProgramFailureException(SQLException e, String errorReason, String errorMessage) {
Optional<ErrorCode> errorCodes = Arrays.stream(ErrorCode.values())
.filter(errorCode -> errorCode.getSqlState().equals(e.getSQLState()))
.findFirst();
ErrorCategory errorCategory = errorCodes.isPresent() ?
getErrorCategoryFromSqlState(errorCodes.get().getMessageCode()) :
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN);
ErrorType errorType = errorCodes.isPresent() ? getErrorTypeFromErrorCode(errorCodes.get().getMessageCode()) :
ErrorType.UNKNOWN;
throw ErrorUtils.getProgramFailureException(errorCategory, errorReason, errorMessage, errorType,
true, ErrorCodeType.SQLSTATE, e.getSQLState(), DocumentUrlUtil.getSupportedDocumentUrl(), e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.base.Strings;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.common.KeyValueListParser;
Expand All @@ -29,7 +28,6 @@
import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException;
import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
import io.cdap.plugin.snowflake.common.util.QueryUtil;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
import org.apache.http.impl.client.HttpClients;

Expand All @@ -44,9 +42,7 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

/**
Expand Down Expand Up @@ -74,14 +70,8 @@ public void runSQL(String query) {
query, e.getSQLState(), e.getErrorCode(), e.getMessage());
String errorReason = String.format("Statement '%s' failed with SQL state %s and error code %s. For more " +
"details see %s.", query, e.getSQLState(), e.getErrorCode(), DocumentUrlUtil.getSupportedDocumentUrl());
Optional<ErrorCode> errorCodes = Arrays.stream(ErrorCode.values())
.filter(errorCode -> errorCode.getSqlState().equals(e.getSQLState())).findFirst();
ErrorCategory errorCategory = SnowflakeErrorType.getErrorCategoryFromSqlState(errorCodes.get().getMessageCode());
ErrorType errorType = SnowflakeErrorType.getErrorTypeFromErrorCode(errorCodes.get().getMessageCode());
throw ErrorUtils.getProgramFailureException(errorCategory, errorReason, errorMessage, errorType,
true, ErrorCodeType.SQLSTATE, e.getSQLState(), DocumentUrlUtil.getSupportedDocumentUrl(), e);
SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage);
}

}

/**
Expand Down Expand Up @@ -110,12 +100,7 @@ public List<SnowflakeFieldDescriptor> describeQuery(String query) throws IOExcep
"code %s with message: %s.", e.getSQLState(), e.getErrorCode(), e.getMessage());
String errorReason = String.format("Failed to execute query to fetch descriptors with SQL State %s and error " +
"code %s. For more details %s", e.getSQLState(), e.getErrorCode(), DocumentUrlUtil.getSupportedDocumentUrl());
Optional<ErrorCode> errorCodes = Arrays.stream(ErrorCode.values())
.filter(errorCode -> errorCode.getSqlState().equals(e.getSQLState())).findFirst();
ErrorCategory errorCategory = SnowflakeErrorType.getErrorCategoryFromSqlState(errorCodes.get().getMessageCode());
ErrorType errorType = SnowflakeErrorType.getErrorTypeFromErrorCode(errorCodes.get().getMessageCode());
throw ErrorUtils.getProgramFailureException(errorCategory, errorReason, errorMessage, errorType,
true, ErrorCodeType.SQLSTATE, e.getSQLState(), DocumentUrlUtil.getSupportedDocumentUrl(), e);
SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage);
}
return fieldDescriptors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,10 @@ public void commitJob(JobContext jobContext) {
String destinationStagePath = conf.get(DESTINATION_STAGE_PATH_PROPERTY);

SnowflakeSinkAccessor snowflakeAccessor = new SnowflakeSinkAccessor(config);
snowflakeAccessor.populateTable(destinationStagePath);
snowflakeAccessor.removeDirectory(destinationStagePath);

snowflakeAccessor.populateTable(destinationStagePath);
snowflakeAccessor.removeDirectory(destinationStagePath);
}

@Override
public void setupTask(TaskAttemptContext taskAttemptContext) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void write(NullWritable key, CSVRecord csvRecord) {
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorReason, errorMessage, ErrorType.SYSTEM, true, e);
}

}

private void submitCurrentBatch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,15 @@

package io.cdap.plugin.snowflake.sink.batch;

import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.snowflake.common.SnowflakeErrorType;
import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor;
import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.jdbc.SnowflakeConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Optional;
import java.util.UUID;

/**
Expand Down Expand Up @@ -68,12 +61,7 @@ public void uploadStream(InputStream inputStream, String stageDir) {
String errorMessage = String.format("Failed to compress '%s' and upload data to destination stage '%s' with " +
"errorCode: '%s' and sqlState: '%s' with message: %s.", filename, stageDir, e.getErrorCode(), e.getSQLState(),
e.getMessage());
Optional<ErrorCode> errorCodes = Arrays.stream(ErrorCode.values())
.filter(errorCode -> errorCode.getSqlState().equals(e.getSQLState())).findFirst();
ErrorCategory errorCategory = SnowflakeErrorType.getErrorCategoryFromSqlState(errorCodes.get().getMessageCode());
ErrorType errorType = SnowflakeErrorType.getErrorTypeFromErrorCode(errorCodes.get().getMessageCode());
throw ErrorUtils.getProgramFailureException(errorCategory, errorReason, errorMessage, errorType,
true, ErrorCodeType.SQLSTATE, e.getSQLState(), DocumentUrlUtil.getSupportedDocumentUrl(), e);
SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
package io.cdap.plugin.snowflake.source.batch;

import au.com.bytecode.opencsv.CSVReader;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorCodeType;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.plugin.snowflake.common.SnowflakeErrorType;
import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor;
import io.cdap.plugin.snowflake.common.util.DocumentUrlUtil;
import io.cdap.plugin.snowflake.common.util.QueryUtil;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.jdbc.SnowflakeConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,9 +32,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

/**
Expand Down Expand Up @@ -105,17 +98,12 @@ public List<String> prepareStageSplits() {
DocumentUrlUtil.getSupportedDocumentUrl());
String errorMessage = String.format("Failed to load data into stage '%s' with sqlState %s and errorCode %s. " +
"Failed to execute query with message: %s.", STAGE_PATH, e.getSQLState(), e.getErrorCode(), e.getMessage());
Optional<ErrorCode> errorCodes = Arrays.stream(ErrorCode.values())
.filter(errorCode -> errorCode.getSqlState().equals(e.getSQLState()))
.findFirst();
ErrorCategory errorCategory = SnowflakeErrorType.getErrorCategoryFromSqlState(errorCodes.get().getMessageCode());
ErrorType errorType = SnowflakeErrorType.getErrorTypeFromErrorCode(errorCodes.get().getMessageCode());
throw ErrorUtils.getProgramFailureException(errorCategory, errorReason, errorMessage, errorType,
true, ErrorCodeType.SQLSTATE, e.getSQLState(), DocumentUrlUtil.getSupportedDocumentUrl(), e);
SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage);
}
return stageSplits;
}


/**
* Remove a file from stage.
* @param stageSplit path to file in Snowflake stage.
Expand Down Expand Up @@ -143,13 +131,8 @@ public CSVReader buildCsvReader(String stageSplit) {
String errorMessage = String.format("Failed to execute the query with sqlState: '%s' & errorCode: '%s' " +
"with message: %s, stage split at %s.", e.getSQLState(), e.getErrorCode(),
e.getMessage(), stageSplit);
Optional<ErrorCode> errorCodes = Arrays.stream(ErrorCode.values())
.filter(errorCode -> errorCode.getSqlState().equals(e.getSQLState()))
.findFirst();
ErrorCategory errorCategory = SnowflakeErrorType.getErrorCategoryFromSqlState(errorCodes.get().getMessageCode());
ErrorType errorType = SnowflakeErrorType.getErrorTypeFromErrorCode(errorCodes.get().getMessageCode());
throw ErrorUtils.getProgramFailureException(errorCategory, errorReason, errorMessage, errorType,
true, ErrorCodeType.SQLSTATE, e.getSQLState(), DocumentUrlUtil.getSupportedDocumentUrl(), e);
SnowflakeErrorType.fetchProgramFailureException(e, errorReason, errorMessage);
}
return null;
}
}

0 comments on commit c1eb99e

Please sign in to comment.