diff --git a/.talismanrc b/.talismanrc index 04af00e..5a6f0a6 100644 --- a/.talismanrc +++ b/.talismanrc @@ -13,6 +13,10 @@ fileignoreconfig: checksum: 0e93196d244417801fd40ad6bfa01f53a6bf04257371378a1dc3e14cbc7a04d8 - filename: slurm-dev-environment/fs/nfs/public/tests/amplicon_v6_output/SRR6180434/taxonomy-summary/SILVA-SSU/SRR6180434.html checksum: be322a6ac4686d8fa31b271a862510603403c2da8dbdce60dbb7ad3877aa7b47 +- filename: analyses/tests.py + allowed_patterns: [key] +- filename: analyses/base_models/with_status_models.py + allowed_patterns: [key] allowed_patterns: - pagination_key - CommonMetadataKeys diff --git a/analyses/base_models/with_status_models.py b/analyses/base_models/with_status_models.py new file mode 100644 index 0000000..c67b3d0 --- /dev/null +++ b/analyses/base_models/with_status_models.py @@ -0,0 +1,69 @@ +# TODO: refactor the "state" field to a base model here, including helpers like a default state classmethod +# and a mark-status method +from enum import Enum +from typing import List, Union + +from django.db.models import Q + + +class SelectByStatusManagerMixin: + """ + A mixin for model managers that provides queryset filtering by the truthiness of keys in a model object's json field. + Helpful for filtering e.g. analyses.objects.filter(status__is_it_started=True, status__is_it_finished=False). + + Use: + class MyModelManager(SelectByStatusManagerMixin, models.Manager):... + STATUS_FIELDNAME = "my_status_field" + + class MyModel(...): + objects = MyModelManager() + + MyModel.objects.filter_by_statuses(["is_it_started"]).exclude_by_statuses(["is_it_finished"]). + """ + + STATUS_FIELDNAME = "status" + + def _build_q_objects( + self, keys: List[Union[str, Enum]], allow_null: bool, truthy_target: bool + ): + filters = [] + for status in keys: + status_label = status.value if isinstance(status, Enum) else status + if allow_null: + filters.append( + Q(**{f"{self.STATUS_FIELDNAME}__{status_label}": truthy_target}) + ) + else: + filters.append( + Q(**{f"{self.STATUS_FIELDNAME}__{status_label}": truthy_target}) + | Q(**{f"{self.STATUS_FIELDNAME}__{status_label}__isnull": True}) + ) + return filters + + def filter_by_statuses( + self, statuses_to_be_true: List[Union[str, Enum]] = None, strict: bool = True + ): + """ + Filter queryset by a combination of statuses in the object's status json field. + :param statuses_to_be_true: List of keys that should resolve to true values in the model's STATUS_FIELDNAME json field. + :param strict: If True, objects will be filtered out if the key is not present. If False, null values are also acceptable i.e. only falsey values will be excluded. + """ + if not statuses_to_be_true: + return super().get_queryset() + + filters = self._build_q_objects(statuses_to_be_true, strict, True) + return super().get_queryset().filter(*filters) + + def exclude_by_statuses( + self, statuses_to_exclude: List[Union[str, Enum]] = None, strict: bool = True + ): + """ + Filter queryset by excluding a combination of statuses in the object's status json field. + :param statuses_to_exclude: List of keys that, if they resolve to false values in the model's STATUS_FIELDNAME json field, will exclude that object from the queryset. + :param strict: If True, objects will only be excluded if the key is present AND truthy. If False, null values are also excluded. + """ + if not statuses_to_exclude: + return super().get_queryset() + + filters = self._build_q_objects(statuses_to_exclude, not strict, False) + return super().get_queryset().filter(*filters) diff --git a/analyses/models.py b/analyses/models.py index ed8d22b..c198659 100644 --- a/analyses/models.py +++ b/analyses/models.py @@ -26,6 +26,7 @@ ) from analyses.base_models.mgnify_accessioned_models import MGnifyAccessionField from analyses.base_models.with_downloads_models import WithDownloadsModel +from analyses.base_models.with_status_models import SelectByStatusManagerMixin from emgapiv2.async_utils import anysync_property # Some models associated with MGnify Analyses (MGYS, MGYA etc). @@ -215,7 +216,7 @@ def __str__(self): return f"{self.name} {self.version}" if self.version is not None else self.name -class AssemblyManager(ENADerivedManager): +class AssemblyManager(SelectByStatusManagerMixin, ENADerivedManager): def get_queryset(self): return super().get_queryset().select_related("run") @@ -275,6 +276,7 @@ class AssemblyStates(str, Enum): def default_status(cls): return { cls.ASSEMBLY_STARTED: False, + cls.PRE_ASSEMBLY_QC_FAILED: False, cls.ASSEMBLY_FAILED: False, cls.ASSEMBLY_COMPLETED: False, cls.ASSEMBLY_BLOCKED: False, @@ -407,7 +409,7 @@ def __str__(self): return f"ComputeResourceHeuristic {self.id} ({self.process})" -class AnalysisManagerDeferringAnnotations(models.Manager): +class AnalysisManagerDeferringAnnotations(SelectByStatusManagerMixin, models.Manager): """ The annotations field is a potentially large JSONB field. Defer it by default, since most queries don't need to transfer this large dataset. @@ -417,7 +419,7 @@ def get_queryset(self): return super().get_queryset().defer("annotations") -class AnalysisManagerIncludingAnnotations(models.Manager): +class AnalysisManagerIncludingAnnotations(SelectByStatusManagerMixin, models.Manager): def get_queryset(self): return super().get_queryset() @@ -543,6 +545,7 @@ class AnalysisStates(str, Enum): def default_status(cls): return { cls.ANALYSIS_STARTED: False, + cls.ANALYSIS_QC_FAILED: False, cls.ANALYSIS_COMPLETED: False, cls.ANALYSIS_BLOCKED: False, cls.ANALYSIS_FAILED: False, diff --git a/analyses/tests.py b/analyses/tests.py index faed6ef..59ed445 100644 --- a/analyses/tests.py +++ b/analyses/tests.py @@ -199,3 +199,112 @@ def test_analysis_inheritance( analysis.refresh_from_db() assert analysis.experiment_type != analysis.ExperimentTypes.UNKNOWN assert analysis.experiment_type == run.experiment_type + + +@pytest.mark.django_db(transaction=True) +def test_status_filtering( + raw_read_run, +): + run = Run.objects.first() + analysis = Analysis.objects.create( + study=run.study, run=run, ena_study=run.ena_study, sample=run.sample + ) + assert analysis.AnalysisStates.ANALYSIS_COMPLETED.value in analysis.status + assert analysis.status[analysis.AnalysisStates.ANALYSIS_COMPLETED.value] == False + + assert run.study.analyses.count() == 1 + + assert ( + run.study.analyses.filter( + **{f"status__{analysis.AnalysisStates.ANALYSIS_COMPLETED.value}": False} + ).count() + == 1 + ) + assert ( + run.study.analyses.filter( + **{f"status__{analysis.AnalysisStates.ANALYSIS_COMPLETED.value}": True} + ).count() + == 0 + ) + + assert ( + run.study.analyses.filter_by_statuses( + [analysis.AnalysisStates.ANALYSIS_COMPLETED] + ).count() + == 0 + ) + + # should include a missing key if not strict + assert run.study.analyses.filter_by_statuses(["non_existent_key"]).count() == 0 + assert ( + run.study.analyses.filter_by_statuses( + ["non_existent_key"], strict=False + ).count() + == 1 + ) + + analysis.status[analysis.AnalysisStates.ANALYSIS_COMPLETED] = True + analysis.save() + assert ( + run.study.analyses.filter_by_statuses( + [analysis.AnalysisStates.ANALYSIS_COMPLETED] + ).count() + == 1 + ) + + assert ( + run.study.analyses.filter_by_statuses( + [analysis.AnalysisStates.ANALYSIS_COMPLETED, "non_existent_key"] + ).count() + == 0 + ) + assert ( + run.study.analyses.filter_by_statuses( + [analysis.AnalysisStates.ANALYSIS_COMPLETED, "non_existent_key"], + strict=False, + ).count() + == 1 + ) + + analysis.status["an_extra_key"] = True + analysis.save() + assert ( + run.study.analyses.filter_by_statuses( + [analysis.AnalysisStates.ANALYSIS_COMPLETED, "an_extra_key"] + ).count() + == 1 + ) + + # EXCLUSIONS + assert run.study.analyses.count() == 1 + assert ( + run.study.analyses.exclude_by_statuses( + [analysis.AnalysisStates.ANALYSIS_COMPLETED] + ).count() + == 0 + ) + assert ( + run.study.analyses.exclude_by_statuses( + [analysis.AnalysisStates.ANALYSIS_FAILED] + ).count() + == 1 + ) + assert ( + run.study.analyses.exclude_by_statuses( + [analysis.AnalysisStates.ANALYSIS_FAILED, "an_extra_key"] + ).count() + == 0 + ) + + assert ( + run.study.analyses.exclude_by_statuses( + [analysis.AnalysisStates.ANALYSIS_FAILED, "non_existent_key"] + ).count() + == 1 + ) + assert ( + run.study.analyses.exclude_by_statuses( + [analysis.AnalysisStates.ANALYSIS_FAILED, "non_existent_key"], strict=False + ).count() + == 0 + ) diff --git a/workflows/flows/analysis_amplicon_study.py b/workflows/flows/analysis_amplicon_study.py index 1dfc1a4..7e36a6f 100644 --- a/workflows/flows/analysis_amplicon_study.py +++ b/workflows/flows/analysis_amplicon_study.py @@ -43,6 +43,7 @@ FASTQ_FTPS = analyses.models.Run.CommonMetadataKeys.FASTQ_FTPS METADATA__FASTQ_FTPS = f"{analyses.models.Run.metadata.field.name}__{FASTQ_FTPS}" EMG_CONFIG = settings.EMG_CONFIG +AnalysisStates = analyses.models.Analysis.AnalysisStates @task( @@ -60,16 +61,18 @@ def get_analyses_to_attempt( """ study.refresh_from_db() analyses_worth_trying = ( - study.analyses.filter( - **{ - f"status__{analyses.models.Analysis.AnalysisStates.ANALYSIS_COMPLETED}": False, - f"status__{analyses.models.Analysis.AnalysisStates.ANALYSIS_BLOCKED}": False, - } + study.analyses.exclude_by_statuses( + [ + analyses.models.Analysis.AnalysisStates.ANALYSIS_QC_FAILED, + analyses.models.Analysis.AnalysisStates.ANALYSIS_COMPLETED, + analyses.models.Analysis.AnalysisStates.ANALYSIS_BLOCKED, + ] ) - .filter(experiment_type=for_experiment_type) + .filter(experiment_type=for_experiment_type.value) .order_by("id") .values_list("id", flat=True) ) + print(f"Got {len(analyses_worth_trying)} analyses to attempt") return analyses_worth_trying @@ -109,12 +112,12 @@ def create_analyses( @task(log_prints=True) def mark_analysis_as_started(analysis: analyses.models.Analysis): - analysis.mark_status(analysis.AnalysisStates.ANALYSIS_STARTED) + analysis.mark_status(AnalysisStates.ANALYSIS_STARTED) @task(log_prints=True) def mark_analysis_as_failed(analysis: analyses.models.Analysis): - analysis.mark_status(analysis.AnalysisStates.ANALYSIS_FAILED) + analysis.mark_status(AnalysisStates.ANALYSIS_FAILED) @task( @@ -472,7 +475,7 @@ def sanity_check_amplicon_results( if reason: task_mark_analysis_status( analysis, - status=analyses.models.Analysis.AnalysisStates.ANALYSIS_POST_SANITY_CHECK_FAILED, + status=AnalysisStates.ANALYSIS_POST_SANITY_CHECK_FAILED, reason=reason, ) @@ -483,7 +486,7 @@ def import_completed_analysis( ): for analysis in amplicon_analyses: analysis.refresh_from_db() - if not analysis.status.get(analysis.AnalysisStates.ANALYSIS_COMPLETED): + if not analysis.status.get(AnalysisStates.ANALYSIS_COMPLETED): print(f"{analysis} is not completed successfuly. Skipping.") continue if analysis.annotations.get(analysis.TAXONOMIES): @@ -546,17 +549,17 @@ def set_post_analysis_states(amplicon_current_outdir: Path, amplicon_analyses: L if analysis.run.first_accession in qc_failed_runs: task_mark_analysis_status( analysis, - status=analyses.models.Analysis.AnalysisStates.ANALYSIS_FAILED, + status=AnalysisStates.ANALYSIS_QC_FAILED, reason=qc_failed_runs[analysis.run.first_accession], ) elif analysis.run.first_accession in qc_completed_runs: task_mark_analysis_status( analysis, - status=analyses.models.Analysis.AnalysisStates.ANALYSIS_COMPLETED, + status=AnalysisStates.ANALYSIS_COMPLETED, reason=qc_completed_runs[analysis.run.first_accession], unset_statuses=[ - analyses.models.Analysis.AnalysisStates.ANALYSIS_FAILED, - analyses.models.Analysis.AnalysisStates.ANALYSIS_BLOCKED, + AnalysisStates.ANALYSIS_FAILED, + AnalysisStates.ANALYSIS_BLOCKED, ], ) sanity_check_amplicon_results( @@ -566,7 +569,7 @@ def set_post_analysis_states(amplicon_current_outdir: Path, amplicon_analyses: L else: task_mark_analysis_status( analysis, - status=analyses.models.Analysis.AnalysisStates.ANALYSIS_FAILED, + status=AnalysisStates.ANALYSIS_FAILED, reason="Missing run in execution", ) diff --git a/workflows/flows/assemble_study.py b/workflows/flows/assemble_study.py index 8722b2b..92fa782 100644 --- a/workflows/flows/assemble_study.py +++ b/workflows/flows/assemble_study.py @@ -122,11 +122,12 @@ def get_assemblies_to_attempt(study: analyses.models.Study) -> List[Union[str, i :return: """ study.refresh_from_db() - assemblies_worth_trying = study.assemblies_reads.filter( - **{ - f"status__{analyses.models.Assembly.AssemblyStates.ASSEMBLY_COMPLETED}": False, - f"status__{analyses.models.Assembly.AssemblyStates.ASSEMBLY_BLOCKED}": False, - } + assemblies_worth_trying = study.assemblies_reads.exclude_by_statuses( + [ + analyses.models.Assembly.AssemblyStates.PRE_ASSEMBLY_QC_FAILED, + analyses.models.Assembly.AssemblyStates.ASSEMBLY_COMPLETED, + analyses.models.Assembly.AssemblyStates.ASSEMBLY_BLOCKED, + ] ).values_list("id", flat=True) return assemblies_worth_trying diff --git a/workflows/tests/test_analysis_amplicon_study_flow.py b/workflows/tests/test_analysis_amplicon_study_flow.py index a0452a1..99a31fa 100644 --- a/workflows/tests/test_analysis_amplicon_study_flow.py +++ b/workflows/tests/test_analysis_amplicon_study_flow.py @@ -505,7 +505,7 @@ async def test_prefect_analyse_amplicon_flow( # check failed runs assert ( await analyses.models.Analysis.objects.filter( - status__analysis_failed=True + status__analysis_qc_failed=True ).acount() == 1 )