Skip to content

Commit

Permalink
Merge pull request #712 from umccr/dev
Browse files Browse the repository at this point in the history
Release 2.2.3rc1
  • Loading branch information
scwatts authored Jul 30, 2024
2 parents 9eabb05 + 5db84d2 commit 0f5354f
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import json
import logging
from typing import Optional
from urllib.parse import urlparse

from libumccr.aws import libssm, libsqs

Expand Down Expand Up @@ -99,10 +100,8 @@ def prepare_oncoanalyser_wgts_job(this_workflow: Workflow) -> dict:
# WGS Tumor/Normal BAMs output from DRAGEN alignment in ICA/GDS
tumor_wgs_sample_id = wgs_input['tumor_wgs_sample_id']
tumor_wgs_library_id = wgs_input['tumor_wgs_library_id']
tumor_wgs_bam = wgs_input['tumor_wgs_bam']
normal_wgs_sample_id = wgs_input['normal_wgs_sample_id']
normal_wgs_library_id = wgs_input['normal_wgs_library_id']
normal_wgs_bam = wgs_input['normal_wgs_bam']

# WTS BAM output from STAR aligner
tumor_wts_sample_id = wts_input['tumor_wts_sample_id']
Expand All @@ -113,6 +112,10 @@ def prepare_oncoanalyser_wgts_job(this_workflow: Workflow) -> dict:
existing_wgs_dir = get_existing_wgs_dir(wgs_wf)
existing_wts_dir = get_existing_wts_dir(wts_wf)

# Select existing WGS MarkDups BAMs; DRAGEN BAMs from original WGS input are ignored
tumor_wgs_bam = get_existing_wgs_markdups_bam(existing_wgs_dir, tumor_wgs_sample_id)
normal_wgs_bam = get_existing_wgs_markdups_bam(existing_wgs_dir, normal_wgs_sample_id)

payload = {
"subject_id": subject_id,
"tumor_wgs_sample_id": tumor_wgs_sample_id,
Expand Down Expand Up @@ -203,6 +206,32 @@ def get_existing_wgs_dir(this_workflow: Workflow) -> str:
raise ValueError("Found none or many output directory")


def get_existing_wgs_markdups_bam(existing_wgs_dir: str, sample_id: str):
"""
Locate existing WGS MarkDups BAMs and return corresponding path.
:param str existing_wgs_dir: S3 path to previously generated WGS output
:param str sample_id: identifier for the WGS tumor and normal sample
:return: MarkDups BAM path
:rtype: str
:raises ValueError: if exactly one MarkDups BAM isn't found
"""

existing_wgs_dir_prefix = urlparse(existing_wgs_dir).path.lstrip('/')
results = s3object_srv.get_s3_files_for_path_tokens(path_tokens=[
existing_wgs_dir_prefix,
f"alignments/dna/{sample_id}.markdups.bam",
])

filtered_list = list(filter(lambda x: str(x).endswith(".bam"), results))

if len(filtered_list) != 1:
message_component = "No MarkDups BAM" if len(filtered_list) == 0 else "Multiple MarkDups BAMs"
raise ValueError(f"{message_component} found in existing WGS output: {existing_wgs_dir}")

return filtered_list[0]


def get_existing_wts_dir(this_workflow: Workflow) -> str:
"""
this_workflow is succeeded oncoanalyser_wts type
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
from datetime import timedelta, datetime

from django.utils.timezone import make_aware
from django.utils.timezone import now, make_aware

from data_portal.models import S3Object
from data_portal.models import Workflow
from data_portal.models.labmetadata import LabMetadata
from data_portal.models.libraryrun import LibraryRun
Expand All @@ -16,6 +17,37 @@
from data_processors.pipeline.tests.case import PipelineUnitTestCase, logger


def generate_mock_markdups_bam(include_bad_duplicate=False):
_ = S3Object.objects.create(
bucket="bucket1",
key=(
f"analysis_data/{TestConstant.subject_id.value}/oncoanalyser/"
f"{TestConstant.portal_run_id_oncoanalyser.value}/wgs/"
f"{TestConstant.library_id_tumor.value}__{TestConstant.library_id_normal.value}/"
f"{TestConstant.subject_id.value}__{TestConstant.sample_id.value}/"
f"alignments/dna/{TestConstant.sample_id.value}.markdups.bam"
),
size=1000,
last_modified_date=now(),
e_tag="abcdefghi123456"
)

if include_bad_duplicate:
_ = S3Object.objects.create(
bucket="bucket1",
key=(
f"analysis_data/{TestConstant.subject_id.value}/oncoanalyser/"
f"{TestConstant.portal_run_id_oncoanalyser.value}/wgs/"
f"{TestConstant.library_id_tumor.value}__{TestConstant.library_id_normal.value}/"
f"{TestConstant.subject_id.value}__{TestConstant.sample_id.value}//"
f"alignments/dna/{TestConstant.sample_id.value}.markdups.bam"
),
size=1000,
last_modified_date=now(),
e_tag="abcdefghi123456"
)


class OncoanalyserWgtsExistingBothStepUnitTests(PipelineUnitTestCase):

def test_perform(self):
Expand Down Expand Up @@ -47,6 +79,8 @@ def test_perform(self):
workflow=mock_wts_wfl,
)

_ = generate_mock_markdups_bam()

result = oncoanalyser_wgts_existing_both_step.perform(this_workflow=mock_wgs_wfl)

self.assertIsNotNone(result)
Expand Down Expand Up @@ -288,3 +322,43 @@ def test_get_existing_wts_dir_raises_error(self):

logger.exception(f"THIS ERROR EXCEPTION IS INTENTIONAL FOR TEST. NOT ACTUAL ERROR. \n{str(e)}")
self.assertIn("Found none", str(e))

def test_find_markdup_bam_more_than_1(self):
"""
python manage.py test data_processors.pipeline.orchestration.tests.test_oncoanalyser_wgts_existing_both_step.OncoanalyserWgtsExistingBothStepUnitTests.test_find_markdup_bam_more_than_1
"""
self.verify_local()

mock_wgs_wfl = OncoanalyserWgsWorkflowFactory()

_ = OncoanalyserWgsS3ObjectOutputFactory()

mock_existing_wgs_dir = oncoanalyser_wgts_existing_both_step.get_existing_wgs_dir(mock_wgs_wfl)

_ = generate_mock_markdups_bam(include_bad_duplicate=True)

with self.assertRaises(ValueError) as cm:
_ = oncoanalyser_wgts_existing_both_step.get_existing_wgs_markdups_bam(mock_existing_wgs_dir, TestConstant.sample_id.value)
e = cm.exception

logger.exception(f"THIS ERROR EXCEPTION IS INTENTIONAL FOR TEST. NOT ACTUAL ERROR. \n{str(e)}")
self.assertIn("Multiple MarkDups BAMs found", str(e))

def test_find_markdup_bam_less_than_1(self):
"""
python manage.py test data_processors.pipeline.orchestration.tests.test_oncoanalyser_wgts_existing_both_step.OncoanalyserWgtsExistingBothStepUnitTests.test_find_markdup_bam_less_than_1
"""
self.verify_local()

mock_wgs_wfl = OncoanalyserWgsWorkflowFactory()

_ = OncoanalyserWgsS3ObjectOutputFactory()

mock_existing_wgs_dir = oncoanalyser_wgts_existing_both_step.get_existing_wgs_dir(mock_wgs_wfl)

with self.assertRaises(ValueError) as cm:
_ = oncoanalyser_wgts_existing_both_step.get_existing_wgs_markdups_bam(mock_existing_wgs_dir, TestConstant.sample_id.value)
e = cm.exception

logger.exception(f"THIS ERROR EXCEPTION IS INTENTIONAL FOR TEST. NOT ACTUAL ERROR. \n{str(e)}")
self.assertIn("No MarkDups BAM found", str(e))

0 comments on commit 0f5354f

Please sign in to comment.