-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcalc_divergence.py
executable file
·172 lines (136 loc) · 5.4 KB
/
calc_divergence.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import os
import sys
import collections
import argparse
from itertools import groupby
__author__ = 'colin anthony'
def py3_fasta_iter(fasta_name):
"""
modified from Brent Pedersen: https://www.biostars.org/p/710/#1412
given a fasta file. yield tuples of header, sequence
"""
fh = open(str(fasta_name), 'r')
faiter = (x[1] for x in groupby(fh, lambda line: line[0] == ">"))
for header in faiter:
# drop the ">"
header_str = header.__next__()[1:].strip()
# join all sequence lines to one.
seq = "".join(s.strip() for s in faiter.__next__())
yield (header_str, seq)
def fasta_to_dct(file_name):
"""
:param file_name: The fasta formatted file to read from.
:return: a dictionary of the contents of the file name given. Dictionary in the format:
{sequence_id: sequence_string, id_2: sequence_2, etc.}
"""
dct = collections.defaultdict(str)
my_gen = py3_fasta_iter(file_name)
for k, v in my_gen:
new_key = k.replace(" ", "_")
if new_key in dct.keys():
print("Duplicate sequence ids found. Exiting")
raise KeyError("Duplicate sequence ids found")
dct[new_key] = str(v).replace("~", "_").upper()
return dct
def gethxb2(dict):
"""
:param dict: a dictionary of your aligned input sequences. Must contain HXB2, with HXB2 in the header
:return: the HXB2 sequence as a string
"""
found = False
hxb2_seq = None
hxb2_key = None
for k in dict.keys():
if "HXB2" in k.upper():
found = True
hxb2_key = k
hxb2_seq = dict[k]
print("Found hxb2 ref. seq. Its full name is: ", hxb2_key)
break
if not found:
print("We could not find a sequence with 'HXB2' in its name. "
"Please make sure you have an HXB2 ref. seq. included")
return str(hxb2_key), str(hxb2_seq)
def customdist(s1, s2):
if len(s1) != len(s2):
print("sequences must be the same length")
sys.exit()
dist = 0
for c1, c2 in zip(s1, s2):
if c1 != c2:
dist += 1
diff = 0
for i in range(len(s1)-1):
if s1[i] != s2[i]:
if (s1[i] == "-" and s1[i+1] == "-" and s2[i+1] != "-") \
or (s2[i] == "-" and s2[i+1] == "-" and s1[i+1] != "-"):
diff += 1
return (dist-diff)
def normcustomdist(s1, s2):
if len(s1) != len(s2):
print("sequences must be the same length")
sys.exit()
dist = 0
for c1, c2 in zip(s1, s2):
if c1 != c2:
dist += 1
diff = 0
for i in range(len(s1)-1):
if s1[i] != s2[i]:
if (s1[i] == "-" and s1[i+1] == "-" and s2[i+1] != "-") \
or (s2[i] == "-" and s2[i+1] == "-" and s1[i+1] != "-"):
diff += 1
dist = dist-diff
normdist = dist / len(s1)
return normdist
def main(ref, align_file, outpath, name):
print(align_file)
# set the outfile name
name = name + "_divergence.csv"
outfile = os.path.join(outpath, name)
# import the reference
ref = fasta_to_dct(ref)
ref_seq = str(list(ref.values())[0]).upper()
ref_name = str(list(ref.keys())[0])
# store all sequnces in a dict
all_sequences = fasta_to_dct(align_file)
# get hxb2 seq and remove it from the dict
hxb2_name, hxb2_seq = gethxb2(all_sequences)
try:
del all_sequences[hxb2_name]
except KeyError:
print("Could not find HXB2 in alignment")
sys.exit()
# write the headings to the outfile
with open(outfile, "w") as handle:
handle.write("Time,Normalised_hamming_distance_adjusted_(changes_per_100_bases),sequence_id\n")
# calculate the divergence from the reference for each sequnce
for seq_name, seq in sorted(all_sequences.items()):
if len(seq) != len(ref_seq):
print("input sequence and reference sequence were not the same length.")
sys.exit()
else:
time = seq_name.split("_")[2][:-3]
# hamdist = distance.hamming(seq, ref_seq, normalized=True)
normadjustdist_perc = round(normcustomdist(seq, ref_seq) * 100, 2)
with open(outfile, "a") as handle:
handle.write(",".join([str(x) for x in [time, normadjustdist_perc, seq_name]]) + "\n")
print("Divergence calculations are complete")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='calculate genetic distance from a reference sequence',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-in', '--infile', type=str, required=True,
help='The path to the fasta file (ie: /path/to/5haplotype')
parser.add_argument('-r', '--reference', type=str, required=True,
help='The fasta file with only the reference sequence. '
'Usually taken from the most abundant haplotype at the first time point')
parser.add_argument('-o', '--outpath', type=str, required=True,
help='The path to the where the outfile should be written')
parser.add_argument('-n', '--name', type=str, required=True,
help='The name of the outfile (no extension')
args = parser.parse_args()
ref = args.reference
infile = args.infile
outpath = args.outpath
name = args.name
main(ref, infile, outpath, name)