在[[06-激动人心的新线索]]我们提到,通过构建clumpFinding
函数,我们可以得到一个指定的 k-mer (L,t)-clump
,但一般我习惯这里的t 取最大。即k-mer 可以出现的最大次数。
然而,我们时间复杂度大约是O(genome*L
) 的算法,在四万百大小的碱基面前,似乎也过于迟钝了。
我们能否提升一下这个算法的运算效率呢?
再来回顾一下之前的定义:某个长度L区域内,k长度的碱基重复t次,并且t 是该长度下重复出现最大的次数,为k-mer (L,t)-clump
。这个k-mer
则是重复出现长度为k 的片段,即在k 长度下,该片段重复次数最多。
假设现在有一个长度为14的序列:AAGCAAAGGTGGG。
我们对其计算L=13, 2-mer 的频数表,因此两次移动分别是:有很多的内容其实是一样的!因为移动只是走过了第一个,新过了一个,所以中间的 L-2 是一样的~ Text = AAGCAAAGGTGGG Text′ = AGCAAAGGTGGGC
所以前一个L 的array 与后一个array 的差别,仅仅在于,开头少了一个k-mer,结尾多了一个k-mer:
另外,这种情况,只有结尾增加的k-mer 在不断增大,只要用它的count跟本来L 窗口内的maxcount 比较即可获得出现最大次数的k-mer。
这里作者使用的方法是建立在[[04-算法01-频繁出现的秘密]] 中提到的第五个部分中的代码,其核心是建立一个四进制十进制转换的静态的array。
需要注意的是,课程的方法是直接将所有的ATCG 四进制组合转换为了十进制的index,因此,其并未考虑互补片段这个特例。因为我的重点在于动态的array 那个方法,所以这部分就不特别处理了。
全部代码在下面,我就不展开它了:
import sys
import time
text_path = sys.argv[1]
with open(text_path) as a:
text = a.read()
k = int(sys.argv[2])
L = int(sys.argv[3])
def PatterntoNumber(pattern):
if len(pattern) == 0:
return 0
numberdict = {'A':0, 'C':1, 'G':2, 'T':3}
symbol = pattern[-1]
prefix = pattern[:-1]
return 4 * PatterntoNumber(prefix) + numberdict[symbol]
def NumbertoPattern(index, k):
pattern = ''
numberdict = {0:'A', 1:'C', 2:'G', 3:'T'}
if k == 1:
return numberdict[index]
while index != 0:
prefixIndex = index//4
remainder = index % 4
pattern = numberdict[remainder] + pattern
index = prefixIndex
while len(pattern) < k:
pattern = 'A' + pattern
return pattern
def ComputingFrequencies(Text, k):
frequencyarray = [0] * 4**k
for i in range(len(Text) - k + 1):
pattern = Text[i:i+k]
j = PatterntoNumber(pattern)
frequencyarray[j] += 1
return frequencyarray
def BetterClumpFinding(genome, k, L):
genome = genome.upper()
max_frequentpattern = {}
FrequencyArray = ComputingFrequencies(genome[:L], k)
max_count = max(FrequencyArray)
for i in range(4**k):
if FrequencyArray[i] == max_count:
max_frequentpattern[NumbertoPattern(i, k)] = max_count
for i in range(len(genome)-L):
firstnumber = PatterntoNumber(genome[i:i+k]) # 开头那个
FrequencyArray[firstnumber] = FrequencyArray[firstnumber] - 1
lastnumber = PatterntoNumber(genome[i+L-k+1:i+L+1]) # 结尾那个
FrequencyArray[lastnumber] = FrequencyArray[lastnumber] + 1
if FrequencyArray[lastnumber] >= max_count:
max_count = max(FrequencyArray)
max_frequentpattern[NumbertoPattern(lastnumber, k)] = max_count
max_count2 = max(max_frequentpattern.values())
frequent_patterns = set([k for k,v in max_frequentpattern.items() if v==max_count2])
frequent_patterns2 = {max_count : frequent_patterns, "times" : len(frequent_patterns)}
return(frequent_patterns2)
def main():
time_start=time.time()
print(BetterClumpFinding(text, k, L))
time_end=time.time()
print('totally cost %f seconds' % (time_end-time_start))
if __name__ == "__main__":
main()
python3 ../3-better_clumping/step07_better_clump_finding1.py ./Input/E_coli.txt 9 500
{9: {'CGGGGAACT', 'GCTGGCGCG', 'GGCGCGGGG', 'ATCCCCGCT', 'CGCGGGGAA', 'CCCCGCTGG', 'TTATCCCCG', 'CGGTTTATC', 'GCGGGGAAC', 'GGGGAACTC', 'TATCCCCGC', 'TGGCGCGGG', 'GGTTTATCC', 'GCGCGGGGA', 'CCGCTGGCG', 'TTTATCCCC', 'TCCCCGCTG', 'CGCTGGCGC', 'CTGGCGCGG', 'GTTTATCCC', 'CCCGCTGGC'}, 'times': 21}
totally cost 102.834948 seconds
4W 长度的碱基,500 的clump,一共有21 个9-mer。
回顾一下之前代码的主体部分:
for i in range(len(genome) - L + 1):
gene = genome[i:i+L]
max_frequency_array_tmp = fasterFrequentWords(gene, k)
if i > 0:
tmp_pattern = max_frequency_array_tmp.keys()
array_pattern = max_frequency_array.keys()
intersect_pattern = [x for x in tmp_pattern if x in array_pattern]
diff_pattern = tmp_pattern - array_pattern
diff_array = {k:v for k,v in max_frequency_array_tmp.items() if k in diff_pattern}
for x in intersect_pattern:
if max_frequency_array_tmp[x] > max_frequency_array[x]:
max_frequency_array[x] = max_frequency_array_tmp[x]
max_frequency_array.update(diff_array)
else:
max_frequency_array.update(max_frequency_array_tmp)
正如我们上面说的,max_frequency_array_tmp 完全不必通过fasterFrequentWords 函数重头计算。那么,该如何操作呢?
来看看主体:
def betterclumpFinding2(genome, k, L):
genome = genome.lower()
frequent_patterns_location = {}
max_frequency_array = {}
for i in range(len(genome) - L + 1):
if i == 0:
gene = genome[i:i+L]
max_frequency_array_tmp = fasterFrequentWords(gene, k)
max_frequency_array.update(max_frequency_array_tmp)
max_count = max(max_frequency_array.values())
else:
first_pattern = genome[i-1:i+k-1]
last_pattern = genome[i+L-k:i+L]
if first_pattern in max_frequency_array_tmp:
max_frequency_array_tmp[first_pattern] -= 1
if last_pattern in max_frequency_array_tmp:
max_frequency_array_tmp[last_pattern] += 1
if max_frequency_array_tmp[last_pattern] >= max_count:
max_count = max_frequency_array_tmp[last_pattern]
max_frequency_array[last_pattern] = max_frequency_array_tmp[last_pattern]
else:
max_frequency_array_tmp[last_pattern] = 1
max_count = max(max_frequency_array.values())
frequent_patterns = [k for k,v in max_frequency_array.items() if v==max_count]
# for i in frequent_patterns:
# location = patternLocalizing(genome, i)
# frequent_patterns_location[i] = {"location" : location,
# "rev_pattern" : reverse_complement_2(i)}
# frequent_patterns_location["counts"] = max_count
# return(frequent_patterns_location)
frequent_patterns2 = {max_count : frequent_patterns, "pattern_number" : len(frequent_patterns)}
return(frequent_patterns2)
同样是进行判断,如果last_pattern 存在于我们开始定义的max_array中,那么对它动态的进行加一,它的计数如果比max_count 还大,那就把它覆盖之前max_array的数值;同理,如果是不存在的last_pattern,在这样动态的过程里,也会不断改变其大小,包括last_pattern 也会变成first_pattern 因而减一。
全部代码:
import sys
import time
text_path = sys.argv[1]
with open(text_path) as a:
text = a.read()
k = int(sys.argv[2])
L = int(sys.argv[3])
def reverse_complement_2(pattern):
re_pattern = ''
dict = {'a':'t', 't':'a', 'c':'g', 'g':'c'}
for i in pattern:
re_pattern += dict[i]
return re_pattern[::-1]
def fasterFrequentWords(Text, k):
frequency_array = {}
for i in range(len(Text) - k + 1):
pattern = Text[i:i+k]
# rev_pattern = reverse_complement_2(pattern)
if pattern in frequency_array:
frequency_array[pattern] += 1
# elif rev_pattern in frequency_array:
# frequency_array[rev_pattern] += 1
else:
frequency_array[pattern] = 1
max_count = max(frequency_array.values())
max_frequency_array_tmp = {k:v for k,v in frequency_array.items() if v==max_count}
return max_frequency_array_tmp
def patternLocalizing(Text, Pattern):
# rev_pattern = reverse_complement_2(Pattern)
count = []
for i in range(len(Text) - len(Pattern) - 1):
if Pattern == Text[i:i+len(Pattern)] :
# if Pattern == Text[i:i+len(Pattern)] or rev_pattern == Text[i:i+len(Pattern)]:
count.append(i)
return count
def betterclumpFinding2(genome, k, L):
genome = genome.lower()
frequent_patterns_location = {}
max_frequency_array = {}
for i in range(len(genome) - L + 1):
if i == 0:
gene = genome[i:i+L]
max_frequency_array_tmp = fasterFrequentWords(gene, k)
max_frequency_array.update(max_frequency_array_tmp)
max_count = max(max_frequency_array.values())
else:
first_pattern = genome[i-1:i+k-1]
last_pattern = genome[i+L-k:i+L]
if first_pattern in max_frequency_array_tmp:
max_frequency_array_tmp[first_pattern] -= 1
if last_pattern in max_frequency_array_tmp:
max_frequency_array_tmp[last_pattern] += 1
if max_frequency_array_tmp[last_pattern] >= max_count:
max_count = max_frequency_array_tmp[last_pattern]
max_frequency_array[last_pattern] = max_frequency_array_tmp[last_pattern]
else:
max_frequency_array_tmp[last_pattern] = 1
max_count = max(max_frequency_array.values())
frequent_patterns = [k for k,v in max_frequency_array.items() if v==max_count]
# for i in frequent_patterns:
# location = patternLocalizing(genome, i)
# frequent_patterns_location[i] = {"location" : location,
# "rev_pattern" : reverse_complement_2(i)}
# frequent_patterns_location["counts"] = max_count
# return(frequent_patterns_location)
frequent_patterns2 = {max_count : frequent_patterns, "pattern_number" : len(frequent_patterns)}
return(frequent_patterns2)
def main():
time_start=time.time()
result = betterclumpFinding2(text, k, L)
print(result)
time_end=time.time()
print('totally cost %f seconds' % (time_end-time_start))
if __name__ == "__main__":
main()
精彩了,比课程方法还快了10倍左右:
% python3 ../3-better_clumping/step08_better_clump_finding2.py ./Input/E_coli.txt 9 500
{9: ['cggtttatc', 'ggtttatcc', 'gtttatccc', 'gcggggaac', 'ggggaactc', 'cgcggggaa', 'cggggaact', 'tttatcccc', 'ttatccccg', 'tatccccgc', 'atccccgct', 'tccccgctg', 'ccccgctgg', 'cccgctggc', 'ccgctggcg', 'cgctggcgc', 'gctggcgcg', 'ctggcgcgg', 'tggcgcggg', 'ggcgcgggg', 'gcgcgggga'], 'pattern_number': 21}
totally cost 13.634644 seconds
对比一下:
python3 ../3-better_clumping/step07_better_clump_finding1.py ./Input/E_coli.txt 9 500
{9: {'CGGGGAACT', 'GCTGGCGCG', 'GGCGCGGGG', 'ATCCCCGCT', 'CGCGGGGAA', 'CCCCGCTGG', 'TTATCCCCG', 'CGGTTTATC', 'GCGGGGAAC', 'GGGGAACTC', 'TATCCCCGC', 'TGGCGCGGG', 'GGTTTATCC', 'GCGCGGGGA', 'CCGCTGGCG', 'TTTATCCCC', 'TCCCCGCTG', 'CGCTGGCGC', 'CTGGCGCGG', 'GTTTATCCC', 'CCCGCTGGC'}, 'times': 21}
totally cost 102.834948 seconds
更何况,我们的动态array 还不会创建一个4**k
大小的array,而且如果在代码中加入检查count 为0 则删除这段键值对,那么空间节约的还可以更多。
你可以算一下,3、4 方法中的时间复杂度吗?
[1]
生物信息学 | Coursera: https://www.coursera.org/specializations/bioinformatics#courses