Skip to content

Commit

Permalink
Perf: use term hashmap in fastfield (#2243)
Browse files Browse the repository at this point in the history
* add shared arena hashmap

* bench fastfield indexing

* use shared arena hashmap in columnar

lower minimum resize in hashtable

* clippy

* add comments
  • Loading branch information
PSeitz committed Apr 10, 2024
1 parent e19d87b commit b85b3fc
Show file tree
Hide file tree
Showing 8 changed files with 543 additions and 294 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ more-asserts = "0.3.1"
rand_distr = "0.4.3"

[target.'cfg(not(windows))'.dev-dependencies]
criterion = "0.5"
pprof = { git = "https://github.com/PSeitz/pprof-rs/", rev = "53af24b", features = ["flamegraph", "criterion"] } # temp fork that works with criterion 0.5
criterion = { version = "0.5" }
pprof = { version= "0.13", features = ["flamegraph", "criterion"] }

[dev-dependencies.fail]
version = "0.5.0"
Expand Down
67 changes: 64 additions & 3 deletions benches/index-bench.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use pprof::criterion::{Output, PProfProfiler};
use tantivy::schema::{TantivyDocument, FAST, INDEXED, STORED, STRING, TEXT};
use tantivy::{Index, IndexWriter};
use tantivy::{tokenizer, Index, IndexWriter};

const HDFS_LOGS: &str = include_str!("hdfs.json");
const GH_LOGS: &str = include_str!("gh.json");
Expand All @@ -19,6 +19,13 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
schema_builder.add_text_field("severity", STRING);
schema_builder.build()
};
let schema_only_fast = {
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
schema_builder.add_u64_field("timestamp", FAST);
schema_builder.add_text_field("body", FAST);
schema_builder.add_text_field("severity", FAST);
schema_builder.build()
};
let schema_with_store = {
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
schema_builder.add_u64_field("timestamp", INDEXED | STORED);
Expand Down Expand Up @@ -83,6 +90,30 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
index_writer.commit().unwrap();
})
});
group.bench_function("index-hdfs-no-commit-fastfield", |b| {
let lines = get_lines(HDFS_LOGS);
b.iter(|| {
let index = Index::create_in_ram(schema_only_fast.clone());
let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in &lines {
let doc = TantivyDocument::parse_json(&schema, doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
})
});
group.bench_function("index-hdfs-with-commit-fastfield", |b| {
let lines = get_lines(HDFS_LOGS);
b.iter(|| {
let index = Index::create_in_ram(schema_only_fast.clone());
let mut index_writer: IndexWriter =
index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in &lines {
let doc = TantivyDocument::parse_json(&schema, doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
index_writer.commit().unwrap();
})
});
group.bench_function("index-hdfs-no-commit-json-without-docstore", |b| {
let lines = get_lines(HDFS_LOGS);
b.iter(|| {
Expand All @@ -107,6 +138,18 @@ pub fn gh_index_benchmark(c: &mut Criterion) {
schema_builder.add_json_field("json", TEXT | FAST);
schema_builder.build()
};
let dynamic_schema_fast = {
let mut schema_builder = tantivy::schema::SchemaBuilder::new();
schema_builder.add_json_field("json", FAST);
schema_builder.build()
};
let ff_tokenizer_manager = tokenizer::TokenizerManager::default();
ff_tokenizer_manager.register(
"raw",
tokenizer::TextAnalyzer::builder(tokenizer::RawTokenizer::default())
.filter(tokenizer::RemoveLongFilter::limit(255))
.build(),
);

let mut group = c.benchmark_group("index-gh");
group.throughput(Throughput::Bytes(GH_LOGS.len() as u64));
Expand All @@ -115,7 +158,23 @@ pub fn gh_index_benchmark(c: &mut Criterion) {
let lines = get_lines(GH_LOGS);
b.iter(|| {
let json_field = dynamic_schema.get_field("json").unwrap();
let index = Index::create_in_ram(dynamic_schema.clone());
let mut index = Index::create_in_ram(dynamic_schema.clone());
index.set_fast_field_tokenizers(ff_tokenizer_manager.clone());
let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in &lines {
let json_val: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(doc_json).unwrap();
let doc = tantivy::doc!(json_field=>json_val);
index_writer.add_document(doc).unwrap();
}
})
});
group.bench_function("index-gh-fast", |b| {
let lines = get_lines(GH_LOGS);
b.iter(|| {
let json_field = dynamic_schema_fast.get_field("json").unwrap();
let mut index = Index::create_in_ram(dynamic_schema_fast.clone());
index.set_fast_field_tokenizers(ff_tokenizer_manager.clone());
let index_writer: IndexWriter = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in &lines {
let json_val: serde_json::Map<String, serde_json::Value> =
Expand All @@ -125,11 +184,13 @@ pub fn gh_index_benchmark(c: &mut Criterion) {
}
})
});

group.bench_function("index-gh-with-commit", |b| {
let lines = get_lines(GH_LOGS);
b.iter(|| {
let json_field = dynamic_schema.get_field("json").unwrap();
let index = Index::create_in_ram(dynamic_schema.clone());
let mut index = Index::create_in_ram(dynamic_schema.clone());
index.set_fast_field_tokenizers(ff_tokenizer_manager.clone());
let mut index_writer: IndexWriter =
index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in &lines {
Expand Down
3 changes: 2 additions & 1 deletion columnar/src/columnar/writer/column_writers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ impl StrOrBytesColumnWriter {
dictionaries: &mut [DictionaryBuilder],
arena: &mut MemoryArena,
) {
let unordered_id = dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes);
let unordered_id =
dictionaries[self.dictionary_id as usize].get_or_allocate_id(bytes, arena);
self.column_writer.record(doc, unordered_id, arena);
}

Expand Down
6 changes: 5 additions & 1 deletion columnar/src/columnar/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ impl ColumnarWriter {
&mut symbol_byte_buffer,
),
buffers,
&self.arena,
&mut column_serializer,
)?;
column_serializer.finalize()?;
Expand Down Expand Up @@ -490,13 +491,15 @@ impl ColumnarWriter {

// Serialize [Dictionary, Column, dictionary num bytes U32::LE]
// Column: [Column Index, Column Values, column index num bytes U32::LE]
#[allow(clippy::too_many_arguments)]
fn serialize_bytes_or_str_column(
cardinality: Cardinality,
num_docs: RowId,
sort_values_within_row: bool,
dictionary_builder: &DictionaryBuilder,
operation_it: impl Iterator<Item = ColumnOperation<UnorderedId>>,
buffers: &mut SpareBuffers,
arena: &MemoryArena,
wrt: impl io::Write,
) -> io::Result<()> {
let SpareBuffers {
Expand All @@ -505,7 +508,8 @@ fn serialize_bytes_or_str_column(
..
} = buffers;
let mut counting_writer = CountingWriter::wrap(wrt);
let term_id_mapping: TermIdMapping = dictionary_builder.serialize(&mut counting_writer)?;
let term_id_mapping: TermIdMapping =
dictionary_builder.serialize(arena, &mut counting_writer)?;
let dictionary_num_bytes: u32 = counting_writer.written_bytes() as u32;
let mut wrt = counting_writer.finish();
let operation_iterator = operation_it.map(|symbol: ColumnOperation<UnorderedId>| {
Expand Down
50 changes: 30 additions & 20 deletions columnar/src/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::io;

use fnv::FnvHashMap;
use sstable::SSTable;
use stacker::{MemoryArena, SharedArenaHashMap};

pub(crate) struct TermIdMapping {
unordered_to_ord: Vec<OrderedId>,
Expand Down Expand Up @@ -31,29 +31,38 @@ pub struct OrderedId(pub u32);
/// mapping.
#[derive(Default)]
pub(crate) struct DictionaryBuilder {
dict: FnvHashMap<Vec<u8>, UnorderedId>,
memory_consumption: usize,
dict: SharedArenaHashMap,
}

impl DictionaryBuilder {
/// Get or allocate an unordered id.
/// (This ID is simply an auto-incremented id.)
pub fn get_or_allocate_id(&mut self, term: &[u8]) -> UnorderedId {
if let Some(term_id) = self.dict.get(term) {
return *term_id;
}
let new_id = UnorderedId(self.dict.len() as u32);
self.dict.insert(term.to_vec(), new_id);
self.memory_consumption += term.len();
self.memory_consumption += 40; // Term Metadata + HashMap overhead
new_id
pub fn get_or_allocate_id(&mut self, term: &[u8], arena: &mut MemoryArena) -> UnorderedId {
let next_id = self.dict.len() as u32;
let unordered_id = self
.dict
.mutate_or_create(term, arena, |unordered_id: Option<u32>| {
if let Some(unordered_id) = unordered_id {
unordered_id
} else {
next_id
}
});
UnorderedId(unordered_id)
}

/// Serialize the dictionary into an fst, and returns the
/// `UnorderedId -> TermOrdinal` map.
pub fn serialize<'a, W: io::Write + 'a>(&self, wrt: &mut W) -> io::Result<TermIdMapping> {
let mut terms: Vec<(&[u8], UnorderedId)> =
self.dict.iter().map(|(k, v)| (k.as_slice(), *v)).collect();
pub fn serialize<'a, W: io::Write + 'a>(
&self,
arena: &MemoryArena,
wrt: &mut W,
) -> io::Result<TermIdMapping> {
let mut terms: Vec<(&[u8], UnorderedId)> = self
.dict
.iter(arena)
.map(|(k, v)| (k, arena.read(v)))
.collect();
terms.sort_unstable_by_key(|(key, _)| *key);
// TODO Remove the allocation.
let mut unordered_to_ord: Vec<OrderedId> = vec![OrderedId(0u32); terms.len()];
Expand All @@ -68,7 +77,7 @@ impl DictionaryBuilder {
}

pub(crate) fn mem_usage(&self) -> usize {
self.memory_consumption
self.dict.mem_usage()
}
}

Expand All @@ -78,12 +87,13 @@ mod tests {

#[test]
fn test_dictionary_builder() {
let mut arena = MemoryArena::default();
let mut dictionary_builder = DictionaryBuilder::default();
let hello_uid = dictionary_builder.get_or_allocate_id(b"hello");
let happy_uid = dictionary_builder.get_or_allocate_id(b"happy");
let tax_uid = dictionary_builder.get_or_allocate_id(b"tax");
let hello_uid = dictionary_builder.get_or_allocate_id(b"hello", &mut arena);
let happy_uid = dictionary_builder.get_or_allocate_id(b"happy", &mut arena);
let tax_uid = dictionary_builder.get_or_allocate_id(b"tax", &mut arena);
let mut buffer = Vec::new();
let id_mapping = dictionary_builder.serialize(&mut buffer).unwrap();
let id_mapping = dictionary_builder.serialize(&arena, &mut buffer).unwrap();
assert_eq!(id_mapping.to_ord(hello_uid), OrderedId(1));
assert_eq!(id_mapping.to_ord(happy_uid), OrderedId(0));
assert_eq!(id_mapping.to_ord(tax_uid), OrderedId(2));
Expand Down
Loading

0 comments on commit b85b3fc

Please sign in to comment.