From 8ff4bfcf5d7ecbc3964c4c5dea4c168e3eacc8cc Mon Sep 17 00:00:00 2001 From: Kirill Tsukanov Date: Tue, 25 Jun 2024 13:46:05 +0100 Subject: [PATCH] feat: implement UKB PPP (EUR) ingestion & harmonisation (#652) * feat: implement UKB PPP (EUR) ingestion & harmonisation * fix: correct module name for docs * fix: definitely correct module name for docs * test: update output of neglog_pvalue_to_mantissa_and_exponent * fix: test syntax with * Update src/gentropy/datasource/ukb_ppp_eur/summary_stats.py Co-authored-by: Szymon Szyszkowski <69353402+project-defiant@users.noreply.github.com> * fix: code review updates for docs and version * fix: syntax for concat_ws * style: list harmonisation steps in the docstring * style: rename freq to MAF * style: use concat_ws * style: use two distinct parameters for study index and summary stats output paths --------- Co-authored-by: Szymon Szyszkowski <69353402+project-defiant@users.noreply.github.com> --- .../ot_ukb_ppp_eur_sumstat_preprocess.yaml | 13 ++ docs/development/contributing.md | 26 +-- docs/python_api/datasources/_datasources.md | 1 + .../datasources/ukb_ppp_eur/_ukb_ppp_eur.md | 7 + .../datasources/ukb_ppp_eur/study_index.md | 5 + .../datasources/ukb_ppp_eur/summary_stats.md | 5 + .../steps/ukb_ppp_eur_sumstat_preprocess.md | 5 + poetry.lock | 2 +- src/airflow/dags/ukb_ppp_eur.py | 44 +++++ src/gentropy/common/spark_helpers.py | 14 +- src/gentropy/config.py | 14 ++ .../datasource/ukb_ppp_eur/__init__.py | 3 + .../datasource/ukb_ppp_eur/study_index.py | 76 ++++++++ .../datasource/ukb_ppp_eur/summary_stats.py | 168 ++++++++++++++++++ .../ukb_ppp_eur_sumstat_preprocess.py | 112 ++++++++++++ 15 files changed, 477 insertions(+), 18 deletions(-) create mode 100644 config/step/ot_ukb_ppp_eur_sumstat_preprocess.yaml create mode 100644 docs/python_api/datasources/ukb_ppp_eur/_ukb_ppp_eur.md create mode 100644 docs/python_api/datasources/ukb_ppp_eur/study_index.md create mode 100644 docs/python_api/datasources/ukb_ppp_eur/summary_stats.md create mode 100644 docs/python_api/steps/ukb_ppp_eur_sumstat_preprocess.md create mode 100644 src/airflow/dags/ukb_ppp_eur.py create mode 100644 src/gentropy/datasource/ukb_ppp_eur/__init__.py create mode 100644 src/gentropy/datasource/ukb_ppp_eur/study_index.py create mode 100644 src/gentropy/datasource/ukb_ppp_eur/summary_stats.py create mode 100644 src/gentropy/ukb_ppp_eur_sumstat_preprocess.py diff --git a/config/step/ot_ukb_ppp_eur_sumstat_preprocess.yaml b/config/step/ot_ukb_ppp_eur_sumstat_preprocess.yaml new file mode 100644 index 000000000..1f8c108bc --- /dev/null +++ b/config/step/ot_ukb_ppp_eur_sumstat_preprocess.yaml @@ -0,0 +1,13 @@ +defaults: + - ukb_ppp_eur_sumstat_preprocess + +raw_study_index_path: ??? +raw_summary_stats_path: ??? +variant_annotation_path: ??? +tmp_variant_annotation_path: ??? +study_index_output_path: ??? +summary_stats_output_path: ??? + +session: + extended_spark_conf: + "spark.sql.shuffle.partitions": "3200" diff --git a/docs/development/contributing.md b/docs/development/contributing.md index 9742d3742..3a363210f 100644 --- a/docs/development/contributing.md +++ b/docs/development/contributing.md @@ -41,7 +41,7 @@ In order to run the code: - Note that the version must comply with [PEP440 conventions](https://peps.python.org/pep-0440/#normalization), otherwise Poetry will not allow it to be deployed. - Do not use underscores or hyphens in your version name. When building the WHL file, they will be automatically converted to dots, which means the file name will no longer match the version and the build will fail. Use dots instead. -3. Manually edit your local `src/airflow/dags/common_airflow.py` and set `OTG_VERSION` to the same version as you did in the previous step. +3. Manually edit your local `src/airflow/dags/common_airflow.py` and set `GENTROPY_VERSION` to the same version as you did in the previous step. 4. Run `make build`. @@ -66,21 +66,27 @@ For more details on each of these steps, see the sections below. - If during development you had a question which wasn't covered in the documentation, and someone explained it to you, add it to the documentation. The same applies if you encountered any instructions in the documentation which were obsolete or incorrect. - Documentation autogeneration expressions start with `:::`. They will automatically generate sections of the documentation based on class and method docstrings. Be sure to update them for: - - Dataset definitions in `docs/python_api/datasource/STEP` (example: `docs/python_api/datasource/finngen/study_index.md`) - - Step definition in `docs/python_api/step/STEP.md` (example: `docs/python_api/step/finngen.md`) + - Datasource main page, for example: `docs/python_api/datasources/finngen/_finngen.md` + - Dataset definitions, for example: `docs/python_api/datasources/finngen/study_index.md` + - Step definition, for example: `docs/python_api/steps/finngen_sumstat_preprocess.md` ### Configuration -- Input and output paths in `config/datasets/gcp.yaml` -- Step configuration in `config/step/STEP.yaml` (example: `config/step/finngen.yaml`) +- Input and output paths in `config/datasets/ot_gcp.yaml` +- Step configuration, for example: `config/step/ot_finngen_sumstat_preprocess.yaml` ### Classes -- Dataset class in `src/gentropy/datasource/STEP` (example: `src/gentropy/datasource/finngen/study_index.py` → `FinnGenStudyIndex`) -- Step main running class in `src/gentropy/STEP.py` (example: `src/gentropy/finngen.py`) +- Datasource init, for example: `src/gentropy/datasource/finngen/__init__.py` +- Dataset classes, for example: `src/gentropy/datasource/finngen/study_index.py` → `FinnGenStudyIndex` +- Step main running class, for example: `src/gentropy/finngen_sumstat_preprocess.py` ### Tests -- Test study fixture in `tests/conftest.py` (example: `mock_study_index_finngen` in that module) -- Test sample data in `tests/data_samples` (example: `tests/gentropy/data_samples/finngen_studies_sample.json`) -- Test definition in `tests/` (example: `tests/dataset/test_study_index.py` → `test_study_index_finngen_creation`) +- Test study fixture in `tests/conftest.py`, for example: `mock_study_index_finngen` in that module +- Test sample data, for example: `tests/gentropy/data_samples/finngen_studies_sample.json` +- Test definition, for example: `tests/dataset/test_study_index.py` → `test_study_index_finngen_creation`) + +### Orchestration + +- Airflow DAG, for example: `src/airflow/dags/finngen_harmonisation.py` diff --git a/docs/python_api/datasources/_datasources.md b/docs/python_api/datasources/_datasources.md index 036dbcb37..f05a2b834 100644 --- a/docs/python_api/datasources/_datasources.md +++ b/docs/python_api/datasources/_datasources.md @@ -10,6 +10,7 @@ This section contains information about the data source harmonisation tools avai 1. [GWAS Catalog](gwas_catalog/_gwas_catalog.md) (with or without full summary statistics) 1. [FinnGen](finngen/_finngen.md) +1. [UKB PPP (EUR)](ukb_ppp_eur/_ukb_ppp_eur.md) ## Molecular QTLs diff --git a/docs/python_api/datasources/ukb_ppp_eur/_ukb_ppp_eur.md b/docs/python_api/datasources/ukb_ppp_eur/_ukb_ppp_eur.md new file mode 100644 index 000000000..e416e1f32 --- /dev/null +++ b/docs/python_api/datasources/ukb_ppp_eur/_ukb_ppp_eur.md @@ -0,0 +1,7 @@ +--- +title: UKB-PPP (EUR) +--- + +The UKB-PPP is a collaboration between the UK Biobank (UKB) and thirteen biopharmaceutical companies characterising the plasma proteomic profiles of 54,219 UKB participants. + +The original data is available at https://www.synapse.org/#!Synapse:syn51364943/. The associated paper is https://www.nature.com/articles/s41586-023-06592-6. diff --git a/docs/python_api/datasources/ukb_ppp_eur/study_index.md b/docs/python_api/datasources/ukb_ppp_eur/study_index.md new file mode 100644 index 000000000..b7f0d91a0 --- /dev/null +++ b/docs/python_api/datasources/ukb_ppp_eur/study_index.md @@ -0,0 +1,5 @@ +--- +title: Study Index +--- + +::: gentropy.datasource.ukb_ppp_eur.study_index.UkbPppEurStudyIndex diff --git a/docs/python_api/datasources/ukb_ppp_eur/summary_stats.md b/docs/python_api/datasources/ukb_ppp_eur/summary_stats.md new file mode 100644 index 000000000..e2db55c25 --- /dev/null +++ b/docs/python_api/datasources/ukb_ppp_eur/summary_stats.md @@ -0,0 +1,5 @@ +--- +title: Summary Statistics +--- + +::: gentropy.datasource.ukb_ppp_eur.summary_stats.UkbPppEurSummaryStats diff --git a/docs/python_api/steps/ukb_ppp_eur_sumstat_preprocess.md b/docs/python_api/steps/ukb_ppp_eur_sumstat_preprocess.md new file mode 100644 index 000000000..4dd74f1df --- /dev/null +++ b/docs/python_api/steps/ukb_ppp_eur_sumstat_preprocess.md @@ -0,0 +1,5 @@ +--- +title: ukb_ppp_eur_sumstat_preprocess +--- + +::: gentropy.ukb_ppp_eur_sumstat_preprocess.UkbPppEurStep diff --git a/poetry.lock b/poetry.lock index 17ef009f0..94f560f9a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.0 and should not be changed by hand. [[package]] name = "aiodns" diff --git a/src/airflow/dags/ukb_ppp_eur.py b/src/airflow/dags/ukb_ppp_eur.py new file mode 100644 index 000000000..f8a7c2342 --- /dev/null +++ b/src/airflow/dags/ukb_ppp_eur.py @@ -0,0 +1,44 @@ +"""Airflow DAG to ingest and harmonise UKB PPP (EUR) data.""" + +from __future__ import annotations + +from pathlib import Path + +import common_airflow as common +from airflow.models.dag import DAG + +CLUSTER_NAME = "otg-ukb-ppp-eur" + +# Input location. +UKB_PPP_EUR_STUDY_INDEX = "gs://gentropy-tmp/batch/output/ukb_ppp_eur/study_index.tsv" +UKB_PPP_EUR_SUMMARY_STATS = "gs://gentropy-tmp/batch/output/ukb_ppp_eur/summary_stats.parquet" +VARIANT_ANNOTATION = "gs://genetics_etl_python_playground/output/python_etl/parquet/XX.XX/variant_annotation" + +# Output locations. +TMP_VARIANT_ANNOTATION = "gs://gentropy-tmp/variant_annotation" +UKB_PPP_EUR_OUTPUT_STUDY_INDEX = "gs://ukb_ppp_eur_data/study_index" +UKB_PPP_EUR_OUTPUT_SUMMARY_STATS = "gs://ukb_ppp_eur_data/summary_stats" + +with DAG( + dag_id=Path(__file__).stem, + description="Open Targets Genetics — Ingest UKB PPP (EUR)", + default_args=common.shared_dag_args, + **common.shared_dag_kwargs, +): + dag = common.generate_dag( + cluster_name=CLUSTER_NAME, + tasks=[ + common.submit_step( + cluster_name=CLUSTER_NAME, + step_id="ot_ukb_ppp_eur_sumstat_preprocess", + other_args=[ + f"step.raw_study_index_path={UKB_PPP_EUR_STUDY_INDEX}", + f"step.raw_summary_stats_path={UKB_PPP_EUR_SUMMARY_STATS}", + f"step.variant_annotation_path={VARIANT_ANNOTATION}", + f"step.tmp_variant_annotation_path={TMP_VARIANT_ANNOTATION}", + f"step.study_index_output_path={UKB_PPP_EUR_OUTPUT_STUDY_INDEX}", + f"step.summary_stats_output_path={UKB_PPP_EUR_OUTPUT_SUMMARY_STATS}", + ] + ) + ] + ) diff --git a/src/gentropy/common/spark_helpers.py b/src/gentropy/common/spark_helpers.py index 60b49d88d..723e877c3 100644 --- a/src/gentropy/common/spark_helpers.py +++ b/src/gentropy/common/spark_helpers.py @@ -267,19 +267,19 @@ def neglog_pvalue_to_mantissa_and_exponent(p_value: Column) -> tuple[Column, Col ... .select('negLogPv',*neglog_pvalue_to_mantissa_and_exponent(f.col('negLogPv'))) ... .show() ... ) - +--------+------------------+--------------+ - |negLogPv| pValueMantissa|pValueExponent| - +--------+------------------+--------------+ - | 4.56| 3.63078054770101| -5| - | 2109.23|1.6982436524618154| -2110| - +--------+------------------+--------------+ + +--------+--------------+--------------+ + |negLogPv|pValueMantissa|pValueExponent| + +--------+--------------+--------------+ + | 4.56| 3.6307805| -5| + | 2109.23| 1.6982436| -2110| + +--------+--------------+--------------+ """ exponent: Column = f.ceil(p_value) mantissa: Column = f.pow(f.lit(10), (p_value - exponent + f.lit(1))) return ( - mantissa.cast(t.DoubleType()).alias("pValueMantissa"), + mantissa.cast(t.FloatType()).alias("pValueMantissa"), (-1 * exponent).cast(t.IntegerType()).alias("pValueExponent"), ) diff --git a/src/gentropy/config.py b/src/gentropy/config.py index f8077722f..4ebf4cbe8 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -288,6 +288,19 @@ class PICSConfig(StepConfig): _target_: str = "gentropy.pics.PICSStep" +@dataclass +class UkbPppEurConfig(StepConfig): + """UKB PPP (EUR) ingestion step configuration.""" + + raw_study_index_path: str = MISSING + raw_summary_stats_path: str = MISSING + tmp_variant_annotation_path: str = MISSING + variant_annotation_path: str = MISSING + study_index_output_path: str = MISSING + summary_stats_output_path: str = MISSING + _target_: str = "gentropy.ukb_ppp_eur_sumstat_preprocess.UkbPppEurStep" + + @dataclass class VariantAnnotationConfig(StepConfig): """Variant annotation step configuration.""" @@ -478,6 +491,7 @@ def register_config() -> None: ) cs.store(group="step", name="pics", node=PICSConfig) + cs.store(group="step", name="ukb_ppp_eur_sumstat_preprocess", node=UkbPppEurConfig) cs.store(group="step", name="variant_annotation", node=VariantAnnotationConfig) cs.store(group="step", name="variant_index", node=VariantIndexConfig) cs.store(group="step", name="variant_to_gene", node=VariantToGeneConfig) diff --git a/src/gentropy/datasource/ukb_ppp_eur/__init__.py b/src/gentropy/datasource/ukb_ppp_eur/__init__.py new file mode 100644 index 000000000..784877a26 --- /dev/null +++ b/src/gentropy/datasource/ukb_ppp_eur/__init__.py @@ -0,0 +1,3 @@ +"""UKB PPP (EUR) data source.""" + +from __future__ import annotations diff --git a/src/gentropy/datasource/ukb_ppp_eur/study_index.py b/src/gentropy/datasource/ukb_ppp_eur/study_index.py new file mode 100644 index 000000000..3e0a7d782 --- /dev/null +++ b/src/gentropy/datasource/ukb_ppp_eur/study_index.py @@ -0,0 +1,76 @@ +"""Study Index for Finngen data source.""" +from __future__ import annotations + +import pyspark.sql.functions as f +from pyspark.sql import SparkSession + +from gentropy.dataset.study_index import StudyIndex + + +class UkbPppEurStudyIndex(StudyIndex): + """Study index dataset from UKB PPP (EUR).""" + + @classmethod + def from_source( + cls: type[UkbPppEurStudyIndex], + spark: SparkSession, + raw_study_index_path: str, + raw_summary_stats_path: str, + ) -> StudyIndex: + """This function ingests study level metadata from UKB PPP (EUR). + + Args: + spark (SparkSession): Spark session object. + raw_study_index_path (str): Raw study index path. + raw_summary_stats_path (str): Raw summary stats path. + + Returns: + StudyIndex: Parsed and annotated UKB PPP (EUR) study table. + """ + # In order to populate the nSamples column, we need to peek inside the summary stats dataframe. + num_of_samples = ( + spark + .read + .parquet(raw_summary_stats_path) + .filter(f.col("chromosome") == "22") + .groupBy("studyId") + .agg(f.first("N").cast("integer").alias("nSamples")) + .select("*") + ) + # Now we can read the raw study index and complete the processing. + study_index_df = ( + spark.read.csv(raw_study_index_path, sep="\t", header=True) + .select( + f.lit("pqtl").alias("studyType"), + f.lit("UKB_PPP_EUR").alias("projectId"), + f.col("_gentropy_study_id").alias("studyId"), + f.col("UKBPPP_ProteinID").alias("traitFromSource"), + f.lit("UBERON_0001969").alias("tissueFromSourceId"), + f.col("ensembl_id").alias("geneId"), + f.lit(True).alias("hasSumstats"), + f.col("_gentropy_summary_stats_link").alias("summarystatsLocation"), + ) + .join(num_of_samples, "studyId", "inner") + ) + # Add population structure. + study_index_df = ( + study_index_df + .withColumn( + "discoverySamples", + f.array( + f.struct( + f.col("nSamples").cast("integer").alias("sampleSize"), + f.lit("European").alias("ancestry"), + ) + ) + ) + .withColumn( + "ldPopulationStructure", + cls.aggregate_and_map_ancestries(f.col("discoverySamples")), + ) + ) + + return StudyIndex( + _df=study_index_df, + _schema=StudyIndex.get_schema(), + ) diff --git a/src/gentropy/datasource/ukb_ppp_eur/summary_stats.py b/src/gentropy/datasource/ukb_ppp_eur/summary_stats.py new file mode 100644 index 000000000..895900b41 --- /dev/null +++ b/src/gentropy/datasource/ukb_ppp_eur/summary_stats.py @@ -0,0 +1,168 @@ +"""Summary statistics ingestion for UKB PPP (EUR).""" + +from __future__ import annotations + +from dataclasses import dataclass + +import pyspark.sql.functions as f +import pyspark.sql.types as t +from pyspark.sql import SparkSession + +from gentropy.common.spark_helpers import neglog_pvalue_to_mantissa_and_exponent +from gentropy.dataset.summary_statistics import SummaryStatistics + + +@dataclass +class UkbPppEurSummaryStats: + """Summary statistics dataset for UKB PPP (EUR).""" + + @classmethod + def from_source( + cls: type[UkbPppEurSummaryStats], + spark: SparkSession, + raw_summary_stats_path: str, + tmp_variant_annotation_path: str, + chromosome: str, + ) -> SummaryStatistics: + """Ingest and harmonise all summary stats for UKB PPP (EUR) data. + + 1. Rename chromosome 23 to X. + 2. Filter out low INFO rows. + 3. Filter out low frequency rows. + 4. Assign variant types. + 5. Create variant ID for joining the variant annotation dataset. + 6. Join with the Variant Annotation dataset. + 7. Drop bad quality variants. + + Args: + spark (SparkSession): Spark session object. + raw_summary_stats_path (str): Input raw summary stats path. + tmp_variant_annotation_path (str): Input variant annotation dataset path. + chromosome (str): Which chromosome to process. + + Returns: + SummaryStatistics: Processed summary statistics dataset for a given chromosome. + """ + # Read the precomputed variant annotation dataset. + va_df = ( + spark + .read + .parquet(tmp_variant_annotation_path) + .filter(f.col("vaChromosome") == ("X" if chromosome == "23" else chromosome)) + .persist() + ) + + # Read and process the summary stats dataset. + df = ( + spark + .read + .parquet(raw_summary_stats_path) + .filter(f.col("chromosome") == chromosome) + # Harmonise, 1: Rename chromosome 23 to X. + .withColumn( + "chromosome", + f.when( + f.col("chromosome") == "23", "X" + ).otherwise(f.col("chromosome")) + ) + # Harmonise, 2: Filter out low INFO rows. + .filter(f.col("INFO") >= 0.8) + # Harmonise, 3: Filter out low frequency rows. + .withColumn( + "MAF", + f.when(f.col("A1FREQ") < 0.5, f.col("A1FREQ")) + .otherwise(1 - f.col("A1FREQ")) + ) + .filter(f.col("MAF") >= 0.0001) + .drop("MAF") + # Harmonise, 4: Assign variant types. + .withColumn( + "variant_type", + f.when( + (f.length("ALLELE0") == 1) & (f.length("ALLELE1") == 1), + f.when( + ((f.col("ALLELE0") == "A") & (f.col("ALLELE1") == "T")) | + ((f.col("ALLELE0") == "T") & (f.col("ALLELE1") == "A")) | + ((f.col("ALLELE0") == "G") & (f.col("ALLELE1") == "C")) | + ((f.col("ALLELE0") == "C") & (f.col("ALLELE1") == "G")), + "snp_c" + ) + .otherwise( + "snp_n" + ) + ) + .otherwise( + "indel" + ) + ) + # Harmonise, 5: Create variant ID for joining the variant annotation dataset. + .withColumn( + "GENPOS", + f.col("GENPOS").cast("integer") + ) + .withColumn( + "ukb_ppp_id", + f.concat_ws( + "_", + f.col("chromosome"), + f.col("GENPOS"), + f.col("ALLELE0"), + f.col("ALLELE1") + ) + ) + ) + # Harmonise, 6: Join with the Variant Annotation dataset. + df = ( + df + .join(va_df, (df["chromosome"] == va_df["vaChromosome"]) & (df["ukb_ppp_id"] == va_df["ukb_ppp_id"]), "inner") + .drop("vaChromosome", "ukb_ppp_id") + .withColumn( + "effectAlleleFrequencyFromSource", + f.when( + f.col("direction") == "direct", + f.col("A1FREQ").cast("float") + ).otherwise(1 - f.col("A1FREQ").cast("float")) + ) + .withColumn( + "beta", + f.when( + f.col("direction") == "direct", + f.col("BETA").cast("double") + ).otherwise(-f.col("BETA").cast("double")) + ) + ) + df = ( + # Harmonise, 7: Drop bad quality variants. + df + .filter( + ~ ((f.col("variant_type") == "snp_c") & (f.col("direction") == "flip")) + ) + ) + + # Prepare the fields according to schema. + df = ( + df + .select( + f.col("studyId"), + f.col("chromosome"), + f.col("variantId"), + f.col("beta"), + f.col("GENPOS").cast(t.IntegerType()).alias("position"), + # Parse p-value into mantissa and exponent. + *neglog_pvalue_to_mantissa_and_exponent(f.col("LOG10P").cast(t.DoubleType())), + # Add standard error and sample size information. + f.col("SE").cast("double").alias("standardError"), + f.col("N").cast("integer").alias("sampleSize"), + ) + # Drop rows which don't have proper position or beta value. + .filter( + f.col("position").cast(t.IntegerType()).isNotNull() + & (f.col("beta") != 0) + ) + ) + + # Create the summary statistics object. + return SummaryStatistics( + _df=df, + _schema=SummaryStatistics.get_schema(), + ) diff --git a/src/gentropy/ukb_ppp_eur_sumstat_preprocess.py b/src/gentropy/ukb_ppp_eur_sumstat_preprocess.py new file mode 100644 index 000000000..fc7fed548 --- /dev/null +++ b/src/gentropy/ukb_ppp_eur_sumstat_preprocess.py @@ -0,0 +1,112 @@ +"""Step to run UKB PPP (EUR) data ingestion.""" + +from __future__ import annotations + +import pyspark.sql.functions as f + +from gentropy.common.session import Session +from gentropy.datasource.ukb_ppp_eur.study_index import UkbPppEurStudyIndex +from gentropy.datasource.ukb_ppp_eur.summary_stats import UkbPppEurSummaryStats + + +class UkbPppEurStep: + """UKB PPP (EUR) data ingestion and harmonisation.""" + + def __init__( + self, session: Session, raw_study_index_path: str, raw_summary_stats_path: str, variant_annotation_path: str, tmp_variant_annotation_path: str, study_index_output_path: str, summary_stats_output_path: str + ) -> None: + """Run UKB PPP (EUR) data ingestion and harmonisation step. + + Args: + session (Session): Session object. + raw_study_index_path (str): Input raw study index path. + raw_summary_stats_path (str): Input raw summary stats path. + variant_annotation_path (str): Input variant annotation dataset path. + tmp_variant_annotation_path (str): Temporary output path for variant annotation dataset. + study_index_output_path (str): Study index output path. + summary_stats_output_path (str): Summary stats output path. + """ + session.logger.info("Pre-compute the direct and flipped variant annotation dataset.") + va_df = ( + session + .spark + .read + .parquet(variant_annotation_path) + ) + va_df_direct = ( + va_df. + select( + f.col("chromosome").alias("vaChromosome"), + f.col("variantId"), + f.concat_ws( + "_", + f.col("chromosome"), + f.col("position"), + f.col("referenceAllele"), + f.col("alternateAllele") + ).alias("ukb_ppp_id"), + f.lit("direct").alias("direction") + ) + ) + va_df_flip = ( + va_df. + select( + f.col("chromosome").alias("vaChromosome"), + f.col("variantId"), + f.concat_ws( + "_", + f.col("chromosome"), + f.col("position"), + f.col("alternateAllele"), + f.col("referenceAllele") + ).alias("ukb_ppp_id"), + f.lit("flip").alias("direction") + ) + ) + ( + va_df_direct.union(va_df_flip) + .coalesce(1) + .repartition("vaChromosome") + .write + .partitionBy("vaChromosome") + .mode("overwrite") + .parquet(tmp_variant_annotation_path) + ) + + session.logger.info("Process study index.") + ( + UkbPppEurStudyIndex.from_source( + spark=session.spark, + raw_study_index_path=raw_study_index_path, + raw_summary_stats_path=raw_summary_stats_path, + ) + .df + .write + .mode("overwrite") + .parquet(study_index_output_path) + ) + + session.logger.info("Process and harmonise summary stats.") + # Set mode to overwrite for processing the first chromosome. + write_mode = "overwrite" + # Chromosome 23 is X, this is handled downstream. + for chromosome in list(range(1, 24)): + logging_message = f" Processing chromosome {chromosome}" + session.logger.info(logging_message) + ( + UkbPppEurSummaryStats.from_source( + spark=session.spark, + raw_summary_stats_path=raw_summary_stats_path, + tmp_variant_annotation_path=tmp_variant_annotation_path, + chromosome=str(chromosome), + ) + .df + .coalesce(1) + .repartition("studyId", "chromosome") + .write + .partitionBy("studyId", "chromosome") + .mode(write_mode) + .parquet(summary_stats_output_path) + ) + # Now that we have written the first chromosome, change mode to append for subsequent operations. + write_mode = "append"