Skip to content

Commit

Permalink
Merge pull request #28 from EBI-Metagenomics/analysis-state-filtering
Browse files Browse the repository at this point in the history
Filter by Statuses
  • Loading branch information
KateSakharova authored Jan 15, 2025
2 parents ebcbe13 + 06f5495 commit 8014248
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 24 deletions.
4 changes: 4 additions & 0 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ fileignoreconfig:
checksum: 0e93196d244417801fd40ad6bfa01f53a6bf04257371378a1dc3e14cbc7a04d8
- filename: slurm-dev-environment/fs/nfs/public/tests/amplicon_v6_output/SRR6180434/taxonomy-summary/SILVA-SSU/SRR6180434.html
checksum: be322a6ac4686d8fa31b271a862510603403c2da8dbdce60dbb7ad3877aa7b47
- filename: analyses/tests.py
allowed_patterns: [key]
- filename: analyses/base_models/with_status_models.py
allowed_patterns: [key]
allowed_patterns:
- pagination_key
- CommonMetadataKeys
69 changes: 69 additions & 0 deletions analyses/base_models/with_status_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# TODO: refactor the "state" field to a base model here, including helpers like a default state classmethod
# and a mark-status method
from enum import Enum
from typing import List, Union

from django.db.models import Q


class SelectByStatusManagerMixin:
"""
A mixin for model managers that provides queryset filtering by the truthiness of keys in a model object's json field.
Helpful for filtering e.g. analyses.objects.filter(status__is_it_started=True, status__is_it_finished=False).
Use:
class MyModelManager(SelectByStatusManagerMixin, models.Manager):...
STATUS_FIELDNAME = "my_status_field"
class MyModel(...):
objects = MyModelManager()
MyModel.objects.filter_by_statuses(["is_it_started"]).exclude_by_statuses(["is_it_finished"]).
"""

STATUS_FIELDNAME = "status"

def _build_q_objects(
self, keys: List[Union[str, Enum]], allow_null: bool, truthy_target: bool
):
filters = []
for status in keys:
status_label = status.value if isinstance(status, Enum) else status
if allow_null:
filters.append(
Q(**{f"{self.STATUS_FIELDNAME}__{status_label}": truthy_target})
)
else:
filters.append(
Q(**{f"{self.STATUS_FIELDNAME}__{status_label}": truthy_target})
| Q(**{f"{self.STATUS_FIELDNAME}__{status_label}__isnull": True})
)
return filters

def filter_by_statuses(
self, statuses_to_be_true: List[Union[str, Enum]] = None, strict: bool = True
):
"""
Filter queryset by a combination of statuses in the object's status json field.
:param statuses_to_be_true: List of keys that should resolve to true values in the model's STATUS_FIELDNAME json field.
:param strict: If True, objects will be filtered out if the key is not present. If False, null values are also acceptable i.e. only falsey values will be excluded.
"""
if not statuses_to_be_true:
return super().get_queryset()

filters = self._build_q_objects(statuses_to_be_true, strict, True)
return super().get_queryset().filter(*filters)

def exclude_by_statuses(
self, statuses_to_exclude: List[Union[str, Enum]] = None, strict: bool = True
):
"""
Filter queryset by excluding a combination of statuses in the object's status json field.
:param statuses_to_exclude: List of keys that, if they resolve to false values in the model's STATUS_FIELDNAME json field, will exclude that object from the queryset.
:param strict: If True, objects will only be excluded if the key is present AND truthy. If False, null values are also excluded.
"""
if not statuses_to_exclude:
return super().get_queryset()

filters = self._build_q_objects(statuses_to_exclude, not strict, False)
return super().get_queryset().filter(*filters)
9 changes: 6 additions & 3 deletions analyses/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from analyses.base_models.mgnify_accessioned_models import MGnifyAccessionField
from analyses.base_models.with_downloads_models import WithDownloadsModel
from analyses.base_models.with_status_models import SelectByStatusManagerMixin
from emgapiv2.async_utils import anysync_property

# Some models associated with MGnify Analyses (MGYS, MGYA etc).
Expand Down Expand Up @@ -215,7 +216,7 @@ def __str__(self):
return f"{self.name} {self.version}" if self.version is not None else self.name


class AssemblyManager(ENADerivedManager):
class AssemblyManager(SelectByStatusManagerMixin, ENADerivedManager):
def get_queryset(self):
return super().get_queryset().select_related("run")

Expand Down Expand Up @@ -275,6 +276,7 @@ class AssemblyStates(str, Enum):
def default_status(cls):
return {
cls.ASSEMBLY_STARTED: False,
cls.PRE_ASSEMBLY_QC_FAILED: False,
cls.ASSEMBLY_FAILED: False,
cls.ASSEMBLY_COMPLETED: False,
cls.ASSEMBLY_BLOCKED: False,
Expand Down Expand Up @@ -407,7 +409,7 @@ def __str__(self):
return f"ComputeResourceHeuristic {self.id} ({self.process})"


class AnalysisManagerDeferringAnnotations(models.Manager):
class AnalysisManagerDeferringAnnotations(SelectByStatusManagerMixin, models.Manager):
"""
The annotations field is a potentially large JSONB field.
Defer it by default, since most queries don't need to transfer this large dataset.
Expand All @@ -417,7 +419,7 @@ def get_queryset(self):
return super().get_queryset().defer("annotations")


class AnalysisManagerIncludingAnnotations(models.Manager):
class AnalysisManagerIncludingAnnotations(SelectByStatusManagerMixin, models.Manager):
def get_queryset(self):
return super().get_queryset()

Expand Down Expand Up @@ -543,6 +545,7 @@ class AnalysisStates(str, Enum):
def default_status(cls):
return {
cls.ANALYSIS_STARTED: False,
cls.ANALYSIS_QC_FAILED: False,
cls.ANALYSIS_COMPLETED: False,
cls.ANALYSIS_BLOCKED: False,
cls.ANALYSIS_FAILED: False,
Expand Down
109 changes: 109 additions & 0 deletions analyses/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,112 @@ def test_analysis_inheritance(
analysis.refresh_from_db()
assert analysis.experiment_type != analysis.ExperimentTypes.UNKNOWN
assert analysis.experiment_type == run.experiment_type


@pytest.mark.django_db(transaction=True)
def test_status_filtering(
raw_read_run,
):
run = Run.objects.first()
analysis = Analysis.objects.create(
study=run.study, run=run, ena_study=run.ena_study, sample=run.sample
)
assert analysis.AnalysisStates.ANALYSIS_COMPLETED.value in analysis.status
assert analysis.status[analysis.AnalysisStates.ANALYSIS_COMPLETED.value] == False

assert run.study.analyses.count() == 1

assert (
run.study.analyses.filter(
**{f"status__{analysis.AnalysisStates.ANALYSIS_COMPLETED.value}": False}
).count()
== 1
)
assert (
run.study.analyses.filter(
**{f"status__{analysis.AnalysisStates.ANALYSIS_COMPLETED.value}": True}
).count()
== 0
)

assert (
run.study.analyses.filter_by_statuses(
[analysis.AnalysisStates.ANALYSIS_COMPLETED]
).count()
== 0
)

# should include a missing key if not strict
assert run.study.analyses.filter_by_statuses(["non_existent_key"]).count() == 0
assert (
run.study.analyses.filter_by_statuses(
["non_existent_key"], strict=False
).count()
== 1
)

analysis.status[analysis.AnalysisStates.ANALYSIS_COMPLETED] = True
analysis.save()
assert (
run.study.analyses.filter_by_statuses(
[analysis.AnalysisStates.ANALYSIS_COMPLETED]
).count()
== 1
)

assert (
run.study.analyses.filter_by_statuses(
[analysis.AnalysisStates.ANALYSIS_COMPLETED, "non_existent_key"]
).count()
== 0
)
assert (
run.study.analyses.filter_by_statuses(
[analysis.AnalysisStates.ANALYSIS_COMPLETED, "non_existent_key"],
strict=False,
).count()
== 1
)

analysis.status["an_extra_key"] = True
analysis.save()
assert (
run.study.analyses.filter_by_statuses(
[analysis.AnalysisStates.ANALYSIS_COMPLETED, "an_extra_key"]
).count()
== 1
)

# EXCLUSIONS
assert run.study.analyses.count() == 1
assert (
run.study.analyses.exclude_by_statuses(
[analysis.AnalysisStates.ANALYSIS_COMPLETED]
).count()
== 0
)
assert (
run.study.analyses.exclude_by_statuses(
[analysis.AnalysisStates.ANALYSIS_FAILED]
).count()
== 1
)
assert (
run.study.analyses.exclude_by_statuses(
[analysis.AnalysisStates.ANALYSIS_FAILED, "an_extra_key"]
).count()
== 0
)

assert (
run.study.analyses.exclude_by_statuses(
[analysis.AnalysisStates.ANALYSIS_FAILED, "non_existent_key"]
).count()
== 1
)
assert (
run.study.analyses.exclude_by_statuses(
[analysis.AnalysisStates.ANALYSIS_FAILED, "non_existent_key"], strict=False
).count()
== 0
)
33 changes: 18 additions & 15 deletions workflows/flows/analysis_amplicon_study.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
FASTQ_FTPS = analyses.models.Run.CommonMetadataKeys.FASTQ_FTPS
METADATA__FASTQ_FTPS = f"{analyses.models.Run.metadata.field.name}__{FASTQ_FTPS}"
EMG_CONFIG = settings.EMG_CONFIG
AnalysisStates = analyses.models.Analysis.AnalysisStates


@task(
Expand All @@ -60,16 +61,18 @@ def get_analyses_to_attempt(
"""
study.refresh_from_db()
analyses_worth_trying = (
study.analyses.filter(
**{
f"status__{analyses.models.Analysis.AnalysisStates.ANALYSIS_COMPLETED}": False,
f"status__{analyses.models.Analysis.AnalysisStates.ANALYSIS_BLOCKED}": False,
}
study.analyses.exclude_by_statuses(
[
analyses.models.Analysis.AnalysisStates.ANALYSIS_QC_FAILED,
analyses.models.Analysis.AnalysisStates.ANALYSIS_COMPLETED,
analyses.models.Analysis.AnalysisStates.ANALYSIS_BLOCKED,
]
)
.filter(experiment_type=for_experiment_type)
.filter(experiment_type=for_experiment_type.value)
.order_by("id")
.values_list("id", flat=True)
)

print(f"Got {len(analyses_worth_trying)} analyses to attempt")
return analyses_worth_trying

Expand Down Expand Up @@ -109,12 +112,12 @@ def create_analyses(

@task(log_prints=True)
def mark_analysis_as_started(analysis: analyses.models.Analysis):
analysis.mark_status(analysis.AnalysisStates.ANALYSIS_STARTED)
analysis.mark_status(AnalysisStates.ANALYSIS_STARTED)


@task(log_prints=True)
def mark_analysis_as_failed(analysis: analyses.models.Analysis):
analysis.mark_status(analysis.AnalysisStates.ANALYSIS_FAILED)
analysis.mark_status(AnalysisStates.ANALYSIS_FAILED)


@task(
Expand Down Expand Up @@ -472,7 +475,7 @@ def sanity_check_amplicon_results(
if reason:
task_mark_analysis_status(
analysis,
status=analyses.models.Analysis.AnalysisStates.ANALYSIS_POST_SANITY_CHECK_FAILED,
status=AnalysisStates.ANALYSIS_POST_SANITY_CHECK_FAILED,
reason=reason,
)

Expand All @@ -483,7 +486,7 @@ def import_completed_analysis(
):
for analysis in amplicon_analyses:
analysis.refresh_from_db()
if not analysis.status.get(analysis.AnalysisStates.ANALYSIS_COMPLETED):
if not analysis.status.get(AnalysisStates.ANALYSIS_COMPLETED):
print(f"{analysis} is not completed successfuly. Skipping.")
continue
if analysis.annotations.get(analysis.TAXONOMIES):
Expand Down Expand Up @@ -546,17 +549,17 @@ def set_post_analysis_states(amplicon_current_outdir: Path, amplicon_analyses: L
if analysis.run.first_accession in qc_failed_runs:
task_mark_analysis_status(
analysis,
status=analyses.models.Analysis.AnalysisStates.ANALYSIS_FAILED,
status=AnalysisStates.ANALYSIS_QC_FAILED,
reason=qc_failed_runs[analysis.run.first_accession],
)
elif analysis.run.first_accession in qc_completed_runs:
task_mark_analysis_status(
analysis,
status=analyses.models.Analysis.AnalysisStates.ANALYSIS_COMPLETED,
status=AnalysisStates.ANALYSIS_COMPLETED,
reason=qc_completed_runs[analysis.run.first_accession],
unset_statuses=[
analyses.models.Analysis.AnalysisStates.ANALYSIS_FAILED,
analyses.models.Analysis.AnalysisStates.ANALYSIS_BLOCKED,
AnalysisStates.ANALYSIS_FAILED,
AnalysisStates.ANALYSIS_BLOCKED,
],
)
sanity_check_amplicon_results(
Expand All @@ -566,7 +569,7 @@ def set_post_analysis_states(amplicon_current_outdir: Path, amplicon_analyses: L
else:
task_mark_analysis_status(
analysis,
status=analyses.models.Analysis.AnalysisStates.ANALYSIS_FAILED,
status=AnalysisStates.ANALYSIS_FAILED,
reason="Missing run in execution",
)

Expand Down
11 changes: 6 additions & 5 deletions workflows/flows/assemble_study.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@ def get_assemblies_to_attempt(study: analyses.models.Study) -> List[Union[str, i
:return:
"""
study.refresh_from_db()
assemblies_worth_trying = study.assemblies_reads.filter(
**{
f"status__{analyses.models.Assembly.AssemblyStates.ASSEMBLY_COMPLETED}": False,
f"status__{analyses.models.Assembly.AssemblyStates.ASSEMBLY_BLOCKED}": False,
}
assemblies_worth_trying = study.assemblies_reads.exclude_by_statuses(
[
analyses.models.Assembly.AssemblyStates.PRE_ASSEMBLY_QC_FAILED,
analyses.models.Assembly.AssemblyStates.ASSEMBLY_COMPLETED,
analyses.models.Assembly.AssemblyStates.ASSEMBLY_BLOCKED,
]
).values_list("id", flat=True)
return assemblies_worth_trying

Expand Down
2 changes: 1 addition & 1 deletion workflows/tests/test_analysis_amplicon_study_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ async def test_prefect_analyse_amplicon_flow(
# check failed runs
assert (
await analyses.models.Analysis.objects.filter(
status__analysis_failed=True
status__analysis_qc_failed=True
).acount()
== 1
)
Expand Down

0 comments on commit 8014248

Please sign in to comment.