Skip to content

Commit

Permalink
Add async subpath listing
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jan 6, 2025
1 parent e02e00d commit 868efcf
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 19 deletions.
50 changes: 32 additions & 18 deletions columnar/src/columnar/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ fn read_all_columns_in_stream(
Ok(results)
}

fn column_dictionary_prefix_for_column_name(column_name: &str) -> String {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
format!("{}{}", column_name, '\0')
}

fn column_dictionary_prefix_for_subpath(root_path: &str) -> String {
format!("{}{}", root_path, JSON_PATH_SEGMENT_SEP as char)
}

impl ColumnarReader {
/// Opens a new Columnar file.
pub fn open<F>(file_slice: F) -> io::Result<ColumnarReader>
Expand Down Expand Up @@ -145,26 +158,14 @@ impl ColumnarReader {
Ok(self.iter_columns()?.collect())
}

fn stream_for_column_range(&self, column_name: &str) -> sstable::StreamerBuilder<RangeSSTable> {
// Each column is a associated to a given `column_key`,
// that starts by `column_name\0column_header`.
//
// Listing the columns associated to the given column name is therefore equivalent to
// listing `column_key` with the prefix `column_name\0`.
//
// This is in turn equivalent to searching for the range
// `[column_name,\0`..column_name\1)`.
let mut prefix = column_name.to_string();
prefix.push('\0');
self.column_dictionary.prefix_range(prefix.as_bytes())
}

pub async fn read_columns_async(
&self,
column_name: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self
.stream_for_column_range(column_name)
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
Expand All @@ -175,7 +176,21 @@ impl ColumnarReader {
/// There can be more than one column associated to a given column name, provided they have
/// different types.
pub fn read_columns(&self, column_name: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let stream = self.stream_for_column_range(column_name).into_stream()?;
let prefix = column_dictionary_prefix_for_column_name(column_name);
let stream = self.column_dictionary.prefix_range(prefix).into_stream()?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}

pub async fn read_subpath_columns_async(
&self,
root_path: &str,
) -> io::Result<Vec<DynamicColumnHandle>> {
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix)
.into_stream_async()
.await?;
read_all_columns_in_stream(stream, &self.column_data, self.format_version)
}

Expand All @@ -185,8 +200,7 @@ impl ColumnarReader {
/// There can be more than one column associated to each path within the JSON structure,
/// provided they have different types.
pub fn read_subpath_columns(&self, root_path: &str) -> io::Result<Vec<DynamicColumnHandle>> {
let mut prefix = root_path.to_string();
prefix.push(JSON_PATH_SEGMENT_SEP as char);
let prefix = column_dictionary_prefix_for_subpath(root_path);
let stream = self
.column_dictionary
.prefix_range(prefix.as_bytes())
Expand Down
26 changes: 25 additions & 1 deletion src/fastfield/readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,21 @@ impl FastFieldReaders {
Ok(columns)
}

#[doc(hidden)]
pub async fn list_subpath_dynamic_column_handles(
&self,
root_path: &str,
) -> crate::Result<Vec<DynamicColumnHandle>> {
let Some(resolved_field_name) = self.resolve_field(root_path)? else {
return Ok(Vec::new());
};
let columns = self
.columnar
.read_subpath_columns_async(&resolved_field_name)
.await?;
Ok(columns)
}

/// Returns the `u64` column used to represent any `u64`-mapped typed (String/Bytes term ids,
/// i64, u64, f64, DateTime).
///
Expand Down Expand Up @@ -492,6 +507,15 @@ mod tests {
.iter()
.any(|column| column.column_type() == ColumnType::Str));

println!("*** {:?}", fast_fields.columnar().list_columns());
let json_columns = fast_fields.dynamic_column_handles("json").unwrap();
assert_eq!(json_columns.len(), 0);

let json_subcolumns = fast_fields.dynamic_subpath_column_handles("json").unwrap();
assert_eq!(json_subcolumns.len(), 3);

let foo_subcolumns = fast_fields
.dynamic_subpath_column_handles("json.foo")
.unwrap();
assert_eq!(foo_subcolumns.len(), 0);
}
}

0 comments on commit 868efcf

Please sign in to comment.