Skip to content

Commit

Permalink
feat: vep annotation (dockerised + google batch + airflow) (#608)
Browse files Browse the repository at this point in the history
* feat: custom dockerfile to run ensembl vep

* ci: automate vep image build and artifact registry

* chore: update airflow google operators (not required)

* feat: working version of google batch airflow vep job

* feat: working version of google batch airflow vep job

* feat(VEP): adding CADD plugin

* feat: local loftee file

* feat: working with input bucket full of input files

* feat: prevent writing html

* fix: minor adjustments to retry strategy

* feat(airflow): separating mounting points for input/output and cache

* fix: typo in airflow dag

* fix: pre-commit pain

* chore: rename airflow dag file

---------

Co-authored-by: DSuveges <[email protected]>
Co-authored-by: Szymon Szyszkowski <[email protected]>
  • Loading branch information
3 people authored Jul 4, 2024
1 parent 768b5d2 commit 6487b31
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 5 deletions.
12 changes: 9 additions & 3 deletions .github/workflows/artifact.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ env:
PROJECT_ID: open-targets-genetics-dev
REGION: europe-west1
GAR_LOCATION: europe-west1-docker.pkg.dev/open-targets-genetics-dev
IMAGE_NAME: gentropy-app
REPOSITORY: gentropy-app

jobs:
build-push-artifact:
Expand All @@ -33,7 +33,13 @@ jobs:
gcloud auth configure-docker ${{ env.REGION }}-docker.pkg.dev --quiet
- name: Build image
run: docker build . --tag "${{ env.GAR_LOCATION }}/${{ env.IMAGE_NAME }}/gentropy:${{ github.ref_name }}"
run: docker build . --tag "${{ env.GAR_LOCATION }}/${{ env.REPOSITORY }}/gentropy:${{ github.ref_name }}"

- name: Push image
run: docker push "${{ env.GAR_LOCATION }}/${{ env.IMAGE_NAME }}/gentropy:${{ github.ref_name }}"
run: docker push "${{ env.GAR_LOCATION }}/${{ env.REPOSITORY }}/gentropy:${{ github.ref_name }}"

- name: Build VEP image
run: docker build src/vep --tag "${{ env.GAR_LOCATION }}/${{ env.REPOSITORY }}/custom_ensembl_vep:${{ github.ref_name }}"

- name: Push VEP image
run: docker push "${{ env.GAR_LOCATION }}/${{ env.REPOSITORY }}/custom_ensembl_vep:${{ github.ref_name }}"
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ create-dev-cluster: build ## Spin up a simple dataproc cluster with all dependen
--master-machine-type n1-standard-16 \
--initialization-actions=gs://genetics_etl_python_playground/initialisation/${VERSION_NO}/install_dependencies_on_cluster.sh \
--metadata="PACKAGE=gs://genetics_etl_python_playground/initialisation/${VERSION_NO}/gentropy-${VERSION_NO}-py3-none-any.whl,CONFIGTAR=gs://genetics_etl_python_playground/initialisation/${VERSION_NO}/config.tar.gz" \
--single-node \
--num-workers 4 \
--primary-worker-type n1-standard-8 \
--worker-machine-type n1-standard-4 \
--worker-boot-disk-size 500 \
--autoscaling_policy=f"projects/${PROJECT_ID}/regions/${REGION}/autoscalingPolicies/eqtl-preprocess", \
--optional-components=JUPYTER \
--enable-component-gateway

Expand Down
284 changes: 284 additions & 0 deletions src/airflow/dags/vep_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
"""Airflow DAG for the harmonisation part of the pipeline."""

from __future__ import annotations

import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, List

import common_airflow as common
from airflow.decorators import task
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_batch import (
CloudBatchSubmitJobOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSListObjectsOperator
from google.cloud import batch_v1

PROJECT_ID = "open-targets-genetics-dev"
REGION = "europe-west1"

# Required parameters:
VEP_DOCKER_IMAGE = "europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/custom_ensembl_vep:dev"

VCF_INPUT_BUCKET = "gs://genetics_etl_python_playground/vep/test_vep_input"
VEP_OUTPUT_BUCKET = "gs://genetics_etl_python_playground/vep/test_vep_output"
VEP_CACHE_BUCKET = "gs://genetics_etl_python_playground/vep/cache"

# Internal parameters for the docker image:
MOUNT_DIR = "/mnt/disks/share"

# Configuration for the machine types:
MACHINES = {
"VEPMACHINE": {
"machine_type": "e2-standard-4",
"cpu_milli": 2000,
"memory_mib": 2000,
"boot_disk_mib": 10000,
},
}


@dataclass
class PathManager:
"""It is quite complicated to keep track of all the input/output buckets, the corresponding mounting points prefixes etc..."""

VCF_INPUT_BUCKET: str
VEP_OUTPUT_BUCKET: str
VEP_CACHE_BUCKET: str
MOUNT_DIR_ROOT: str

# Derived parameters to find the list of files to process:
input_path: str | None = None
input_bucket: str | None = None

# Derived parameters to initialise the docker image:
path_dictionary: dict[str, dict[str, str]] | None = None

# Derived parameters to point to the right mouting points:
cache_dir: str | None = None
input_dir: str | None = None
output_dir: str | None = None

def __post_init__(self: PathManager) -> None:
"""Build paths based on the input parameters."""
self.path_dictionary = {
"input": {
"remote_path": self.VCF_INPUT_BUCKET.replace("gs://", ""),
"mount_point": f"{self.MOUNT_DIR_ROOT}/input",
},
"output": {
"remote_path": self.VEP_OUTPUT_BUCKET.replace("gs://", ""),
"mount_point": f"{self.MOUNT_DIR_ROOT}/output",
},
"cache": {
"remote_path": self.VEP_CACHE_BUCKET.replace("gs://", ""),
"mount_point": f"{self.MOUNT_DIR_ROOT}/cache",
},
}
# Parameters for fetching files:
self.input_path = self.VCF_INPUT_BUCKET.replace("gs://", "") + "/"
self.input_bucket = self.VCF_INPUT_BUCKET.split("/")[2]

# Parameters for VEP:
self.cache_dir = f"{self.MOUNT_DIR_ROOT}/cache"
self.input_dir = f"{self.MOUNT_DIR_ROOT}/input"
self.output_dir = f"{self.MOUNT_DIR_ROOT}/output"

def get_mount_config(self) -> list[dict[str, str]]:
"""Return the mount configuration.
Returns:
list[dict[str, str]]: The mount configuration.
"""
assert self.path_dictionary is not None, "Path dictionary not initialized."
return list(self.path_dictionary.values())


def create_container_runnable(image: str, commands: List[str]) -> batch_v1.Runnable:
"""Create a container runnable for a Batch job.
Args:
image (str): The Docker image to use.
commands (List[str]): The commands to run in the container.
Returns:
batch_v1.Runnable: The container runnable.
"""
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = image
runnable.container.entrypoint = "/bin/sh"
runnable.container.commands = commands
return runnable


def create_task_spec(image: str, commands: List[str]) -> batch_v1.TaskSpec:
"""Create a task for a Batch job.
Args:
image (str): The Docker image to use.
commands (List[str]): The commands to run in the container.
Returns:
batch_v1.TaskSpec: The task specification.
"""
task = batch_v1.TaskSpec()
task.runnables = [
create_container_runnable(image, commands)
# msg_runnable()
]
return task


def set_up_mouting_points(
mounting_points: list[dict[str, str]],
) -> list[batch_v1.Volume]:
"""Set up the mounting points for the container.
Args:
mounting_points (list[dict[str, str]]): The mounting points.
Returns:
list[batch_v1.Volume]: The volumes.
"""
volumes = []
for mount in mounting_points:
gcs_bucket = batch_v1.GCS()
gcs_bucket.remote_path = mount["remote_path"]
gcs_volume = batch_v1.Volume()
gcs_volume.gcs = gcs_bucket
gcs_volume.mount_path = mount["mount_point"]
volumes.append(gcs_volume)
return volumes


def create_batch_job(
task: batch_v1.TaskSpec,
machine: str,
task_env: list[batch_v1.Environment],
mounting_points: list[dict[str, str]],
) -> batch_v1.Job:
"""Create a Google Batch job.
Args:
task (batch_v1.TaskSpec): The task specification.
machine (str): The machine type to use.
task_env (list[batch_v1.Environment]): The environment variables for the task.
mounting_points (list[dict[str, str]]): List of mounting points.
Returns:
batch_v1.Job: The Batch job.
"""
resources = batch_v1.ComputeResource()
resources.cpu_milli = MACHINES[machine]["cpu_milli"]
resources.memory_mib = MACHINES[machine]["memory_mib"]
resources.boot_disk_mib = MACHINES[machine]["boot_disk_mib"]
task.compute_resource = resources

task.max_retry_count = 3
task.max_run_duration = "43200s"

# The mounting points are set up and assigned to the task:
task.volumes = set_up_mouting_points(mounting_points)

group = batch_v1.TaskGroup()
group.task_spec = task
group.task_environments = task_env

policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = MACHINES[machine]["machine_type"]
policy.provisioning_model = "SPOT"

instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]

job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

return job


@task(task_id="vep_annotation")
def vep_annotation(pm: PathManager, **kwargs: Any) -> None:
"""Submit a Batch job to download cache for VEP.
Args:
pm (PathManager): The path manager with all the required path related information.
**kwargs (Any): Keyword arguments.
"""
# Get the filenames to process:
ti = kwargs["ti"]
filenames = [
os.path.basename(os.path.splitext(path)[0])
for path in ti.xcom_pull(task_ids="get_vep_todo_list", key="return_value")
]
# Stop process if no files was found:
assert filenames, "No files found to process."

# Based on the filenames, build the environment variables for the batch job:
task_env = [
batch_v1.Environment(
variables={
"INPUT_FILE": filename + ".tsv",
"OUTPUT_FILE": filename + ".json",
}
)
for filename in filenames
]
# Build the command to run in the container:
command = [
"-c",
rf"vep --cache --offline --format vcf --force_overwrite \
--no_stats \
--dir_cache {pm.cache_dir} \
--input_file {pm.input_dir}/$INPUT_FILE \
--output_file {pm.output_dir}/$OUTPUT_FILE --json \
--dir_plugins {pm.cache_dir}/VEP_plugins \
--sift b \
--polyphen b \
--uniprot \
--check_existing \
--exclude_null_alleles \
--canonical \
--plugin LoF,loftee_path:{pm.cache_dir}/VEP_plugins,gerp_bigwig:{pm.cache_dir}/gerp_conservation_scores.homo_sapiens.GRCh38.bw,human_ancestor_fa:{pm.cache_dir}/human_ancestor.fa.gz,conservation_file:/opt/vep/loftee.sql \
--plugin AlphaMissense,file={pm.cache_dir}/AlphaMissense_hg38.tsv.gz,transcript_match=1 \
--plugin CADD,snv={pm.cache_dir}/CADD_GRCh38_whole_genome_SNVs.tsv.gz",
]
task = create_task_spec(VEP_DOCKER_IMAGE, command)
batch_task = CloudBatchSubmitJobOperator(
task_id="vep_batch_job",
project_id=PROJECT_ID,
region=REGION,
job_name=f"vep-job-{time.strftime('%Y%m%d-%H%M%S')}",
job=create_batch_job(task, "VEPMACHINE", task_env, pm.get_mount_config()),
deferrable=False,
)
batch_task.execute(context=kwargs)


with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — Ensembl VEP",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
# Initialise parameter manager:
pm = PathManager(VCF_INPUT_BUCKET, VEP_OUTPUT_BUCKET, VEP_CACHE_BUCKET, MOUNT_DIR)

# Get a list of files to process from the input bucket:
get_vep_todo_list = GCSListObjectsOperator(
task_id="get_vep_todo_list",
bucket=pm.input_bucket,
prefix=pm.input_path,
match_glob="**tsv",
)

get_vep_todo_list >> vep_annotation(pm)
2 changes: 1 addition & 1 deletion src/airflow/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
apache-airflow-providers-google==10.10.1
apache-airflow-providers-google==10.17.0
apache-airflow-providers-apache-beam==5.6.1
psycopg2-binary==2.9.9
31 changes: 31 additions & 0 deletions src/vep/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
FROM ensemblorg/ensembl-vep:release_111.0

USER root

RUN apt-get update && apt-get -y install \
wget \
libncurses5-dev \
libncursesw5-dev \
libbz2-dev \
liblzma-dev \
sqlite3 \
libsqlite3-dev \
cpanminus \
git \
&& rm -rf /var/lib/apt/lists/*

RUN cpanm DBD::SQLite

RUN wget --progress=dot:giga https://github.com/samtools/samtools/releases/download/1.7/samtools-1.7.tar.bz2 && \
tar xjvf samtools-1.7.tar.bz2 && \
cd samtools-1.7 && \
make && \
make install

RUN wget --progress=dot:giga https://personal.broadinstitute.org/konradk/loftee_data/GRCh38/loftee.sql.gz --directory-prefix=/opt/vep/ && \
gunzip /opt/vep/loftee.sql.gz

# Make sure the mounting points exist:
RUN mkdir -p /mnt/disks/share/cache && \
mkdir -p /mnt/disks/share/input && \
mkdir -p /mnt/disks/share/output

0 comments on commit 6487b31

Please sign in to comment.