Skip to content

Commit

Permalink
feat: allow replacement of entire datafile when the schema lines up c…
Browse files Browse the repository at this point in the history
…orrectly
  • Loading branch information
chebbyChefNEQ committed Jan 30, 2025
1 parent a7c5216 commit f5078d3
Showing 1 changed file with 130 additions and 12 deletions.
142 changes: 130 additions & 12 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,26 @@
//! them to be compatible.
//!
//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig |
//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|-------------|
//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ |
//! | Delete / Update | ✅ | (1) | ❌ | ✅ | (1) | ❌ | ❌ | ✅ |
//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | (2) |
//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ | ❌ | ✅ |
//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ |
//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ |
//! | UpdateConfig | ✅ | ✅ | (2) | ✅ | ✅ | ✅ | ✅ | (2) |
//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|--------------|
//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ |
//! | Delete / Update | ✅ | 1️⃣ | ❌ | ✅ | 1️⃣ | ❌ | ❌ | ✅ |
//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 2️⃣ |
//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
//! | Rewrite | ✅ | 1️⃣ | ❌ | ❌ | 1️⃣ | ❌ | ❌ | ✅ |
//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ |
//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ |
//! | UpdateConfig | ✅ | ✅ | 2️⃣ | ✅ | ✅ | ✅ | ✅ | 2️⃣ |
//! | DataReplacement | ✅ | ✅ | ❌ | 3️⃣ | 3️⃣ | 3️⃣ | ❌* | ✅ |
//!
//! (1) Delete, update, and rewrite are compatible with each other and themselves only if
//! 1️⃣ Delete, update, and rewrite are compatible with each other and themselves only if
//! they affect distinct fragments. Otherwise, they conflict.
//! (2) Operations that mutate the config conflict if one of the operations upserts a key
//! 2️⃣ Operations that mutate the config conflict if one of the operations upserts a key
//! that if referenced by another concurrent operation or if both operations modify the schema
//! metadata or the same field metadata.
//! 3️⃣ DataReplacement on a column without index is compatible with any operation AS LONG AS
//! the operation does not modify the region of the column being replaced.
//! * This could become allowed in the future
//!
use std::{
collections::{HashMap, HashSet},
Expand All @@ -51,7 +56,7 @@ use lance_io::object_store::ObjectStore;
use lance_table::{
format::{
pb::{self, IndexMetadata},
DataStorageFormat, Fragment, Index, Manifest, RowIdMeta,
DataFile, DataStorageFormat, Fragment, Index, Manifest, RowIdMeta,
},
io::{
commit::CommitHandler,
Expand Down Expand Up @@ -136,6 +141,25 @@ pub enum Operation {
/// Indices that have been updated with the new row addresses
rewritten_indices: Vec<RewrittenIndex>,
},
/// Replace data in a column in the dataset with a new data. This is used for
/// null column population where we replace an entirely null column with a
/// new column that has data.
///
/// This operation will only allow replacing files that contains the same schema
/// e.g. if the original files contains column A, B, C and the new files contains
/// only column A, B then the operation is not allowed. As we would need to split
/// the original files into two files, one with column A, B and the other with column C.
///
/// Corollary to the above: the operation will also not allow replacing files layouts
/// that are not uniform across all fragments.
/// e.g. if fragments being replaced contains files with different schema layouts on
/// the column being replaced, the operation is not allowed.
/// say frag_1: [A] [B, C] and frag_2: [A, B] [C] and we are trying to replace column A
/// with a new column A the operation is not allowed.
DataReplacement {
old_fragments: Vec<Fragment>,
new_datafiles: Vec<DataFile>,
},
/// Merge a new column in
Merge {
fragments: Vec<Fragment>,
Expand Down Expand Up @@ -229,6 +253,9 @@ impl Operation {
.map(|f| f.id)
.chain(removed_fragment_ids.iter().copied()),
),
Self::DataReplacement { old_fragments, .. } => {
Box::new(old_fragments.iter().map(|f| f.id))
}
}
}

Expand Down Expand Up @@ -332,6 +359,7 @@ impl Operation {
Self::Update { .. } => "Update",
Self::Project { .. } => "Project",
Self::UpdateConfig { .. } => "UpdateConfig",
Self::DataReplacement { .. } => "DataReplacement",
}
}
}
Expand Down Expand Up @@ -370,6 +398,7 @@ impl Transaction {
Operation::ReserveFragments { .. } => false,
Operation::Project { .. } => false,
Operation::UpdateConfig { .. } => false,
Operation::DataReplacement { .. } => false,
_ => true,
},
Operation::Rewrite { .. } => match &other.operation {
Expand Down Expand Up @@ -411,6 +440,7 @@ impl Transaction {
// if the rewrite changed more than X% of row ids.
Operation::Rewrite { .. } => true,
Operation::UpdateConfig { .. } => false,
Operation::DataReplacement { .. } => false,
_ => true,
},
Operation::Delete { .. } | Operation::Update { .. } => match &other.operation {
Expand Down Expand Up @@ -467,6 +497,10 @@ impl Transaction {
Operation::UpdateConfig { .. } => false,
_ => true,
},
// TODO: implement cell based conflict resolution
Operation::DataReplacement { .. } => match &other.operation {
_ => true,
},
}
}

Expand Down Expand Up @@ -744,6 +778,90 @@ impl Transaction {
Operation::Restore { .. } => {
unreachable!()
}
Operation::DataReplacement {
old_fragments,
new_datafiles,
} => {
// 0. check we have the same number of old fragments as new data files
if old_fragments.len() != new_datafiles.len() {
return Err(Error::invalid_input(
"Number of old fragments must match number of new data files",
location!(),
));
}

// 1. make sure the new files all have the same fields
if new_datafiles
.iter()
.map(|f| f.fields.clone())
.collect::<HashSet<_>>()
.len()
!= 1
{
// TODO: better error message to explain what's wrong
return Err(Error::invalid_input(
"All new data files must have the same fields",
location!(),
));
}

// 2. check that the fragments being modified have isomorphic layouts along the columns being replaced
// 3. add modified fragments to final_fragments
for (frag, new_file) in old_fragments.into_iter().zip(new_datafiles) {
let mut new_frag = frag.clone();

// TODO: check new file and fragment are the same length

let mut columns_covered = HashSet::new();
for file in &mut new_frag.files {
if file.fields == new_file.fields
&& file.file_major_version == new_file.file_major_version
&& file.file_minor_version == new_file.file_minor_version
{
// assign the new file path to the fragment
file.path = new_file.path.clone();
}
columns_covered.extend(file.fields.iter());
}
// SPECIAL CASE: if the column(s) being replaced are not covered by the fragment
// Then it means it's a all-NULL column that is being replaced with real data
// just add it to the final fragments
if columns_covered.is_disjoint(&new_file.fields.iter().collect()) {
new_frag.add_file(
new_file.path.clone(),
new_file.fields.clone(),
new_file.column_indices.clone(),
&LanceFileVersion::try_from_major_minor(
new_file.file_major_version,
new_file.file_minor_version,
)
.expect("Expected valid file version"),
);
}

// Nothing changed in the current fragment, which is not expected -- error out
if &new_frag == frag {
// TODO: better error message to explain what's wrong
return Err(Error::invalid_input(
"Expected to modify the fragment but no changes were made",
location!(),
));
}
final_fragments.push(new_frag);
}

let fragments_changed =
final_fragments.iter().map(|f| f.id).collect::<HashSet<_>>();

// 4. push fragments that didn't change back to final_fragments
let unmodified_fragments = maybe_existing_fragments?
.iter()
.filter(|f| !fragments_changed.contains(&f.id))
.cloned()
.collect::<Vec<_>>();

final_fragments.extend(unmodified_fragments);
}
};

// If a fragment was reserved then it may not belong at the end of the fragments list.
Expand Down

0 comments on commit f5078d3

Please sign in to comment.