Skip to content

Commit

Permalink
feat: locus_breaker_clumping (#655)
Browse files Browse the repository at this point in the history
* feat: locus_breaker_clumping

* fix: dosctring

* feat: _process_locus_breaker function

* feat: locus breaker clumping step

* fix: tidying parameters

* feat: option to remove MHC region

* fix: description for LocusBreakerClumpingStep

* fix: removing division of distance

* fix: adding new parameters for wbc distance separate from large_loci_size

* fix: resolving comments

* refactor: refactored code in process_locus_breaker_output

* fix: removing superfluous variable

* fix: persisting sumstats parquet to improve analysis plan

---------

Co-authored-by: Yakov <[email protected]>
  • Loading branch information
Daniel-Considine and addramir authored Jul 8, 2024
1 parent 6487b31 commit 5f97232
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 104 deletions.
2 changes: 1 addition & 1 deletion docs/python_api/methods/clumping.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ The algorithmic logic is similar to classic clumping approaches from PLINK (Refe

# Locus-breaker clumping

::: gentropy.method.locus_breaker_clumping.locus_breaker
::: gentropy.method.locus_breaker_clumping.LocusBreakerClumping
2 changes: 1 addition & 1 deletion src/gentropy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ class LocusBreakerClumpingConfig(StepConfig):
}
)
distance_cutoff: int = 250_000
flankig_distance: int = 100_000
flanking_distance: int = 100_000
baseline_pvalue_cutoff: float = 1e-5


Expand Down
10 changes: 5 additions & 5 deletions src/gentropy/dataset/summary_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def locus_breaker_clumping(
baseline_pvalue_cutoff: float = LocusBreakerClumpingConfig().baseline_pvalue_cutoff,
distance_cutoff: int = LocusBreakerClumpingConfig().distance_cutoff,
pvalue_cutoff: float = WindowBasedClumpingStepConfig().gwas_significance,
flankig_distance: int = LocusBreakerClumpingConfig().flankig_distance,
flanking_distance: int = LocusBreakerClumpingConfig().flanking_distance,
) -> StudyLocus:
"""Generate study-locus from summary statistics using locus-breaker clumping method with locus boundaries.
Expand All @@ -96,20 +96,20 @@ def locus_breaker_clumping(
baseline_pvalue_cutoff (float, optional): Baseline significance we consider for the locus.
distance_cutoff (int, optional): Distance in base pairs to be used for clumping.
pvalue_cutoff (float, optional): GWAS significance threshold.
flankig_distance (int, optional): Flank distance in base pairs to be used for clumping.
flanking_distance (int, optional): Flank distance in base pairs to be used for clumping.
Returns:
StudyLocus: Clumped study-locus optionally containing variants based on window.
Check LocusBreakerClumpingConfig object for default values.
"""
from gentropy.method.locus_breaker_clumping import locus_breaker
from gentropy.method.locus_breaker_clumping import LocusBreakerClumping

return locus_breaker(
return LocusBreakerClumping.locus_breaker(
self,
baseline_pvalue_cutoff,
distance_cutoff,
pvalue_cutoff,
flankig_distance,
flanking_distance,
)

def exclude_region(self: SummaryStatistics, region: str) -> SummaryStatistics:
Expand Down
73 changes: 73 additions & 0 deletions src/gentropy/locus_breaker_clumping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Step to apply linkageg based clumping on study-locus dataset."""

from __future__ import annotations

from gentropy.common.session import Session
from gentropy.dataset.summary_statistics import SummaryStatistics
from gentropy.method.locus_breaker_clumping import LocusBreakerClumping


class LocusBreakerClumpingStep:
"""Step to perform locus-breaker clumping on a study."""

def __init__(
self,
session: Session,
summary_statistics_input_path: str,
clumped_study_locus_output_path: str,
lbc_baseline_pvalue: float,
lbc_distance_cutoff: int,
lbc_pvalue_threshold: float,
lbc_flanking_distance: int,
large_loci_size: int,
wbc_clump_distance: int,
wbc_pvalue_threshold: float,
collect_locus: bool = False,
remove_mhc: bool = True,
) -> None:
"""Run locus-breaker clumping step.
This step will perform locus-breaker clumping on the full set of summary statistics.
StudyLocus larger than the large_loci_size, by distance, will be further clumped with window-based
clumping.
Args:
session (Session): Session object.
summary_statistics_input_path (str): Path to the input study locus.
clumped_study_locus_output_path (str): path of the resulting, clumped study-locus dataset.
lbc_baseline_pvalue (float): Baseline p-value for locus breaker clumping.
lbc_distance_cutoff (int): Distance cutoff for locus breaker clumping.
lbc_pvalue_threshold (float): P-value threshold for locus breaker clumping.
lbc_flanking_distance (int): Flanking distance for locus breaker clumping.
large_loci_size (int): Threshold distance to define large loci for window-based clumping.
wbc_clump_distance (int): Clump distance for window breaker clumping.
wbc_pvalue_threshold (float): P-value threshold for window breaker clumping.
collect_locus (bool, optional): Whether to collect locus. Defaults to False.
remove_mhc (bool, optional): If true will use exclude_region() to remove the MHC region.
"""
sum_stats = SummaryStatistics.from_parquet(
session, summary_statistics_input_path, recursiveFileLookup=True
).persist()
lbc = sum_stats.locus_breaker_clumping(
lbc_baseline_pvalue,
lbc_distance_cutoff,
lbc_pvalue_threshold,
lbc_flanking_distance,
)
clumped_result = LocusBreakerClumping.process_locus_breaker_output(
lbc,
sum_stats,
large_loci_size,
wbc_clump_distance,
wbc_pvalue_threshold,
)
if remove_mhc:
clumped_result = clumped_result.exclude_region("chr6:25726063-33400556")

if collect_locus:
clumped_result = clumped_result.annotate_locus_statistics_boundaries(
sum_stats
)
clumped_result.df.write.mode(session.write_mode).parquet(
clumped_study_locus_output_path
)
254 changes: 157 additions & 97 deletions src/gentropy/method/locus_breaker_clumping.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

import sys
from typing import TYPE_CHECKING

import numpy as np
import pyspark.sql.functions as f
Expand All @@ -12,103 +11,164 @@

from gentropy.common.spark_helpers import calculate_neglog_pvalue
from gentropy.dataset.study_locus import StudyLocus
from gentropy.dataset.summary_statistics import SummaryStatistics

if TYPE_CHECKING:

from gentropy.dataset.summary_statistics import SummaryStatistics


def locus_breaker(
summary_statistics: SummaryStatistics,
baseline_pvalue_cutoff: float,
distance_cutoff: int,
pvalue_cutoff: float,
flankig_distance: int,
) -> StudyLocus:
"""Identify GWAS associated loci based on the provided p-value and distance cutoff.
- The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff.
- The distance is only calculted between single point associations that reach the baseline p-value cutoff.
- As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position.
- To ensure completeness, the locus is extended by a flanking distance in both ends.
Args:
summary_statistics (SummaryStatistics): Input summary statistics dataset.
baseline_pvalue_cutoff (float): baseline significance we consider for the locus.
distance_cutoff (int): minimum distance that separates two loci.
pvalue_cutoff (float): the minimum significance the locus should have.
flankig_distance (int): the distance to extend the locus in both directions.
Returns:
StudyLocus: clumped study loci with locus start and end positions + lead variant from the locus.
"""
# Extract columns from the summary statistics:
columns_sumstats_columns = summary_statistics.df.columns
# Convert pvalue_cutoff to neglog scale:
neglog_pv_cutoff = -np.log10(pvalue_cutoff)

# First window to calculate the distance between consecutive positions:
w1 = Window.partitionBy("studyId", "chromosome").orderBy("position")

# Second window to calculate the locus start and end:
w2 = (
Window.partitionBy("studyId", "chromosome", "locusStart")
.orderBy("position")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)

# Third window to rank the variants within the locus based on neglog p-value to find top loci:
w3 = Window.partitionBy("studyId", "chromosome", "locusStart", "locusEnd").orderBy(
f.col("negLogPValue").desc()
)

return StudyLocus(
_df=(
# Applying the baseline p-value cutoff:
summary_statistics.pvalue_filter(baseline_pvalue_cutoff)
# Calculating the neglog p-value for easier sorting:
.df.withColumn(
"negLogPValue",
calculate_neglog_pvalue(
f.col("pValueMantissa"), f.col("pValueExponent")
),
)
# Calculating the distance between consecutive positions, then identifying the locus start and end:
.withColumn("next_position", f.lag(f.col("position")).over(w1))
.withColumn("distance", f.col("position") - f.col("next_position"))
.withColumn(
"locusStart",
f.when(
(f.col("distance") > distance_cutoff) | f.col("distance").isNull(),
f.col("position"),
),
)
.withColumn(
"locusStart",
f.when(
f.last(f.col("locusStart") - flankig_distance, True).over(
w1.rowsBetween(-sys.maxsize, 0)
)
> 0,
f.last(f.col("locusStart") - flankig_distance, True).over(
w1.rowsBetween(-sys.maxsize, 0)

class LocusBreakerClumping:
"""Locus-breaker clumping method."""

@staticmethod
def locus_breaker(
summary_statistics: SummaryStatistics,
baseline_pvalue_cutoff: float,
distance_cutoff: int,
pvalue_cutoff: float,
flanking_distance: int,
) -> StudyLocus:
"""Identify GWAS associated loci based on the provided p-value and distance cutoff.
- The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff.
- The distance is only calculted between single point associations that reach the baseline p-value cutoff.
- As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position.
- To ensure completeness, the locus is extended by a flanking distance in both ends.
Args:
summary_statistics (SummaryStatistics): Input summary statistics dataset.
baseline_pvalue_cutoff (float): baseline significance we consider for the locus.
distance_cutoff (int): minimum distance that separates two loci.
pvalue_cutoff (float): the minimum significance the locus should have.
flanking_distance (int): the distance to extend the locus in both directions.
Returns:
StudyLocus: clumped study loci with locus start and end positions + lead variant from the locus.
"""
# Extract columns from the summary statistics:
columns_sumstats_columns = summary_statistics.df.columns
# Convert pvalue_cutoff to neglog scale:
neglog_pv_cutoff = -np.log10(pvalue_cutoff)

# First window to calculate the distance between consecutive positions:
w1 = Window.partitionBy("studyId", "chromosome").orderBy("position")

# Second window to calculate the locus start and end:
w2 = (
Window.partitionBy("studyId", "chromosome", "locusStart")
.orderBy("position")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
)

# Third window to rank the variants within the locus based on neglog p-value to find top loci:
w3 = Window.partitionBy(
"studyId", "chromosome", "locusStart", "locusEnd"
).orderBy(f.col("negLogPValue").desc())

return StudyLocus(
_df=(
# Applying the baseline p-value cutoff:
summary_statistics.pvalue_filter(baseline_pvalue_cutoff)
# Calculating the neglog p-value for easier sorting:
.df.withColumn(
"negLogPValue",
calculate_neglog_pvalue(
f.col("pValueMantissa"), f.col("pValueExponent")
),
).otherwise(f.lit(0)),
)
.withColumn(
"locusEnd", f.max(f.col("position") + flankig_distance).over(w2)
)
# Calculating the distance between consecutive positions, then identifying the locus start and end:
.withColumn("next_position", f.lag(f.col("position")).over(w1))
.withColumn("distance", f.col("position") - f.col("next_position"))
.withColumn(
"locusStart",
f.when(
(f.col("distance") > distance_cutoff)
| f.col("distance").isNull(),
f.col("position"),
),
)
.withColumn(
"locusStart",
f.when(
f.last(f.col("locusStart") - flanking_distance, True).over(
w1.rowsBetween(-sys.maxsize, 0)
)
> 0,
f.last(f.col("locusStart") - flanking_distance, True).over(
w1.rowsBetween(-sys.maxsize, 0)
),
).otherwise(f.lit(0)),
)
.withColumn(
"locusEnd", f.max(f.col("position") + flanking_distance).over(w2)
)
.withColumn("rank", f.rank().over(w3))
.filter(
(f.col("rank") == 1) & (f.col("negLogPValue") > neglog_pv_cutoff)
)
.select(
*columns_sumstats_columns,
# To make sure that the type of locusStart and locusEnd follows schema of StudyLocus:
f.col("locusStart").cast(t.IntegerType()).alias("locusStart"),
f.col("locusEnd").cast(t.IntegerType()).alias("locusEnd"),
f.lit(None)
.cast(t.ArrayType(t.StringType()))
.alias("qualityControls"),
StudyLocus.assign_study_locus_id(
f.col("studyId"), f.col("variantId")
).alias("studyLocusId"),
)
),
_schema=StudyLocus.get_schema(),
)

@staticmethod
def process_locus_breaker_output(
study_locus: StudyLocus,
sum_stats: SummaryStatistics,
large_loci_size: int,
wbc_clump_distance: int,
gwas_threshold: float,
) -> StudyLocus:
"""Process the locus breaker method result, and run window-based clumping on large loci.
Args:
study_locus (StudyLocus): StudyLocus object with locus start and end positions.
sum_stats (SummaryStatistics): Input summary statistics dataset.
large_loci_size (int): the size to define large loci which should be broken with wbc.
wbc_clump_distance (int): Clump distance for window-based clumping.
gwas_threshold (float): P-value threshold to be used in window-based clumping.
Returns:
StudyLocus: clumped study loci with large loci broken by window-based clumping.
"""
small_loci = study_locus.filter(
(f.col("locusEnd") - f.col("locusStart")) <= large_loci_size
)
large_loci = study_locus.filter(
(f.col("locusEnd") - f.col("locusStart")) > large_loci_size
)
large_loci_wbc = (
SummaryStatistics(
sum_stats.df.alias("ss")
.join(
large_loci.df.alias("ll"),
(f.col("ss.studyId") == f.col("ll.studyId"))
& (f.col("ss.chromosome") == f.col("ll.chromosome"))
& (f.col("ss.position") >= f.col("ll.locusStart"))
& (f.col("ss.position") <= f.col("ll.locusEnd")),
"inner",
)
.select([f.col("ss." + col) for col in sum_stats.df.columns]),
SummaryStatistics.get_schema(),
)
.withColumn("rank", f.rank().over(w3))
.filter((f.col("rank") == 1) & (f.col("negLogPValue") > neglog_pv_cutoff))
.select(
*columns_sumstats_columns,
# To make sure that the type of locusStart and locusEnd follows schema of StudyLocus:
f.col("locusStart").cast(t.IntegerType()).alias("locusStart"),
f.col("locusEnd").cast(t.IntegerType()).alias("locusEnd"),
StudyLocus.assign_study_locus_id(
f.col("studyId"), f.col("variantId")
).alias("studyLocusId"),
.window_based_clumping(wbc_clump_distance, gwas_threshold)
.df.withColumns(
{
"locusStart": f.col("position") - large_loci_size // 2,
"locusEnd": f.col("position") + large_loci_size // 2,
}
)
),
_schema=StudyLocus.get_schema(),
)
)

return StudyLocus(
large_loci_wbc.unionByName(small_loci.df),
StudyLocus.get_schema(),
)

0 comments on commit 5f97232

Please sign in to comment.