Skip to content

Commit

Permalink
feat: implement UKB PPP (EUR) ingestion & harmonisation (#652)
Browse files Browse the repository at this point in the history
* feat: implement UKB PPP (EUR) ingestion & harmonisation

* fix: correct module name for docs

* fix: definitely correct module name for docs

* test: update output of neglog_pvalue_to_mantissa_and_exponent

* fix: test syntax with <BLANKLINE>

* Update src/gentropy/datasource/ukb_ppp_eur/summary_stats.py

Co-authored-by: Szymon Szyszkowski <[email protected]>

* fix: code review updates for docs and version

* fix: syntax for concat_ws

* style: list harmonisation steps in the docstring

* style: rename freq to MAF

* style: use concat_ws

* style: use two distinct parameters for study index and summary stats output paths

---------

Co-authored-by: Szymon Szyszkowski <[email protected]>
  • Loading branch information
tskir and project-defiant authored Jun 25, 2024
1 parent 0d9160f commit 8ff4bfc
Show file tree
Hide file tree
Showing 15 changed files with 477 additions and 18 deletions.
13 changes: 13 additions & 0 deletions config/step/ot_ukb_ppp_eur_sumstat_preprocess.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defaults:
- ukb_ppp_eur_sumstat_preprocess

raw_study_index_path: ???
raw_summary_stats_path: ???
variant_annotation_path: ???
tmp_variant_annotation_path: ???
study_index_output_path: ???
summary_stats_output_path: ???

session:
extended_spark_conf:
"spark.sql.shuffle.partitions": "3200"
26 changes: 16 additions & 10 deletions docs/development/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ In order to run the code:
- Note that the version must comply with [PEP440 conventions](https://peps.python.org/pep-0440/#normalization), otherwise Poetry will not allow it to be deployed.
- Do not use underscores or hyphens in your version name. When building the WHL file, they will be automatically converted to dots, which means the file name will no longer match the version and the build will fail. Use dots instead.

3. Manually edit your local `src/airflow/dags/common_airflow.py` and set `OTG_VERSION` to the same version as you did in the previous step.
3. Manually edit your local `src/airflow/dags/common_airflow.py` and set `GENTROPY_VERSION` to the same version as you did in the previous step.

4. Run `make build`.

Expand All @@ -66,21 +66,27 @@ For more details on each of these steps, see the sections below.

- If during development you had a question which wasn't covered in the documentation, and someone explained it to you, add it to the documentation. The same applies if you encountered any instructions in the documentation which were obsolete or incorrect.
- Documentation autogeneration expressions start with `:::`. They will automatically generate sections of the documentation based on class and method docstrings. Be sure to update them for:
- Dataset definitions in `docs/python_api/datasource/STEP` (example: `docs/python_api/datasource/finngen/study_index.md`)
- Step definition in `docs/python_api/step/STEP.md` (example: `docs/python_api/step/finngen.md`)
- Datasource main page, for example: `docs/python_api/datasources/finngen/_finngen.md`
- Dataset definitions, for example: `docs/python_api/datasources/finngen/study_index.md`
- Step definition, for example: `docs/python_api/steps/finngen_sumstat_preprocess.md`

### Configuration

- Input and output paths in `config/datasets/gcp.yaml`
- Step configuration in `config/step/STEP.yaml` (example: `config/step/finngen.yaml`)
- Input and output paths in `config/datasets/ot_gcp.yaml`
- Step configuration, for example: `config/step/ot_finngen_sumstat_preprocess.yaml`

### Classes

- Dataset class in `src/gentropy/datasource/STEP` (example: `src/gentropy/datasource/finngen/study_index.py``FinnGenStudyIndex`)
- Step main running class in `src/gentropy/STEP.py` (example: `src/gentropy/finngen.py`)
- Datasource init, for example: `src/gentropy/datasource/finngen/__init__.py`
- Dataset classes, for example: `src/gentropy/datasource/finngen/study_index.py``FinnGenStudyIndex`
- Step main running class, for example: `src/gentropy/finngen_sumstat_preprocess.py`

### Tests

- Test study fixture in `tests/conftest.py` (example: `mock_study_index_finngen` in that module)
- Test sample data in `tests/data_samples` (example: `tests/gentropy/data_samples/finngen_studies_sample.json`)
- Test definition in `tests/` (example: `tests/dataset/test_study_index.py``test_study_index_finngen_creation`)
- Test study fixture in `tests/conftest.py`, for example: `mock_study_index_finngen` in that module
- Test sample data, for example: `tests/gentropy/data_samples/finngen_studies_sample.json`
- Test definition, for example: `tests/dataset/test_study_index.py``test_study_index_finngen_creation`)

### Orchestration

- Airflow DAG, for example: `src/airflow/dags/finngen_harmonisation.py`
1 change: 1 addition & 0 deletions docs/python_api/datasources/_datasources.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This section contains information about the data source harmonisation tools avai

1. [GWAS Catalog](gwas_catalog/_gwas_catalog.md) (with or without full summary statistics)
1. [FinnGen](finngen/_finngen.md)
1. [UKB PPP (EUR)](ukb_ppp_eur/_ukb_ppp_eur.md)

## Molecular QTLs

Expand Down
7 changes: 7 additions & 0 deletions docs/python_api/datasources/ukb_ppp_eur/_ukb_ppp_eur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: UKB-PPP (EUR)
---

The UKB-PPP is a collaboration between the UK Biobank (UKB) and thirteen biopharmaceutical companies characterising the plasma proteomic profiles of 54,219 UKB participants.

The original data is available at https://www.synapse.org/#!Synapse:syn51364943/. The associated paper is https://www.nature.com/articles/s41586-023-06592-6.
5 changes: 5 additions & 0 deletions docs/python_api/datasources/ukb_ppp_eur/study_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: Study Index
---

::: gentropy.datasource.ukb_ppp_eur.study_index.UkbPppEurStudyIndex
5 changes: 5 additions & 0 deletions docs/python_api/datasources/ukb_ppp_eur/summary_stats.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: Summary Statistics
---

::: gentropy.datasource.ukb_ppp_eur.summary_stats.UkbPppEurSummaryStats
5 changes: 5 additions & 0 deletions docs/python_api/steps/ukb_ppp_eur_sumstat_preprocess.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
title: ukb_ppp_eur_sumstat_preprocess
---

::: gentropy.ukb_ppp_eur_sumstat_preprocess.UkbPppEurStep
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions src/airflow/dags/ukb_ppp_eur.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Airflow DAG to ingest and harmonise UKB PPP (EUR) data."""

from __future__ import annotations

from pathlib import Path

import common_airflow as common
from airflow.models.dag import DAG

CLUSTER_NAME = "otg-ukb-ppp-eur"

# Input location.
UKB_PPP_EUR_STUDY_INDEX = "gs://gentropy-tmp/batch/output/ukb_ppp_eur/study_index.tsv"
UKB_PPP_EUR_SUMMARY_STATS = "gs://gentropy-tmp/batch/output/ukb_ppp_eur/summary_stats.parquet"
VARIANT_ANNOTATION = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX/variant_annotation"

# Output locations.
TMP_VARIANT_ANNOTATION = "gs://gentropy-tmp/variant_annotation"
UKB_PPP_EUR_OUTPUT_STUDY_INDEX = "gs://ukb_ppp_eur_data/study_index"
UKB_PPP_EUR_OUTPUT_SUMMARY_STATS = "gs://ukb_ppp_eur_data/summary_stats"

with DAG(
dag_id=Path(__file__).stem,
description="Open Targets Genetics — Ingest UKB PPP (EUR)",
default_args=common.shared_dag_args,
**common.shared_dag_kwargs,
):
dag = common.generate_dag(
cluster_name=CLUSTER_NAME,
tasks=[
common.submit_step(
cluster_name=CLUSTER_NAME,
step_id="ot_ukb_ppp_eur_sumstat_preprocess",
other_args=[
f"step.raw_study_index_path={UKB_PPP_EUR_STUDY_INDEX}",
f"step.raw_summary_stats_path={UKB_PPP_EUR_SUMMARY_STATS}",
f"step.variant_annotation_path={VARIANT_ANNOTATION}",
f"step.tmp_variant_annotation_path={TMP_VARIANT_ANNOTATION}",
f"step.study_index_output_path={UKB_PPP_EUR_OUTPUT_STUDY_INDEX}",
f"step.summary_stats_output_path={UKB_PPP_EUR_OUTPUT_SUMMARY_STATS}",
]
)
]
)
14 changes: 7 additions & 7 deletions src/gentropy/common/spark_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,19 +267,19 @@ def neglog_pvalue_to_mantissa_and_exponent(p_value: Column) -> tuple[Column, Col
... .select('negLogPv',*neglog_pvalue_to_mantissa_and_exponent(f.col('negLogPv')))
... .show()
... )
+--------+------------------+--------------+
|negLogPv| pValueMantissa|pValueExponent|
+--------+------------------+--------------+
| 4.56| 3.63078054770101| -5|
| 2109.23|1.6982436524618154| -2110|
+--------+------------------+--------------+
+--------+--------------+--------------+
|negLogPv|pValueMantissa|pValueExponent|
+--------+--------------+--------------+
| 4.56| 3.6307805| -5|
| 2109.23| 1.6982436| -2110|
+--------+--------------+--------------+
<BLANKLINE>
"""
exponent: Column = f.ceil(p_value)
mantissa: Column = f.pow(f.lit(10), (p_value - exponent + f.lit(1)))

return (
mantissa.cast(t.DoubleType()).alias("pValueMantissa"),
mantissa.cast(t.FloatType()).alias("pValueMantissa"),
(-1 * exponent).cast(t.IntegerType()).alias("pValueExponent"),
)

Expand Down
14 changes: 14 additions & 0 deletions src/gentropy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,19 @@ class PICSConfig(StepConfig):
_target_: str = "gentropy.pics.PICSStep"


@dataclass
class UkbPppEurConfig(StepConfig):
"""UKB PPP (EUR) ingestion step configuration."""

raw_study_index_path: str = MISSING
raw_summary_stats_path: str = MISSING
tmp_variant_annotation_path: str = MISSING
variant_annotation_path: str = MISSING
study_index_output_path: str = MISSING
summary_stats_output_path: str = MISSING
_target_: str = "gentropy.ukb_ppp_eur_sumstat_preprocess.UkbPppEurStep"


@dataclass
class VariantAnnotationConfig(StepConfig):
"""Variant annotation step configuration."""
Expand Down Expand Up @@ -478,6 +491,7 @@ def register_config() -> None:
)

cs.store(group="step", name="pics", node=PICSConfig)
cs.store(group="step", name="ukb_ppp_eur_sumstat_preprocess", node=UkbPppEurConfig)
cs.store(group="step", name="variant_annotation", node=VariantAnnotationConfig)
cs.store(group="step", name="variant_index", node=VariantIndexConfig)
cs.store(group="step", name="variant_to_gene", node=VariantToGeneConfig)
Expand Down
3 changes: 3 additions & 0 deletions src/gentropy/datasource/ukb_ppp_eur/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""UKB PPP (EUR) data source."""

from __future__ import annotations
76 changes: 76 additions & 0 deletions src/gentropy/datasource/ukb_ppp_eur/study_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Study Index for Finngen data source."""
from __future__ import annotations

import pyspark.sql.functions as f
from pyspark.sql import SparkSession

from gentropy.dataset.study_index import StudyIndex


class UkbPppEurStudyIndex(StudyIndex):
"""Study index dataset from UKB PPP (EUR)."""

@classmethod
def from_source(
cls: type[UkbPppEurStudyIndex],
spark: SparkSession,
raw_study_index_path: str,
raw_summary_stats_path: str,
) -> StudyIndex:
"""This function ingests study level metadata from UKB PPP (EUR).
Args:
spark (SparkSession): Spark session object.
raw_study_index_path (str): Raw study index path.
raw_summary_stats_path (str): Raw summary stats path.
Returns:
StudyIndex: Parsed and annotated UKB PPP (EUR) study table.
"""
# In order to populate the nSamples column, we need to peek inside the summary stats dataframe.
num_of_samples = (
spark
.read
.parquet(raw_summary_stats_path)
.filter(f.col("chromosome") == "22")
.groupBy("studyId")
.agg(f.first("N").cast("integer").alias("nSamples"))
.select("*")
)
# Now we can read the raw study index and complete the processing.
study_index_df = (
spark.read.csv(raw_study_index_path, sep="\t", header=True)
.select(
f.lit("pqtl").alias("studyType"),
f.lit("UKB_PPP_EUR").alias("projectId"),
f.col("_gentropy_study_id").alias("studyId"),
f.col("UKBPPP_ProteinID").alias("traitFromSource"),
f.lit("UBERON_0001969").alias("tissueFromSourceId"),
f.col("ensembl_id").alias("geneId"),
f.lit(True).alias("hasSumstats"),
f.col("_gentropy_summary_stats_link").alias("summarystatsLocation"),
)
.join(num_of_samples, "studyId", "inner")
)
# Add population structure.
study_index_df = (
study_index_df
.withColumn(
"discoverySamples",
f.array(
f.struct(
f.col("nSamples").cast("integer").alias("sampleSize"),
f.lit("European").alias("ancestry"),
)
)
)
.withColumn(
"ldPopulationStructure",
cls.aggregate_and_map_ancestries(f.col("discoverySamples")),
)
)

return StudyIndex(
_df=study_index_df,
_schema=StudyIndex.get_schema(),
)
Loading

0 comments on commit 8ff4bfc

Please sign in to comment.