Skip to content

Commit

Permalink
add lineage output files
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenixAja committed Nov 8, 2023
2 parents 9e8da5f + b1d060e commit 9af5002
Show file tree
Hide file tree
Showing 55 changed files with 3,329 additions and 355 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/wdl-ci-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ env:

jobs:
cancel-previous:
runs-on: self-hosted
runs-on: [self-hosted, idseq-dev]
steps:
- uses: styfle/[email protected]
with:
access_token: ${{ github.token }}

list-workflow-dirs:
runs-on: self-hosted
runs-on: [self-hosted, idseq-dev]
outputs:
matrix: ${{steps.list_dirs.outputs.matrix}}
steps:
Expand All @@ -29,7 +29,7 @@ jobs:
run: echo "matrix=$(./scripts/diff-workflows.sh |jq -cnR '[inputs|select(length>0)]')" >> $GITHUB_OUTPUT

wdl-ci:
runs-on: self-hosted
runs-on: [self-hosted, idseq-dev]
needs: list-workflow-dirs
if: ${{ needs['list-workflow-dirs'].outputs.matrix != '[]' && needs['list-workflow-dirs'].outputs.matrix != '' }}
strategy:
Expand Down
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
__pycache__
pyrightconfig.json


# Makefile kludge
.venv
.build-*
.python_dependencies_installed
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ run: build python-dependencies ## run a miniwdl workflow. eg. make run WORKFLOW=

.PHONY: miniwdl-explore
miniwdl-explore: ## !ADVANCED! explore a previous miniwdl workflow run in the cli. eg. make miniwdl-explore OUTPATH='/mnt/path/to/output/'
cat $(OUTPATH)/inputs.json | jq ' [values[]] | flatten | .[] | tostring | select(startswith("s3") or startswith("/"))' | xargs -I {} s3parcp {} $(OUTPATH)/work/_miniwdl_inputs/0/
cat $(OUTPATH)/inputs.json | jq ' [values[]] | flatten | .[] | tostring | select(startswith("s3"))' | xargs -I {} aws s3 cp "{}" $(OUTPATH)/work/_miniwdl_inputs/0/
cat $(OUTPATH)/inputs.json | jq ' [values[]] | flatten | .[] | tostring | select(startswith("/"))' | xargs -I {} cp "{}" $(OUTPATH)/work/_miniwdl_inputs/0/
docker run -it --entrypoint bash -w /mnt/miniwdl_task_container/work -v$(OUTPATH):/mnt/miniwdl_task_container czid-$(WORKFLOW)

.PHONY: ls
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ Currently we have 5 main workflows. The details of each pipeline are in a README
* [short-read-mngs](workflows/short-read-mngs/README.md)
* [consensus-genome](workflows/consensus-genome/README.md)
* [phylotree-ng](workflows/phylotree-ng/README.md)
* long-read-mngs (Beta)
* amr (Beta)
* [long-read-mngs](workflows/long-read-mngs/README.md)
* [amr](workflows/amr/README.md)

## Running these workflows
This repository contains [WDL](https://openwdl.org/) workflows that the [CZ ID](https://czid.org) platform uses in
Expand All @@ -28,6 +28,7 @@ to get started with them.
### System Requirements
The system requirements differs for each workflow and depending on the database being used. For example running the short-read-mngs workflow with the full NT and NR databases would require an instance with >1TB of disk space and >100GB of memory. Running other workflows (e.g. consensus-genome, amr) requires much less space.

If using an ARM Mac try following [these](RunningCZIDWorkflowsOnARMMacs.md) setup instructions
### Software requirements
* docker with buildx support (version >= 19.03)
* python3
Expand Down
39 changes: 39 additions & 0 deletions RunningCZIDWorkflowsOnARMMacs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
## Running czid-workflows on Silicon (M1/M2) Macs

Creating the docker containers for the workflows is challenging on machines that don't use the `x86` architecture. Many of the dependencies that we use right now don't have options for building for other architectures.

The following is how to set up a virtual machine using `colima` where you can build and run docker containers locally in machines that use the ARM CPU architecture


### Install Colima

`brew install colima`

Go to the czid-workflows repo

`cd ~/czid-workflows`

### Boot and mount the colima VM
```
# uses 4GB of memory & 4 CPUs, change this to suit your needs
colima start --mount $PWD:/work:w -m 4 -c 4
```

### Set up the VM

SSH into the VM

`colima ssh`

In the VM run:

`sudo apk add py3-virtualenv gcc python3-dev musl-dev linux-headers make`

Go to where the `czid-workflows` repo is mounted

`cd /work`

Then you should be able to build and run the docker containers as normal

`make build WORKFLOW=minimap2`
10 changes: 5 additions & 5 deletions lib/bin/filter_count
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@

set -eo pipefail

if [[ $1 == *gz ]]; then
if [[ "$1" == *gz ]]; then
# gzip -t checks the whole file so might be a bit slow.
# a faster but more imperfect method would be to check that the first 2 bytes equal 0x8b1f
if ! gzip -t $1 > /dev/null 2>&1; then
raise_error InvalidFileFormatError "The file: $(basename $1) is not a proper gzip file"
if ! gzip -t "$1" > /dev/null 2>&1; then
raise_error InvalidFileFormatError "The file: $(basename "$1") is not a proper gzip file"
fi
WC_OUT=$(gzip -dc $1 | awk 'NR % 4 == 2' | wc)
WC_OUT=$(gzip -dc "$1" | awk 'NR % 4 == 2' | wc)
else
WC_OUT=$(cat $1 | awk 'NR % 4 == 2' | wc)
WC_OUT=$(cat "$1" | awk 'NR % 4 == 2' | wc)
fi
LINES=$(echo $WC_OUT | cut -f 2 -d ' ')
CHARS=$(echo $WC_OUT | cut -f 3 -d ' ')
Expand Down
63 changes: 60 additions & 3 deletions lib/idseq-dag/idseq_dag/steps/generate_taxid_fasta.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List
from shutil import copy as file_copy
from idseq_dag.util.parsing import HitSummaryMergedReader
from idseq_dag.engine.pipeline_step import PipelineStep
import idseq_dag.util.lineage as lineage
Expand Down Expand Up @@ -56,18 +57,64 @@ def generate_taxid_fasta(
output_fa.write(read.sequence + "\n")


CONFORMING_PREAMBLE = ">family_nr:-300:family_nt:-300:genus_nr:-200:genus_nt:-200:species_nr:-100:species_nt:-100:"
def conform_unmapped_read_header(header: str):
"""Converts fasta headers from unmapped reads to match structure of mapped.
Implementation is very, very tightly coupled with how we output unmapped
(AKA, unidentified) reads from GenerateAnnotatedFasta task. This function
takes the headers of those unmapped reads and converts them to match the
structure we expect for all the mapped reads, eg where `family_nr` etc
are all explicitly given. See above `CONFORMING_PREAMBLE` for what that
looks like. Result is a header that looks like this:
>family_nr:-300:...:species_nt:-100:NR::NT::ORIGINAL_RAW_ID_GOES_HERE
See `generate_fasta_with_unmapped_included` below for why we do this."""
return CONFORMING_PREAMBLE + header.lstrip('>')


def generate_fasta_with_unmapped_included(
mapped_fa_path: str,
unmapped_fa_path: str,
output_with_unmapped_path: str,
):
"""Creates additional fasta that has both mapped and unmapped reads.
This takes an already existing fasta (expected to be the mapped reads with
taxids in place), copies it, then appends the unmapped (AKA, unidentified)
reads to it while conforming the headers of those unmapped reads so they
match up to the same structure used for the taxid+accession mapped reads.
Intent: while some steps need only the mapped reads to work properly, other
steps want to have all the non-host reads, both mapped and unmapped. That
is also what our users generally expect when downloading a sample's reads.
This provides another fasta that has **all** the non-host reads, but with
the headers all in a consistent structure so that any downstream parsing
won't blow up because of varying header structures (and so long as those
downstream steps aren't very strongly assuming just getting mapped reads).
"""
file_copy(mapped_fa_path, output_with_unmapped_path)
with open(output_with_unmapped_path, "a") as output_fa:
for read in fasta.iterator(unmapped_fa_path):
conformed_header = conform_unmapped_read_header(read.header)
output_fa.write(conformed_header + "\n")
output_fa.write(read.sequence + "\n")


class PipelineStepGenerateTaxidFasta(PipelineStep):
"""Generate taxid FASTA from hit summaries. Intermediate conversion step
that includes handling of non-specific hits with artificial tax_ids.
"""

def run(self):
input_fa_name = self.input_files_local[0][0]
if len(self.input_files_local) > 1:
# We determine if intended for use in `short-read-mngs/postprocess.wdl`
# or `short-read-mngs/experimental.wdl` by shape of input files list.
is_structured_as_postprocess_call = (len(self.input_files_local) > 1)
if is_structured_as_postprocess_call: # short-read-mngs/postprocess.wdl
input_fa_name = self.input_files_local[0][0]
nt_hit_summary_path, nr_hit_summary_path = self.input_files_local[1][2], self.input_files_local[2][2]
else:
# This is used in `short-read-mngs/experimental.wdl`
else: # short-read-mngs/experimental.wdl
input_fa_name = self.input_files_local[0][0]
nt_hit_summary_path, nr_hit_summary_path = self.input_files_local[0][1], self.input_files_local[0][2]

Expand All @@ -78,3 +125,13 @@ def run(self):
self.additional_files["lineage_db"],
self.output_files_local()[0],
)

if is_structured_as_postprocess_call:
# For short-read-mngs/postprocess.wdl ONLY we generate additional
# FASTA that has both the mapped reads we just did and unmappeds.
input_unidentified_fa_name = self.input_files_local[0][1]
generate_fasta_with_unmapped_included(
self.output_files_local()[0],
input_unidentified_fa_name,
self.output_files_local()[1],
)
70 changes: 12 additions & 58 deletions lib/idseq-dag/idseq_dag/steps/nonhost_fastq.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,12 @@ def run(self) -> None:
# could be several GBs in the worst case.
clusters_dict = parse_clusters_file(self.input_files_local[2][0])

self.run_with_tax_ids(None, None, clusters_dict)
# TODO: (gdingle): Generate taxon-specific downloads in idseq-web at
# time of download. See https://jira.czi.team/browse/IDSEQ-2599.
betacoronaviruses = {
2697049, # SARS-CoV2
694002, # betacoronavirus genus
}
self.run_with_tax_ids(betacoronaviruses, "betacoronavirus", clusters_dict)

def run_with_tax_ids(
self.run_nonhost_fastq_generation(clusters_dict)

def run_nonhost_fastq_generation(
self,
tax_ids: Optional[Set[int]],
filename: Optional[str],
clusters_dict: Dict[str, List] = None,
) -> None:
assert (tax_ids and filename) or not (
tax_ids or filename), 'Must be supplied with tax_ids and filename or neither'

scratch_dir = os.path.join(self.output_dir_local, "scratch_nonhost_fastq")
command.make_dirs(scratch_dir)
self.nonhost_headers = [
Expand All @@ -50,17 +38,11 @@ def run_with_tax_ids(

nonhost_fasta = self.input_files_local[1][0]

if filename is None:
output_fastqs = self.output_files_local()
else:
output_fastqs = [
f"{os.path.dirname(fastq)}/{filename}__{os.path.basename(self.output_files_local()[i])}"
for i, fastq in enumerate(fastqs)]
self.additional_output_files_hidden.extend(output_fastqs)
output_fastqs = self.output_files_local()

fastqs = self.unzip_files(fastqs)

self.generate_nonhost_headers(nonhost_fasta, clusters_dict, tax_ids)
self.generate_nonhost_headers(nonhost_fasta, clusters_dict)

for i in range(len(fastqs)):
self.generate_nonhost_fastq(self.nonhost_headers[i], fastqs[i], output_fastqs[i])
Expand Down Expand Up @@ -120,61 +102,33 @@ def extract_header_from_line(line: str) -> Tuple[int, str, Set[int]]:
nt_index = fragments.index("NT")
header = ":".join(fragments[nt_index + 2:])

annot_tax_ids = set(
int(fragments[fragments.index(annot_type) + 1])
for annot_type in [
"species_nt",
"species_nr",
"genus_nt",
"genus_nr",
]
)

return read_index, header, annot_tax_ids
return read_index, header

def generate_nonhost_headers(
self,
nonhost_fasta_file: str,
clusters_dict: Dict[str, List] = None,
tax_ids: Set[int] = None
):
# This var is only needed when tax_ids, because tax_id
# may match only on one half of a read pair. In that case, we still
# want to include both.
seen = set()
with open(nonhost_fasta_file, "r") as input_file, \
open(self.nonhost_headers[0], "w") as output_file_0, \
open(self.nonhost_headers[1], "w") as output_file_1:
for line in input_file:
# Assumes that the header line in the nonhost_fasta starts with ">"
if line[0] != ">":
continue
read_index, header, annot_tax_ids = PipelineStepNonhostFastq.extract_header_from_line(line)
read_index, header = PipelineStepNonhostFastq.extract_header_from_line(line)
if clusters_dict:
if header not in clusters_dict:
header += "/2" if read_index else "/1"
other_headers = clusters_dict[header][1:]
else:
other_headers = []
other_headers = clusters_dict[header][1:] if clusters_dict else []
if tax_ids:
if tax_ids.intersection(annot_tax_ids) and (header not in seen):
output_file_0.write(header + "\n")
output_file_1.write(header + "\n")
seen.add(header)
for other_header in other_headers:
output_file_0.write(other_header + "\n")
output_file_1.write(other_header + "\n")
# Add other headers just in case something has gone
# wrong upstream with czid-dedup.
seen.add(other_header)
continue
else:
output_file = output_file_0 if read_index == 0 else output_file_1
output_file.write(header + "\n")
for other_header in other_headers:
output_file.write(other_header + "\n")
continue

output_file = output_file_0 if read_index == 0 else output_file_1
output_file.write(header + "\n")
for other_header in other_headers:
output_file.write(other_header + "\n")

@staticmethod
# Use seqtk, which is orders of magnitude faster than Python for this particular step.
Expand Down
1 change: 0 additions & 1 deletion lib/idseq-dag/idseq_dag/steps/run_assembly.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ def assemble(input_fasta,
command.write_text_to_file('@NO INFO', bowtie_sam)
command.write_text_to_file('{}', contig_stats)
traceback.print_exc()
command.remove_rf(assembled_dir)

@staticmethod
def generate_read_to_contig_mapping(assembled_contig,
Expand Down
2 changes: 1 addition & 1 deletion lib/idseq-dag/idseq_dag/util/m8.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def is_whitelisted(hits: Iterable[str]):
def should_keep(hits: Iterable[str]):
# In some places in the code taxids are ints rather than strings, this would lead
# to a silent failure here so it is worth the explicit check.
non_strings = [h for h in hits if type(h) != str]
non_strings = [h for h in hits if not isinstance(h, str)]
assert not non_strings, f"should_keep recieved non-string inputs {non_strings}"
return is_whitelisted(hits) and not is_blacklisted(hits)

Expand Down
2 changes: 1 addition & 1 deletion scripts/docker-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

set -euxo pipefail

docker buildx build --build-context lib=lib "$@"
docker buildx build --platform linux/amd64 --build-context lib=lib "$@"
10 changes: 9 additions & 1 deletion scripts/release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ else
exit 1
fi

# Fix until we can update this script to follow semantic versioning
if [[ $WORKFLOW_NAME == amr ]]; then
TAG="amr-v1.3.0"
fi

if [[ $( git branch --show-current) != "main" ]]; then
COMMIT=$(git rev-parse --short HEAD)
TAG=$TAG"-$COMMIT"
Expand All @@ -37,7 +42,10 @@ TAG_MSG=$(mktemp)
echo "# Changes for ${TAG} ($(date +%Y-%m-%d))" > $TAG_MSG
echo "$RELEASE_NOTES" >> $TAG_MSG
git log --pretty=format:%s ${OLD_TAG}..HEAD >> $TAG_MSG || true
git config --get user.name || git config user.name "CZ ID release action triggered by ${GITHUB_ACTOR:-$(whoami)}"
if ! git config --get user.name ; then
git config user.name "CZ ID release action triggered by ${GITHUB_ACTOR:-$(whoami)}"
fi
git config --get user.name
git tag --annotate --file $TAG_MSG "$TAG"
git push origin "$TAG"

Expand Down
4 changes: 2 additions & 2 deletions scripts/remote_to_local_helper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ set -euo pipefail
## Usage ./scripts/remote_to_local_helper.sh inputs-copied-from-remote-run.json s3://bucket-where-outputs-really-are

INPUT_JSON=$1
S3_BUCKET_PATH=$(echo $2 | sed 's/\//\\\//g')
S3_BUCKET_PATH=$(echo $2 | sed 's/\/$//' | sed 's/\//\\\//g')
REPLACE_MATCH=$(echo /mnt | sed 's/\//\\\//g')


jq '.' $INPUT_JSON | sed -E "s/(.*)${REPLACE_MATCH}.*(\/[A-Za-z0-9]+)(\/?)/\1${S3_BUCKET_PATH}\2/g" \
| sed -E "s/(.*\"docker_image_id\": \")(.*\/)([A-Za-z0-9]+)(:.*)(\",)/\1czid-\3\5/g"
| sed -E "s/(.*\"docker_image_id\": \")(.*\/)([A-Za-z0-9_-]+)(:.*)(\",)/\1czid-\3\5/g"
Loading

0 comments on commit 9af5002

Please sign in to comment.