diff --git a/src/chunkers.rs b/src/chunkers.rs index 240393a..5286e03 100644 --- a/src/chunkers.rs +++ b/src/chunkers.rs @@ -1,8 +1,9 @@ -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, ensure, Context, Result}; use io::prelude::*; use std::fs::File; use std::fs::OpenOptions; use std::io; +use std::io::Take; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::Path; @@ -21,20 +22,21 @@ pub enum Chunk { } pub struct ThickChunker { - input: File, + input: Take, input_size: u64, total_read: u64, - block_size: usize, + block_size: u64, } impl ThickChunker { - pub fn new(input_path: &Path, block_size: usize) -> Result { + pub fn new(input_path: &Path, block_size: u64) -> Result { let input_size = thinp::file_utils::file_size(input_path)?; let input = OpenOptions::new() .read(true) .write(false) .open(input_path) - .context("couldn't open input file/dev")?; + .context("couldn't open input file/dev")? + .take(0); Ok(Self { input, @@ -44,9 +46,11 @@ impl ThickChunker { }) } - // FIXME: stop reallocating and zeroing these buffers - fn do_read(&mut self, mut buffer: Vec) -> Result> { - self.input.read_exact(&mut buffer)?; + fn do_read(&mut self, size: u64) -> Result> { + let mut buffer = Vec::with_capacity(size as usize); + self.input.set_limit(size); + let read_size = self.input.read_to_end(&mut buffer)?; + ensure!(read_size == size as usize, "short read"); self.total_read += buffer.len() as u64; Ok(Some(Chunk::Mapped(buffer))) } @@ -56,12 +60,10 @@ impl ThickChunker { if remaining == 0 { Ok(None) - } else if remaining >= self.block_size as u64 { - let buf = vec![0; self.block_size]; - self.do_read(buf) + } else if remaining >= self.block_size { + self.do_read(self.block_size) } else { - let buf = vec![0; remaining as usize]; - self.do_read(buf) + self.do_read(remaining) } } } @@ -141,12 +143,7 @@ impl Iterator for ThinChunker { type Item = Result; fn next(&mut self) -> Option { - let mc = self.next_chunk(); - match mc { - Err(e) => Some(Err(e)), - Ok(Some(c)) => Some(Ok(c)), - Ok(None) => None, - } + self.next_chunk().transpose() } } @@ -223,12 +220,7 @@ impl Iterator for DeltaChunker { type Item = Result; fn next(&mut self) -> Option { - let mc = self.next_chunk(); - match mc { - Err(e) => Some(Err(e)), - Ok(Some(c)) => Some(Ok(c)), - Ok(None) => None, - } + self.next_chunk().transpose() } } diff --git a/src/content_sensitive_splitter.rs b/src/content_sensitive_splitter.rs index c07d448..a2e6d1e 100644 --- a/src/content_sensitive_splitter.rs +++ b/src/content_sensitive_splitter.rs @@ -136,36 +136,41 @@ impl ContentSensitiveSplitter { let mut offset = 0; let mut remainder = self.unconsumed_len as usize; let min_size = self.window_size as usize / 4; + let max_size = self.window_size as usize * 8; let ws = self.window_size as usize; + if remainder < min_size { + offset += min_size - remainder; + remainder = min_size; + } while offset < data.len() { - let end = data.len(); - if let Some(boundary) = self.hasher.next_match(&data[offset..end], self.mask_s) { - consumes.push(remainder + boundary); - offset += boundary; - - let skip_size = std::cmp::min(data.len() - offset, min_size); - offset += skip_size; - remainder = skip_size; - continue; - } else { - offset += ws; - remainder += ws; - } + let len_s = ws - remainder; + if len_s > 0 { + let end = std::cmp::min(data.len(), offset + len_s); + if let Some(boundary) = self.hasher.next_match(&data[offset..end], self.mask_s) { + consumes.push(remainder + boundary); + offset += boundary + min_size; + remainder = min_size; + continue; + } else { + offset += len_s; + remainder += len_s; + } - if offset >= data.len() { - break; + if offset >= data.len() { + break; + } } - - if let Some(boundary) = self.hasher.next_match(&data[offset..], self.mask_l) { + let len_l = max_size - remainder; + let end = std::cmp::min(data.len(), offset + len_l); + if let Some(boundary) = self.hasher.next_match(&data[offset..end], self.mask_l) { consumes.push(remainder + boundary); - offset += boundary; - - let skip_size = std::cmp::min(data.len() - offset, min_size); - offset += skip_size; - remainder = skip_size; + offset += boundary + min_size; + remainder = min_size; } else { - break; + consumes.push(end - offset + remainder); + offset = end + min_size; + remainder = min_size; } } diff --git a/src/slab.rs b/src/slab.rs index 249467c..d8349b0 100644 --- a/src/slab.rs +++ b/src/slab.rs @@ -119,7 +119,7 @@ impl SlabOffsets { // derived data, and can be rebuilt with the repair fn. // // file :=
* -// header := +// header := // slab := const FILE_MAGIC: u64 = 0xb927f96a6b611180; @@ -196,7 +196,6 @@ fn writer_(shared: Arc>, rx: Receiver) -> Result<()> let buf = rx.recv(); if buf.is_err() { // all send ends have been closed, so we're done. - assert!(queued.is_empty()); break; }