diff --git a/src/maldi_tools/load_maldi_data.py b/src/maldi_tools/load_maldi_data.py index 9798111..32b8efe 100644 --- a/src/maldi_tools/load_maldi_data.py +++ b/src/maldi_tools/load_maldi_data.py @@ -1,18 +1,18 @@ -# """Used for loading and binning. spectra data across multiple TMA runs into a single DataFrame. +"""Used for loading and binning. spectra data across multiple TMA runs into a single DataFrame. -# This is expected to serve as a drop-in replacement for SCiLS. Because we to combine, normalize, -# and bin across multiple, we need to directly interact with pyTDFSDK over using an existing tool -# like timsconvert. - -# TODO: need to access TIC normalization, or else add it in manually. -# """ +This is expected to serve as a drop-in replacement for SCiLS. Because we to combine, normalize, +and bin across multiple, we need to directly interact with pyTDFSDK over using an existing tool +like timsconvert. +TODO: need to access TIC normalization, or else add it in manually. +""" import os from bisect import bisect_left +from concurrent.futures import ProcessPoolExecutor, as_completed from ctypes import CDLL from pathlib import Path -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, List, Tuple, Union import numpy as np import pandas as pd @@ -22,10 +22,12 @@ # Initialize the TDFSDK binary loader BASE_PATH = Path(os.path.dirname(os.path.realpath(__file__))) -TDF_SDK_BINARY: CDLL = init_tdf_sdk_api(os.path.join(BASE_PATH, "timsdata.dll")) -def generate_mz_bins(min_mz: float = 800, max_mz: float = 4000) -> np.ndarray: +def generate_mz_bins( + min_mz: float = 800, + max_mz: float = 4000 +) -> np.ndarray: """Given a range of mz values, generate the bins as would be calculated by Bruker's SCiLS. To convert from an mz value to its corresponding bin size, use mz_val / 200 / 1000, since the @@ -55,26 +57,30 @@ def generate_mz_bins(min_mz: float = 800, max_mz: float = 4000) -> np.ndarray: return np.array(mz_bins) -def init_tsf_load_object(maldi_data_path: Union[str, Path]) -> TsfData: +def init_tsf_load_object(maldi_data_path: Union[str, Path], tdf_sdk_binary: CDLL) -> TsfData: """Initialize the cursor (TsfData object). Args: ---- maldi_data_path (Union[str, pathlib.Path]): The path to the raw MALDI data, must end with a `.d` extension + tdf_sdk_binary (CDLL): + The dynamically loaded library created for the TsfData object Returns: ------- pyTDFSDK.classes.TsfData: The pyTDFSDK cursor for the MALDI data_path provided """ - return TsfData(maldi_data_path, tdf_sdk=TDF_SDK_BINARY) + return TsfData(maldi_data_path, tdf_sdk=tdf_sdk_binary) def extract_maldi_tsf_data( - maldi_data_path: Union[str, Path], min_mz: float = 800, max_mz: float = 4000 + maldi_data_path: Union[str, Path], + min_mz: float = 800, + max_mz: float = 4000 ) -> Tuple[pd.DataFrame, pd.DataFrame]: - """Extract the spectra data for a particular MALDI run + """Extract the spectra data for a particular MALDI run. Args: ==== @@ -90,9 +96,8 @@ def extract_maldi_tsf_data( Tuple[pandas.DataFrame, pandas.DataFrame]: Two DataFrames containing the spectra and poslog info across the run respectively """ - tsf_cursor: TsfData = init_tsf_load_object(maldi_data_path) - - tsf_poslog: pd.DataFrame = tsf_cursor.analysis["MaldiFrameInfo"] + tdf_sdk_binary: CDLL = init_tdf_sdk_api(os.path.join(BASE_PATH, "timsdata.dll")) + tsf_cursor: TsfData = init_tsf_load_object(maldi_data_path, tdf_sdk_binary) mz_bins: np.ndarray = generate_mz_bins(min_mz, max_mz) spectra_dict: Dict[float, float] = {} @@ -107,26 +112,31 @@ def extract_maldi_tsf_data( ) for mz, intensity in zip(mz_arr, intensity_arr): - binned_mz = bisect_left(mz_bins, mz) + binned_mz = mz_arr[bisect_left(mz_bins, mz)] spectra_dict[binned_mz] = ( 0 if binned_mz not in spectra_dict else spectra_dict[binned_mz] ) + intensity if sid % 5000 == 0: print(f"Processed {sid} spots") - tsf_spectra: pd.DataFrame = pd.DataFrame( - spectra_dict.items(), columns=["m/z", "intensity"] - ) - tsf_spectra["run_name"] = os.path.basename(os.path.splitext(maldi_data_path)[0]) + run_name = os.path.basename(os.path.splitext(maldi_data_path)[0]) + tsf_spectra: pd.DataFrame = pd.DataFrame(spectra_dict.items(), columns=["m/z", "intensity"]) + tsf_spectra["run_name"] = run_name tsf_spectra.sort_values(by="m/z", inplace=True) + tsf_poslog: pd.DataFrame = tsf_cursor.analysis["MaldiFrameInfo"] + tsf_poslog["run_name"] = run_name + return tsf_spectra, tsf_poslog def extract_maldi_run_spectra( - maldi_paths: List[Union[str, Path]], min_mz: float = 800, max_mz: float = 4000 + maldi_paths: List[Union[str, Path]], + min_mz: float = 800, + max_mz: float = 4000, + num_workers: int = 32 ) -> Tuple[pd.DataFrame, pd.DataFrame]: - """Extract the full spectra and corresponding poslog information from the MALDI files + """Extract the full spectra and corresponding poslog information from the MALDI files. Args: ---- @@ -136,16 +146,35 @@ def extract_maldi_run_spectra( The minimum m/z value observed during the run max_mz (float): The maximum m/z value observed during the run + num_workers (int): + The number of workers to use for the process, default to all Returns: ------- Tuple[pandas.DataFrame, pandas.DataFrame: Two DataFrames containing the spectra and poslog info across all runs respectively """ + if num_workers <= 0: + raise ValueError("num_workers specified must be positive") + poslog_df: pd.DataFrame = pd.DataFrame() spectra_df: pd.DataFrame = pd.DataFrame() - from timeit import default_timer + # with ProcessPoolExecutor(max_workers=num_workers) as executor: + # future_maldi_data = { + # executor.submit(extract_maldi_run_spectra, mp, min_mz, max_mz): mp for mp in maldi_paths + # } + + # for future in as_completed(future_maldi_data): + # mp = future_maldi_data[future] + # try: + # poslog_mp, spectra_mp = future.result() + # poslog_df = pd.concat([poslog_df, poslog_mp]) + # spectra_df = pd.concat([spectra_df, spectra_mp]) + # except Exception as e: + # print(f"Exception raised while processing {mp}") + # print(e) + for mp in maldi_paths: print(f"Processing data {mp}") start = default_timer()