From fde423e69b4edb08f40922988096619d9acf28cc Mon Sep 17 00:00:00 2001 From: Sam Ansmink Date: Thu, 11 Jul 2024 15:57:08 +0200 Subject: [PATCH] add support for r2 and gcs --- src/functions/delta_scan.cpp | 56 ++++++++++++++-- test/sql/cloud/minio_local/gcs_r2.test | 93 ++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 4 deletions(-) create mode 100644 test/sql/cloud/minio_local/gcs_r2.test diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index a1c435c..5d50c8b 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -112,7 +112,14 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p ffi::EngineBuilder* builder; // For "regular" paths we early out with the default builder config - if (!StringUtil::StartsWith(path, "s3://") && !StringUtil::StartsWith(path, "azure://") && !StringUtil::StartsWith(path, "az://") && !StringUtil::StartsWith(path, "abfs://") && !StringUtil::StartsWith(path, "abfss://")) { + if (!StringUtil::StartsWith(path, "s3://") && + !StringUtil::StartsWith(path, "gcs://") && + !StringUtil::StartsWith(path, "gs://") && + !StringUtil::StartsWith(path, "r2://") && + !StringUtil::StartsWith(path, "azure://") && + !StringUtil::StartsWith(path, "az://") && + !StringUtil::StartsWith(path, "abfs://") && + !StringUtil::StartsWith(path, "abfss://")) { auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError); return KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path); } @@ -130,6 +137,33 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p bucket = path.substr(5, end_of_container-5); path_in_bucket = path.substr(end_of_container); secret_type = "s3"; + } else if (StringUtil::StartsWith(path, "gcs://")) { + auto end_of_container = path.find('/',6); + + if(end_of_container == string::npos) { + throw IOException("Invalid gcs url passed to delta scan: %s", path); + } + bucket = path.substr(6, end_of_container-6); + path_in_bucket = path.substr(end_of_container); + secret_type = "gcs"; + } else if (StringUtil::StartsWith(path, "gs://")) { + auto end_of_container = path.find('/',5); + + if(end_of_container == string::npos) { + throw IOException("Invalid gcs url passed to delta scan: %s", path); + } + bucket = path.substr(5, end_of_container-5); + path_in_bucket = path.substr(end_of_container); + secret_type = "gcs"; + } else if (StringUtil::StartsWith(path, "r2://")) { + auto end_of_container = path.find('/',5); + + if(end_of_container == string::npos) { + throw IOException("Invalid gcs url passed to delta scan: %s", path); + } + bucket = path.substr(5, end_of_container-5); + path_in_bucket = path.substr(end_of_container); + secret_type = "r2"; } else if ((StringUtil::StartsWith(path, "azure://")) || (StringUtil::StartsWith(path, "abfss://"))) { auto end_of_container = path.find('/',8); @@ -159,8 +193,18 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p secret_type = "azure"; } - auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError); - builder = KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path); + // We need to substitute DuckDB's usage of s3 and r2 paths because delta kernel needs to just interpret them as s3 protocol servers. + string cleaned_path; + if (StringUtil::StartsWith(path, "r2://") || StringUtil::StartsWith(path, "gs://") ) { + cleaned_path = "s3://" + path.substr(5); + } else if (StringUtil::StartsWith(path, "gcs://")) { + cleaned_path = "s3://" + path.substr(6); + } else { + cleaned_path = path; + } + + auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(cleaned_path), DuckDBEngineError::AllocateError); + builder = KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + cleaned_path); // For S3 or Azure paths we need to trim the url, set the container, and fetch a potential secret auto &secret_manager = SecretManager::Get(context); @@ -170,13 +214,17 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p // No secret: nothing left to do here! if (!secret_match.HasMatch()) { + if (StringUtil::StartsWith(path, "r2://") || StringUtil::StartsWith(path, "gs://") || StringUtil::StartsWith(path, "gcs://")) { + throw NotImplementedException("Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please create an R2 or GCS secret containing the credentials for this endpoint and try again."); + } + return builder; } const auto &kv_secret = dynamic_cast(*secret_match.secret_entry->secret); // Here you would need to add the logic for setting the builder options for Azure // This is just a placeholder and will need to be replaced with the actual logic - if (secret_type == "s3") { + if (secret_type == "s3" || secret_type == "gcs" || secret_type == "r2") { auto key_id = kv_secret.TryGetValue("key_id").ToString(); auto secret = kv_secret.TryGetValue("secret").ToString(); auto session_token = kv_secret.TryGetValue("session_token").ToString(); diff --git a/test/sql/cloud/minio_local/gcs_r2.test b/test/sql/cloud/minio_local/gcs_r2.test new file mode 100644 index 0000000..319380c --- /dev/null +++ b/test/sql/cloud/minio_local/gcs_r2.test @@ -0,0 +1,93 @@ +# name: test/sql/cloud/minio_local/gcs_r2.test +# description: test delta extension with GCS and R2 +# group: [aws] + +require httpfs + +require parquet + +require delta + +require aws + +require-env DUCKDB_MINIO_TEST_SERVER_AVAILABLE + +require-env AWS_ACCESS_KEY_ID + +require-env AWS_SECRET_ACCESS_KEY + +require-env AWS_DEFAULT_REGION + +require-env AWS_ENDPOINT + +statement ok +set secret_directory='__TEST_DIR__/minio_local_gcs_env' + +statement error +FROM delta_scan('gcs://test-bucket/dat/all_primitive_types/delta') +---- +Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please create an R2 or GCS secret containing the credentials for this endpoint and try again. + +statement error +FROM delta_scan('gs://test-bucket/dat/all_primitive_types/delta') +---- +Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please create an R2 or GCS secret containing the credentials for this endpoint and try again. + +statement error +FROM delta_scan('r2://test-bucket/dat/all_primitive_types/delta') +---- +Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please create an R2 or GCS secret containing the credentials for this endpoint and try again. + +# create a fake gcs secret +statement ok +CREATE SECRET ( + TYPE GCS, + KEY_ID '${AWS_ACCESS_KEY_ID}', + SECRET '${AWS_SECRET_ACCESS_KEY}', + REGION '${AWS_DEFAULT_REGION}', + ENDPOINT '${AWS_ENDPOINT}', + USE_SSL false +) + +query I +SELECT int32 +FROM delta_scan('gcs://test-bucket-public/dat/all_primitive_types/delta') +---- +0 +1 +2 +3 +4 + +query I +SELECT int32 +FROM delta_scan('gs://test-bucket-public/dat/all_primitive_types/delta') +---- +0 +1 +2 +3 +4 + +# create a fake r2 secret +statement ok +CREATE SECRET s1 ( + TYPE R2, + PROVIDER config, + account_id 'some_bogus_account', + KEY_ID '${AWS_ACCESS_KEY_ID}', + SECRET '${AWS_SECRET_ACCESS_KEY}', + REGION '${AWS_DEFAULT_REGION}', + ENDPOINT '${AWS_ENDPOINT}', + USE_SSL false +) + +query I +SELECT int32 +FROM delta_scan('r2://test-bucket-public/dat/all_primitive_types/delta') +---- +0 +1 +2 +3 +4