Skip to content

Commit

Permalink
feat(stydyLocus): adding new locus collection using boundaries (#644)
Browse files Browse the repository at this point in the history
* feat(stydyLocus): adding new locus collection using boundaries

* fix: fix in test

* Update tests/gentropy/dataset/test_study_locus.py

Co-authored-by: Szymon Szyszkowski <[email protected]>

* chore: pre-commit auto fixes [...]

---------

Co-authored-by: Szymon Szyszkowski <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 19, 2024
1 parent 6d93192 commit 2fd6d1f
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 2 deletions.
65 changes: 65 additions & 0 deletions src/gentropy/dataset/study_locus.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,68 @@ def _qc_no_population(self: StudyLocus) -> StudyLocus:
),
)
return self

def annotate_locus_statistics_boundaries(
self: StudyLocus,
summary_statistics: SummaryStatistics,
) -> StudyLocus:
"""Annotates study locus with summary statistics in the specified boundaries - locusStart and locusEnd.
Args:
summary_statistics (SummaryStatistics): Summary statistics to be used for annotation.
Returns:
StudyLocus: Study locus annotated with summary statistics in `locus` column. If no statistics are found, the `locus` column will be empty.
"""
# The clumps will be used several times (persisting)
self.df.persist()
# Renaming columns:
sumstats_renamed = summary_statistics.df.selectExpr(
*[f"{col} as tag_{col}" for col in summary_statistics.df.columns]
).alias("sumstat")

locus_df = (
sumstats_renamed
# Joining the two datasets together:
.join(
f.broadcast(
self.df.alias("clumped").select(
"position",
"chromosome",
"studyId",
"studyLocusId",
"locusStart",
"locusEnd",
)
),
on=[
(f.col("sumstat.tag_studyId") == f.col("clumped.studyId"))
& (f.col("sumstat.tag_chromosome") == f.col("clumped.chromosome"))
& (f.col("sumstat.tag_position") >= (f.col("clumped.locusStart")))
& (f.col("sumstat.tag_position") <= (f.col("clumped.locusEnd")))
],
how="inner",
)
.withColumn(
"locus",
f.struct(
f.col("tag_variantId").alias("variantId"),
f.col("tag_beta").alias("beta"),
f.col("tag_pValueMantissa").alias("pValueMantissa"),
f.col("tag_pValueExponent").alias("pValueExponent"),
f.col("tag_standardError").alias("standardError"),
),
)
.groupBy("studyLocusId")
.agg(
f.collect_list(f.col("locus")).alias("locus"),
)
)

self.df = self.df.drop("locus").join(
locus_df,
on="studyLocusId",
how="left",
)

return self
4 changes: 2 additions & 2 deletions tests/gentropy/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,13 @@ def mock_study_locus_data(spark: SparkSession) -> DataFrame:
)
.withSchema(sl_schema)
.withColumnSpec("chromosome", percentNulls=0.1)
.withColumnSpec("position", percentNulls=0.1)
.withColumnSpec("position", minValue=100, percentNulls=0.1)
.withColumnSpec("beta", percentNulls=0.1)
.withColumnSpec("effectAlleleFrequencyFromSource", percentNulls=0.1)
.withColumnSpec("standardError", percentNulls=0.1)
.withColumnSpec("subStudyDescription", percentNulls=0.1)
.withColumnSpec("pValueMantissa", minValue=1, percentNulls=0.1)
.withColumnSpec("pValueExponent", minValue=1, percentNulls=0.1)
.withColumnSpec("pValueExponent", minValue=-10, percentNulls=0.1)
.withColumnSpec(
"qualityControls",
expr="array(cast(rand() as string))",
Expand Down
14 changes: 14 additions & 0 deletions tests/gentropy/dataset/test_study_locus.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,17 @@ def test_filter_ld_set(spark: SparkSession) -> None:
assert (
observed_df.filter(f.size("ldSet") > 1).count() == expected_tags_in_ld
), "Expected tags in ld set differ from observed."


def test_annotate_locus_statistics_boundaries(
mock_study_locus: StudyLocus, mock_summary_statistics: SummaryStatistics
) -> None:
"""Test annotate locus statistics returns a StudyLocus."""
df = mock_study_locus.df
df = df.withColumn("locusStart", f.col("position") - 10)
df = df.withColumn("locusEnd", f.col("position") + 10)
slt = StudyLocus(df, StudyLocus.get_schema())
assert isinstance(
slt.annotate_locus_statistics_boundaries(mock_summary_statistics),
StudyLocus,
)

0 comments on commit 2fd6d1f

Please sign in to comment.