diff --git a/docs/python_api/methods/clumping.md b/docs/python_api/methods/clumping.md index 11a197a41..86968dfe0 100644 --- a/docs/python_api/methods/clumping.md +++ b/docs/python_api/methods/clumping.md @@ -24,4 +24,4 @@ The algorithmic logic is similar to classic clumping approaches from PLINK (Refe # Locus-breaker clumping -::: gentropy.method.locus_breaker_clumping.locus_breaker +::: gentropy.method.locus_breaker_clumping.LocusBreakerClumping diff --git a/src/gentropy/config.py b/src/gentropy/config.py index 409826ca4..663b90a73 100644 --- a/src/gentropy/config.py +++ b/src/gentropy/config.py @@ -382,7 +382,7 @@ class LocusBreakerClumpingConfig(StepConfig): } ) distance_cutoff: int = 250_000 - flankig_distance: int = 100_000 + flanking_distance: int = 100_000 baseline_pvalue_cutoff: float = 1e-5 diff --git a/src/gentropy/dataset/summary_statistics.py b/src/gentropy/dataset/summary_statistics.py index 0f6f51d3f..c1e1b3af5 100644 --- a/src/gentropy/dataset/summary_statistics.py +++ b/src/gentropy/dataset/summary_statistics.py @@ -86,7 +86,7 @@ def locus_breaker_clumping( baseline_pvalue_cutoff: float = LocusBreakerClumpingConfig().baseline_pvalue_cutoff, distance_cutoff: int = LocusBreakerClumpingConfig().distance_cutoff, pvalue_cutoff: float = WindowBasedClumpingStepConfig().gwas_significance, - flankig_distance: int = LocusBreakerClumpingConfig().flankig_distance, + flanking_distance: int = LocusBreakerClumpingConfig().flanking_distance, ) -> StudyLocus: """Generate study-locus from summary statistics using locus-breaker clumping method with locus boundaries. @@ -96,20 +96,20 @@ def locus_breaker_clumping( baseline_pvalue_cutoff (float, optional): Baseline significance we consider for the locus. distance_cutoff (int, optional): Distance in base pairs to be used for clumping. pvalue_cutoff (float, optional): GWAS significance threshold. - flankig_distance (int, optional): Flank distance in base pairs to be used for clumping. + flanking_distance (int, optional): Flank distance in base pairs to be used for clumping. Returns: StudyLocus: Clumped study-locus optionally containing variants based on window. Check LocusBreakerClumpingConfig object for default values. """ - from gentropy.method.locus_breaker_clumping import locus_breaker + from gentropy.method.locus_breaker_clumping import LocusBreakerClumping - return locus_breaker( + return LocusBreakerClumping.locus_breaker( self, baseline_pvalue_cutoff, distance_cutoff, pvalue_cutoff, - flankig_distance, + flanking_distance, ) def exclude_region(self: SummaryStatistics, region: str) -> SummaryStatistics: diff --git a/src/gentropy/locus_breaker_clumping.py b/src/gentropy/locus_breaker_clumping.py new file mode 100644 index 000000000..ce546c523 --- /dev/null +++ b/src/gentropy/locus_breaker_clumping.py @@ -0,0 +1,73 @@ +"""Step to apply linkageg based clumping on study-locus dataset.""" + +from __future__ import annotations + +from gentropy.common.session import Session +from gentropy.dataset.summary_statistics import SummaryStatistics +from gentropy.method.locus_breaker_clumping import LocusBreakerClumping + + +class LocusBreakerClumpingStep: + """Step to perform locus-breaker clumping on a study.""" + + def __init__( + self, + session: Session, + summary_statistics_input_path: str, + clumped_study_locus_output_path: str, + lbc_baseline_pvalue: float, + lbc_distance_cutoff: int, + lbc_pvalue_threshold: float, + lbc_flanking_distance: int, + large_loci_size: int, + wbc_clump_distance: int, + wbc_pvalue_threshold: float, + collect_locus: bool = False, + remove_mhc: bool = True, + ) -> None: + """Run locus-breaker clumping step. + + This step will perform locus-breaker clumping on the full set of summary statistics. + StudyLocus larger than the large_loci_size, by distance, will be further clumped with window-based + clumping. + + Args: + session (Session): Session object. + summary_statistics_input_path (str): Path to the input study locus. + clumped_study_locus_output_path (str): path of the resulting, clumped study-locus dataset. + lbc_baseline_pvalue (float): Baseline p-value for locus breaker clumping. + lbc_distance_cutoff (int): Distance cutoff for locus breaker clumping. + lbc_pvalue_threshold (float): P-value threshold for locus breaker clumping. + lbc_flanking_distance (int): Flanking distance for locus breaker clumping. + large_loci_size (int): Threshold distance to define large loci for window-based clumping. + wbc_clump_distance (int): Clump distance for window breaker clumping. + wbc_pvalue_threshold (float): P-value threshold for window breaker clumping. + collect_locus (bool, optional): Whether to collect locus. Defaults to False. + remove_mhc (bool, optional): If true will use exclude_region() to remove the MHC region. + """ + sum_stats = SummaryStatistics.from_parquet( + session, summary_statistics_input_path, recursiveFileLookup=True + ).persist() + lbc = sum_stats.locus_breaker_clumping( + lbc_baseline_pvalue, + lbc_distance_cutoff, + lbc_pvalue_threshold, + lbc_flanking_distance, + ) + clumped_result = LocusBreakerClumping.process_locus_breaker_output( + lbc, + sum_stats, + large_loci_size, + wbc_clump_distance, + wbc_pvalue_threshold, + ) + if remove_mhc: + clumped_result = clumped_result.exclude_region("chr6:25726063-33400556") + + if collect_locus: + clumped_result = clumped_result.annotate_locus_statistics_boundaries( + sum_stats + ) + clumped_result.df.write.mode(session.write_mode).parquet( + clumped_study_locus_output_path + ) diff --git a/src/gentropy/method/locus_breaker_clumping.py b/src/gentropy/method/locus_breaker_clumping.py index f272a821c..8b8e3e0f6 100644 --- a/src/gentropy/method/locus_breaker_clumping.py +++ b/src/gentropy/method/locus_breaker_clumping.py @@ -3,7 +3,6 @@ from __future__ import annotations import sys -from typing import TYPE_CHECKING import numpy as np import pyspark.sql.functions as f @@ -12,103 +11,164 @@ from gentropy.common.spark_helpers import calculate_neglog_pvalue from gentropy.dataset.study_locus import StudyLocus +from gentropy.dataset.summary_statistics import SummaryStatistics -if TYPE_CHECKING: - - from gentropy.dataset.summary_statistics import SummaryStatistics - - -def locus_breaker( - summary_statistics: SummaryStatistics, - baseline_pvalue_cutoff: float, - distance_cutoff: int, - pvalue_cutoff: float, - flankig_distance: int, -) -> StudyLocus: - """Identify GWAS associated loci based on the provided p-value and distance cutoff. - - - The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff. - - The distance is only calculted between single point associations that reach the baseline p-value cutoff. - - As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position. - - To ensure completeness, the locus is extended by a flanking distance in both ends. - - Args: - summary_statistics (SummaryStatistics): Input summary statistics dataset. - baseline_pvalue_cutoff (float): baseline significance we consider for the locus. - distance_cutoff (int): minimum distance that separates two loci. - pvalue_cutoff (float): the minimum significance the locus should have. - flankig_distance (int): the distance to extend the locus in both directions. - - Returns: - StudyLocus: clumped study loci with locus start and end positions + lead variant from the locus. - """ - # Extract columns from the summary statistics: - columns_sumstats_columns = summary_statistics.df.columns - # Convert pvalue_cutoff to neglog scale: - neglog_pv_cutoff = -np.log10(pvalue_cutoff) - - # First window to calculate the distance between consecutive positions: - w1 = Window.partitionBy("studyId", "chromosome").orderBy("position") - - # Second window to calculate the locus start and end: - w2 = ( - Window.partitionBy("studyId", "chromosome", "locusStart") - .orderBy("position") - .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) - ) - - # Third window to rank the variants within the locus based on neglog p-value to find top loci: - w3 = Window.partitionBy("studyId", "chromosome", "locusStart", "locusEnd").orderBy( - f.col("negLogPValue").desc() - ) - - return StudyLocus( - _df=( - # Applying the baseline p-value cutoff: - summary_statistics.pvalue_filter(baseline_pvalue_cutoff) - # Calculating the neglog p-value for easier sorting: - .df.withColumn( - "negLogPValue", - calculate_neglog_pvalue( - f.col("pValueMantissa"), f.col("pValueExponent") - ), - ) - # Calculating the distance between consecutive positions, then identifying the locus start and end: - .withColumn("next_position", f.lag(f.col("position")).over(w1)) - .withColumn("distance", f.col("position") - f.col("next_position")) - .withColumn( - "locusStart", - f.when( - (f.col("distance") > distance_cutoff) | f.col("distance").isNull(), - f.col("position"), - ), - ) - .withColumn( - "locusStart", - f.when( - f.last(f.col("locusStart") - flankig_distance, True).over( - w1.rowsBetween(-sys.maxsize, 0) - ) - > 0, - f.last(f.col("locusStart") - flankig_distance, True).over( - w1.rowsBetween(-sys.maxsize, 0) + +class LocusBreakerClumping: + """Locus-breaker clumping method.""" + + @staticmethod + def locus_breaker( + summary_statistics: SummaryStatistics, + baseline_pvalue_cutoff: float, + distance_cutoff: int, + pvalue_cutoff: float, + flanking_distance: int, + ) -> StudyLocus: + """Identify GWAS associated loci based on the provided p-value and distance cutoff. + + - The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff. + - The distance is only calculted between single point associations that reach the baseline p-value cutoff. + - As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position. + - To ensure completeness, the locus is extended by a flanking distance in both ends. + + Args: + summary_statistics (SummaryStatistics): Input summary statistics dataset. + baseline_pvalue_cutoff (float): baseline significance we consider for the locus. + distance_cutoff (int): minimum distance that separates two loci. + pvalue_cutoff (float): the minimum significance the locus should have. + flanking_distance (int): the distance to extend the locus in both directions. + + Returns: + StudyLocus: clumped study loci with locus start and end positions + lead variant from the locus. + """ + # Extract columns from the summary statistics: + columns_sumstats_columns = summary_statistics.df.columns + # Convert pvalue_cutoff to neglog scale: + neglog_pv_cutoff = -np.log10(pvalue_cutoff) + + # First window to calculate the distance between consecutive positions: + w1 = Window.partitionBy("studyId", "chromosome").orderBy("position") + + # Second window to calculate the locus start and end: + w2 = ( + Window.partitionBy("studyId", "chromosome", "locusStart") + .orderBy("position") + .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + ) + + # Third window to rank the variants within the locus based on neglog p-value to find top loci: + w3 = Window.partitionBy( + "studyId", "chromosome", "locusStart", "locusEnd" + ).orderBy(f.col("negLogPValue").desc()) + + return StudyLocus( + _df=( + # Applying the baseline p-value cutoff: + summary_statistics.pvalue_filter(baseline_pvalue_cutoff) + # Calculating the neglog p-value for easier sorting: + .df.withColumn( + "negLogPValue", + calculate_neglog_pvalue( + f.col("pValueMantissa"), f.col("pValueExponent") ), - ).otherwise(f.lit(0)), - ) - .withColumn( - "locusEnd", f.max(f.col("position") + flankig_distance).over(w2) + ) + # Calculating the distance between consecutive positions, then identifying the locus start and end: + .withColumn("next_position", f.lag(f.col("position")).over(w1)) + .withColumn("distance", f.col("position") - f.col("next_position")) + .withColumn( + "locusStart", + f.when( + (f.col("distance") > distance_cutoff) + | f.col("distance").isNull(), + f.col("position"), + ), + ) + .withColumn( + "locusStart", + f.when( + f.last(f.col("locusStart") - flanking_distance, True).over( + w1.rowsBetween(-sys.maxsize, 0) + ) + > 0, + f.last(f.col("locusStart") - flanking_distance, True).over( + w1.rowsBetween(-sys.maxsize, 0) + ), + ).otherwise(f.lit(0)), + ) + .withColumn( + "locusEnd", f.max(f.col("position") + flanking_distance).over(w2) + ) + .withColumn("rank", f.rank().over(w3)) + .filter( + (f.col("rank") == 1) & (f.col("negLogPValue") > neglog_pv_cutoff) + ) + .select( + *columns_sumstats_columns, + # To make sure that the type of locusStart and locusEnd follows schema of StudyLocus: + f.col("locusStart").cast(t.IntegerType()).alias("locusStart"), + f.col("locusEnd").cast(t.IntegerType()).alias("locusEnd"), + f.lit(None) + .cast(t.ArrayType(t.StringType())) + .alias("qualityControls"), + StudyLocus.assign_study_locus_id( + f.col("studyId"), f.col("variantId") + ).alias("studyLocusId"), + ) + ), + _schema=StudyLocus.get_schema(), + ) + + @staticmethod + def process_locus_breaker_output( + study_locus: StudyLocus, + sum_stats: SummaryStatistics, + large_loci_size: int, + wbc_clump_distance: int, + gwas_threshold: float, + ) -> StudyLocus: + """Process the locus breaker method result, and run window-based clumping on large loci. + + Args: + study_locus (StudyLocus): StudyLocus object with locus start and end positions. + sum_stats (SummaryStatistics): Input summary statistics dataset. + large_loci_size (int): the size to define large loci which should be broken with wbc. + wbc_clump_distance (int): Clump distance for window-based clumping. + gwas_threshold (float): P-value threshold to be used in window-based clumping. + + Returns: + StudyLocus: clumped study loci with large loci broken by window-based clumping. + """ + small_loci = study_locus.filter( + (f.col("locusEnd") - f.col("locusStart")) <= large_loci_size + ) + large_loci = study_locus.filter( + (f.col("locusEnd") - f.col("locusStart")) > large_loci_size + ) + large_loci_wbc = ( + SummaryStatistics( + sum_stats.df.alias("ss") + .join( + large_loci.df.alias("ll"), + (f.col("ss.studyId") == f.col("ll.studyId")) + & (f.col("ss.chromosome") == f.col("ll.chromosome")) + & (f.col("ss.position") >= f.col("ll.locusStart")) + & (f.col("ss.position") <= f.col("ll.locusEnd")), + "inner", + ) + .select([f.col("ss." + col) for col in sum_stats.df.columns]), + SummaryStatistics.get_schema(), ) - .withColumn("rank", f.rank().over(w3)) - .filter((f.col("rank") == 1) & (f.col("negLogPValue") > neglog_pv_cutoff)) - .select( - *columns_sumstats_columns, - # To make sure that the type of locusStart and locusEnd follows schema of StudyLocus: - f.col("locusStart").cast(t.IntegerType()).alias("locusStart"), - f.col("locusEnd").cast(t.IntegerType()).alias("locusEnd"), - StudyLocus.assign_study_locus_id( - f.col("studyId"), f.col("variantId") - ).alias("studyLocusId"), + .window_based_clumping(wbc_clump_distance, gwas_threshold) + .df.withColumns( + { + "locusStart": f.col("position") - large_loci_size // 2, + "locusEnd": f.col("position") + large_loci_size // 2, + } ) - ), - _schema=StudyLocus.get_schema(), - ) + ) + + return StudyLocus( + large_loci_wbc.unionByName(small_loci.df), + StudyLocus.get_schema(), + )