From 3b50980182591410051b8b7c9e53f9eea8fb6c7d Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Tue, 7 Jan 2025 16:29:47 -0800 Subject: [PATCH] Use our object store in delta lake to properly handle rotating credentials --- Cargo.lock | 19 +++ crates/arroyo-connectors/Cargo.toml | 2 +- .../src/filesystem/sink/delta.rs | 150 +++++++----------- .../src/filesystem/sink/local.rs | 89 +++++------ .../src/filesystem/sink/mod.rs | 88 +++++----- crates/arroyo-storage/src/lib.rs | 27 +--- 6 files changed, 168 insertions(+), 207 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1b64dd71..1ad8ee070 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3369,6 +3369,7 @@ source = "git+https://github.com/delta-io/delta-rs?rev=25ce38956e25722ba7a6cbc0f dependencies = [ "deltalake-aws", "deltalake-core", + "deltalake-gcp", ] [[package]] @@ -3454,6 +3455,24 @@ dependencies = [ "z85", ] +[[package]] +name = "deltalake-gcp" +version = "0.6.0" +source = "git+https://github.com/delta-io/delta-rs?rev=25ce38956e25722ba7a6cbc0f5a7dba6b3361554#25ce38956e25722ba7a6cbc0f5a7dba6b3361554" +dependencies = [ + "async-trait", + "bytes", + "deltalake-core", + "futures", + "lazy_static", + "object_store", + "regex", + "thiserror 2.0.3", + "tokio", + "tracing", + "url", +] + [[package]] name = "der" version = "0.6.1" diff --git a/crates/arroyo-connectors/Cargo.toml b/crates/arroyo-connectors/Cargo.toml index edd62af6e..22331d691 100644 --- a/crates/arroyo-connectors/Cargo.toml +++ b/crates/arroyo-connectors/Cargo.toml @@ -83,7 +83,7 @@ uuid = { version = "1.7.0", features = ["v4"] } # Filesystem parquet = { workspace = true, features = ["async"]} object_store = { workspace = true } -deltalake = { workspace = true, features = ["s3"] } +deltalake = { workspace = true, features = ["s3", "gcs"] } async-compression = { version = "0.4.3", features = ["tokio", "zstd", "gzip"] } # MQTT diff --git a/crates/arroyo-connectors/src/filesystem/sink/delta.rs b/crates/arroyo-connectors/src/filesystem/sink/delta.rs index 3dd3eff7b..9dfd4111b 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/delta.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/delta.rs @@ -1,52 +1,40 @@ use super::FinishedFile; use anyhow::{Context, Result}; -use arrow::datatypes::{Schema, SchemaRef}; -use arroyo_storage::{get_current_credentials, StorageProvider}; +use arrow::datatypes::Schema; +use arroyo_storage::{BackendConfig, StorageProvider}; use arroyo_types::to_millis; +use deltalake::aws::storage::S3StorageBackend; use deltalake::TableProperty::{MinReaderVersion, MinWriterVersion}; use deltalake::{ - aws::storage::s3_constants::AWS_S3_ALLOW_UNSAFE_RENAME, kernel::{Action, Add}, operations::create::CreateBuilder, protocol::SaveMode, table::PeekCommit, - DeltaTableBuilder, + DeltaTable, DeltaTableBuilder, }; -use object_store::{aws::AmazonS3ConfigKey, path::Path}; -use once_cell::sync::Lazy; +use object_store::{path::Path, ObjectStore}; +use std::sync::Arc; use std::{ collections::{HashMap, HashSet}, time::SystemTime, }; use tracing::debug; - -static INIT: Lazy<()> = Lazy::new(|| { - deltalake::aws::register_handlers(None); -}); +use url::Url; pub(crate) async fn commit_files_to_delta( finished_files: &[FinishedFile], relative_table_path: &Path, - storage_provider: &StorageProvider, + table: &mut DeltaTable, last_version: i64, - schema: SchemaRef, ) -> Result> { if finished_files.is_empty() { return Ok(None); } let add_actions = create_add_actions(finished_files, relative_table_path)?; - let table_path = build_table_path(storage_provider, relative_table_path); - let storage_options = configure_storage_options(&table_path, storage_provider).await?; - let mut table = load_or_create_table(&table_path, storage_options, &schema).await?; - - if let Some(new_version) = check_existing_files( - &mut table, - last_version, - finished_files, - relative_table_path, - ) - .await? + + if let Some(new_version) = + check_existing_files(table, last_version, finished_files, relative_table_path).await? { return Ok(Some(new_version)); } @@ -55,72 +43,50 @@ pub(crate) async fn commit_files_to_delta( Ok(Some(new_version)) } -async fn load_or_create_table( - table_path: &str, - storage_options: HashMap, +pub(crate) async fn load_or_create_table( + table_path: &Path, + storage_provider: &StorageProvider, schema: &Schema, -) -> Result { - Lazy::force(&INIT); +) -> Result { deltalake::aws::register_handlers(None); - match DeltaTableBuilder::from_uri(table_path) - .with_storage_options(storage_options.clone()) - .load() - .await - { - Ok(table) => Ok(table), - Err(deltalake::DeltaTableError::NotATable(_)) => { - create_new_table(table_path, storage_options, schema).await - } - Err(err) => Err(err.into()), - } -} - -async fn create_new_table( - table_path: &str, - storage_options: HashMap, - schema: &Schema, -) -> Result { - let delta_object_store = DeltaTableBuilder::from_uri(table_path) - .with_storage_options(storage_options) - .build_storage()?; - let delta_schema: deltalake::kernel::Schema = (schema).try_into()?; - CreateBuilder::new() - .with_log_store(delta_object_store) - .with_columns(delta_schema.fields().cloned()) - .with_configuration_property(MinReaderVersion, Some("3")) - .with_configuration_property(MinWriterVersion, Some("7")) - .await - .map_err(Into::into) -} - -async fn configure_storage_options( - table_path: &str, - storage_provider: &StorageProvider, -) -> Result> { - let mut options = storage_provider.storage_options().clone(); - if table_path.starts_with("s3://") { - update_s3_credentials(&mut options).await?; - } - Ok(options) -} - -async fn update_s3_credentials(options: &mut HashMap) -> Result<()> { - if !options.contains_key(AmazonS3ConfigKey::SecretAccessKey.as_ref()) { - let tmp_credentials = get_current_credentials().await?; - options.insert( - AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(), - tmp_credentials.key_id.clone(), - ); - options.insert( - AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(), - tmp_credentials.secret_key.clone(), - ); - if let Some(token) = tmp_credentials.token.as_ref() { - options.insert(AmazonS3ConfigKey::Token.as_ref().to_string(), token.clone()); - } + deltalake::gcp::register_handlers(None); + + let (backing_store, url): (Arc, _) = match storage_provider.config() { + BackendConfig::S3(_) => ( + Arc::new(S3StorageBackend::try_new( + storage_provider.get_backing_store(), + true, + )?), + format!("s3://{}", storage_provider.qualify_path(table_path)), + ), + BackendConfig::GCS(_) => ( + storage_provider.get_backing_store(), + format!("gs://{}", storage_provider.qualify_path(table_path)), + ), + BackendConfig::Local(_) => (storage_provider.get_backing_store(), table_path.to_string()), + }; + + let mut delta = DeltaTableBuilder::from_uri(&url) + .with_storage_backend( + backing_store, + Url::parse(&storage_provider.canonical_url())?, + ) + .build()?; + + println!("Table uri = {}", delta.table_uri()); + + if delta.verify_deltatable_existence().await? { + delta.load().await?; + Ok(delta) + } else { + let delta_schema: deltalake::kernel::Schema = schema.try_into()?; + Ok(CreateBuilder::new() + .with_log_store(delta.log_store()) + .with_columns(delta_schema.fields().cloned()) + .with_configuration_property(MinReaderVersion, Some("3")) + .with_configuration_property(MinWriterVersion, Some("7")) + .await?) } - options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_string(), "true".to_string()); - Ok(()) } fn create_add_actions( @@ -158,7 +124,7 @@ fn create_add_action(file: &FinishedFile, relative_table_path: &Path) -> Result< } async fn check_existing_files( - table: &mut deltalake::DeltaTable, + table: &mut DeltaTable, last_version: i64, finished_files: &[FinishedFile], relative_table_path: &Path, @@ -191,7 +157,7 @@ async fn check_existing_files( Ok(None) } -async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec) -> Result { +async fn commit_to_delta(table: &mut DeltaTable, add_actions: Vec) -> Result { Ok(deltalake::operations::transaction::CommitBuilder::default() .with_actions(add_actions) .build( @@ -206,11 +172,3 @@ async fn commit_to_delta(table: deltalake::DeltaTable, add_actions: Vec) .await? .version) } - -fn build_table_path(storage_provider: &StorageProvider, relative_table_path: &Path) -> String { - format!( - "{}/{}", - storage_provider.object_store_base_url(), - relative_table_path - ) -} diff --git a/crates/arroyo-connectors/src/filesystem/sink/local.rs b/crates/arroyo-connectors/src/filesystem/sink/local.rs index 96b426547..a9a8ae07b 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/local.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/local.rs @@ -2,29 +2,25 @@ use std::{collections::HashMap, fs::create_dir_all, path::Path, sync::Arc, time: use arrow::record_batch::RecordBatch; use arroyo_operator::context::OperatorContext; -use arroyo_rpc::{ - df::{ArroyoSchema, ArroyoSchemaRef}, - formats::Format, - OperatorConfig, -}; +use arroyo_rpc::{df::ArroyoSchemaRef, formats::Format, OperatorConfig}; use arroyo_storage::StorageProvider; use arroyo_types::TaskInfo; use async_trait::async_trait; use bincode::{Decode, Encode}; use datafusion::physical_plan::PhysicalExpr; use tokio::{fs::OpenOptions, io::AsyncWriteExt}; -use tracing::info; +use tracing::debug; use uuid::Uuid; -use crate::filesystem::{sink::two_phase_committer::TwoPhaseCommitter, FileSettings}; -use anyhow::{bail, Result}; - use super::{ add_suffix_prefix, delta, get_partitioner_from_file_settings, parquet::batches_by_partition, two_phase_committer::TwoPhaseCommitterOperator, CommitState, CommitStyle, FileNaming, FileSystemTable, FilenameStrategy, FinishedFile, MultiPartWriterStats, RollingPolicy, TableType, }; +use crate::filesystem::sink::delta::load_or_create_table; +use crate::filesystem::{sink::two_phase_committer::TwoPhaseCommitter, FileSettings}; +use anyhow::{bail, Result}; pub struct LocalFileSystemWriter { // writer to a local tmp file @@ -40,7 +36,7 @@ pub struct LocalFileSystemWriter { file_settings: FileSettings, format: Option, schema: Option, - commit_state: CommitState, + commit_state: Option, filenaming: FileNaming, } @@ -64,10 +60,6 @@ impl LocalFileSystemWriter { else { unreachable!("LocalFileSystemWriter can only be used as a sink") }; - let commit_state = match file_settings.as_ref().unwrap().commit_style.unwrap() { - CommitStyle::DeltaLake => CommitState::DeltaLake { last_version: -1 }, - CommitStyle::Direct => CommitState::VanillaParquet, - }; let mut filenaming = file_settings .clone() @@ -92,36 +84,16 @@ impl LocalFileSystemWriter { partitioner: None, finished_files: Vec::new(), file_settings: file_settings.clone().unwrap(), - schema: None, format: config.format, rolling_policy: RollingPolicy::from_file_settings(file_settings.as_ref().unwrap()), table_properties, - commit_state, + schema: None, + commit_state: None, filenaming, }; TwoPhaseCommitterOperator::new(writer) } - fn init_schema_and_partitioner(&mut self, record_batch: &RecordBatch) -> Result<()> { - if self.schema.is_none() { - self.schema = Some(Arc::new(ArroyoSchema::from_fields( - record_batch - .schema() - .fields() - .into_iter() - .map(|field| field.as_ref().clone()) - .collect(), - ))); - } - if self.partitioner.is_none() { - self.partitioner = get_partitioner_from_file_settings( - self.file_settings.clone(), - self.schema.as_ref().unwrap().clone(), - ); - } - Ok(()) - } - fn get_or_insert_writer(&mut self, partition: &Option) -> &mut V { let filename_strategy = match self.filenaming.strategy { Some(FilenameStrategy::Uuid) => FilenameStrategy::Uuid, @@ -258,13 +230,33 @@ impl TwoPhaseCommitter for LocalFileSystemWrite self.subtask_id = ctx.task_info.task_index as usize; self.finished_files = recovered_files; self.next_file_index = max_file_index; + + let storage_provider = StorageProvider::for_url(&self.final_dir).await?; + + let schema = Arc::new(ctx.in_schemas[0].clone()); + + self.commit_state = Some(match self.file_settings.commit_style.unwrap() { + CommitStyle::DeltaLake => CommitState::DeltaLake { + last_version: -1, + table: load_or_create_table( + &object_store::path::Path::parse(&self.final_dir)?, + &storage_provider, + &schema.schema_without_timestamp(), + ) + .await?, + }, + CommitStyle::Direct => CommitState::VanillaParquet, + }); + + self.partitioner = + get_partitioner_from_file_settings(self.file_settings.clone(), schema.clone()); + + self.schema = Some(schema); + Ok(()) } async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { - if self.schema.is_none() { - self.init_schema_and_partitioner(&batch)?; - } if let Some(partitioner) = self.partitioner.as_ref() { for (batch, partition) in batches_by_partition(batch, partitioner.clone())? { let writer = self.get_or_insert_writer(&partition); @@ -298,7 +290,7 @@ impl TwoPhaseCommitter for LocalFileSystemWrite if !tmp_file.exists() { bail!("tmp file {} does not exist", tmp_file.to_string_lossy()); } - info!( + debug!( "committing file {} to {}", tmp_file.to_string_lossy(), destination.to_string_lossy() @@ -311,20 +303,21 @@ impl TwoPhaseCommitter for LocalFileSystemWrite size: destination.metadata()?.len() as usize, }); } - if let CommitState::DeltaLake { last_version } = self.commit_state { - let storage_provider = Arc::new(StorageProvider::for_url("/").await?); + + if let CommitState::DeltaLake { + last_version, + table, + } = self.commit_state.as_mut().unwrap() + { if let Some(version) = delta::commit_files_to_delta( &finished_files, &object_store::path::Path::parse(&self.final_dir)?, - &storage_provider, - last_version, - Arc::new(self.schema.as_ref().unwrap().schema_without_timestamp()), + table, + *last_version, ) .await? { - self.commit_state = CommitState::DeltaLake { - last_version: version, - }; + *last_version = version; } } Ok(()) diff --git a/crates/arroyo-connectors/src/filesystem/sink/mod.rs b/crates/arroyo-connectors/src/filesystem/sink/mod.rs index 8c0c24249..92ed70665 100644 --- a/crates/arroyo-connectors/src/filesystem/sink/mod.rs +++ b/crates/arroyo-connectors/src/filesystem/sink/mod.rs @@ -32,6 +32,7 @@ use datafusion::{ physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}, scalar::ScalarValue, }; +use deltalake::DeltaTable; use futures::{stream::FuturesUnordered, Future}; use futures::{stream::StreamExt, TryStreamExt}; use object_store::{multipart::PartId, path::Path, MultipartId}; @@ -56,6 +57,7 @@ use self::{ }, }; +use crate::filesystem::sink::delta::load_or_create_table; use crate::filesystem::{ CommitStyle, FileNaming, FileSettings, FileSystemTable, FilenameStrategy, TableType, }; @@ -109,7 +111,7 @@ impl FileSystemSink { Self::create_and_start(table_properties, config.format) } - pub fn start(&mut self, schema: ArroyoSchemaRef) -> Result<()> { + pub async fn start(&mut self, schema: ArroyoSchemaRef) -> Result<()> { let TableType::Sink { write_path, file_settings, @@ -132,21 +134,22 @@ impl FileSystemSink { self.partitioner = partition_func; let table = self.table.clone(); let format = self.format.clone(); + let storage_path: Path = StorageProvider::get_key(&write_path).unwrap(); + let provider = StorageProvider::for_url_with_options(&write_path, storage_options.clone()) + .await + .unwrap(); + let mut writer = AsyncMultipartFileSystemWriter::::new( + storage_path, + Arc::new(provider), + receiver, + checkpoint_sender, + table, + format, + schema, + ) + .await?; + tokio::spawn(async move { - let storage_path: Path = StorageProvider::get_key(&write_path).unwrap(); - let provider = - StorageProvider::for_url_with_options(&write_path, storage_options.clone()) - .await - .unwrap(); - let mut writer = AsyncMultipartFileSystemWriter::::new( - storage_path, - Arc::new(provider), - receiver, - checkpoint_sender, - table, - format, - schema, - ); writer.run().await.unwrap(); }); Ok(()) @@ -437,9 +440,12 @@ struct AsyncMultipartFileSystemWriter { schema: ArroyoSchemaRef, } -#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] +#[derive(Debug)] pub enum CommitState { - DeltaLake { last_version: i64 }, + DeltaLake { + last_version: i64, + table: DeltaTable, + }, VanillaParquet, } @@ -701,7 +707,7 @@ impl AsyncMultipartFileSystemWriter where R: MultiPartWriter, { - fn new( + async fn new( path: Path, object_store: Arc, receiver: Receiver, @@ -709,7 +715,7 @@ where writer_properties: FileSystemTable, format: Option, schema: ArroyoSchemaRef, - ) -> Self { + ) -> Result { let file_settings = if let TableType::Sink { ref file_settings, .. } = writer_properties.table_type @@ -720,7 +726,15 @@ where }; let commit_state = match file_settings.commit_style.unwrap() { - CommitStyle::DeltaLake => CommitState::DeltaLake { last_version: -1 }, + CommitStyle::DeltaLake => CommitState::DeltaLake { + last_version: -1, + table: load_or_create_table( + &path, + &object_store, + &schema.schema_without_timestamp(), + ) + .await?, + }, CommitStyle::Direct => CommitState::VanillaParquet, }; let mut file_naming = file_settings.file_naming.clone().unwrap_or(FileNaming { @@ -732,7 +746,7 @@ where file_naming.suffix = Some(R::suffix()); } - Self { + Ok(Self { path, active_writers: HashMap::new(), watermark: None, @@ -750,7 +764,7 @@ where file_naming, format, schema, - } + }) } fn add_part_to_finish(&mut self, file_to_finish: FileToFinish) { @@ -926,19 +940,16 @@ where finished_files.push(file); } } - if let CommitState::DeltaLake { last_version } = self.commit_state { - if let Some(new_version) = delta::commit_files_to_delta( - &finished_files, - &self.path, - &self.object_store, - last_version, - Arc::new(self.schema.schema_without_timestamp()), - ) - .await? + if let CommitState::DeltaLake { + last_version, + table, + } = &mut self.commit_state + { + if let Some(new_version) = + delta::commit_files_to_delta(&finished_files, &self.path, table, *last_version) + .await? { - self.commit_state = CommitState::DeltaLake { - last_version: new_version, - }; + *last_version = new_version; } } let finished_message = CheckpointData::Finished { @@ -950,8 +961,8 @@ where } fn delta_version(&mut self) -> i64 { - match self.commit_state { - CommitState::DeltaLake { last_version } => last_version, + match &self.commit_state { + CommitState::DeltaLake { last_version, .. } => *last_version, CommitState::VanillaParquet => 0, } } @@ -1553,9 +1564,12 @@ impl TwoPhaseCommitter for FileSystemSink, ) -> Result<()> { - self.start(Arc::new(ctx.in_schemas.first().unwrap().clone()))?; + self.start(Arc::new(ctx.in_schemas.first().unwrap().clone())) + .await?; + let mut max_file_index = 0; let mut recovered_files = Vec::new(); + for file_system_data_recovery in data_recovery { max_file_index = max_file_index.max(file_system_data_recovery.next_file_index); // task 0 is responsible for recovering all files. diff --git a/crates/arroyo-storage/src/lib.rs b/crates/arroyo-storage/src/lib.rs index b187d672d..b7a6eacef 100644 --- a/crates/arroyo-storage/src/lib.rs +++ b/crates/arroyo-storage/src/lib.rs @@ -2,15 +2,15 @@ use arroyo_rpc::retry; use aws::ArroyoCredentialProvider; use bytes::Bytes; use futures::{Stream, StreamExt}; -use object_store::aws::{AmazonS3ConfigKey, AwsCredential}; +use object_store::aws::AmazonS3ConfigKey; use object_store::buffered::BufWriter; use object_store::gcp::GoogleCloudStorageBuilder; use object_store::multipart::{MultipartStore, PartId}; use object_store::path::Path; +use object_store::ObjectMeta; use object_store::{ aws::AmazonS3Builder, local::LocalFileSystem, MultipartId, ObjectStore, PutPayload, }; -use object_store::{CredentialProvider, ObjectMeta}; use regex::{Captures, Regex}; use std::borrow::Cow; use std::fmt::{Debug, Formatter}; @@ -35,9 +35,6 @@ pub struct StorageProvider { object_store: Arc, multipart_store: Option>, canonical_url: String, - // A URL that object_store can parse. - // May require storage_options to properly instantiate - object_store_base_url: String, storage_options: HashMap, } @@ -305,12 +302,6 @@ fn last(opts: [Option; COUNT]) -> Option { opts.into_iter().flatten().last() } -pub async fn get_current_credentials() -> Result, StorageError> { - let provider = ArroyoCredentialProvider::try_new().await?; - let credentials = provider.get_credential().await?; - Ok(credentials) -} - impl StorageProvider { pub async fn for_url(url: &str) -> Result { Self::for_url_with_options(url, HashMap::new()).await @@ -424,7 +415,6 @@ impl StorageProvider { if let Some(key) = &config.key { canonical_url = format!("{}/{}", canonical_url, key); } - let object_store_base_url = format!("s3://{}", config.bucket); let object_store = Arc::new(builder.build().map_err(Into::::into)?); @@ -433,7 +423,6 @@ impl StorageProvider { object_store: object_store.clone(), multipart_store: Some(object_store), canonical_url, - object_store_base_url, storage_options: s3_options .into_iter() .map(|(k, v)| (k.as_ref().to_string(), v)) @@ -454,15 +443,12 @@ impl StorageProvider { canonical_url = format!("{}/{}", canonical_url, key); } - let object_store_base_url = format!("https://{}.storage.googleapis.com", config.bucket); - let object_store = Arc::new(builder.build()?); Ok(Self { config: BackendConfig::GCS(config), object_store: object_store.clone(), multipart_store: Some(object_store), - object_store_base_url, canonical_url, storage_options: HashMap::new(), }) @@ -481,13 +467,11 @@ impl StorageProvider { ); let canonical_url = format!("file://{}", config.path); - let object_store_base_url = canonical_url.clone(); Ok(Self { config: BackendConfig::Local(config), object_store, multipart_store: None, canonical_url, - object_store_base_url, storage_options: HashMap::new(), }) } @@ -627,13 +611,6 @@ impl StorageProvider { format!("{}/{}", self.canonical_url, path) } - // Returns a url that will, combined with storage_options, parse to - // the same ObjectStore as self.object_store. - // Needed for systems that build their own ObjectStore, such as delta-rs - pub fn object_store_base_url(&self) -> &str { - &self.object_store_base_url - } - pub fn storage_options(&self) -> &HashMap { &self.storage_options }