Skip to content

Commit

Permalink
Use correct (relative) delta paths when writing to object stores (#693)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Jul 26, 2024
1 parent 4ca6edf commit ec5305e
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 17 deletions.
21 changes: 11 additions & 10 deletions crates/arroyo-connectors/src/filesystem/sink/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@ use object_store::{aws::AmazonS3ConfigKey, path::Path};
use once_cell::sync::Lazy;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::SystemTime,
};
use tracing::info;
use tracing::debug;

static INIT: Lazy<()> = Lazy::new(|| {
deltalake::aws::register_handlers(None);
});

pub(crate) async fn commit_files_to_delta(
finished_files: Vec<FinishedFile>,
relative_table_path: Path,
storage_provider: Arc<StorageProvider>,
finished_files: &[FinishedFile],
relative_table_path: &Path,
storage_provider: &StorageProvider,
last_version: i64,
schema: SchemaRef,
) -> Result<Option<i64>> {
Expand All @@ -37,8 +36,8 @@ pub(crate) async fn commit_files_to_delta(

let add_actions = create_add_actions(&finished_files, &relative_table_path)?;
let table_path = build_table_path(&storage_provider, &relative_table_path);
let storage_options = configure_storage_options(&table_path, storage_provider.clone()).await?;
let mut table = load_or_create_table(&table_path, storage_options.clone(), &schema).await?;
let storage_options = configure_storage_options(&table_path, &storage_provider).await?;
let mut table = load_or_create_table(&table_path, storage_options, &schema).await?;

if let Some(new_version) = check_existing_files(
&mut table,
Expand Down Expand Up @@ -93,7 +92,7 @@ async fn create_new_table(

async fn configure_storage_options(
table_path: &str,
storage_provider: Arc<StorageProvider>,
storage_provider: &StorageProvider,
) -> Result<HashMap<String, String>> {
let mut options = storage_provider.storage_options().clone();
if table_path.starts_with("s3://") {
Expand Down Expand Up @@ -132,7 +131,7 @@ fn create_add_actions(
}

fn create_add_action(file: &FinishedFile, relative_table_path: &Path) -> Result<Action> {
info!(
debug!(
"creating add action for file {:?}, relative table path {}",
file, relative_table_path
);
Expand All @@ -142,7 +141,9 @@ fn create_add_action(file: &FinishedFile, relative_table_path: &Path) -> Result<
.context(format!(
"File {} is not in table {}",
file.filename, relative_table_path
))?;
))?
.trim_start_matches('/');

Ok(Action::Add(Add {
path: subpath.to_string(),
size: file.size as i64,
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-connectors/src/filesystem/sink/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ impl<V: LocalWriter + Send + 'static> TwoPhaseCommitter for LocalFileSystemWrite
if let CommitState::DeltaLake { last_version } = self.commit_state {
let storage_provider = Arc::new(StorageProvider::for_url("/").await?);
if let Some(version) = delta::commit_files_to_delta(
finished_files,
object_store::path::Path::parse(&self.final_dir)?,
storage_provider,
&finished_files,
&object_store::path::Path::parse(&self.final_dir)?,
&storage_provider,
last_version,
Arc::new(self.schema.as_ref().unwrap().schema_without_timestamp()),
)
Expand Down
6 changes: 3 additions & 3 deletions crates/arroyo-connectors/src/filesystem/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,9 +935,9 @@ where
}
if let CommitState::DeltaLake { last_version } = self.commit_state {
if let Some(new_version) = delta::commit_files_to_delta(
finished_files,
self.path.clone(),
self.object_store.clone(),
&finished_files,
&self.path,
&self.object_store,
last_version,
Arc::new(self.schema.schema_without_timestamp()),
)
Expand Down
1 change: 1 addition & 0 deletions crates/arroyo-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl StorageProvider {
let mut builder = AmazonS3Builder::from_env().with_bucket_name(&config.bucket);
let mut aws_key_manually_set = false;
let mut s3_options = HashMap::new();

for (key, value) in options {
let s3_config_key = key.parse().map_err(|_| {
StorageError::CredentialsError(format!("invalid S3 config key: {}", key))
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ impl WorkerGrpc for WorkerServer {

async fn commit(&self, request: Request<CommitReq>) -> Result<Response<CommitResp>, Status> {
let req = request.into_inner();
info!("received commit request {:?}", req);
debug!("received commit request {:?}", req);
let sender_commit_map_pairs = {
let state_mutex = self.state.lock().unwrap();
let Some(state) = state_mutex.as_ref() else {
Expand Down

0 comments on commit ec5305e

Please sign in to comment.