Skip to content

Commit

Permalink
add support for r2 and gcs
Browse files Browse the repository at this point in the history
  • Loading branch information
samansmink committed Jul 11, 2024
1 parent f0aae97 commit fde423e
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 4 deletions.
56 changes: 52 additions & 4 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p
ffi::EngineBuilder* builder;

// For "regular" paths we early out with the default builder config
if (!StringUtil::StartsWith(path, "s3://") && !StringUtil::StartsWith(path, "azure://") && !StringUtil::StartsWith(path, "az://") && !StringUtil::StartsWith(path, "abfs://") && !StringUtil::StartsWith(path, "abfss://")) {
if (!StringUtil::StartsWith(path, "s3://") &&
!StringUtil::StartsWith(path, "gcs://") &&
!StringUtil::StartsWith(path, "gs://") &&
!StringUtil::StartsWith(path, "r2://") &&
!StringUtil::StartsWith(path, "azure://") &&
!StringUtil::StartsWith(path, "az://") &&
!StringUtil::StartsWith(path, "abfs://") &&
!StringUtil::StartsWith(path, "abfss://")) {
auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError);
return KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path);
}
Expand All @@ -130,6 +137,33 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p
bucket = path.substr(5, end_of_container-5);
path_in_bucket = path.substr(end_of_container);
secret_type = "s3";
} else if (StringUtil::StartsWith(path, "gcs://")) {
auto end_of_container = path.find('/',6);

if(end_of_container == string::npos) {
throw IOException("Invalid gcs url passed to delta scan: %s", path);
}
bucket = path.substr(6, end_of_container-6);
path_in_bucket = path.substr(end_of_container);
secret_type = "gcs";
} else if (StringUtil::StartsWith(path, "gs://")) {
auto end_of_container = path.find('/',5);

if(end_of_container == string::npos) {
throw IOException("Invalid gcs url passed to delta scan: %s", path);
}
bucket = path.substr(5, end_of_container-5);
path_in_bucket = path.substr(end_of_container);
secret_type = "gcs";
} else if (StringUtil::StartsWith(path, "r2://")) {
auto end_of_container = path.find('/',5);

if(end_of_container == string::npos) {
throw IOException("Invalid gcs url passed to delta scan: %s", path);
}
bucket = path.substr(5, end_of_container-5);
path_in_bucket = path.substr(end_of_container);
secret_type = "r2";
} else if ((StringUtil::StartsWith(path, "azure://")) || (StringUtil::StartsWith(path, "abfss://"))) {
auto end_of_container = path.find('/',8);

Expand Down Expand Up @@ -159,8 +193,18 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p
secret_type = "azure";
}

auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError);
builder = KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path);
// We need to substitute DuckDB's usage of s3 and r2 paths because delta kernel needs to just interpret them as s3 protocol servers.
string cleaned_path;
if (StringUtil::StartsWith(path, "r2://") || StringUtil::StartsWith(path, "gs://") ) {
cleaned_path = "s3://" + path.substr(5);
} else if (StringUtil::StartsWith(path, "gcs://")) {
cleaned_path = "s3://" + path.substr(6);
} else {
cleaned_path = path;
}

auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(cleaned_path), DuckDBEngineError::AllocateError);
builder = KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + cleaned_path);

// For S3 or Azure paths we need to trim the url, set the container, and fetch a potential secret
auto &secret_manager = SecretManager::Get(context);
Expand All @@ -170,13 +214,17 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p

// No secret: nothing left to do here!
if (!secret_match.HasMatch()) {
if (StringUtil::StartsWith(path, "r2://") || StringUtil::StartsWith(path, "gs://") || StringUtil::StartsWith(path, "gcs://")) {
throw NotImplementedException("Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please create an R2 or GCS secret containing the credentials for this endpoint and try again.");
}

return builder;
}
const auto &kv_secret = dynamic_cast<const KeyValueSecret &>(*secret_match.secret_entry->secret);

// Here you would need to add the logic for setting the builder options for Azure
// This is just a placeholder and will need to be replaced with the actual logic
if (secret_type == "s3") {
if (secret_type == "s3" || secret_type == "gcs" || secret_type == "r2") {
auto key_id = kv_secret.TryGetValue("key_id").ToString();
auto secret = kv_secret.TryGetValue("secret").ToString();
auto session_token = kv_secret.TryGetValue("session_token").ToString();
Expand Down
93 changes: 93 additions & 0 deletions test/sql/cloud/minio_local/gcs_r2.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# name: test/sql/cloud/minio_local/gcs_r2.test
# description: test delta extension with GCS and R2
# group: [aws]

require httpfs

require parquet

require delta

require aws

require-env DUCKDB_MINIO_TEST_SERVER_AVAILABLE

require-env AWS_ACCESS_KEY_ID

require-env AWS_SECRET_ACCESS_KEY

require-env AWS_DEFAULT_REGION

require-env AWS_ENDPOINT

statement ok
set secret_directory='__TEST_DIR__/minio_local_gcs_env'

statement error
FROM delta_scan('gcs://test-bucket/dat/all_primitive_types/delta')
----
Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please create an R2 or GCS secret containing the credentials for this endpoint and try again.

statement error
FROM delta_scan('gs://test-bucket/dat/all_primitive_types/delta')
----
Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please create an R2 or GCS secret containing the credentials for this endpoint and try again.

statement error
FROM delta_scan('r2://test-bucket/dat/all_primitive_types/delta')
----
Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please create an R2 or GCS secret containing the credentials for this endpoint and try again.

# create a fake gcs secret
statement ok
CREATE SECRET (
TYPE GCS,
KEY_ID '${AWS_ACCESS_KEY_ID}',
SECRET '${AWS_SECRET_ACCESS_KEY}',
REGION '${AWS_DEFAULT_REGION}',
ENDPOINT '${AWS_ENDPOINT}',
USE_SSL false
)

query I
SELECT int32
FROM delta_scan('gcs://test-bucket-public/dat/all_primitive_types/delta')
----
0
1
2
3
4

query I
SELECT int32
FROM delta_scan('gs://test-bucket-public/dat/all_primitive_types/delta')
----
0
1
2
3
4

# create a fake r2 secret
statement ok
CREATE SECRET s1 (
TYPE R2,
PROVIDER config,
account_id 'some_bogus_account',
KEY_ID '${AWS_ACCESS_KEY_ID}',
SECRET '${AWS_SECRET_ACCESS_KEY}',
REGION '${AWS_DEFAULT_REGION}',
ENDPOINT '${AWS_ENDPOINT}',
USE_SSL false
)

query I
SELECT int32
FROM delta_scan('r2://test-bucket-public/dat/all_primitive_types/delta')
----
0
1
2
3
4

0 comments on commit fde423e

Please sign in to comment.