-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Transfer file2udf_quantit_qc to cg_lims and update file_to_udf (#574)…
…(minor) ### Added - New EPP, cg_lims/EPPs/files/parsers/quantit_excel_to_udf.py - New common functions in cg_lims/get/fields.py - New common functions in cg_lims/get/artifacts.py - New features in cg_lims/EPPs/files/parsers/file_to_udf.py for parsing a multitude of values and files ### Changed - Moved all parser EPPs into a common sub dir, cg_lims/EPPs/files/parsers - Refactored the CSV parser, cg_lims/EPPs/files/parsers/file_to_udf.py
- Loading branch information
1 parent
8c07c65
commit 589a423
Showing
10 changed files
with
303 additions
and
105 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
import csv | ||
import logging | ||
import sys | ||
from pathlib import Path | ||
from typing import Any, Dict, List, Optional, Tuple | ||
|
||
import click | ||
from cg_lims import options | ||
from cg_lims.exceptions import ArgumentError, LimsError, MissingArtifactError, MissingFileError | ||
from cg_lims.get.artifacts import create_well_dict, get_artifact_by_name | ||
from cg_lims.get.files import get_file_path | ||
from genologics.entities import Artifact, Process | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
||
def make_udf_dict(udfs: Tuple[str], value_fields: Tuple[str]) -> Dict[str, str]: | ||
"""Create dictionary containing UDF names and their corresponding value field names.""" | ||
if len(udfs) != len(value_fields): | ||
raise ArgumentError( | ||
f"The number of artifact-udfs to update and file value fields must be the same." | ||
) | ||
udf_vf_dict: dict = {} | ||
for i in range(len(udfs)): | ||
udf_vf_dict[udfs[i]] = value_fields[i] | ||
return udf_vf_dict | ||
|
||
|
||
def get_file_placeholder_paths(placeholder_names: List[str], process: Process) -> List[str]: | ||
"""Convert a list of file placeholder names to complete file paths.""" | ||
file_paths: List[str] = [] | ||
for placeholder_name in placeholder_names: | ||
file_artifact: Artifact = get_artifact_by_name(process=process, name=placeholder_name) | ||
file_paths.append(get_file_path(file_artifact=file_artifact)) | ||
return file_paths | ||
|
||
|
||
def set_udfs_from_file( | ||
well_field: str, udf_vf_dict: Dict[str, str], well_dict: dict, result_file: Path | ||
) -> List[str]: | ||
"""Parse a CSV file and set the corresponding UDF values for each sample.""" | ||
error_msg: List[str] = [] | ||
passed_arts: int = 0 | ||
with open(result_file, newline="", encoding="latin1") as csvfile: | ||
reader: csv.DictReader = csv.DictReader(csvfile) | ||
for udf_name in list(udf_vf_dict.keys()): | ||
if udf_vf_dict[udf_name] not in reader.fieldnames: | ||
LOG.info( | ||
f"Value {udf_vf_dict[udf_name]} does not exist in file {result_file}, skipping." | ||
) | ||
continue | ||
value_field: str = udf_vf_dict.pop(udf_name) | ||
|
||
for sample in reader: | ||
well: str = sample.get(well_field) | ||
if well not in well_dict: | ||
LOG.info(f"Well {well} was not found in the step. Skipping!") | ||
continue | ||
artifact: Artifact = well_dict[well] | ||
value: Any = sample.get(value_field) | ||
if not value: | ||
error_msg.append("Some samples in the file had missing values.") | ||
LOG.info(f"Missing value for sample {sample} in well {well}. Skipping!") | ||
continue | ||
try: | ||
artifact.udf[udf_name] = str(value) | ||
except: | ||
artifact.udf[udf_name] = float(value) | ||
artifact.put() | ||
passed_arts += 1 | ||
|
||
if passed_arts < len(well_dict.keys()): | ||
error_msg.append("Some samples in the step were not represented in the file.") | ||
|
||
return error_msg | ||
|
||
|
||
def set_udfs( | ||
well_fields: List[str], | ||
udf_vf_dict: Dict[str, str], | ||
well_dict: dict, | ||
file_placeholders: List[str], | ||
local_files: Optional[List[str]], | ||
process: Process, | ||
) -> None: | ||
"""Loop through each given file and parse out the given values which are then set to their corresponding UDFs.""" | ||
if local_files: | ||
files: List[str] = local_files | ||
else: | ||
files: List[str] = get_file_placeholder_paths( | ||
placeholder_names=file_placeholders, process=process | ||
) | ||
if len(well_fields) != len(files): | ||
raise ArgumentError(f"The number of files to read and file value fields must be the same.") | ||
|
||
file_well_list: zip = zip(files, well_fields) | ||
error_message: List[str] = [] | ||
|
||
for file_tuple in file_well_list: | ||
file: str = file_tuple[0] | ||
well_field: str = file_tuple[1] | ||
if not Path(file).is_file(): | ||
raise MissingFileError(f"No such file: {file}") | ||
error_message += set_udfs_from_file( | ||
well_field=well_field, | ||
udf_vf_dict=udf_vf_dict, | ||
well_dict=well_dict, | ||
result_file=Path(file), | ||
) | ||
|
||
if error_message: | ||
error_string: str = " ".join(list(set(error_message))) | ||
raise MissingArtifactError(error_string + " See the log for details.") | ||
|
||
|
||
@click.command() | ||
@options.file_placeholders(help="File placeholder name.") | ||
@options.local_files() | ||
@options.udfs() | ||
@options.well_fields() | ||
@options.value_fields() | ||
@options.input() | ||
@click.pass_context | ||
def csv_well_to_udf( | ||
ctx, | ||
files: Tuple[str], | ||
local_files: Tuple[str], | ||
udfs: Tuple[str], | ||
well_fields: Tuple[str], | ||
value_fields: Tuple[str], | ||
input: bool, | ||
): | ||
"""Script to copy data from files to UDFs based on well position.""" | ||
|
||
LOG.info(f"Running {ctx.command_path} with params: {ctx.params}") | ||
process: Process = ctx.obj["process"] | ||
|
||
try: | ||
well_dict: Dict[str, Artifact] = create_well_dict(process=process, input_flag=input) | ||
udf_vf_dict: Dict[str, str] = make_udf_dict(udfs=udfs, value_fields=value_fields) | ||
set_udfs( | ||
well_fields=list(well_fields), | ||
udf_vf_dict=udf_vf_dict, | ||
well_dict=well_dict, | ||
file_placeholders=list(files), | ||
local_files=list(local_files), | ||
process=process, | ||
) | ||
click.echo("The UDFs were successfully populated.") | ||
except LimsError as e: | ||
sys.exit(e.message) |
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import logging | ||
import sys | ||
from pathlib import Path | ||
from typing import Dict | ||
|
||
import click | ||
import pandas as pd | ||
from cg_lims import options | ||
from cg_lims.exceptions import LimsError, MissingArtifactError, MissingFileError | ||
from cg_lims.get.artifacts import create_well_dict, get_artifact_by_name | ||
from cg_lims.get.files import get_file_path | ||
from genologics.entities import Artifact, Process | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
||
def set_udfs(udf: str, well_dict: dict, result_file: Path): | ||
"""Reads the Quant-iT Excel file and sets the value for each sample""" | ||
|
||
failed_artifacts: int = 0 | ||
skipped_artifacts: int = 0 | ||
df: pd.DataFrame = pd.read_excel(result_file, skiprows=11, header=None) | ||
for index, row in df.iterrows(): | ||
if row[0] not in well_dict.keys(): | ||
LOG.info(f"Well {row[0]} is not used by a sample in the step, skipping.") | ||
skipped_artifacts += 1 | ||
continue | ||
elif pd.isna(row[2]): | ||
LOG.info( | ||
f"Well {row[0]} does not have a valid concentration value ({row[2]}), skipping." | ||
) | ||
failed_artifacts += 1 | ||
continue | ||
artifact: Artifact = well_dict[row[0]] | ||
artifact.udf[udf] = row[2] | ||
artifact.put() | ||
|
||
if failed_artifacts or skipped_artifacts: | ||
error_message: str = "Warning:" | ||
if failed_artifacts: | ||
error_message += f" Skipped {failed_artifacts} artifact(s) with wrong and/or blank values for some UDFs." | ||
if skipped_artifacts: | ||
error_message += f" Skipped {failed_artifacts} artifact(s) as they weren't represented in the result file." | ||
raise MissingArtifactError(error_message) | ||
|
||
|
||
@click.command() | ||
@options.file_placeholder(help="File placeholder name.") | ||
@options.local_file() | ||
@options.udf() | ||
@options.input() | ||
@click.pass_context | ||
def quantit_excel_to_udf( | ||
ctx, | ||
file: str, | ||
local_file: str, | ||
udf: str, | ||
input: bool, | ||
): | ||
"""Script to copy data from a Quant-iT result Excel file to concentration UDFs based on well position""" | ||
|
||
LOG.info(f"Running {ctx.command_path} with params: {ctx.params}") | ||
process: Process = ctx.obj["process"] | ||
|
||
if local_file: | ||
file_path: str = local_file | ||
else: | ||
file_art: Artifact = get_artifact_by_name(process=process, name=file) | ||
file_path: str = get_file_path(file_art) | ||
|
||
try: | ||
if not Path(file_path).is_file(): | ||
raise MissingFileError(f"No such file: {file_path}") | ||
well_dict: Dict[str, Artifact] = create_well_dict( | ||
process=process, input_flag=input, quantit_well_format=True | ||
) | ||
set_udfs(udf=udf, well_dict=well_dict, result_file=Path(file_path)) | ||
click.echo(f"Updated {len(well_dict.keys())} artifact(s) successfully.") | ||
except LimsError as e: | ||
sys.exit(e.message) |
Oops, something went wrong.