From fb431920da88234048f6515c5a5bb7fc5cac04e4 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 25 Apr 2024 06:39:44 -0700 Subject: [PATCH] fix: copy arrays when placing them in the v2 writer's accumulation queue (#2249) When a record batch crosses the FFI layer we lose the ability to deallocate that batch one array at a time (every array in the batch keeps a reference counted pointer to the batch). This is a problem for the writer because we usually want to flush some arrays to disk and keep other arrays in memory for a while longer. However, we do want to release the arrays we flush to disk to avoid memory leaks. This issue was highlighted in a test / demo that attempted to write video data to a lance file. In this situation we were writing the data one row at a time. There were only 30,000 rows in the source data but this was hundreds of GB. None of the metadata columns would ever flush to disk (e.g. a 4 byte int column @ 30,000 rows is only 120KB). These metadata arrays were keeping the video data alive in memory and the writer was taking up too much data. The solution in this PR is to perform a deep copy of any data we are planning on flushing to disk. This is unfortunate, and there is a configuration parameter to disable this copy (I have, intentionally, chosen not to expose this in the python API since data from python always crosses an FFI boundary and is susceptible to this problem). However, copying this data by default is a much safer course of action. In the future we could investigate alternative ways of passing the data across the FFI boundary (e.g. instead of sending the entire record batch we could send individual arrays across and then reassemble them into a record batch on the other side). However, I don't think we need to worry too much about this until we see writer CPU performance become an issue. In most cases the batches will probably be in the CPU cache already and so this should be a pretty quick write. Also, writers that are writing any significant amount of data will be I/O bound. --- python/python/lance/file.py | 16 ++++- python/python/lance/lance/__init__.pyi | 8 ++- python/src/file.rs | 26 ++++++-- rust/lance-arrow/src/deepcopy.rs | 60 +++++++++++++++++++ rust/lance-arrow/src/lib.rs | 1 + rust/lance-encoding/src/encoder.rs | 19 +++++- .../src/encodings/logical/binary.rs | 4 +- .../src/encodings/logical/list.rs | 4 +- .../src/encodings/logical/primitive.rs | 20 ++++++- rust/lance-encoding/src/testing.rs | 2 + rust/lance-file/Cargo.toml | 1 + rust/lance-file/src/v2/writer.rs | 22 ++++++- 12 files changed, 168 insertions(+), 15 deletions(-) create mode 100644 rust/lance-arrow/src/deepcopy.rs diff --git a/python/python/lance/file.py b/python/python/lance/file.py index 09dc249e41..6325a59865 100644 --- a/python/python/lance/file.py +++ b/python/python/lance/file.py @@ -124,7 +124,14 @@ class LanceFileWriter: Lance datasets then you should use the LanceDataset class instead. """ - def __init__(self, path: str, schema: pa.Schema, **kwargs): + def __init__( + self, + path: str, + schema: pa.Schema, + *, + data_cache_bytes: int = None, + **kwargs, + ): """ Create a new LanceFileWriter to write to the given path @@ -135,8 +142,13 @@ def __init__(self, path: str, schema: pa.Schema, **kwargs): or a URI for remote storage. schema: pa.Schema The schema of data that will be written + data_cache_bytes: int + How many bytes (per column) to cache before writing a page. The + default is an appropriate value based on the filesystem. """ - self._writer = _LanceFileWriter(path, schema, **kwargs) + self._writer = _LanceFileWriter( + path, schema, data_cache_bytes=data_cache_bytes, **kwargs + ) self.closed = False def write_batch(self, batch: Union[pa.RecordBatch, pa.Table]) -> None: diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 070b6993ef..9f090d1c7c 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -34,7 +34,13 @@ class CompactionMetrics: files_added: int class LanceFileWriter: - def __init__(self, path: str, schema: pa.Schema): ... + def __init__( + self, + path: str, + schema: pa.Schema, + data_cache_bytes: int, + keep_original_array: bool, + ): ... def write_batch(self, batch: pa.RecordBatch) -> None: ... def finish(self) -> int: ... diff --git a/python/src/file.rs b/python/src/file.rs index eb047b45b0..94f8f5e054 100644 --- a/python/src/file.rs +++ b/python/src/file.rs @@ -168,7 +168,12 @@ pub struct LanceFileWriter { } impl LanceFileWriter { - async fn open(uri_or_path: String, schema: PyArrowType) -> PyResult { + async fn open( + uri_or_path: String, + schema: PyArrowType, + data_cache_bytes: Option, + keep_original_array: Option, + ) -> PyResult { let (object_store, path) = object_store_from_uri_or_path(uri_or_path).await?; let object_writer = object_store.create(&path).await.infer_error()?; let lance_schema = lance_core::datatypes::Schema::try_from(&schema.0).infer_error()?; @@ -176,7 +181,10 @@ impl LanceFileWriter { object_writer, path.to_string(), lance_schema, - FileWriterOptions::default(), + FileWriterOptions { + data_cache_bytes, + keep_original_array, + }, ) .infer_error()?; Ok(Self { @@ -188,8 +196,18 @@ impl LanceFileWriter { #[pymethods] impl LanceFileWriter { #[new] - pub fn new(path: String, schema: PyArrowType) -> PyResult { - RT.runtime.block_on(Self::open(path, schema)) + pub fn new( + path: String, + schema: PyArrowType, + data_cache_bytes: Option, + keep_original_array: Option, + ) -> PyResult { + RT.runtime.block_on(Self::open( + path, + schema, + data_cache_bytes, + keep_original_array, + )) } pub fn write_batch(&mut self, batch: PyArrowType) -> PyResult<()> { diff --git a/rust/lance-arrow/src/deepcopy.rs b/rust/lance-arrow/src/deepcopy.rs new file mode 100644 index 0000000000..a60c2c3f3b --- /dev/null +++ b/rust/lance-arrow/src/deepcopy.rs @@ -0,0 +1,60 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use arrow_array::{make_array, Array, RecordBatch}; +use arrow_buffer::{Buffer, NullBuffer}; +use arrow_data::ArrayData; + +pub fn deep_copy_buffer(buffer: &Buffer) -> Buffer { + Buffer::from(Vec::from(buffer.as_slice())) +} + +fn deep_copy_nulls(nulls: &NullBuffer) -> Buffer { + deep_copy_buffer(nulls.inner().inner()) +} + +pub fn deep_copy_array_data(data: &ArrayData) -> ArrayData { + let data_type = data.data_type().clone(); + let len = data.len(); + let null_count = data.null_count(); + let null_bit_buffer = data.nulls().map(deep_copy_nulls); + let offset = data.offset(); + let buffers = data + .buffers() + .iter() + .map(deep_copy_buffer) + .collect::>(); + let child_data = data + .child_data() + .iter() + .map(deep_copy_array_data) + .collect::>(); + unsafe { + ArrayData::new_unchecked( + data_type, + len, + Some(null_count), + null_bit_buffer, + offset, + buffers, + child_data, + ) + } +} + +pub fn deep_copy_array(array: &dyn Array) -> Arc { + let data = array.to_data(); + let data = deep_copy_array_data(&data); + make_array(data) +} + +pub fn deep_copy_batch(batch: &RecordBatch) -> crate::Result { + let arrays = batch + .columns() + .iter() + .map(|array| deep_copy_array(array)) + .collect::>(); + RecordBatch::try_new(batch.schema().clone(), arrays) +} diff --git a/rust/lance-arrow/src/lib.rs b/rust/lance-arrow/src/lib.rs index 86dd114ab4..ea2ab91d06 100644 --- a/rust/lance-arrow/src/lib.rs +++ b/rust/lance-arrow/src/lib.rs @@ -17,6 +17,7 @@ use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, use arrow_select::take::take; use rand::prelude::*; +pub mod deepcopy; pub mod schema; pub use schema::*; pub mod bfloat16; diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index b3a2b87d62..10bcc44a84 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -165,6 +165,7 @@ impl BatchEncoder { pub(crate) fn get_encoder_for_field( field: &Field, cache_bytes_per_column: u64, + keep_original_array: bool, col_idx: &mut u32, field_col_mapping: &mut Vec<(i32, i32)>, ) -> Result> { @@ -198,6 +199,7 @@ impl BatchEncoder { field_col_mapping.push((field.id, my_col_idx as i32)); Ok(Box::new(PrimitiveFieldEncoder::try_new( cache_bytes_per_column, + keep_original_array, &field.data_type(), my_col_idx, )?)) @@ -209,12 +211,14 @@ impl BatchEncoder { let inner_encoding = Self::get_encoder_for_field( &field.children[0], cache_bytes_per_column, + keep_original_array, col_idx, field_col_mapping, )?; Ok(Box::new(ListFieldEncoder::new( inner_encoding, cache_bytes_per_column, + keep_original_array, my_col_idx, ))) } @@ -229,6 +233,7 @@ impl BatchEncoder { Self::get_encoder_for_field( field, cache_bytes_per_column, + keep_original_array, col_idx, field_col_mapping, ) @@ -245,6 +250,7 @@ impl BatchEncoder { *col_idx += 2; Ok(Box::new(BinaryFieldEncoder::new( cache_bytes_per_column, + keep_original_array, my_col_idx, ))) } @@ -252,7 +258,11 @@ impl BatchEncoder { } } - pub fn try_new(schema: &Schema, cache_bytes_per_column: u64) -> Result { + pub fn try_new( + schema: &Schema, + cache_bytes_per_column: u64, + keep_original_array: bool, + ) -> Result { let mut col_idx = 0; let mut field_col_mapping = Vec::new(); let field_encoders = schema @@ -262,6 +272,7 @@ impl BatchEncoder { Self::get_encoder_for_field( field, cache_bytes_per_column, + keep_original_array, &mut col_idx, &mut field_col_mapping, ) @@ -301,7 +312,11 @@ pub async fn encode_batch( ) -> Result { let mut data_buffer = BytesMut::new(); let lance_schema = Schema::try_from(batch.schema().as_ref())?; - let batch_encoder = BatchEncoder::try_new(&lance_schema, cache_bytes_per_column)?; + // At this point, this is just a test utility, and there is no point in copying allocations + // This could become configurable in the future if needed. + let keep_original_array = true; + let batch_encoder = + BatchEncoder::try_new(&lance_schema, cache_bytes_per_column, keep_original_array)?; let mut page_table = Vec::new(); for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) { let mut tasks = encoder.maybe_encode(arr.clone())?; diff --git a/rust/lance-encoding/src/encodings/logical/binary.rs b/rust/lance-encoding/src/encodings/logical/binary.rs index 57e1f0b633..af531e034c 100644 --- a/rust/lance-encoding/src/encodings/logical/binary.rs +++ b/rust/lance-encoding/src/encodings/logical/binary.rs @@ -166,10 +166,11 @@ pub struct BinaryFieldEncoder { } impl BinaryFieldEncoder { - pub fn new(cache_bytes_per_column: u64, column_index: u32) -> Self { + pub fn new(cache_bytes_per_column: u64, keep_original_array: bool, column_index: u32) -> Self { let items_encoder = Box::new( PrimitiveFieldEncoder::try_new( cache_bytes_per_column, + keep_original_array, &DataType::UInt8, column_index + 1, ) @@ -179,6 +180,7 @@ impl BinaryFieldEncoder { varbin_encoder: Box::new(ListFieldEncoder::new( items_encoder, cache_bytes_per_column, + keep_original_array, column_index, )), } diff --git a/rust/lance-encoding/src/encodings/logical/list.rs b/rust/lance-encoding/src/encodings/logical/list.rs index 8a82e6a743..2f8fff18d6 100644 --- a/rust/lance-encoding/src/encodings/logical/list.rs +++ b/rust/lance-encoding/src/encodings/logical/list.rs @@ -447,7 +447,6 @@ impl ArrayEncoder for ListOffsetsEncoder { // Nothing to patch, don't incur a copy return self.inner.encode(arrays, buffer_index); } - println!("Stitching offsets {:?}", arrays); let num_offsets = arrays.iter().map(|array| array.len()).sum::() - (arrays.len() - 1); let mut offsets = Vec::with_capacity(num_offsets); @@ -472,7 +471,6 @@ impl ArrayEncoder for ListOffsetsEncoder { .map(|&v| v + last_prev_offset - first_curr_offset), ); } - println!("Stitched offsets {:?}", offsets); self.inner .encode(&[Arc::new(Int32Array::from(offsets))], buffer_index) } @@ -487,6 +485,7 @@ impl ListFieldEncoder { pub fn new( items_encoder: Box, cache_bytes_per_columns: u64, + keep_original_array: bool, column_index: u32, ) -> Self { let inner_encoder = @@ -497,6 +496,7 @@ impl ListFieldEncoder { Self { offsets_encoder: PrimitiveFieldEncoder::new_with_encoder( cache_bytes_per_columns, + keep_original_array, column_index, offsets_encoder, ), diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 62dd968ec2..a6d41935ef 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -20,6 +20,7 @@ use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer}; use arrow_schema::{DataType, IntervalUnit, TimeUnit}; use bytes::BytesMut; use futures::{future::BoxFuture, FutureExt}; +use lance_arrow::deepcopy::deep_copy_array; use log::{debug, trace}; use snafu::{location, Location}; @@ -430,6 +431,7 @@ impl LogicalPageDecoder for PrimitiveFieldDecoder { pub struct PrimitiveFieldEncoder { cache_bytes: u64, + keep_original_array: bool, buffered_arrays: Vec, current_bytes: u64, encoder: Arc, @@ -451,9 +453,15 @@ impl PrimitiveFieldEncoder { } } - pub fn try_new(cache_bytes: u64, data_type: &DataType, column_index: u32) -> Result { + pub fn try_new( + cache_bytes: u64, + keep_original_array: bool, + data_type: &DataType, + column_index: u32, + ) -> Result { Ok(Self { cache_bytes, + keep_original_array, column_index, buffered_arrays: Vec::new(), current_bytes: 0, @@ -463,11 +471,13 @@ impl PrimitiveFieldEncoder { pub fn new_with_encoder( cache_bytes: u64, + keep_original_array: bool, column_index: u32, encoder: Arc, ) -> Self { Self { cache_bytes, + keep_original_array, column_index, buffered_arrays: Vec::new(), current_bytes: 0, @@ -502,14 +512,20 @@ impl FieldEncoder for PrimitiveFieldEncoder { // Buffers data, if there is enough to write a page then we create an encode task fn maybe_encode(&mut self, array: ArrayRef) -> Result> { self.current_bytes += array.get_array_memory_size() as u64; - self.buffered_arrays.push(array); if self.current_bytes > self.cache_bytes { + // Push into buffered_arrays without copy since we are about to flush anyways + self.buffered_arrays.push(array); debug!( "Flushing column {} page of size {} bytes (unencoded)", self.column_index, self.current_bytes ); Ok(vec![self.do_flush()]) } else { + if self.keep_original_array { + self.buffered_arrays.push(array); + } else { + self.buffered_arrays.push(deep_copy_array(array.as_ref())) + } trace!( "Accumulating data for column {}. Now at {} bytes", self.column_index, diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index ddf3823859..59c350936a 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -96,6 +96,7 @@ pub async fn check_round_trip_encoding_random(field: Field) { BatchEncoder::get_encoder_for_field( &lance_field, page_size, + /*keep_original_array=*/ true, &mut col_idx, &mut field_id_to_col_index, ) @@ -149,6 +150,7 @@ pub async fn check_round_trip_encoding_of_data(data: Vec>, test_c let encoder = BatchEncoder::get_encoder_for_field( &lance_field, page_size, + /*keep_original=*/ true, &mut col_idx, &mut field_id_to_col_index, ) diff --git a/rust/lance-file/Cargo.toml b/rust/lance-file/Cargo.toml index 1ac2d32fe6..6102c5fd51 100644 --- a/rust/lance-file/Cargo.toml +++ b/rust/lance-file/Cargo.toml @@ -19,6 +19,7 @@ lance-io.workspace = true arrow-arith.workspace = true arrow-array.workspace = true arrow-buffer.workspace = true +arrow-data.workspace = true arrow-schema.workspace = true arrow-select.workspace = true async-recursion.workspace = true diff --git a/rust/lance-file/src/v2/writer.rs b/rust/lance-file/src/v2/writer.rs index 7a76b49088..f06280a24b 100644 --- a/rust/lance-file/src/v2/writer.rs +++ b/rust/lance-file/src/v2/writer.rs @@ -41,6 +41,24 @@ pub struct FileWriterOptions { /// The default will use 8MiB per column which should be reasonable for most cases. // TODO: Do we need to be able to set this on a per-column basis? pub data_cache_bytes: Option, + /// The file writer buffers columns until enough data has arrived to flush a page + /// to disk. + /// + /// Some columns with small data types may not flush very often. These arrays can + /// stick around for a long time. These arrays might also be keeping larger data + /// structures alive. By default, the writer will make a deep copy of this array + /// to avoid any potential memory leaks. However, this can be disabled for a + /// (probably minor) performance boost if you are sure that arrays are not keeping + /// any sibling structures alive (this typically means the array was allocated in + /// the same language / runtime as the writer) + /// + /// Do not enable this if your data is arriving from the C data interface. + /// Data typically arrives one "batch" at a time (encoded in the C data interface + /// as a struct array). Each array in that batch keeps the entire batch alive. + /// This means a small boolean array (which we will buffer in memory for quite a + /// while) might keep a much larger record batch around in memory (even though most + /// of that batch's data has been written to disk) + pub keep_original_array: Option, } pub struct FileWriter { @@ -70,7 +88,9 @@ impl FileWriter { schema.validate()?; - let encoder = BatchEncoder::try_new(&schema, cache_bytes_per_column)?; + let keep_original_array = options.keep_original_array.unwrap_or(false); + + let encoder = BatchEncoder::try_new(&schema, cache_bytes_per_column, keep_original_array)?; let num_columns = encoder.num_columns(); let column_writers = encoder.field_encoders;