Skip to content

Commit

Permalink
Add draft implementation of BEAGLE 4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
szhan committed Aug 17, 2023
1 parent 12aa711 commit 20c3bf2
Showing 1 changed file with 135 additions and 0 deletions.
135 changes: 135 additions & 0 deletions python/tests/test_beagle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""
Implementation of the BEAGLE 4.1 algorithm to impute genotypes.
"""
import numpy as np


def set_hmm_parameters(ref_h, site_pos, error_rate):
"""
Set HMM parameters as per BEAGLE 4.1.
:param ref_h: Panel of reference haplotypes.
:param site_pos: Genomic site positions.
:param error_rate: Mismatch probability.
:return: Switch probabilities and mismatch probabilities.
:rtype: list(numpy.ndarray, numpy.ndarray)
"""
assert ref_h.shape[0] == len(site_pos)
assert 0 <= error_rate <= 0.5
h = ref_h.shape[1]
# Define probability of switching for each genotyped marker site.
MIN_CM_DIST = 1e-7
Ne = 10 # By default, BEAGLE sets this to 1e6
# 1 cM / 1 Mb if no map is provided.
gen_pos = site_pos / 1e6
gen_pos = np.concatenate([[0], gen_pos])
gen_dist = np.maximum(
# In BEAGLE, current genetic distance minus previous genetic distance.
np.abs(gen_pos[1:] - gen_pos[:-1]),
np.repeat(MIN_CM_DIST, len(gen_pos) - 1),
)
rho = -np.expm1(-(0.04 * Ne / h) * gen_dist)
# Define probability of mismatch for each genotyped marker site.
mu = np.repeat(error_rate, len(site_pos))
return [rho, mu]


def compute_forward_probability_matrix(ref_h, query_h, rho, mu):
"""
Implement the forward algorithm in BEAGLE 4.1.
:param ref_h: Panel of reference haplotypes.
:param query_h: One query haplotype.
:param rho: Per-site switch probability.
:param mu: Per-site mismatch probability.
:return: Forward probability matrix.
:rtype: numpy.ndarray
"""
assert ref_h.shape[0] == len(query_h)
assert ref_h.shape[0] == len(rho) == len(mu)
assert np.max(rho) <= 1 and np.min(rho) >= 0
# BEAGLE caps mismatch probabilities at 0.5.
assert np.max(mu) <= 0.5 and np.min(mu) >= 0
m = ref_h.shape[0]
h = ref_h.shape[1]
fm = np.zeros((m, h), dtype=np.float64)
last_sum = 1.0 # Part of normalization factor
for i in np.arange(m):
# Get site-specific parameters
shift = rho[i] / h
scale = (1 - rho[i]) / last_sum
query_a = query_h[i]
for j in np.arange(h):
ref_a = ref_h[i, j]
# Get emission probability
em_prob = mu[i] if query_a != ref_a else 1.0 - mu[i]
if m == 0:
fm[i, j] = em_prob
else:
fm[i, j] = em_prob * (scale * fm[i - 1, j] + shift)
site_sum = np.sum(fm[i, :])
last_sum = site_sum / h if m == 0 else site_sum
return fm


def compute_backward_probability_matrix(ref_h, query_h, rho, mu):
"""
Implement the backward algorithm in BEAGLE 4.1.
In BEAGLE, the values are kept one site at a time.
:param ref_h: Panel of reference haplotypes.
:param query_h: One query haplotype.
:param rho: Per-site switch probability.
:param mu: Per-site mismatch probability.
:return: Backward probability matrix.
:rtype: numpy.ndarray
"""
assert ref_h.shape[0] == len(query_h)
assert ref_h.shape[0] == len(rho) == len(mu)
assert np.max(rho) <= 1 and np.min(rho) >= 0
# BEAGLE caps mismatch probabilities at 0.5.
assert np.max(mu) <= 0.5 and np.min(mu) >= 0
m = ref_h.shape[0]
h = ref_h.shape[1]
bm = np.zeros((m, h), dtype=np.float64)
# Initialise the last column
bm[m - 1, :] = 1.0 / h
for i in np.arange(m - 2, -1, -1):
sum_site = 0.0
query_a = query_h[i + 1]
for j in np.arange(h):
ref_a = ref_h[i + 1, j]
em_prob = mu[i + 1] if ref_a != query_a else 1.0 - mu[i + 1]
bm[i, j] *= em_prob
sum_site += bm[i, j]
scale = (1 - rho[i + 1]) / sum_site
shift = rho[i + 1] / h
for j in np.arange(h):
bm[i, j] = scale * bm[i, j] + shift
return bm


def compute_state_probability_matrix(fm, bm, ref_h, query_h, rho, mu):
"""
Compute state probability matrix.
"""
assert ref_h.shape[0] == len(query_h)
assert ref_h.shape[0] == len(rho) == len(mu)
assert np.max(rho) <= 1 and np.min(rho) >= 0
# BEAGLE caps mismatch probabilities at 0.5.
assert np.max(mu) <= 0.5 and np.min(mu) >= 0
m = ref_h.shape[0]
h = ref_h.shape[1]
state_probs = np.zeros((m, h), dtype=np.float64)
for i in np.arange(m - 1, 0, -1):
fwd_hap_probs = np.zeros(m, dtype=np.float64)
bwd_hap_probs = np.zeros(m, dtype=np.float64)
for j in np.arange(h):
state_probs[i, j] = fm[i, j] * bm[i, j]
fwd_hap_probs[i, j] += state_probs[i, j] # Not quite
bwd_hap_probs[i, j] += state_probs[i, j]
sum_fwd_hap_probs = np.sum(fwd_hap_probs[i, :])
fwd_hap_probs[i, :] /= sum_fwd_hap_probs
bwd_hap_probs[i, :] /= sum_fwd_hap_probs # Note: forward hap probs
return state_probs

0 comments on commit 20c3bf2

Please sign in to comment.