Package Bio :: Package SeqIO :: Module AbiIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.AbiIO

  1  # Copyright 2011 by Wibowo Arindrarto (w.arindrarto@gmail.com) 
  2  # Revisions copyright 2011 by Peter Cock. 
  3  # This code is part of the Biopython distribution and governed by its 
  4  # license. Please see the LICENSE file that should have been included 
  5  # as part of this package. 
  6   
  7  """Bio.SeqIO parser for the ABI format. 
  8   
  9  ABI is the format used by Applied Biosystem's sequencing machines to store 
 10  sequencing results.  
 11   
 12  For more details on the format specification, visit: 
 13  http://www.appliedbiosystem.com/support/software_community/ABIF_File_Format.pdf 
 14   
 15  """ 
 16   
 17  __docformat__ = "epytext en" 
 18   
 19  import datetime 
 20  import struct 
 21   
 22  from os.path import basename 
 23  from sys import version_info 
 24   
 25  from Bio import Alphabet 
 26  from Bio.Alphabet.IUPAC import ambiguous_dna, unambiguous_dna 
 27  from Bio.Seq import Seq 
 28  from Bio.SeqRecord import SeqRecord 
 29  from Bio._py3k import _bytes_to_string, _as_bytes 
 30   
 31  # dictionary for determining which tags goes into SeqRecord annotation 
 32  # each key is tag_name + tag_number 
 33  # if a tag entry needs to be added, just add its key and its key 
 34  # for the annotations dictionary as the value 
 35  _EXTRACT = { 
 36              'TUBE1': 'sample_well', 
 37              'DySN1': 'dye', 
 38              'GTyp1': 'polymer', 
 39              'MODL1': 'machine_model', 
 40             } 
 41  # dictionary for tags that require preprocessing before use in creating 
 42  # seqrecords 
 43  _SPCTAGS = [ 
 44              'PBAS2',    # base-called sequence 
 45              'PCON2',    # quality values of base-called sequence 
 46              'SMPL1',    # sample id inputted before sequencing run 
 47              'RUND1',    # run start date 
 48              'RUND2',    # run finish date 
 49              'RUNT1',    # run start time 
 50              'RUNT2',    # run finish time 
 51             ] 
 52  # dictionary for data unpacking format 
 53  _BYTEFMT = { 
 54              1: 'b',     # byte 
 55              2: 's',     # char 
 56              3: 'H',     # word 
 57              4: 'h',     # short 
 58              5: 'i',     # long 
 59              6: '2i',    # rational, legacy unsupported 
 60              7: 'f',     # float 
 61              8: 'd',     # double 
 62              10: 'h2B',  # date 
 63              11: '4B',   # time 
 64              12: '2i2b', # thumb 
 65              13: 'B',    # bool 
 66              14: '2h',   # point, legacy unsupported 
 67              15: '4h',   # rect, legacy unsupported 
 68              16: '2i',   # vPoint, legacy unsupported 
 69              17: '4i',   # vRect, legacy unsupported 
 70              18: 's',    # pString 
 71              19: 's',    # cString 
 72              20: '2i',   # tag, legacy unsupported 
 73             } 
 74  # header data structure (exluding 4 byte ABIF marker) 
 75  _HEADFMT = '>H4sI2H3I' 
 76  # directory data structure 
 77  _DIRFMT = '>4sI2H4I' 
 78   
79 -def AbiIterator(handle, alphabet=None, trim=False):
80 """Iterator for the Abi file format. 81 """ 82 # raise exception is alphabet is not dna 83 if alphabet is not None: 84 if isinstance(Alphabet._get_base_alphabet(alphabet), 85 Alphabet.ProteinAlphabet): 86 raise ValueError("Invalid alphabet, ABI files do not hold proteins.") 87 if isinstance(Alphabet._get_base_alphabet(alphabet), 88 Alphabet.RNAAlphabet): 89 raise ValueError("Invalid alphabet, ABI files do not hold RNA.") 90 91 # raise exception if handle mode is not 'rb' 92 if hasattr(handle, 'mode'): 93 if set('rb') != set(handle.mode.lower()): 94 raise ValueError("ABI files has to be opened in 'rb' mode.") 95 96 # check if input file is a valid Abi file 97 handle.seek(0) 98 marker = handle.read(4) 99 if not marker: 100 # handle empty file gracefully 101 raise StopIteration 102 if marker != _as_bytes('ABIF'): 103 raise IOError('File should start ABIF, not %r' % marker) 104 105 # dirty hack for handling time information 106 times = {'RUND1': '', 'RUND2': '', 'RUNT1': '', 'RUNT2': '', } 107 108 # initialize annotations 109 annot = dict(zip(_EXTRACT.values(), [None] * len(_EXTRACT))) 110 111 # parse header and extract data from directories 112 header = struct.unpack(_HEADFMT, \ 113 handle.read(struct.calcsize(_HEADFMT))) 114 115 for tag_name, tag_number, tag_data in _abi_parse_header(header, handle): 116 # stop iteration if all desired tags have been extracted 117 # 4 tags from _EXTRACT + 2 time tags from _SPCTAGS - 3, 118 # and seq, qual, id 119 # todo 120 121 key = tag_name + str(tag_number) 122 123 # PBAS2 is base-called sequence 124 if key == 'PBAS2': 125 seq = tag_data 126 ambigs = 'KYWMRS' 127 if alphabet is None: 128 if set(seq).intersection(ambigs): 129 alphabet = ambiguous_dna 130 else: 131 alphabet = unambiguous_dna 132 # PCON2 is quality values of base-called sequence 133 elif key == 'PCON2': 134 qual = [ord(val) for val in tag_data] 135 # SMPL1 is sample id entered before sequencing run 136 elif key == 'SMPL1': 137 sample_id = tag_data 138 elif key in times: 139 times[key] = tag_data 140 else: 141 # extract sequence annotation as defined in _EXTRACT 142 if key in _EXTRACT: 143 annot[_EXTRACT[key]] = tag_data 144 145 # set time annotations 146 annot['run_start'] = '%s %s' % (times['RUND1'], times['RUNT1']) 147 annot['run_finish'] = '%s %s' % (times['RUND2'], times['RUNT2']) 148 149 # use the file name as SeqRecord.name if available 150 try: 151 file_name = basename(handle.name).replace('.ab1','') 152 except: 153 file_name = "" 154 155 record = SeqRecord(Seq(seq, alphabet), 156 id=sample_id, name=file_name, 157 description='', 158 annotations=annot, 159 letter_annotations={'phred_quality': qual}) 160 161 if not trim: 162 yield record 163 else: 164 yield _abi_trim(record)
165
166 -def _AbiTrimIterator(handle):
167 """Iterator for the Abi file format that yields trimmed SeqRecord objects. 168 """ 169 return AbiIterator(handle, trim=True)
170
171 -def _abi_parse_header(header, handle):
172 """Generator that returns directory contents. 173 """ 174 # header structure (after ABIF marker): 175 # file version, tag name, tag number, 176 # element type code, element size, number of elements 177 # data size, data offset, handle (not file handle) 178 head_elem_size = header[4] 179 head_elem_num = header[5] 180 head_offset = header[7] 181 index = 0 182 183 while index < head_elem_num: 184 start = head_offset + index * head_elem_size 185 # add directory offset to tuple 186 # to handle directories with data size <= 4 bytes 187 handle.seek(start) 188 dir_entry = struct.unpack(_DIRFMT, \ 189 handle.read(struct.calcsize(_DIRFMT))) + (start,) 190 index += 1 191 # only parse desired dirs 192 key = _bytes_to_string(dir_entry[0]) 193 key += str(dir_entry[1]) 194 if key in (list(_EXTRACT.keys()) + _SPCTAGS): 195 tag_name = _bytes_to_string(dir_entry[0]) 196 tag_number = dir_entry[1] 197 elem_code = dir_entry[2] 198 elem_num = dir_entry[4] 199 data_size = dir_entry[5] 200 data_offset = dir_entry[6] 201 tag_offset = dir_entry[8] 202 # if data size <= 4 bytes, data is stored inside tag 203 # so offset needs to be changed 204 if data_size <= 4: 205 data_offset = tag_offset + 20 206 handle.seek(data_offset) 207 data = handle.read(data_size) 208 yield tag_name, tag_number, \ 209 _parse_tag_data(elem_code, elem_num, data)
210 211
212 -def _abi_trim(seq_record):
213 """Trims the sequence using Richard Mott's modified trimming algorithm. 214 215 seq_record - SeqRecord object to be trimmed. 216 217 Trimmed bases are determined from their segment score, which is a 218 cumulative sum of each base's score. Base scores are calculated from 219 their quality values. 220 221 More about the trimming algorithm: 222 http://www.phrap.org/phredphrap/phred.html 223 http://www.clcbio.com/manual/genomics/Quality_abif_trimming.html 224 """ 225 226 start = False # flag for starting position of trimmed sequence 227 segment = 20 # minimum sequence length 228 trim_start = 0 # init start index 229 cutoff = 0.05 # default cutoff value for calculating base score 230 231 if len(seq_record) <= segment: 232 return seq_record 233 else: 234 # calculate base score 235 score_list = [cutoff - (10 ** (qual/-10.0)) for qual in 236 seq_record.letter_annotations['phred_quality']] 237 238 # calculate cummulative score 239 # if cummulative value < 0, set it to 0 240 # first value is set to 0, because of the assumption that 241 # the first base will always be trimmed out 242 cummul_score = [0] 243 for i in range(1, len(score_list)): 244 score = cummul_score[-1] + score_list[i] 245 if score < 0: 246 cummul_score.append(0) 247 else: 248 cummul_score.append(score) 249 if not start: 250 # trim_start = value when cummulative score is first > 0 251 trim_start = i 252 start = True 253 254 # trim_finish = index of highest cummulative score, 255 # marking the end of sequence segment with highest cummulative score 256 trim_finish = cummul_score.index(max(cummul_score)) 257 258 return seq_record[trim_start:trim_finish]
259
260 -def _parse_tag_data(elem_code, elem_num, raw_data):
261 """Returns single data value. 262 263 elem_code - What kind of data 264 elem_num - How many data points 265 raw_data - abi file object from which the tags would be unpacked 266 """ 267 if elem_code in _BYTEFMT: 268 # because '>1s' unpack differently from '>s' 269 if elem_num == 1: 270 num = '' 271 else: 272 num = str(elem_num) 273 fmt = '>' + num + _BYTEFMT[elem_code] 274 275 assert len(raw_data) == struct.calcsize(fmt) 276 data = struct.unpack(fmt, raw_data) 277 278 # no need to use tuple if len(data) == 1 279 # also if data is date / time 280 if elem_code not in [10, 11] and len(data) == 1: 281 data = data[0] 282 283 # account for different data types 284 if elem_code == 2: 285 return _bytes_to_string(data) 286 elif elem_code == 10: 287 return str(datetime.date(*data)) 288 elif elem_code == 11: 289 return str(datetime.time(*data[:3])) 290 elif elem_code == 13: 291 return bool(data) 292 elif elem_code == 18: 293 return _bytes_to_string(data[1:]) 294 elif self.elem_code == 19: 295 return _bytes_to_string(data[:-1]) 296 else: 297 return data 298 else: 299 return None
300 301 if __name__ == '__main__': 302 pass 303