diff --git a/python/tests/test_beagle.py b/python/tests/test_beagle.py index be1715d67e..8f3c22ddbd 100644 --- a/python/tests/test_beagle.py +++ b/python/tests/test_beagle.py @@ -9,9 +9,22 @@ The reference and target haplotype matrices are of the same size. +h = Number of reference haplotypes. +m = Number of genotyped markers. +x = Number of imputed markers. + +The forward, backward, and HMM state probability matrices are of size (m, h). +The interpolated HMM state probability matrix is of size (x, h). + For simplicity, we assume that: 1. All the sites biallelic (0/1 encoding). 2. The genetic map is defined by equating 1 Mbp to 1 cM. + +For efficiency, BEAGLE uses aggregated markers, which are clusters of markers +within a 0.005 cM interval (default). Because the genotypes are phased, +the alleles in the aggregated markers form distinct allele sequences. +Below, we do not use aggregated markers or allele sequences, which simplifies +the implementation. """ import numpy as np @@ -26,8 +39,8 @@ def convert_to_genetic_map_position(pos): This trivial function is meant for documentation. - :param numpy.ndarray pos: Site positions (bp). - :return: Genetic map positions (cM) + :param numpy.ndarray pos: Site positions. + :return: Genetic map positions. :rtype: numpy.ndarray """ return pos / 1e6 @@ -47,8 +60,7 @@ def get_weights(ref_pos, target_pos): where g(i) = genetic map position of marker i. BEAGLE uses aggregated markers, which are clusters of markers - that are close together (within a specified max genetic distance). - Here, each marker is treated separately. + (within a 0.005 cM interval). Here, each marker is treated separately. :param numpy.ndarray ref_pos: Site positions of genotyped markers. :param numpy.ndarray target_pos: Site positions of imputed markers. @@ -136,14 +148,14 @@ def compute_forward_probability_matrix(ref_h, query_h, rho, mu): :return: Forward probability matrix. :rtype: numpy.ndarray """ - assert ref_h.shape[0] == len(query_h) - assert ref_h.shape[0] == len(rho) - assert ref_h.shape[0] == len(mu) + m = ref_h.shape[0] + h = ref_h.shape[1] + assert m == len(query_h) + assert m == len(rho) + assert m == len(mu) assert 0 <= np.min(rho) and np.max(rho) <= 1 # BEAGLE caps mismatch probabilities at 0.5. assert 0 <= np.min(mu) and np.max(mu) <= 0.5 - m = ref_h.shape[0] - h = ref_h.shape[1] fm = np.zeros((m, h), dtype=np.float64) last_sum = 1.0 # Part of normalization factor for i in np.arange(m): @@ -178,13 +190,14 @@ def compute_backward_probability_matrix(ref_h, query_h, rho, mu): :return: Backward probability matrix. :rtype: numpy.ndarray """ - assert ref_h.shape[0] == len(query_h) - assert ref_h.shape[0] == len(rho) == len(mu) - assert np.max(rho) <= 1 and np.min(rho) >= 0 - # BEAGLE caps mismatch probabilities at 0.5. - assert np.max(mu) <= 0.5 and np.min(mu) >= 0 m = ref_h.shape[0] h = ref_h.shape[1] + assert m == len(query_h) + assert m == len(rho) + assert m == len(mu) + assert 0 <= np.min(rho) and np.max(rho) <= 1 + # BEAGLE caps mismatch probabilities at 0.5. + assert 0 <= np.min(mu) and np.max(mu) <= 0.5 bm = np.zeros((m, h), dtype=np.float64) # Initialise the last column bm[m - 1, :] = 1.0 / h @@ -216,16 +229,19 @@ def compute_state_probability_matrix(fm, bm, ref_h, query_h, rho, mu): :param numpy.ndarray query_h: One query haplotype. :param numpy.ndarray rho: Switch probabilities. :param numpy.ndarray mu: Mismatch probabilities. - :return: State probability matrix. + :return: HMM state probability matrix. :rtype: numpy.ndarray """ - assert ref_h.shape[0] == len(query_h) - assert ref_h.shape[0] == len(rho) == len(mu) - assert np.max(rho) <= 1 and np.min(rho) >= 0 - # BEAGLE caps mismatch probabilities at 0.5. - assert np.max(mu) <= 0.5 and np.min(mu) >= 0 m = ref_h.shape[0] h = ref_h.shape[1] + assert m == len(query_h) + assert m == len(rho) + assert m == len(mu) + assert 0 <= np.min(rho) and np.max(rho) <= 1 + # BEAGLE caps mismatch probabilities at 0.5. + assert 0 <= np.min(mu) and np.max(mu) <= 0.5 + assert fm.shape == (m, h) + assert bm.shape == (m, h) state_probs = np.zeros((m, h), dtype=np.float64) for i in np.arange(m - 1, 0, -1): fwd_hap_probs = np.zeros(m, dtype=np.float64) @@ -240,59 +256,59 @@ def compute_state_probability_matrix(fm, bm, ref_h, query_h, rho, mu): return state_probs -def compute_interpolated_state_probability_matrix(sm, ref_pos, target_pos): +def compute_interpolated_state_probability_matrix(sm, genotyped_pos, imputed_pos): """ Compute interpolated HMM state probability matrix. This implementation is based on Equation 1 in BB2016. - The state probability matrix is of size m x h, + The state probability matrix is of size (m, h), where m is the number of genotyped markers and h is the number of reference haplotypes. - The interpolated state probability matrix is of size x x h, - where x is the number of target markers + The interpolated state probability matrix is of size (x, h), + where x is the number of imputed markers and h is the number of reference haplotypes. :param sm: HMM state probability matrix. - :param ref_pos: Site positions of genotyped markers. - :param target_pos: Site positions of target markers. + :param genotyped_pos: Site positions of genotyped markers. + :param imputed_pos: Site positions of imputed markers. :return: Interpolated HMM state probability matrix. :rtype: numpy.ndarray """ m = sm.shape[0] h = sm.shape[1] - x = len(target_pos) - assert len(ref_pos) == m + x = len(imputed_pos) + assert len(genotyped_pos) == m i_sm = np.zeros((x, h), dtype=np.float64) # Interpolated matrix - ref_cm = convert_to_genetic_map_position(ref_pos) - target_cm = convert_to_genetic_map_position(target_pos) - weights = get_weights(ref_cm, target_cm) + genotyped_cm = convert_to_genetic_map_position(genotyped_pos) + imputed_cm = convert_to_genetic_map_position(imputed_pos) + weights = get_weights(genotyped_cm, imputed_cm) assert len(weights) == x for i in np.arange(x): - if target_cm[i] < ref_cm[0]: + if imputed_cm[i] < genotyped_cm[0]: m = 0 - elif target_cm[i] > ref_cm[-1]: + elif imputed_cm[i] > genotyped_cm[-1]: m = -2 else: - m = np.min(np.where(ref_cm > target_cm[i])) - # Vectorisd over reference haplotypes + m = np.min(np.where(genotyped_cm > imputed_cm[i])) + # Vectorised over reference haplotypes i_sm[i, :] = weights[i] * sm[m, :] + (1 - weights[i]) * sm[m + 1, :] return i_sm def compute_allele_probabilities(i_sm, ref_h_target): """ - Compute allele probabilities at target markers. + Compute allele probabilities at imputed markers. - Assuming biallelic sites, it is a matrix of x by 2, - where x is the number of target markers. + Assuming all biallelic sites, it is a matrix of size (x, 2), + where x is the number of imputed markers. This implementation is based on Equation 1 in BB2016. It computes the summand per allele. :param numpy.ndarray i_sm: Interpolated HMM state probability matrix. - :param numpy.ndarray ref_h_target: Reference haplotypes subsetted to target markers. + :param numpy.ndarray ref_h_target: Reference haplotypes subsetted to imputed markers. :return: Allele probabilities. :rtype: numpy.ndarray """