Skip to content

Commit

Permalink
Merge pull request #22 from bmarzins/splitter_work
Browse files Browse the repository at this point in the history
Splitter man other misc work
  • Loading branch information
jthornber authored Nov 20, 2024
2 parents 701a0d8 + 90e338b commit cb3e8c6
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 50 deletions.
42 changes: 17 additions & 25 deletions src/chunkers.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, ensure, Context, Result};
use io::prelude::*;
use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::io::Take;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::Path;
Expand All @@ -21,20 +22,21 @@ pub enum Chunk {
}

pub struct ThickChunker {
input: File,
input: Take<File>,
input_size: u64,
total_read: u64,
block_size: usize,
block_size: u64,
}

impl ThickChunker {
pub fn new(input_path: &Path, block_size: usize) -> Result<Self> {
pub fn new(input_path: &Path, block_size: u64) -> Result<Self> {
let input_size = thinp::file_utils::file_size(input_path)?;
let input = OpenOptions::new()
.read(true)
.write(false)
.open(input_path)
.context("couldn't open input file/dev")?;
.context("couldn't open input file/dev")?
.take(0);

Ok(Self {
input,
Expand All @@ -44,9 +46,11 @@ impl ThickChunker {
})
}

// FIXME: stop reallocating and zeroing these buffers
fn do_read(&mut self, mut buffer: Vec<u8>) -> Result<Option<Chunk>> {
self.input.read_exact(&mut buffer)?;
fn do_read(&mut self, size: u64) -> Result<Option<Chunk>> {
let mut buffer = Vec::with_capacity(size as usize);
self.input.set_limit(size);
let read_size = self.input.read_to_end(&mut buffer)?;
ensure!(read_size == size as usize, "short read");
self.total_read += buffer.len() as u64;
Ok(Some(Chunk::Mapped(buffer)))
}
Expand All @@ -56,12 +60,10 @@ impl ThickChunker {

if remaining == 0 {
Ok(None)
} else if remaining >= self.block_size as u64 {
let buf = vec![0; self.block_size];
self.do_read(buf)
} else if remaining >= self.block_size {
self.do_read(self.block_size)
} else {
let buf = vec![0; remaining as usize];
self.do_read(buf)
self.do_read(remaining)
}
}
}
Expand Down Expand Up @@ -141,12 +143,7 @@ impl Iterator for ThinChunker {
type Item = Result<Chunk>;

fn next(&mut self) -> Option<Self::Item> {
let mc = self.next_chunk();
match mc {
Err(e) => Some(Err(e)),
Ok(Some(c)) => Some(Ok(c)),
Ok(None) => None,
}
self.next_chunk().transpose()
}
}

Expand Down Expand Up @@ -223,12 +220,7 @@ impl Iterator for DeltaChunker {
type Item = Result<Chunk>;

fn next(&mut self) -> Option<Self::Item> {
let mc = self.next_chunk();
match mc {
Err(e) => Some(Err(e)),
Ok(Some(c)) => Some(Ok(c)),
Ok(None) => None,
}
self.next_chunk().transpose()
}
}

Expand Down
51 changes: 28 additions & 23 deletions src/content_sensitive_splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,36 +136,41 @@ impl ContentSensitiveSplitter {
let mut offset = 0;
let mut remainder = self.unconsumed_len as usize;
let min_size = self.window_size as usize / 4;
let max_size = self.window_size as usize * 8;
let ws = self.window_size as usize;

if remainder < min_size {
offset += min_size - remainder;
remainder = min_size;
}
while offset < data.len() {
let end = data.len();
if let Some(boundary) = self.hasher.next_match(&data[offset..end], self.mask_s) {
consumes.push(remainder + boundary);
offset += boundary;

let skip_size = std::cmp::min(data.len() - offset, min_size);
offset += skip_size;
remainder = skip_size;
continue;
} else {
offset += ws;
remainder += ws;
}
let len_s = ws - remainder;
if len_s > 0 {
let end = std::cmp::min(data.len(), offset + len_s);
if let Some(boundary) = self.hasher.next_match(&data[offset..end], self.mask_s) {
consumes.push(remainder + boundary);
offset += boundary + min_size;
remainder = min_size;
continue;
} else {
offset += len_s;
remainder += len_s;
}

if offset >= data.len() {
break;
if offset >= data.len() {
break;
}
}

if let Some(boundary) = self.hasher.next_match(&data[offset..], self.mask_l) {
let len_l = max_size - remainder;
let end = std::cmp::min(data.len(), offset + len_l);
if let Some(boundary) = self.hasher.next_match(&data[offset..end], self.mask_l) {
consumes.push(remainder + boundary);
offset += boundary;

let skip_size = std::cmp::min(data.len() - offset, min_size);
offset += skip_size;
remainder = skip_size;
offset += boundary + min_size;
remainder = min_size;
} else {
break;
consumes.push(end - offset + remainder);
offset = end + min_size;
remainder = min_size;
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/slab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl SlabOffsets {
// derived data, and can be rebuilt with the repair fn.
//
// file := <header> <slab>*
// header := <magic nr> <slab format version>
// header := <magic nr> <slab format version> <flags>
// slab := <magic nr> <len> <checksum> <compressed data>

const FILE_MAGIC: u64 = 0xb927f96a6b611180;
Expand Down Expand Up @@ -196,7 +196,6 @@ fn writer_(shared: Arc<Mutex<SlabShared>>, rx: Receiver<SlabData>) -> Result<()>
let buf = rx.recv();
if buf.is_err() {
// all send ends have been closed, so we're done.
assert!(queued.is_empty());
break;
}

Expand Down

0 comments on commit cb3e8c6

Please sign in to comment.