Source code for genepi.step2_estimateLD

# -*- coding: utf-8 -*-
"""
Created on Feb 2018

@author: Chester (Yu-Chuan Chang)
"""

""""""""""""""""""""""""""""""
# import libraries
""""""""""""""""""""""""""""""
import os
import sys
import numpy as np

""""""""""""""""""""""""""""""
# define functions 
""""""""""""""""""""""""""""""
[docs]def EstimateAlleleFrequency(gen_snp): """ A function for estimating allele frequency of a single varaint Args: gen_snp (list): The genotypes of a variant of all samples Returns: (tuple): tuple containing: - float_frequency_A (float): The reference allele type frequency - float_frequency_B (float): The alternative allele type frequency """ ### get all subject's genotype list_snp = gen_snp.split(" ")[5:] ### get the number of subjects int_num_subject = int(len(list_snp)/3) ### generate count table (AA, AB, BB) list_count = [0, 0, 0] for idx_subject in range(0, int_num_subject): idx_col = np.argmax(list_snp[idx_subject * 3: idx_subject * 3 + 3]) list_count[idx_col] = list_count[idx_col] + 1 ### calculate allele frequency ### frequency of A = AA + AB/2 ### frequency of B = BB + AB/2 float_frequency_A = float(list_count[0] + list_count[1] / 2) / int_num_subject float_frequency_B = float(list_count[2] + list_count[1] / 2) / int_num_subject return float_frequency_A, float_frequency_B
[docs]def EstimatePairwiseLD(gen_snp_1, gen_snp_2): """ Lewontin (1964) linkage disequilibrium (LD) estimation. Args: gen_snp_1 (list): The genotypes of first variant of all samples gen_snp_2 (list): The genotypes of second variant of all samples Returns: (tuple): tuple containing: - float_D_prime (float): The DPrime of these two variants - float_R_square (float): The RSquare of these two variants """ ### get all subject's genotype list_snp1 = gen_snp_1.split(" ")[5:] list_snp2 = gen_snp_2.split(" ")[5:] ### get the number of subjects int_num_subject = int(len(list_snp1)/3) ### generate contigency table ### row: SNP1_AA; SNP1_Aa; SNP1_aa ### col: SNP2_bb; SNP2_Bb; SNP2_bb np_contigency = np.zeros((3, 3), np.dtype(int)) for idx_subject in range(0, int_num_subject): idx_row = np.argmax(np.array(list_snp1[idx_subject * 3: idx_subject * 3 + 3])) idx_col = np.argmax(np.array(list_snp2[idx_subject * 3: idx_subject * 3 + 3])) np_contigency[idx_row, idx_col] = np_contigency[idx_row, idx_col] + 1 ### estimate single locus haplotyes ### snp1_A = (AABB + AABb + AAbb) + (AaBB + AaBb + Aabb)/2; snp1_a = snp1_A - 1 float_probability_A = float(np.sum(np_contigency[0, :]) + float(np.sum(np_contigency[1, :])) / 2) / int_num_subject float_probability_a = 1 - float_probability_A ### snp2_B = (AABB + AaBB + aaBB) + (AABb + AaBb + aaBb)/2; snp2_b = snp2_B - 1 float_probability_B = float(np.sum(np_contigency[:, 0]) + float(np.sum(np_contigency[:, 1])) / 2) / int_num_subject float_probability_b = 1 - float_probability_B ### set arbitrary probability of AB float_probability_AB = float_probability_A * float_probability_B try: ### EM algorithm for idx_loop in range(0, 10000): ### E(num_AB|prob_AB) = 2 * num_AABB + num_AABb + num_AaBB + ### (prob_AB * (1 + prob_AB - prob_A - prob_B) * num_AbBb) / ### ((prob_A - prob_AB) * (prob_B - prob_AB) + prob_AB * (1 + prob_AB - prob_A - prob_B)) float_num_AB_estimateByEM = 2 * float(np_contigency[0, 0]) + float(np_contigency[0, 1]) + float(np_contigency[1, 0]) + (float_probability_AB * (1 + float_probability_AB - float_probability_A - float_probability_B) * float(np_contigency[1, 1])) / ((float_probability_A - float_probability_AB) * (float_probability_B - float_probability_AB) + float_probability_AB * (1 + float_probability_AB - float_probability_A - float_probability_B)) float_probability_AB_estimateByEM = float_num_AB_estimateByEM / (int_num_subject * 2) if abs(float_probability_AB_estimateByEM - float_probability_AB) < 0.0000001: break else: float_probability_AB = float_probability_AB_estimateByEM ### calculate D float_D = float_probability_AB - float_probability_A * float_probability_B ### calculate D prime if float_D >= 0: float_D_min = min([float_probability_A * (1 - float_probability_B), (1 - float_probability_A) * float_probability_B]) else: float_D_min = max([-float_probability_A * float_probability_B, -(1 - float_probability_A) * (1 - float_probability_B)]) float_D_prime = float_D / float_D_min ### calculate R square float_R_square = (float_D**2) / (float_probability_A * float_probability_a * float_probability_B * float_probability_b) return float_D_prime, float_R_square except ZeroDivisionError: return 1.0, 1.0
"""""""""""""""""""""""""""""" # main function """"""""""""""""""""""""""""""
[docs]def EstimateLDBlock(str_inputFileName_genotype, str_outputFilePath = "", float_threshold_DPrime = 0.8, float_threshold_RSquare = 0.8): """ A function for implementing linkage disequilibrium (LD) dimension reduction. In genotype data, a variant often exhibits high dependency with its nearby variants because of LD. In the practical implantation, we prefer to group these dependent features to reduce the dimension of features. In other words, we can take the advantages of LD to reduce the dimensionality of genetic features. In this regard, this function adopted the same approach developed by Lewontin (1964) to estimate LD. We used D’ and r2 as the criteria to group highly dependent genetic features as blocks. In each block, we chose the features with the largest minor allele frequency to represent other features in the same block. Args: str_inputFileName_genotype (str): File name of input genotype data str_outputFilePath (str): File path of output file float_threshold_DPrime (float): The Dprime threshold for discriminating a LD block (default: 0.8) float_threshold_RSquare (float): The RSquare threshold for discriminating a LD block (default: 0.8) Returns: - Expected Success Response:: "step2: Estimate LD. DONE!" """ ### set default output path if str_outputFilePath == "": str_outputFilePath = os.path.dirname(str_inputFileName_genotype) ### get the number of snp int_num_snp = sum(1 for line in open(str_inputFileName_genotype)) ### read .gen file and estimate the LD block list_outputLDBlock = [] with open(str_inputFileName_genotype, "r") as file_inputFile: with open(os.path.join(str_outputFilePath, os.path.basename(str_inputFileName_genotype).replace(".gen", "_LDReduced.gen")), "w") as file_outputFile: ### create dictionary for LD block ### key: rsID; value:[minor allele requency, raw genotypes data] dict_thisLDBlock = {} ### put first snp into dictionary line_previousSnp = file_inputFile.readline() list_previousSnp = line_previousSnp.strip().split(" ") dict_thisLDBlock[list_previousSnp[1]] = [min(EstimateAlleleFrequency(line_previousSnp)), line_previousSnp] ### scan all other snps int_count_snp = 1 for line in file_inputFile: list_thisSnp = line.strip().split(" ") ### estimate pairwise LD for all of the snps in dictionary bool_flag_inLD = True for key in dict_thisLDBlock.keys(): float_DPrime, float_RSquare = EstimatePairwiseLD(dict_thisLDBlock[key][1], line) if float_DPrime < float_threshold_DPrime or float_RSquare < float_threshold_RSquare: bool_flag_inLD = False break ### if this snp not in this LD block, then output and clear the content of dictionary if bool_flag_inLD == False: ### find a snp with maximum minor allele frequency to be representative snp str_representative_rsid = list(dict_thisLDBlock.keys())[0] for key in dict_thisLDBlock.keys(): if dict_thisLDBlock[key][0] > dict_thisLDBlock[str_representative_rsid][0]: str_representative_rsid = key list_outputLDBlock.append(str_representative_rsid + ":" + ",".join(dict_thisLDBlock.keys())) file_outputFile.writelines(dict_thisLDBlock[str_representative_rsid][1]) dict_thisLDBlock.clear() ### add this snp to current dictionary dict_thisLDBlock[list_thisSnp[1]] = [min(EstimateAlleleFrequency(line)), line] ### show progress int_count_snp = int_count_snp + 1 str_print = "step2: Processing: " + "{0:.2f}".format(float(int_count_snp) / int_num_snp * 100) + "%" sys.stdout.write('%s\r' % str_print) sys.stdout.flush() ### output the final LD block in dictionary ### find a snp with maximum minor allele frequency to be representative snp str_representative_rsid = list(dict_thisLDBlock.keys())[0] for key in dict_thisLDBlock.keys(): if dict_thisLDBlock[key][0] > dict_thisLDBlock[str_representative_rsid][0]: str_representative_rsid = key list_outputLDBlock.append(str_representative_rsid + ":" + ",".join(dict_thisLDBlock.keys())) file_outputFile.writelines(dict_thisLDBlock[str_representative_rsid][1]) ### output the file of LD block ### output file format: rsid_representative: rsid_1,rsid_2,rsid_3,...(the snps in the same LD block) with open(os.path.join(str_outputFilePath, os.path.basename(str_inputFileName_genotype).replace(".gen", ".LDBlock")), "w") as file_outputFile: for item in list_outputLDBlock: file_outputFile.writelines(item + "\n") print("step2: Estimate LD. DONE! \t\t\t\t")