Skip to content

Commit

Permalink
Add current work of matrix snp phased ancestry
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesYang007 committed Nov 14, 2023
1 parent 6858b4d commit 731b934
Show file tree
Hide file tree
Showing 16 changed files with 1,310 additions and 229 deletions.
122 changes: 121 additions & 1 deletion adelie/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def create_snp_unphased(
n : int
Number of data points.
p : int
Number of features.
Number of SNPs.
sparsity : float, optional
Proportion of :math:`\\beta` entries to be zeroed out.
Default is ``0.95``.
Expand Down Expand Up @@ -227,4 +227,124 @@ def create_snp_unphased(
"groups": groups,
"group_sizes": group_sizes,
"penalty": penalty,
}


def create_snp_phased_ancestry(
n: int,
s: int,
A: int,
*,
sparsity: float =0.95,
one_ratio: float =0.25,
two_ratio: float =0.05,
zero_penalty: float =0,
snr: float =1,
seed: int =0,
):
"""Creates a SNP Unphased dataset.
The data matrix ``X`` is a phased version of a matrix with sparsity ratio ``1 - one_ratio - two_ratio``
where ``one_ratio`` of the entries are randomly set to ``1``
and ``two_ratio`` are randomly set to ``2``.
The response ``y`` is generated from a linear model :math:`y = X\\beta + \\epsilon`
with :math:`\\epsilon \\sim N(0, \\sigma^2 I_n)`
and :math:`\\beta` such that ``sparsity`` proportion of the entries are set to :math:`0`.
We compute :math:`\\sigma^2` such that the signal-to-noise ratio is given by ``snr``.
The penalty factors are by default set to ``np.sqrt(group_sizes)``,
however if ``zero_penalty > 0``, a random set of penalties will be set to zero,
in which case, ``penalty`` is rescaled such that the :math:`\\ell_2` norm squared is ``p``.
Parameters
----------
n : int
Number of data points.
s : int
Number of SNPs.
A : int
Number of ancestries.
sparsity : float, optional
Proportion of :math:`\\beta` entries to be zeroed out.
Default is ``0.95``.
one_ratio : float, optional
Proportion of the entries of ``X`` that is set to ``1``.
Default is ``0.25``.
two_ratio : float, optional
Proportion of the entries of ``X`` that is set to ``2``.
Default is ``0.05``.
zero_penalty : float, optional
Proportion of ``penalty`` entries to be zeroed out.
Default is ``0``.
snr : float, optional
Signal-to-noise ratio.
Default is ``1``.
seed : int, optional
Random seed.
Default is ``0``.
Returns
-------
data : dict
A dictionary containing the generated data:
- ``"X"``: feature matrix.
- ``"y"``: response vector.
- ``"groups"``: mapping of group index to the starting column index of ``X``.
- ``"group_sizes"``: mapping of group index to the group size.
- ``"penalty"``: penalty factor for each group index.
"""
assert n >= 1
assert s >= 1
assert A >= 1
assert sparsity >= 0 and sparsity <= 1
assert one_ratio >= 0 and one_ratio <= 1
assert two_ratio >= 0 and two_ratio <= 1
assert zero_penalty >= 0 and zero_penalty <= 1
assert snr > 0
assert seed >= 0

np.random.seed(seed)

nnz_ratio = one_ratio + two_ratio
one_ratio = one_ratio / nnz_ratio
two_ratio = two_ratio / nnz_ratio
nnz = int(nnz_ratio * n * s)
nnz_indices = np.random.choice(n * s, nnz, replace=False)
nnz_indices = np.random.permutation(nnz_indices)
one_indices = nnz_indices[:int(one_ratio * nnz)]
two_indices = nnz_indices[int(one_ratio * nnz):]
one_indices = 2 * one_indices + np.random.binomial(1, 0.5, len(one_indices))
two_indices *= 2
X = np.zeros((n, 2 * s), dtype=np.int8)
X.ravel()[one_indices] = 1
X.ravel()[two_indices] = 1
X.ravel()[two_indices + 1] = 1
ancestries = np.zeros((n, 2 * s), dtype=np.int8)
ancestries.ravel()[one_indices] = np.random.choice(A, len(one_indices), replace=True)
ancestries.ravel()[two_indices] = np.random.choice(A, len(two_indices), replace=True)
ancestries.ravel()[two_indices + 1] = np.random.choice(A, len(two_indices), replace=True)

groups = np.arange(s)
group_sizes = np.full(s, A)

penalty = np.sqrt(group_sizes)
penalty[np.random.choice(s, int(zero_penalty * s), replace=False)] = 0
penalty /= np.linalg.norm(penalty) / np.sqrt(s * A)

beta = np.random.normal(0, 1, 2 * s)
beta_nnz_indices = np.random.choice(2 * s, int((1-sparsity) * s * 2), replace=False)
X_sub = X[:, beta_nnz_indices]
beta_sub = beta[beta_nnz_indices]

signal_var = np.dot(beta_sub, np.dot(np.cov(X_sub.T), beta_sub))
noise_scale = np.sqrt(signal_var / snr)
y = X_sub @ beta_sub + noise_scale * np.random.normal(0, 1, n)

return {
"X": X,
"ancestries": ancestries,
"y": y,
"groups": groups,
"group_sizes": group_sizes,
"penalty": penalty,
}
166 changes: 138 additions & 28 deletions adelie/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,10 @@
from .adelie_core import io as core_io


class snp_unphased:
"""IO handler for SNP Unphased data.
Parameters
----------
filename : str
File name to either read from or write to related to the SNP data.
"""
def __init__(
self,
filename,
):
self._core = core_io.IOSNPUnphased(filename)

class snp_base:
def __init__(self, core):
self._core = core

def endian(self):
"""Gets the endianness used in the file.
Expand Down Expand Up @@ -50,6 +40,47 @@ def cols(self):
"""
return self._core.cols()

def read(self):
"""Read and load the matrix from file.
Returns
-------
total_bytes : int
Number of bytes read.
"""
return self._core.read()

def to_dense(self, n_threads: int =1):
"""Creates a dense matrix.
Parameters
----------
n_threads : int, optional
Number of threads.
Default is ``1``.
Returns
-------
dense : (n, p) np.ndarray
Dense matrix.
"""
return self._core.to_dense(n_threads)


class snp_unphased(snp_base):
"""IO handler for SNP Unphased data.
Parameters
----------
filename : str
File name to either read from or write to related to the SNP data.
"""
def __init__(
self,
filename,
):
super().__init__(core_io.IOSNPUnphased(filename))

def outer(self):
"""Gets the outer indexing vector.
Expand Down Expand Up @@ -105,43 +136,121 @@ def value(self, j: int):
"""
return self._core.value(j)

def to_dense(self, n_threads: int =1):
"""Creates a dense matrix.
def write(
self,
calldata: np.ndarray,
n_threads: int =1,
):
"""Write dense array to the file in special format.
Parameters
----------
calldata : (n, p) np.ndarray
SNP unphased calldata in dense format.
n_threads : int, optional
Number of threads.
Default is ``1``.
Returns
-------
dense : (n, p) np.ndarray
Dense matrix.
total_bytes : int
Number of bytes written.
"""
return self._core.to_dense(n_threads)
return self._core.write(calldata, n_threads)

def read(self):
"""Read and load the matrix from file.

class snp_phased_ancestry(snp_base):
"""IO handler for SNP Phased Ancestry data.
Parameters
----------
filename : str
File name to either read from or write to related to the SNP data.
"""
def __init__(
self,
filename,
):
super().__init__(core_io.IOSNPPhasedAncestry(filename))

def outer(self):
"""Gets the outer indexing vector.
Returns
-------
total_bytes : int
Number of bytes read.
outer : (2*s+1,) np.ndarray
Outer indexing vector.
"""
return self._core.read()
return self._core.outer()

def nnz(self, j: int, hap: int):
"""Gets the number of non-zero entries at SNP and haplotype.
Parameters
----------
j : int
SNP index.
hap : int
Haplotype for the corresponding SNP.
Returns
-------
nnz : int
Number of non-zero entries at SNP ``j`` and haplotype ``hap``.
"""
return self._core.nnz(j, hap)

def inner(self, j: int, hap: int):
"""Gets the inner indexing vector at SNP and haplotype.
Parameters
----------
j : int
SNP index.
hap : int
Haplotype for the corresponding SNP.
Returns
-------
inner : np.ndarray
Inner indexing vector at SNP ``j`` and haplotype ``hap``.
"""
return self._core.inner(j, hap)

def ancestry(self, j: int, hap: int):
"""Gets the value vector at a SNP and haplotype.
Parameters
----------
j : int
SNP index.
hap : int
Haplotype for the corresponding SNP.
Returns
-------
v : np.ndarray
Ancestry vector at SNP ``j`` and haplotype ``hap``.
"""
return self._core.ancestry(j, hap)

def write(
self,
calldata: np.ndarray,
ancestries: np.ndarray,
A: int,
n_threads: int =1,
):
"""Write dense array to the file in special format.
"""Write dense arrays to the file in special format.
Parameters
----------
calldata : (n, p) np.ndarray
SNP unphased calldata in dense format.
calldata : (n, 2*s) np.ndarray
SNP phased calldata in dense format.
ancestries : (n, 2*s) np.ndarray
Ancestry in dense format.
A : int
Number of ancestries.
n_threads : int, optional
Number of threads.
Default is ``1``.
Expand All @@ -151,4 +260,5 @@ def write(
total_bytes : int
Number of bytes written.
"""
return self._core.write(calldata, n_threads)
return self._core.write(calldata, ancestries, A, n_threads)

Loading

0 comments on commit 731b934

Please sign in to comment.