Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timscompress #32

Merged
merged 14 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
373 changes: 191 additions & 182 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ keywords = ["MS", "LC-TIMS-TOF", "PASEF"]
zstd = "0.13.2"
rayon = "1.10.0"
linreg = "0.2.0"
bytemuck = "1.13.1"
bytemuck = "1.18.0"
thiserror = "1.0.0"
memmap2 = "0.9.3"
rusqlite = { version = "0.31.0", features = ["bundled"], optional = true}
parquet = { version = "42.0.0", optional = true }
serde = { version = "1.0.209", features = ["derive"], optional = true }
serde_json = { version = "1.0.127", optional = true }
rusqlite = { version = "0.32.0", features = ["bundled"], optional = true }
parquet = { version = "53.0.0", optional = true }
serde = { version = "1.0.210", features = ["derive"], optional = true }
serde_json = { version = "1.0.128", optional = true }
timscompress = {optional=true}

[features]
tdf = ["rusqlite"]
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ TODO
* Improve docs
* Improve tests
* Pase CompressionType1
* Tarred file reader
* Clean up src (FrameReader, ...)
* Cleaner try_from conversions/readers
* Make Path of TimsTOF data into special type
* Single access point for all readers?
* Few unchecked unwraps left
Expand Down
9 changes: 4 additions & 5 deletions src/io/readers/file_readers/parquet_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@ pub trait ReadableParquetTable {
let file: File = File::open(file_name)?;
let reader: SerializedFileReader<File> =
SerializedFileReader::new(file)?;
let results: Vec<Self> = reader
reader
.get_row_iter(None)?
.map(|record| {
let mut result = Self::default();
for (name, field) in record.get_column_iter() {
for (name, field) in record?.get_column_iter() {
result.update_from_parquet_file(
name.to_string().as_str(),
field.to_string(),
);
}
result
Ok(result)
})
.collect();
Ok(results)
.collect()
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/io/readers/file_readers/tdf_blob_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,14 @@ impl TdfBlobReader {
}
}

#[cfg(feature = "minitdf")]
#[derive(Debug)]
pub struct IndexedTdfBlobReader {
blob_reader: TdfBlobReader,
binary_offsets: Vec<usize>,
}

#[cfg(feature = "minitdf")]
impl IndexedTdfBlobReader {
pub fn new(
file_name: impl AsRef<Path>,
Expand Down Expand Up @@ -115,6 +117,7 @@ pub enum TdfBlobReaderError {
pub enum IndexedTdfBlobReaderError {
#[error("{0}")]
TdfBlobReaderError(#[from] TdfBlobReaderError),
#[cfg(feature = "minitdf")]
#[error("Invalid index {0}")]
InvalidIndex(usize),
}
8 changes: 8 additions & 0 deletions src/io/readers/file_readers/tdf_blob_reader/tdf_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ impl TdfBlob {
}
}

pub fn get_all(&self) -> Vec<u32> {
(0..self.len())
.map(|index| self.get(index).expect(
"When iterating over the length of a tdf blob, you cannot go out of bounds"
))
.collect()
}

pub fn get(&self, index: usize) -> Option<u32> {
if index >= self.len() {
None
Expand Down
110 changes: 106 additions & 4 deletions src/io/readers/frame_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use std::{
};

use rayon::iter::{IntoParallelIterator, ParallelIterator};
#[cfg(feature = "timscompress")]
use timscompress::reader::{
CompressedTdfBlobReader, CompressedTdfBlobReaderError,
};

use crate::{
ms_data::{AcquisitionType, Frame, MSLevel, QuadrupoleSettings},
Expand All @@ -19,30 +23,50 @@ use super::{
},
tdf_blob_reader::{TdfBlob, TdfBlobReader, TdfBlobReaderError},
},
QuadrupoleSettingsReader, QuadrupoleSettingsReaderError,
MetadataReader, MetadataReaderError, QuadrupoleSettingsReader,
QuadrupoleSettingsReaderError,
};

#[derive(Debug)]
pub struct FrameReader {
path: PathBuf,
tdf_bin_reader: TdfBlobReader,
#[cfg(feature = "timscompress")]
compressed_reader: CompressedTdfBlobReader,
frames: Vec<Frame>,
acquisition: AcquisitionType,
offsets: Vec<usize>,
dia_windows: Option<Vec<Arc<QuadrupoleSettings>>>,
compression_type: u8,
#[cfg(feature = "timscompress")]
scan_count: usize,
}

impl FrameReader {
pub fn new(path: impl AsRef<Path>) -> Result<Self, FrameReaderError> {
let sql_path = find_extension(&path, "analysis.tdf").ok_or(
FrameReaderError::FileNotFound("analysis.tdf".to_string()),
)?;
let compression_type =
match MetadataReader::new(&sql_path)?.compression_type {
2 => 2,
#[cfg(feature = "timscompress")]
3 => 3,
compression_type => {
return Err(FrameReaderError::CompressionTypeError(
compression_type,
))
},
};

let tdf_sql_reader = SqlReader::open(sql_path)?;
let sql_frames = SqlFrame::from_sql_reader(&tdf_sql_reader)?;
let bin_path = find_extension(&path, "analysis.tdf_bin").ok_or(
FrameReaderError::FileNotFound("analysis.tdf_bin".to_string()),
)?;
let tdf_bin_reader = TdfBlobReader::new(bin_path)?;
let tdf_bin_reader = TdfBlobReader::new(&bin_path)?;
#[cfg(feature = "timscompress")]
let compressed_reader = CompressedTdfBlobReader::new(&bin_path)?;
let acquisition = if sql_frames.iter().any(|x| x.msms_type == 8) {
AcquisitionType::DDAPASEF
} else if sql_frames.iter().any(|x| x.msms_type == 9) {
Expand Down Expand Up @@ -81,6 +105,13 @@ impl FrameReader {
)
})
.collect();
#[cfg(feature = "timscompress")]
let scan_count = sql_frames
.iter()
.map(|frame| frame.scan_count)
.max()
.expect("Frame table cannot be empty")
as usize;
let offsets = sql_frames.iter().map(|x| x.binary_offset).collect();
let reader = Self {
path: path.as_ref().to_path_buf(),
Expand All @@ -92,10 +123,19 @@ impl FrameReader {
AcquisitionType::DIAPASEF => Some(quadrupole_settings),
_ => None,
},
compression_type,
#[cfg(feature = "timscompress")]
compressed_reader,
#[cfg(feature = "timscompress")]
scan_count,
};
Ok(reader)
}

pub fn get_binary_offset(&self, index: usize) -> usize {
self.offsets[index]
}

pub fn parallel_filter<'a, F: Fn(&Frame) -> bool + Sync + Send + 'a>(
&'a self,
predicate: F,
Expand All @@ -107,14 +147,37 @@ impl FrameReader {
.map(move |x| self.get(x))
}

pub fn filter<'a, F: Fn(&Frame) -> bool + Sync + Send + 'a>(
&'a self,
predicate: F,
) -> impl Iterator<Item = Result<Frame, FrameReaderError>> + 'a {
(0..self.len())
.filter(move |x| predicate(&self.frames[*x]))
.map(move |x| self.get(x))
}

pub fn get_dia_windows(&self) -> Option<Vec<Arc<QuadrupoleSettings>>> {
self.dia_windows.clone()
}

pub fn get(&self, index: usize) -> Result<Frame, FrameReaderError> {
match self.compression_type {
2 => self.get_from_compression_type_2(index),
#[cfg(feature = "timscompress")]
3 => self.get_from_compression_type_3(index),
_ => Err(FrameReaderError::CompressionTypeError(
self.compression_type,
)),
}
}

fn get_from_compression_type_2(
&self,
index: usize,
) -> Result<Frame, FrameReaderError> {
// NOTE: get does it by 0-offsetting the vec, not by Frame index!!!
let mut frame = self.frames[index].clone();
let offset = self.offsets[index];
let mut frame = self.get_frame_without_coordinates(index)?;
let offset = self.get_binary_offset(index);
let blob = self.tdf_bin_reader.get(offset)?;
let scan_count: usize =
blob.get(0).ok_or(FrameReaderError::CorruptFrame)? as usize;
Expand All @@ -130,6 +193,36 @@ impl FrameReader {
Ok(frame)
}

#[cfg(feature = "timscompress")]
fn get_from_compression_type_3(
&self,
index: usize,
) -> Result<Frame, FrameReaderError> {
// NOTE: get does it by 0-offsetting the vec, not by Frame index!!!
// TODO
let mut frame = self.get_frame_without_coordinates(index)?;
let offset = self.get_binary_offset(index);
let raw_frame = self
.compressed_reader
.get_raw_frame_data(offset, self.scan_count);
frame.tof_indices = raw_frame.tof_indices;
frame.intensities = raw_frame.intensities;
frame.scan_offsets = raw_frame.scan_offsets;
Ok(frame)
}

pub fn get_frame_without_coordinates(
&self,
index: usize,
) -> Result<Frame, FrameReaderError> {
let frame = self
.frames
.get(index)
.ok_or(FrameReaderError::IndexOutOfBounds)?
.clone();
Ok(frame)
}

pub fn get_all(&self) -> Vec<Result<Frame, FrameReaderError>> {
self.parallel_filter(|_| true).collect()
}
Expand Down Expand Up @@ -239,14 +332,23 @@ fn get_frame_without_data(

#[derive(Debug, thiserror::Error)]
pub enum FrameReaderError {
#[cfg(feature = "timscompress")]
#[error("{0}")]
CompressedTdfBlobReaderError(#[from] CompressedTdfBlobReaderError),
#[error("{0}")]
TdfBlobReaderError(#[from] TdfBlobReaderError),
#[error("{0}")]
MetadataReaderError(#[from] MetadataReaderError),
#[error("{0}")]
FileNotFound(String),
#[error("{0}")]
SqlError(#[from] SqlError),
#[error("Corrupt Frame")]
CorruptFrame,
#[error("{0}")]
QuadrupoleSettingsReaderError(#[from] QuadrupoleSettingsReaderError),
#[error("Index out of bounds")]
IndexOutOfBounds,
#[error("Compression type {0} not understood")]
CompressionTypeError(u8),
}
5 changes: 4 additions & 1 deletion src/io/readers/metadata_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ fn get_im_converter(
) -> Result<Scan2ImConverter, MetadataReaderError> {
let scan_counts: Vec<u32> =
tdf_sql_reader.read_column_from_table("NumScans", "Frames")?;
let scan_max_index = *scan_counts.iter().max().unwrap(); // SqlReader cannot return empty vecs, so always succeeds
let scan_max_index = *scan_counts
.iter()
.max()
.expect("SqlReader cannot return empty vecs, so there is always a max scan index");
let (im_min, im_max) = get_im_bounds(sql_metadata)?;
Ok(Scan2ImConverter::from_boundaries(
im_min,
Expand Down
18 changes: 14 additions & 4 deletions src/io/readers/quad_settings_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ impl QuadrupoleSettingsReader {
.iter()
.map(|x| x.window_group)
.max()
.unwrap() as usize; // SqlReader cannot return empty vecs, so always succeeds
.expect("SqlReader cannot return empty vecs, so there is always a max window_group")
as usize;
let quadrupole_settings = (0..window_group_count)
.map(|window_group| {
let mut quad = QuadrupoleSettings::default();
Expand Down Expand Up @@ -306,9 +307,18 @@ fn expand_window_settings(
let window = window_group.window_group;
let frame = window_group.frame;
let group = &quadrupole_settings[window as usize - 1];
let window_group_start =
group.scan_starts.iter().min().unwrap().clone(); // SqlReader cannot return empty vecs, so always succeeds
let window_group_end = group.scan_ends.iter().max().unwrap().clone(); // SqlReader cannot return empty vecs, so always succeeds
let window_group_start = group
.scan_starts
.iter()
.min()
.expect("SqlReader cannot return empty vecs, so there is always min window_group index")
.clone();
let window_group_end = group
.scan_ends
.iter()
.max()
.expect("SqlReader cannot return empty vecs, so there is always max window_group index")
.clone();
for (sws, swe) in
scan_range_subsplit(window_group_start, window_group_end, &strategy)
{
Expand Down
11 changes: 1 addition & 10 deletions src/io/readers/spectrum_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
mod minitdf;
#[cfg(feature = "tdf")]
mod tdf;

use core::fmt;

#[cfg(feature = "minitdf")]
use minitdf::{MiniTDFSpectrumReader, MiniTDFSpectrumReaderError};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
Expand All @@ -23,12 +20,6 @@ pub struct SpectrumReader {
spectrum_reader: Box<dyn SpectrumReaderTrait>,
}

impl fmt::Debug for SpectrumReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SpectrumReader {{ /* fields omitted */ }}")
}
}

impl SpectrumReader {
pub fn build() -> SpectrumReaderBuilder {
SpectrumReaderBuilder::default()
Expand Down
7 changes: 1 addition & 6 deletions src/io/readers/spectrum_reader/minitdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,7 @@ impl MiniTDFSpectrumReader {
spectrum.index = index;
let blob = self.blob_reader.get(index)?;
if !blob.is_empty() {
let size: usize = blob.len();
let spectrum_data: Vec<u32> = (0..size)
.map(|i| {
blob.get(i).ok_or(MiniTDFSpectrumReaderError::BlobError)
})
.collect::<Result<Vec<u32>, _>>()?;
let spectrum_data: Vec<u32> = blob.get_all();
let scan_count: usize = blob.len() / 3;
let tof_indices_bytes: &[u32] =
&spectrum_data[..scan_count as usize * 2];
Expand Down
5 changes: 4 additions & 1 deletion src/io/readers/spectrum_reader/tdf/dda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ impl DDARawSpectrumReader {
let pasef_precursors =
&pasef_frames.iter().map(|x| x.precursor).collect();
let order: Vec<usize> = argsort(&pasef_precursors);
let max_precursor = pasef_precursors.iter().max().unwrap(); // SqlReader cannot return empty vecs, so always succeeds
let max_precursor = pasef_precursors
.iter()
.max()
.expect("SqlReader cannot return empty vecs, so there is always a max precursor index");
let mut offsets: Vec<usize> = Vec::with_capacity(max_precursor + 1);
offsets.push(0);
for (offset, &index) in order.iter().enumerate().take(order.len() - 1) {
Expand Down
Loading