Skip to content

Commit

Permalink
SNOW-1886186 - gather threadExecutor callables and call Future.get() …
Browse files Browse the repository at this point in the history
…to prevent silent fails (#2035)
  • Loading branch information
sfc-gh-mkubik authored Feb 3, 2025
1 parent 19db9b3 commit 4d73661
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 132 deletions.
4 changes: 3 additions & 1 deletion src/main/java/net/snowflake/client/jdbc/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ public enum ErrorCode {
AUTHENTICATOR_REQUEST_TIMEOUT(200062, SqlState.CONNECTION_EXCEPTION),
INVALID_STRUCT_DATA(200063, SqlState.DATA_EXCEPTION),
DISABLEOCSP_INSECUREMODE_VALUE_MISMATCH(200064, SqlState.INVALID_PARAMETER_VALUE),
TOO_MANY_FILES_TO_DOWNLOAD_AS_STREAM(200065, SqlState.DATA_EXCEPTION);
TOO_MANY_FILES_TO_DOWNLOAD_AS_STREAM(200065, SqlState.DATA_EXCEPTION),
FILE_OPERATION_UPLOAD_ERROR(200066, SqlState.INTERNAL_ERROR),
FILE_OPERATION_DOWNLOAD_ERROR(200067, SqlState.INTERNAL_ERROR);

public static final String errorMessageResource = "net.snowflake.client.jdbc.jdbc_error_messages";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -1667,7 +1668,7 @@ private void uploadStream() throws SnowflakeSQLException {
queryID,
ex.getCause(),
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode());
ErrorCode.FILE_OPERATION_UPLOAD_ERROR.getMessageCode());
}
logger.debug("Done with uploading from a stream");
} finally {
Expand Down Expand Up @@ -1752,6 +1753,7 @@ private void downloadFiles() throws SnowflakeSQLException {
try {
threadExecutor = SnowflakeUtil.createDefaultExecutorService("sf-file-download-worker-", 1);

List<Future<Void>> downloadFileFutures = new LinkedList<>();
for (String srcFile : sourceFiles) {
FileMetadata fileMetadata = fileMetadataMap.get(srcFile);

Expand All @@ -1768,21 +1770,22 @@ private void downloadFiles() throws SnowflakeSQLException {

RemoteStoreFileEncryptionMaterial encMat = srcFileToEncMat.get(srcFile);
String presignedUrl = srcFileToPresignedUrl.get(srcFile);
threadExecutor.submit(
getDownloadFileCallable(
stageInfo,
srcFile,
localLocation,
fileMetadataMap,
(stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS)
? null
: storageFactory.createClient(stageInfo, parallel, encMat, session),
session,
command,
parallel,
encMat,
presignedUrl,
queryID));
downloadFileFutures.add(
threadExecutor.submit(
getDownloadFileCallable(
stageInfo,
srcFile,
localLocation,
fileMetadataMap,
(stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS)
? null
: storageFactory.createClient(stageInfo, parallel, encMat, session),
session,
command,
parallel,
encMat,
presignedUrl,
queryID)));

logger.debug("Submitted download job for: {}", srcFile);
}
Expand All @@ -1792,9 +1795,20 @@ private void downloadFiles() throws SnowflakeSQLException {
try {
// wait for all threads to complete without timeout
threadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
for (Future<Void> downloadFileFuture : downloadFileFutures) {
if (downloadFileFuture.isDone()) {
downloadFileFuture.get();
}
}
} catch (InterruptedException ex) {
throw new SnowflakeSQLLoggedException(
queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED);
} catch (ExecutionException ex) {
throw new SnowflakeSQLException(
queryID,
ex.getCause(),
SqlState.INTERNAL_ERROR,
ErrorCode.FILE_OPERATION_DOWNLOAD_ERROR.getMessageCode());
}
logger.debug("Done with downloading");
} finally {
Expand Down Expand Up @@ -1835,6 +1849,7 @@ private void uploadFiles(Set<String> fileList, int parallel) throws SnowflakeSQL
threadExecutor =
SnowflakeUtil.createDefaultExecutorService("sf-file-upload-worker-", parallel);

List<Future<Void>> uploadFileFutures = new LinkedList<>();
for (String srcFile : fileList) {
FileMetadata fileMetadata = fileMetadataMap.get(srcFile);

Expand Down Expand Up @@ -1862,23 +1877,24 @@ private void uploadFiles(Set<String> fileList, int parallel) throws SnowflakeSQL
int delay = session.getInjectWaitInPut();
setUploadDelay(delay);

threadExecutor.submit(
getUploadFileCallable(
stageInfo,
srcFile,
fileMetadata,
(stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS)
? null
: storageFactory.createClient(
stageInfo, parallel, encryptionMaterial.get(0), session),
session,
command,
null,
false,
(parallel > 1 ? 1 : this.parallel),
srcFileObj,
encryptionMaterial.get(0),
queryID));
uploadFileFutures.add(
threadExecutor.submit(
getUploadFileCallable(
stageInfo,
srcFile,
fileMetadata,
(stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS)
? null
: storageFactory.createClient(
stageInfo, parallel, encryptionMaterial.get(0), session),
session,
command,
null,
false,
(parallel > 1 ? 1 : this.parallel),
srcFileObj,
encryptionMaterial.get(0),
queryID)));

logger.debug("Submitted copy job for: {}", srcFile);
}
Expand All @@ -1889,9 +1905,20 @@ private void uploadFiles(Set<String> fileList, int parallel) throws SnowflakeSQL
try {
// wait for all threads to complete without timeout
threadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
for (Future<Void> uploadFileFuture : uploadFileFutures) {
if (uploadFileFuture.isDone()) {
uploadFileFuture.get();
}
}
} catch (InterruptedException ex) {
throw new SnowflakeSQLLoggedException(
queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED);
} catch (ExecutionException ex) {
throw new SnowflakeSQLException(
queryID,
ex.getCause(),
SqlState.INTERNAL_ERROR,
ErrorCode.FILE_OPERATION_UPLOAD_ERROR.getMessageCode());
}
logger.debug("Done with uploading");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,21 @@ public void download(
// Get the user-defined BLOB metadata
Map<String, String> userDefinedMetadata =
SnowflakeUtil.createCaseInsensitiveMap(blob.getMetadata());
AbstractMap.SimpleEntry<String, String> encryptionData =
parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId);

String key = encryptionData.getKey();
String iv = encryptionData.getValue();

if (this.isEncrypting() && this.getEncryptionKeySize() <= 256) {
if (!userDefinedMetadata.containsKey(AZ_ENCRYPTIONDATAPROP)) {
throw new SnowflakeSQLLoggedException(
queryId,
session,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Encryption data not found in the metadata of a file being downloaded");
}
AbstractMap.SimpleEntry<String, String> encryptionData =
parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId);

String key = encryptionData.getKey();
String iv = encryptionData.getValue();
stopwatch.restart();
if (key == null || iv == null) {
throw new SnowflakeSQLLoggedException(
Expand Down Expand Up @@ -452,12 +460,20 @@ public InputStream downloadToStream(
long downloadMillis = stopwatch.elapsedMillis();
Map<String, String> userDefinedMetadata =
SnowflakeUtil.createCaseInsensitiveMap(blob.getMetadata());
AbstractMap.SimpleEntry<String, String> encryptionData =
parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId);
String key = encryptionData.getKey();
String iv = encryptionData.getValue();

if (this.isEncrypting() && this.getEncryptionKeySize() <= 256) {
if (!userDefinedMetadata.containsKey(AZ_ENCRYPTIONDATAPROP)) {
throw new SnowflakeSQLLoggedException(
queryId,
session,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Encryption data not found in the metadata of a file being downloaded");
}
AbstractMap.SimpleEntry<String, String> encryptionData =
parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId);
String key = encryptionData.getKey();
String iv = encryptionData.getValue();
stopwatch.restart();
if (key == null || iv == null) {
throw new SnowflakeSQLLoggedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,7 @@ Error message={3}, Extended error info={4}
200061=GCS operation failed: Operation={0}, Error code={1}, Message={2}, Reason={3}
200062=Authentication timed out.
200063=Invalid data - Cannot be parsed and converted to structured type.

200064=The values for 'disableOCSPChecks' and 'insecureMode' must be identical.
200065=Too many files to download as stream
200066=JDBC driver file operation error while performing stage upload.
200067=JDBC driver file operation error while performing stage download.
Loading

0 comments on commit 4d73661

Please sign in to comment.