Skip to content

Commit

Permalink
Always split indicators into their own dedicated chunks (#8833)
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc authored Jan 28, 2025
1 parent a2a0079 commit 6ab4951
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 57 deletions.
18 changes: 4 additions & 14 deletions crates/build/re_types_builder/src/codegen/python/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2643,19 +2643,6 @@ fn quote_columnar_methods(reporter: &Reporter, obj: &Object, objects: &Objects)
};
let doc_block = indent::indent_by(12, quote_doc_lines(doc_string_lines));

// NOTE(#8768): Scalar indicators are extremely wasteful, and not actually used for anything.
let has_indicator = obj.fqname.as_str() != "rerun.archetypes.Scalar";
let pack_and_return = if has_indicator {
indent::indent_by(12, unindent("\
indicator_batch = DescribedComponentBatch(cls.indicator(), cls.indicator().component_descriptor())
indicator_column = indicator_batch.partition(np.zeros(len(lengths)))
return ComponentColumnList([indicator_column] + columns)
"))
} else {
"return ComponentColumnList(columns)".to_owned()
};

// NOTE: Calling `update_fields` is not an option: we need to be able to pass
// plural data, even to singular fields (mono-components).
unindent(&format!(
Expand All @@ -2680,7 +2667,10 @@ fn quote_columnar_methods(reporter: &Reporter, obj: &Object, objects: &Objects)
lengths = np.ones(len(batches[0]._batch.as_arrow_array()))
columns = [batch.partition(lengths) for batch in batches]
{pack_and_return}
indicator_batch = DescribedComponentBatch(cls.indicator(), cls.indicator().component_descriptor())
indicator_column = indicator_batch.partition(np.zeros(len(lengths)))
return ComponentColumnList([indicator_column] + columns)
"#
))
}
Expand Down
13 changes: 2 additions & 11 deletions crates/build/re_types_builder/src/codegen/rust/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1638,13 +1638,9 @@ fn quote_builder_from_obj(reporter: &Reporter, objects: &Objects, obj: &Object)
");
let columns_unary_doc = quote_doc_lines(&columns_unary_doc.lines().map(|l| l.to_owned()).collect_vec());

let has_indicator = obj.fqname.as_str() != "rerun.archetypes.Scalar";
let num_fields = required.iter().chain(optional.iter()).count();

let fields = required.iter().chain(optional.iter()).map(|field| {
let field_name = format_ident!("{}", field.name);
let clone = if num_fields == 1 && !has_indicator { quote!(.into_iter()) } else { quote!(.clone()) };
quote!(self.#field_name.map(|#field_name| #field_name.partitioned(_lengths #clone)).transpose()?)
quote!(self.#field_name.map(|#field_name| #field_name.partitioned(_lengths.clone())).transpose()?)
});

let field_lengths = required.iter().chain(optional.iter()).map(|field| {
Expand All @@ -1656,12 +1652,7 @@ fn quote_builder_from_obj(reporter: &Reporter, objects: &Objects, obj: &Object)
quote!(let #len_field_name = self.#field_name.as_ref().map(|b| b.array.len()))
});

let indicator_column = if !has_indicator {
// NOTE(#8768): Scalar indicators are extremely wasteful, and not actually used for anything.
quote!(None)
} else {
quote!(::re_types_core::indicator_column::<Self>(_lengths.into_iter().count())?)
};
let indicator_column = quote!(::re_types_core::indicator_column::<Self>(_lengths.into_iter().count())?);

quote! {
#columns_doc
Expand Down
24 changes: 20 additions & 4 deletions crates/store/re_chunk/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chu
let chunks =
PendingRow::many_into_chunks(acc.entity_path.clone(), chunk_max_rows_if_unsorted, rows);
for chunk in chunks {
let chunk = match chunk {
let mut chunk = match chunk {
Ok(chunk) => chunk,
Err(err) => {
re_log::error!(%err, "corrupt chunk detected, dropping");
Expand All @@ -571,7 +571,15 @@ fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chu

// NOTE: This can only fail if all receivers have been dropped, which simply cannot happen
// as long the batching thread is alive… which is where we currently are.
tx_chunk.send(chunk).ok();

let split_indicators = chunk.split_indicators();
if !chunk.components.is_empty() {
// make sure the chunk didn't contain *only* indicators!
tx_chunk.send(chunk).ok();
}
if let Some(split_indicators) = split_indicators {
tx_chunk.send(split_indicators).ok();
}
}

acc.reset();
Expand Down Expand Up @@ -600,10 +608,18 @@ fn batching_thread(config: ChunkBatcherConfig, rx_cmd: Receiver<Command>, tx_chu


match cmd {
Command::AppendChunk(chunk) => {
Command::AppendChunk(mut chunk) => {
// NOTE: This can only fail if all receivers have been dropped, which simply cannot happen
// as long the batching thread is alive… which is where we currently are.
tx_chunk.send(chunk).ok();

let split_indicators = chunk.split_indicators();
if !chunk.components.is_empty() {
// make sure the chunk didn't contain *only* indicators!
tx_chunk.send(chunk).ok();
}
if let Some(split_indicators) = split_indicators {
tx_chunk.send(split_indicators).ok();
}
},
Command::AppendRow(entity_path, row) => {
let acc = accs.entry(entity_path.clone())
Expand Down
41 changes: 41 additions & 0 deletions crates/store/re_chunk/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,47 @@ impl Chunk {
&& self.same_datatypes(rhs)
&& self.same_descriptors(rhs)
}

/// Moves all indicator components from `self` into a new, dedicated chunk.
///
/// The new chunk contains only the first index from each index column, and all the indicators,
/// packed in a single row.
/// Beware: `self` might be left with no component columns at all after this operation.
///
/// This greatly reduces the overhead of indicators, both in the row-oriented and
/// column-oriented APIs.
/// See <https://github.com/rerun-io/rerun/issues/8768> for further rationale.
pub fn split_indicators(&mut self) -> Option<Self> {
let indicators: ChunkComponents = self
.components
.iter_flattened()
.filter(|&(descr, _list_array)| descr.component_name.is_indicator_component())
.filter(|&(_descr, list_array)| (!list_array.is_empty()))
.map(|(descr, list_array)| (descr.clone(), list_array.slice(0, 1)))
.collect();
if indicators.is_empty() {
return None;
}

let timelines = self
.timelines
.iter()
.map(|(timeline, time_column)| (*timeline, time_column.row_sliced(0, 1)))
.collect();

if let Ok(chunk) = Self::from_auto_row_ids(
ChunkId::new(),
self.entity_path.clone(),
timelines,
indicators,
) {
self.components
.retain(|component_name, _per_desc| !component_name.is_indicator_component());
return Some(chunk);
}

None
}
}

impl TimeColumn {
Expand Down
5 changes: 3 additions & 2 deletions crates/store/re_types/src/archetypes/scalar.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 42 additions & 25 deletions docs/snippets/all/descriptors/descr_builtin_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,53 @@ fn check_tags(rec: &rerun::RecordingStream) {

let store = stores.into_values().next().unwrap();
let chunks = store.iter_chunks().collect::<Vec<_>>();
assert_eq!(1, chunks.len());
assert_eq!(2, chunks.len());

let chunk = chunks.into_iter().next().unwrap();
{
let chunk = &chunks[0];

let mut descriptors = chunk
.components()
.values()
.flat_map(|per_desc| per_desc.keys())
.cloned()
.collect::<Vec<_>>();
descriptors.sort();
let mut descriptors = chunk
.components()
.values()
.flat_map(|per_desc| per_desc.keys())
.cloned()
.collect::<Vec<_>>();
descriptors.sort();

let expected = vec![
ComponentDescriptor {
let expected = vec![
ComponentDescriptor {
archetype_name: Some("rerun.archetypes.Points3D".into()),
archetype_field_name: Some("positions".into()),
component_name: "rerun.components.Position3D".into(),
},
ComponentDescriptor {
archetype_name: Some("rerun.archetypes.Points3D".into()),
archetype_field_name: Some("radii".into()),
component_name: "rerun.components.Radius".into(),
},
];

similar_asserts::assert_eq!(expected, descriptors);
}

{
let chunk = &chunks[1];

let mut descriptors = chunk
.components()
.values()
.flat_map(|per_desc| per_desc.keys())
.cloned()
.collect::<Vec<_>>();
descriptors.sort();

let expected = vec![ComponentDescriptor {
archetype_name: None,
archetype_field_name: None,
component_name: "rerun.components.Points3DIndicator".into(),
},
ComponentDescriptor {
archetype_name: Some("rerun.archetypes.Points3D".into()),
archetype_field_name: Some("positions".into()),
component_name: "rerun.components.Position3D".into(),
},
ComponentDescriptor {
archetype_name: Some("rerun.archetypes.Points3D".into()),
archetype_field_name: Some("radii".into()),
component_name: "rerun.components.Radius".into(),
},
];

similar_asserts::assert_eq!(expected, descriptors);
}];

similar_asserts::assert_eq!(expected, descriptors);
}
}
}
5 changes: 4 additions & 1 deletion rerun_py/rerun_sdk/rerun/archetypes/scalar.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6ab4951

Please sign in to comment.