diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index e390ecddcc..95cbff145a 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -23,21 +23,26 @@ //! them to be compatible. //! //! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | -//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|-------------| -//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | -//! | Delete / Update | ✅ | (1) | ❌ | ✅ | (1) | ❌ | ❌ | ✅ | -//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | (2) | -//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | -//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ | ❌ | ✅ | -//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | -//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | -//! | UpdateConfig | ✅ | ✅ | (2) | ✅ | ✅ | ✅ | ✅ | (2) | +//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|--------------| +//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | +//! | Delete / Update | ✅ | 1️⃣ | ❌ | ✅ | 1️⃣ | ❌ | ❌ | ✅ | +//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 2️⃣ | +//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | +//! | Rewrite | ✅ | 1️⃣ | ❌ | ❌ | 1️⃣ | ❌ | ❌ | ✅ | +//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | +//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | +//! | UpdateConfig | ✅ | ✅ | 2️⃣ | ✅ | ✅ | ✅ | ✅ | 2️⃣ | +//! | DataReplacement | ✅ | ✅ | ❌ | 3️⃣ | 3️⃣ | 3️⃣ | ❌* | ✅ | //! -//! (1) Delete, update, and rewrite are compatible with each other and themselves only if +//! 1️⃣ Delete, update, and rewrite are compatible with each other and themselves only if //! they affect distinct fragments. Otherwise, they conflict. -//! (2) Operations that mutate the config conflict if one of the operations upserts a key +//! 2️⃣ Operations that mutate the config conflict if one of the operations upserts a key //! that if referenced by another concurrent operation or if both operations modify the schema //! metadata or the same field metadata. +//! 3️⃣ DataReplacement on a column without index is compatible with any operation AS LONG AS +//! the operation does not modify the region of the column being replaced. +//! * This could become allowed in the future +//! use std::{ collections::{HashMap, HashSet}, @@ -51,7 +56,7 @@ use lance_io::object_store::ObjectStore; use lance_table::{ format::{ pb::{self, IndexMetadata}, - DataStorageFormat, Fragment, Index, Manifest, RowIdMeta, + DataFile, DataStorageFormat, Fragment, Index, Manifest, RowIdMeta, }, io::{ commit::CommitHandler, @@ -136,6 +141,25 @@ pub enum Operation { /// Indices that have been updated with the new row addresses rewritten_indices: Vec, }, + /// Replace data in a column in the dataset with a new data. This is used for + /// null column population where we replace an entirely null column with a + /// new column that has data. + /// + /// This operation will only allow replacing files that contains the same schema + /// e.g. if the original files contains column A, B, C and the new files contains + /// only column A, B then the operation is not allowed. As we would need to split + /// the original files into two files, one with column A, B and the other with column C. + /// + /// Corollary to the above: the operation will also not allow replacing files layouts + /// that are not uniform across all fragments. + /// e.g. if fragments being replaced contains files with different schema layouts on + /// the column being replaced, the operation is not allowed. + /// say frag_1: [A] [B, C] and frag_2: [A, B] [C] and we are trying to replace column A + /// with a new column A the operation is not allowed. + DataReplacement { + old_fragments: Vec, + new_datafiles: Vec, + }, /// Merge a new column in Merge { fragments: Vec, @@ -229,6 +253,9 @@ impl Operation { .map(|f| f.id) .chain(removed_fragment_ids.iter().copied()), ), + Self::DataReplacement { old_fragments, .. } => { + Box::new(old_fragments.iter().map(|f| f.id)) + } } } @@ -332,6 +359,7 @@ impl Operation { Self::Update { .. } => "Update", Self::Project { .. } => "Project", Self::UpdateConfig { .. } => "UpdateConfig", + Self::DataReplacement { .. } => "DataReplacement", } } } @@ -370,6 +398,7 @@ impl Transaction { Operation::ReserveFragments { .. } => false, Operation::Project { .. } => false, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => false, _ => true, }, Operation::Rewrite { .. } => match &other.operation { @@ -411,6 +440,7 @@ impl Transaction { // if the rewrite changed more than X% of row ids. Operation::Rewrite { .. } => true, Operation::UpdateConfig { .. } => false, + Operation::DataReplacement { .. } => false, _ => true, }, Operation::Delete { .. } | Operation::Update { .. } => match &other.operation { @@ -467,6 +497,10 @@ impl Transaction { Operation::UpdateConfig { .. } => false, _ => true, }, + // TODO: implement cell based conflict resolution + Operation::DataReplacement { .. } => match &other.operation { + _ => true, + }, } } @@ -744,6 +778,90 @@ impl Transaction { Operation::Restore { .. } => { unreachable!() } + Operation::DataReplacement { + old_fragments, + new_datafiles, + } => { + // 0. check we have the same number of old fragments as new data files + if old_fragments.len() != new_datafiles.len() { + return Err(Error::invalid_input( + "Number of old fragments must match number of new data files", + location!(), + )); + } + + // 1. make sure the new files all have the same fields + if new_datafiles + .iter() + .map(|f| f.fields.clone()) + .collect::>() + .len() + != 1 + { + // TODO: better error message to explain what's wrong + return Err(Error::invalid_input( + "All new data files must have the same fields", + location!(), + )); + } + + // 2. check that the fragments being modified have isomorphic layouts along the columns being replaced + // 3. add modified fragments to final_fragments + for (frag, new_file) in old_fragments.into_iter().zip(new_datafiles) { + let mut new_frag = frag.clone(); + + // TODO: check new file and fragment are the same length + + let mut columns_covered = HashSet::new(); + for file in &mut new_frag.files { + if file.fields == new_file.fields + && file.file_major_version == new_file.file_major_version + && file.file_minor_version == new_file.file_minor_version + { + // assign the new file path to the fragment + file.path = new_file.path.clone(); + } + columns_covered.extend(file.fields.iter()); + } + // SPECIAL CASE: if the column(s) being replaced are not covered by the fragment + // Then it means it's a all-NULL column that is being replaced with real data + // just add it to the final fragments + if columns_covered.is_disjoint(&new_file.fields.iter().collect()) { + new_frag.add_file( + new_file.path.clone(), + new_file.fields.clone(), + new_file.column_indices.clone(), + &LanceFileVersion::try_from_major_minor( + new_file.file_major_version, + new_file.file_minor_version, + ) + .expect("Expected valid file version"), + ); + } + + // Nothing changed in the current fragment, which is not expected -- error out + if &new_frag == frag { + // TODO: better error message to explain what's wrong + return Err(Error::invalid_input( + "Expected to modify the fragment but no changes were made", + location!(), + )); + } + final_fragments.push(new_frag); + } + + let fragments_changed = + final_fragments.iter().map(|f| f.id).collect::>(); + + // 4. push fragments that didn't change back to final_fragments + let unmodified_fragments = maybe_existing_fragments? + .iter() + .filter(|f| !fragments_changed.contains(&f.id)) + .cloned() + .collect::>(); + + final_fragments.extend(unmodified_fragments); + } }; // If a fragment was reserved then it may not belong at the end of the fragments list.