Package Bio :: Package SeqIO :: Module InsdcIO
[hide private]
[frames] | no frames]

Source Code for Module Bio.SeqIO.InsdcIO

   1  # Copyright 2007-2010 by Peter Cock.  All rights reserved. 
   2  # 
   3  # This code is part of the Biopython distribution and governed by its 
   4  # license.  Please see the LICENSE file that should have been included 
   5  # as part of this package.. 
   6   
   7  """Bio.SeqIO support for the "genbank" and "embl" file formats. 
   8   
   9  You are expected to use this module via the Bio.SeqIO functions. 
  10  Note that internally this module calls Bio.GenBank to do the actual 
  11  parsing of GenBank, EMBL and IMGT files. 
  12   
  13  See also: 
  14   
  15  International Nucleotide Sequence Database Collaboration 
  16  http://www.insdc.org/ 
  17    
  18  GenBank 
  19  http://www.ncbi.nlm.nih.gov/Genbank/ 
  20   
  21  EMBL Nucleotide Sequence Database 
  22  http://www.ebi.ac.uk/embl/ 
  23   
  24  DDBJ (DNA Data Bank of Japan) 
  25  http://www.ddbj.nig.ac.jp/ 
  26   
  27  IMGT (use a variant of EMBL format with longer feature indents) 
  28  http://imgt.cines.fr/download/LIGM-DB/userman_doc.html 
  29  http://imgt.cines.fr/download/LIGM-DB/ftable_doc.html 
  30  http://www.ebi.ac.uk/imgt/hla/docs/manual.html 
  31   
  32  """ 
  33   
  34  from Bio.Seq import UnknownSeq 
  35  from Bio.GenBank.Scanner import GenBankScanner, EmblScanner, _ImgtScanner 
  36  from Bio import Alphabet 
  37  from Interfaces import SequentialSequenceWriter 
  38  from Bio import SeqFeature 
  39   
  40  from Bio._py3k import _is_int_or_long 
  41   
  42  # NOTE 
  43  # ==== 
  44  # The "brains" for parsing GenBank, EMBL and IMGT files (and any 
  45  # other flat file variants from the INSDC in future) is in 
  46  # Bio.GenBank.Scanner (plus the _FeatureConsumer in Bio.GenBank) 
  47  # However, all the writing code is in this file. 
  48   
  49   
50 -def GenBankIterator(handle):
51 """Breaks up a Genbank file into SeqRecord objects. 52 53 Every section from the LOCUS line to the terminating // becomes 54 a single SeqRecord with associated annotation and features. 55 56 Note that for genomes or chromosomes, there is typically only 57 one record.""" 58 #This calls a generator function: 59 return GenBankScanner(debug=0).parse_records(handle)
60
61 -def EmblIterator(handle):
62 """Breaks up an EMBL file into SeqRecord objects. 63 64 Every section from the LOCUS line to the terminating // becomes 65 a single SeqRecord with associated annotation and features. 66 67 Note that for genomes or chromosomes, there is typically only 68 one record.""" 69 #This calls a generator function: 70 return EmblScanner(debug=0).parse_records(handle)
71
72 -def ImgtIterator(handle):
73 """Breaks up an IMGT file into SeqRecord objects. 74 75 Every section from the LOCUS line to the terminating // becomes 76 a single SeqRecord with associated annotation and features. 77 78 Note that for genomes or chromosomes, there is typically only 79 one record.""" 80 #This calls a generator function: 81 return _ImgtScanner(debug=0).parse_records(handle)
82
83 -def GenBankCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein):
84 """Breaks up a Genbank file into SeqRecord objects for each CDS feature. 85 86 Every section from the LOCUS line to the terminating // can contain 87 many CDS features. These are returned as with the stated amino acid 88 translation sequence (if given). 89 """ 90 #This calls a generator function: 91 return GenBankScanner(debug=0).parse_cds_features(handle, alphabet)
92
93 -def EmblCdsFeatureIterator(handle, alphabet=Alphabet.generic_protein):
94 """Breaks up a EMBL file into SeqRecord objects for each CDS feature. 95 96 Every section from the LOCUS line to the terminating // can contain 97 many CDS features. These are returned as with the stated amino acid 98 translation sequence (if given). 99 """ 100 #This calls a generator function: 101 return EmblScanner(debug=0).parse_cds_features(handle, alphabet)
102
103 -def _insdc_feature_position_string(pos, offset=0):
104 """Build a GenBank/EMBL position string (PRIVATE). 105 106 Use offset=1 to add one to convert a start position from python counting. 107 """ 108 if isinstance(pos, SeqFeature.ExactPosition): 109 return "%i" % (pos.position+offset) 110 elif isinstance(pos, SeqFeature.WithinPosition): 111 return "(%i.%i)" % (pos.position + offset, 112 pos.position + pos.extension + offset) 113 elif isinstance(pos, SeqFeature.BetweenPosition): 114 return "(%i^%i)" % (pos.position + offset, 115 pos.position + pos.extension + offset) 116 elif isinstance(pos, SeqFeature.BeforePosition): 117 return "<%i" % (pos.position + offset) 118 elif isinstance(pos, SeqFeature.AfterPosition): 119 return ">%i" % (pos.position + offset) 120 elif isinstance(pos, SeqFeature.OneOfPosition): 121 return "one-of(%s)" \ 122 % ",".join([_insdc_feature_position_string(p,offset) \ 123 for p in pos.position_choices]) 124 elif isinstance(pos, SeqFeature.AbstractPosition): 125 raise NotImplementedError("Please report this as a bug in Biopython.") 126 else: 127 raise ValueError("Expected a SeqFeature position object.")
128 129
130 -def _insdc_location_string_ignoring_strand_and_subfeatures(feature, rec_length):
131 if feature.ref: 132 ref = "%s:" % feature.ref 133 else: 134 ref = "" 135 assert not feature.ref_db 136 if isinstance(feature.location.start, SeqFeature.ExactPosition) \ 137 and isinstance(feature.location.end, SeqFeature.ExactPosition) \ 138 and feature.location.start.position == feature.location.end.position: 139 #Special case, for 12:12 return 12^13 140 #(a zero length slice, meaning the point between two letters) 141 if feature.location.end.position == rec_length: 142 #Very special case, for a between position at the end of a 143 #sequence (used on some circular genomes, Bug 3098) we have 144 #N:N so return N^1 145 return "%s%i^1" % (ref, rec_length) 146 else: 147 return "%s%i^%i" % (ref, feature.location.end.position, 148 feature.location.end.position+1) 149 if isinstance(feature.location.start, SeqFeature.ExactPosition) \ 150 and isinstance(feature.location.end, SeqFeature.ExactPosition) \ 151 and feature.location.start.position+1 == feature.location.end.position: 152 #Special case, for 11:12 return 12 rather than 12..12 153 #(a length one slice, meaning a single letter) 154 return "%s%i" % (ref, feature.location.end.position) 155 elif isinstance(feature.location.start, SeqFeature.UnknownPosition) \ 156 or isinstance(feature.location.end, SeqFeature.UnknownPosition): 157 #Special case for features from SwissProt/UniProt files 158 if isinstance(feature.location.start, SeqFeature.UnknownPosition) \ 159 and isinstance(feature.location.end, SeqFeature.UnknownPosition): 160 #import warnings 161 #warnings.warn("Feature with unknown location") 162 #return "?" 163 raise ValueError("Feature with unknown location") 164 elif isinstance(feature.location.start, SeqFeature.UnknownPosition): 165 #Treat the unknown start position as a BeforePosition 166 return "%s<%i..%s" \ 167 % (ref, 168 feature.location.nofuzzy_end, 169 _insdc_feature_position_string(feature.location.end)) 170 else: 171 #Treat the unknown end position as an AfterPosition 172 return "%s%s..>%i" \ 173 % (ref, 174 _insdc_feature_position_string(feature.location.start), 175 feature.location.nofuzzy_start) 176 else: 177 #Typical case, e.g. 12..15 gets mapped to 11:15 178 return ref \ 179 + _insdc_feature_position_string(feature.location.start, +1) \ 180 + ".." + \ 181 _insdc_feature_position_string(feature.location.end)
182
183 -def _insdc_feature_location_string(feature, rec_length):
184 """Build a GenBank/EMBL location string from a SeqFeature (PRIVATE). 185 186 There is a choice of how to show joins on the reverse complement strand, 187 GenBank used "complement(join(1,10),(20,100))" while EMBL used to use 188 "join(complement(20,100),complement(1,10))" instead (but appears to have 189 now adopted the GenBank convention). Notice that the order of the entries 190 is reversed! This function therefore uses the first form. In this situation 191 we expect the parent feature and the two children to all be marked as 192 strand == -1, and in the order 0:10 then 19:100. 193 194 Also need to consider dual-strand examples like these from the Arabidopsis 195 thaliana chloroplast NC_000932: join(complement(69611..69724),139856..140650) 196 gene ArthCp047, GeneID:844801 or its CDS (protein NP_051038.1 GI:7525057) 197 which is further complicated by a splice: 198 join(complement(69611..69724),139856..140087,140625..140650) 199 200 For mixed this mixed strand feature, the parent SeqFeature should have 201 no strand (either 0 or None) while the child features should have either 202 strand +1 or -1 as appropriate, and be listed in the order given here. 203 """ 204 205 if not feature.sub_features: 206 #Non-recursive. 207 #assert feature.location_operator == "", \ 208 # "%s has no subfeatures but location_operator %s" \ 209 # % (repr(feature), feature.location_operator) 210 location = _insdc_location_string_ignoring_strand_and_subfeatures(feature, rec_length) 211 if feature.strand == -1: 212 location = "complement(%s)" % location 213 return location 214 # As noted above, treat reverse complement strand features carefully: 215 if feature.strand == -1: 216 for f in feature.sub_features: 217 assert f.strand == -1 218 return "complement(%s(%s))" \ 219 % (feature.location_operator, 220 ",".join(_insdc_location_string_ignoring_strand_and_subfeatures(f, rec_length) \ 221 for f in feature.sub_features)) 222 #if feature.strand == +1: 223 # for f in feature.sub_features: 224 # assert f.strand == +1 225 #This covers typical forward strand features, and also an evil mixed strand: 226 assert feature.location_operator != "" 227 return "%s(%s)" % (feature.location_operator, 228 ",".join([_insdc_feature_location_string(f, rec_length) \ 229 for f in feature.sub_features]))
230 231
232 -class _InsdcWriter(SequentialSequenceWriter):
233 """Base class for GenBank and EMBL writers (PRIVATE).""" 234 MAX_WIDTH = 80 235 QUALIFIER_INDENT = 21 236 QUALIFIER_INDENT_STR = " "*QUALIFIER_INDENT 237 QUALIFIER_INDENT_TMP = " %s " # 21 if %s is empty 238
239 - def _write_feature_qualifier(self, key, value=None, quote=None):
240 if not value: 241 self.handle.write("%s/%s\n" % (self.QUALIFIER_INDENT_STR, key)) 242 return 243 #Quick hack with no line wrapping, may be useful for testing: 244 #self.handle.write('%s/%s="%s"\n' % (self.QUALIFIER_INDENT_STR, key, value)) 245 if quote is None: 246 #Try to mimic unwritten rules about when quotes can be left out: 247 if _is_int_or_long(value): 248 quote = False 249 else: 250 quote = True 251 if quote: 252 line = '%s/%s="%s"' % (self.QUALIFIER_INDENT_STR, key, value) 253 else: 254 line = '%s/%s=%s' % (self.QUALIFIER_INDENT_STR, key, value) 255 if len(line) <= self.MAX_WIDTH: 256 self.handle.write(line+"\n") 257 return 258 while line.lstrip(): 259 if len(line) <= self.MAX_WIDTH: 260 self.handle.write(line+"\n") 261 return 262 #Insert line break... 263 for index in range(min(len(line)-1, self.MAX_WIDTH), 264 self.QUALIFIER_INDENT+1,-1): 265 if line[index] == " " : break 266 if line[index] != " ": 267 #No nice place to break... 268 index = self.MAX_WIDTH 269 assert index <= self.MAX_WIDTH 270 self.handle.write(line[:index] + "\n") 271 line = self.QUALIFIER_INDENT_STR + line[index:].lstrip()
272
273 - def _wrap_location(self, location):
274 """Split a feature location into lines (break at commas).""" 275 #TODO - Rewrite this not to recurse! 276 length = self.MAX_WIDTH - self.QUALIFIER_INDENT 277 if len(location) <= length: 278 return location 279 index = location[:length].rfind(",") 280 if index == -1: 281 #No good place to split (!) 282 import warnings 283 warnings.warn("Couldn't split location:\n%s" % location) 284 return location 285 return location[:index+1] + "\n" + \ 286 self.QUALIFIER_INDENT_STR + self._wrap_location(location[index+1:])
287
288 - def _write_feature(self, feature, record_length):
289 """Write a single SeqFeature object to features table.""" 290 assert feature.type, feature 291 location = _insdc_feature_location_string(feature, record_length) 292 f_type = feature.type.replace(" ","_") 293 line = (self.QUALIFIER_INDENT_TMP % f_type)[:self.QUALIFIER_INDENT] \ 294 + self._wrap_location(location) + "\n" 295 self.handle.write(line) 296 #Now the qualifiers... 297 for key, values in feature.qualifiers.iteritems(): 298 if isinstance(values, list) or isinstance(values, tuple): 299 for value in values: 300 self._write_feature_qualifier(key, value) 301 elif values: 302 #String, int, etc 303 self._write_feature_qualifier(key, values) 304 else: 305 #e.g. a /psuedo entry 306 self._write_feature_qualifier(key)
307
308 - def _get_annotation_str(self, record, key, default=".", just_first=False):
309 """Get an annotation dictionary entry (as a string). 310 311 Some entries are lists, in which case if just_first=True the first entry 312 is returned. If just_first=False (default) this verifies there is only 313 one entry before returning it.""" 314 try: 315 answer = record.annotations[key] 316 except KeyError: 317 return default 318 if isinstance(answer, list): 319 if not just_first : assert len(answer) == 1 320 return str(answer[0]) 321 else: 322 return str(answer)
323
324 - def _split_multi_line(self, text, max_len):
325 """Returns a list of strings. 326 327 Any single words which are too long get returned as a whole line 328 (e.g. URLs) without an exception or warning. 329 """ 330 #TODO - Do the line spliting while preserving white space? 331 text = text.strip() 332 if len(text) <= max_len: 333 return [text] 334 335 words = text.split() 336 text = "" 337 while words and len(text) + 1 + len(words[0]) <= max_len: 338 text += " " + words.pop(0) 339 text = text.strip() 340 #assert len(text) <= max_len 341 answer = [text] 342 while words: 343 text = words.pop(0) 344 while words and len(text) + 1 + len(words[0]) <= max_len: 345 text += " " + words.pop(0) 346 text = text.strip() 347 #assert len(text) <= max_len 348 answer.append(text) 349 assert not words 350 return answer
351
352 - def _split_contig(self, record, max_len):
353 "Returns a list of strings, splits on commas.""" 354 #TODO - Merge this with _write_multi_line method? 355 #It would need the addition of the comma splitting logic... 356 #are there any other cases where that would be sensible? 357 contig = record.annotations.get("contig", "") 358 if isinstance(contig, list) or isinstance(contig, tuple): 359 contig = "".join(contig) 360 contig = self.clean(contig) 361 i = 0 362 answer = [] 363 while contig: 364 if len(contig) > max_len: 365 #Split lines at the commas 366 pos = contig[:max_len-1].rfind(",") 367 if pos == -1: 368 raise ValueError("Could not break up CONTIG") 369 text, contig = contig[:pos+1], contig[pos+1:] 370 else: 371 text, contig = contig, "" 372 answer.append(text) 373 return answer
374
375 -class GenBankWriter(_InsdcWriter):
376 HEADER_WIDTH = 12 377 QUALIFIER_INDENT = 21 378
379 - def _write_single_line(self, tag, text):
380 "Used in the the 'header' of each GenBank record.""" 381 assert len(tag) < self.HEADER_WIDTH 382 if len(text) > self.MAX_WIDTH - self.HEADER_WIDTH: 383 import warnings 384 warnings.warn("Annotation %r too long for %s line" % (text, tag)) 385 self.handle.write("%s%s\n" % (tag.ljust(self.HEADER_WIDTH), 386 text.replace("\n", " ")))
387
388 - def _write_multi_line(self, tag, text):
389 "Used in the the 'header' of each GenBank record.""" 390 #TODO - Do the line spliting while preserving white space? 391 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 392 lines = self._split_multi_line(text, max_len) 393 self._write_single_line(tag, lines[0]) 394 for line in lines[1:]: 395 self._write_single_line("", line)
396
397 - def _write_multi_entries(self, tag, text_list):
398 #used for DBLINK and any similar later line types. 399 #If the list of strings is empty, nothing is written. 400 for i, text in enumerate(text_list): 401 if i == 0: 402 self._write_single_line(tag, text) 403 else: 404 self._write_single_line("", text)
405
406 - def _get_date(self, record) :
407 default = "01-JAN-1980" 408 try : 409 date = record.annotations["date"] 410 except KeyError : 411 return default 412 #Cope with a list of one string: 413 if isinstance(date, list) and len(date)==1 : 414 date = date[0] 415 #TODO - allow a Python date object 416 if not isinstance(date, basestring) or len(date) != 11 \ 417 or date[2] != "-" or date[6] != "-" \ 418 or not date[:2].isdigit() or not date[7:].isdigit() \ 419 or int(date[:2]) > 31 \ 420 or date[3:6] not in ["JAN", "FEB", "MAR", "APR", "MAY", "JUN", 421 "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"] : 422 #TODO - Check is a valid date (e.g. not 31 Feb) 423 return default 424 return date
425
426 - def _get_data_division(self, record):
427 try: 428 division = record.annotations["data_file_division"] 429 except KeyError: 430 division = "UNK" 431 if division in ["PRI", "ROD", "MAM", "VRT", "INV", "PLN", "BCT", 432 "VRL", "PHG", "SYN", "UNA", "EST", "PAT", "STS", 433 "GSS", "HTG", "HTC", "ENV", "CON"]: 434 #Good, already GenBank style 435 # PRI - primate sequences 436 # ROD - rodent sequences 437 # MAM - other mammalian sequences 438 # VRT - other vertebrate sequences 439 # INV - invertebrate sequences 440 # PLN - plant, fungal, and algal sequences 441 # BCT - bacterial sequences [plus archea] 442 # VRL - viral sequences 443 # PHG - bacteriophage sequences 444 # SYN - synthetic sequences 445 # UNA - unannotated sequences 446 # EST - EST sequences (expressed sequence tags) 447 # PAT - patent sequences 448 # STS - STS sequences (sequence tagged sites) 449 # GSS - GSS sequences (genome survey sequences) 450 # HTG - HTGS sequences (high throughput genomic sequences) 451 # HTC - HTC sequences (high throughput cDNA sequences) 452 # ENV - Environmental sampling sequences 453 # CON - Constructed sequences 454 # 455 #(plus UNK for unknown) 456 pass 457 else: 458 #See if this is in EMBL style: 459 # Division Code 460 # ----------------- ---- 461 # Bacteriophage PHG - common 462 # Environmental Sample ENV - common 463 # Fungal FUN - map to PLN (plants + fungal) 464 # Human HUM - map to PRI (primates) 465 # Invertebrate INV - common 466 # Other Mammal MAM - common 467 # Other Vertebrate VRT - common 468 # Mus musculus MUS - map to ROD (rodent) 469 # Plant PLN - common 470 # Prokaryote PRO - map to BCT (poor name) 471 # Other Rodent ROD - common 472 # Synthetic SYN - common 473 # Transgenic TGN - ??? map to SYN ??? 474 # Unclassified UNC - map to UNK 475 # Viral VRL - common 476 # 477 #(plus XXX for submiting which we can map to UNK) 478 embl_to_gbk = {"FUN":"PLN", 479 "HUM":"PRI", 480 "MUS":"ROD", 481 "PRO":"BCT", 482 "UNC":"UNK", 483 "XXX":"UNK", 484 } 485 try: 486 division = embl_to_gbk[division] 487 except KeyError: 488 division = "UNK" 489 assert len(division)==3 490 return division
491
492 - def _write_the_first_line(self, record):
493 """Write the LOCUS line.""" 494 495 locus = record.name 496 if not locus or locus == "<unknown name>": 497 locus = record.id 498 if not locus or locus == "<unknown id>": 499 locus = self._get_annotation_str(record, "accession", just_first=True) 500 if len(locus) > 16: 501 raise ValueError("Locus identifier %r is too long" % str(locus)) 502 503 if len(record) > 99999999999: 504 #Currently GenBank only officially support up to 350000, but 505 #the length field can take eleven digits 506 raise ValueError("Sequence too long!") 507 508 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 509 a = Alphabet._get_base_alphabet(record.seq.alphabet) 510 if not isinstance(a, Alphabet.Alphabet): 511 raise TypeError("Invalid alphabet") 512 elif isinstance(a, Alphabet.ProteinAlphabet): 513 units = "aa" 514 elif isinstance(a, Alphabet.NucleotideAlphabet): 515 units = "bp" 516 else: 517 #Must be something like NucleotideAlphabet or 518 #just the generic Alphabet (default for fasta files) 519 raise ValueError("Need a Nucleotide or Protein alphabet") 520 521 #Get the molecule type 522 #TODO - record this explicitly in the parser? 523 if isinstance(a, Alphabet.ProteinAlphabet): 524 mol_type = "" 525 elif isinstance(a, Alphabet.DNAAlphabet): 526 mol_type = "DNA" 527 elif isinstance(a, Alphabet.RNAAlphabet): 528 mol_type = "RNA" 529 else: 530 #Must be something like NucleotideAlphabet or 531 #just the generic Alphabet (default for fasta files) 532 raise ValueError("Need a DNA, RNA or Protein alphabet") 533 534 division = self._get_data_division(record) 535 536 assert len(units) == 2 537 assert len(division) == 3 538 #TODO - date 539 #TODO - mol_type 540 line = "LOCUS %s %s %s %s %s %s\n" \ 541 % (locus.ljust(16), 542 str(len(record)).rjust(11), 543 units, 544 mol_type.ljust(6), 545 division, 546 self._get_date(record)) 547 assert len(line) == 79+1, repr(line) #plus one for new line 548 549 assert line[12:28].rstrip() == locus, \ 550 'LOCUS line does not contain the locus at the expected position:\n' + line 551 assert line[28:29] == " " 552 assert line[29:40].lstrip() == str(len(record)), \ 553 'LOCUS line does not contain the length at the expected position:\n' + line 554 555 #Tests copied from Bio.GenBank.Scanner 556 assert line[40:44] in [' bp ', ' aa '] , \ 557 'LOCUS line does not contain size units at expected position:\n' + line 558 assert line[44:47] in [' ', 'ss-', 'ds-', 'ms-'], \ 559 'LOCUS line does not have valid strand type (Single stranded, ...):\n' + line 560 assert line[47:54].strip() == "" \ 561 or line[47:54].strip().find('DNA') != -1 \ 562 or line[47:54].strip().find('RNA') != -1, \ 563 'LOCUS line does not contain valid sequence type (DNA, RNA, ...):\n' + line 564 assert line[54:55] == ' ', \ 565 'LOCUS line does not contain space at position 55:\n' + line 566 assert line[55:63].strip() in ['', 'linear', 'circular'], \ 567 'LOCUS line does not contain valid entry (linear, circular, ...):\n' + line 568 assert line[63:64] == ' ', \ 569 'LOCUS line does not contain space at position 64:\n' + line 570 assert line[67:68] == ' ', \ 571 'LOCUS line does not contain space at position 68:\n' + line 572 assert line[70:71] == '-', \ 573 'LOCUS line does not contain - at position 71 in date:\n' + line 574 assert line[74:75] == '-', \ 575 'LOCUS line does not contain - at position 75 in date:\n' + line 576 577 self.handle.write(line)
578
579 - def _write_references(self, record):
580 number = 0 581 for ref in record.annotations["references"]: 582 if not isinstance(ref, SeqFeature.Reference): 583 continue 584 number += 1 585 data = str(number) 586 #TODO - support more complex record reference locations? 587 if ref.location and len(ref.location)==1: 588 a = Alphabet._get_base_alphabet(record.seq.alphabet) 589 if isinstance(a, Alphabet.ProteinAlphabet): 590 units = "residues" 591 else: 592 units = "bases" 593 data += " (%s %i to %i)" % (units, 594 ref.location[0].nofuzzy_start+1, 595 ref.location[0].nofuzzy_end) 596 self._write_single_line("REFERENCE", data) 597 if ref.authors: 598 #We store the AUTHORS data as a single string 599 self._write_multi_line(" AUTHORS", ref.authors) 600 if ref.consrtm: 601 #We store the consortium as a single string 602 self._write_multi_line(" CONSRTM", ref.consrtm) 603 if ref.title: 604 #We store the title as a single string 605 self._write_multi_line(" TITLE", ref.title) 606 if ref.journal: 607 #We store this as a single string - holds the journal name, 608 #volume, year, and page numbers of the citation 609 self._write_multi_line(" JOURNAL", ref.journal) 610 if ref.medline_id: 611 #This line type is obsolete and was removed from the GenBank 612 #flatfile format in April 2005. Should we write it? 613 #Note this has a two space indent: 614 self._write_multi_line(" MEDLINE", ref.medline_id) 615 if ref.pubmed_id: 616 #Note this has a THREE space indent: 617 self._write_multi_line(" PUBMED", ref.pubmed_id) 618 if ref.comment: 619 self._write_multi_line(" REMARK", ref.comment)
620 621
622 - def _write_comment(self, record):
623 #This is a bit complicated due to the range of possible 624 #ways people might have done their annotation... 625 #Currently the parser uses a single string with newlines. 626 #A list of lines is also reasonable. 627 #A single (long) string is perhaps the most natural of all. 628 #This means we may need to deal with line wrapping. 629 comment = record.annotations["comment"] 630 if isinstance(comment, basestring): 631 lines = comment.split("\n") 632 elif isinstance(comment, list) or isinstance(comment, tuple): 633 lines = comment 634 else: 635 raise ValueError("Could not understand comment annotation") 636 self._write_multi_line("COMMENT", lines[0]) 637 for line in lines[1:]: 638 self._write_multi_line("", line)
639
640 - def _write_contig(self, record):
641 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 642 lines = self._split_contig(record, max_len) 643 self._write_single_line("CONTIG", lines[0]) 644 for text in lines[1:] : 645 self._write_single_line("", text)
646
647 - def _write_sequence(self, record):
648 #Loosely based on code from Howard Salis 649 #TODO - Force lower case? 650 LETTERS_PER_LINE = 60 651 SEQUENCE_INDENT = 9 652 653 if isinstance(record.seq, UnknownSeq): 654 #We have already recorded the length, and there is no need 655 #to record a long sequence of NNNNNNN...NNN or whatever. 656 if "contig" in record.annotations: 657 self._write_contig(record) 658 else: 659 self.handle.write("ORIGIN\n") 660 return 661 662 #Catches sequence being None: 663 data = self._get_seq_string(record).lower() 664 seq_len = len(data) 665 self.handle.write("ORIGIN\n") 666 for line_number in range(0, seq_len, LETTERS_PER_LINE): 667 self.handle.write(str(line_number+1).rjust(SEQUENCE_INDENT)) 668 for words in range(line_number, 669 min(line_number+LETTERS_PER_LINE, seq_len), 10): 670 self.handle.write(" %s" % data[words:words+10]) 671 self.handle.write("\n")
672
673 - def write_record(self, record):
674 """Write a single record to the output file.""" 675 handle = self.handle 676 self._write_the_first_line(record) 677 678 accession = self._get_annotation_str(record, "accession", 679 record.id.split(".", 1)[0], 680 just_first=True) 681 acc_with_version = accession 682 if record.id.startswith(accession+"."): 683 try: 684 acc_with_version = "%s.%i" \ 685 % (accession, 686 int(record.id.split(".", 1)[1])) 687 except ValueError: 688 pass 689 gi = self._get_annotation_str(record, "gi", just_first=True) 690 691 descr = record.description 692 if descr == "<unknown description>" : descr = "." 693 self._write_multi_line("DEFINITION", descr) 694 695 self._write_single_line("ACCESSION", accession) 696 if gi != ".": 697 self._write_single_line("VERSION", "%s GI:%s" \ 698 % (acc_with_version, gi)) 699 else: 700 self._write_single_line("VERSION", "%s" % (acc_with_version)) 701 702 #The NCBI only expect two types of link so far, 703 #e.g. "Project:28471" and "Trace Assembly Archive:123456" 704 #TODO - Filter the dbxrefs list to just these? 705 self._write_multi_entries("DBLINK", record.dbxrefs) 706 707 try: 708 #List of strings 709 #Keywords should be given separated with semi colons, 710 keywords = "; ".join(record.annotations["keywords"]) 711 #with a trailing period: 712 if not keywords.endswith(".") : 713 keywords += "." 714 except KeyError: 715 #If no keywords, there should be just a period: 716 keywords = "." 717 self._write_multi_line("KEYWORDS", keywords) 718 719 if "segment" in record.annotations: 720 #Deal with SEGMENT line found only in segmented records, 721 #e.g. AH000819 722 segment = record.annotations["segment"] 723 if isinstance(segment, list): 724 assert len(segment)==1, segment 725 segment = segment[0] 726 self._write_single_line("SEGMENT", segment) 727 728 self._write_multi_line("SOURCE", \ 729 self._get_annotation_str(record, "source")) 730 #The ORGANISM line MUST be a single line, as any continuation is the taxonomy 731 org = self._get_annotation_str(record, "organism") 732 if len(org) > self.MAX_WIDTH - self.HEADER_WIDTH: 733 org = org[:self.MAX_WIDTH - self.HEADER_WIDTH-4]+"..." 734 self._write_single_line(" ORGANISM", org) 735 try: 736 #List of strings 737 #Taxonomy should be given separated with semi colons, 738 taxonomy = "; ".join(record.annotations["taxonomy"]) 739 #with a trailing period: 740 if not taxonomy.endswith(".") : 741 taxonomy += "." 742 except KeyError: 743 taxonomy = "." 744 self._write_multi_line("", taxonomy) 745 746 if "references" in record.annotations: 747 self._write_references(record) 748 749 if "comment" in record.annotations: 750 self._write_comment(record) 751 752 handle.write("FEATURES Location/Qualifiers\n") 753 rec_length = len(record) 754 for feature in record.features: 755 self._write_feature(feature, rec_length) 756 self._write_sequence(record) 757 handle.write("//\n")
758
759 -class EmblWriter(_InsdcWriter):
760 HEADER_WIDTH = 5 761 QUALIFIER_INDENT = 21 762 QUALIFIER_INDENT_STR = "FT" + " "*(QUALIFIER_INDENT-2) 763 QUALIFIER_INDENT_TMP = "FT %s " # 21 if %s is empty 764 FEATURE_HEADER = "FH Key Location/Qualifiers\n" 765
766 - def _write_contig(self, record):
767 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 768 lines = self._split_contig(record, max_len) 769 for text in lines: 770 self._write_single_line("CO", text)
771
772 - def _write_sequence(self, record):
773 LETTERS_PER_BLOCK = 10 774 BLOCKS_PER_LINE = 6 775 LETTERS_PER_LINE = LETTERS_PER_BLOCK * BLOCKS_PER_LINE 776 POSITION_PADDING = 10 777 handle = self.handle #save looking up this multiple times 778 779 if isinstance(record.seq, UnknownSeq): 780 #We have already recorded the length, and there is no need 781 #to record a long sequence of NNNNNNN...NNN or whatever. 782 if "contig" in record.annotations: 783 self._write_contig(record) 784 else: 785 #TODO - Can the sequence just be left out as in GenBank files? 786 handle.write("SQ \n") 787 return 788 789 #Catches sequence being None 790 data = self._get_seq_string(record).lower() 791 seq_len = len(data) 792 793 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 794 a = Alphabet._get_base_alphabet(record.seq.alphabet) 795 if isinstance(a, Alphabet.DNAAlphabet): 796 #TODO - What if we have RNA? 797 a_count = data.count('A') + data.count('a') 798 c_count = data.count('C') + data.count('c') 799 g_count = data.count('G') + data.count('g') 800 t_count = data.count('T') + data.count('t') 801 other = seq_len - (a_count + c_count + g_count + t_count) 802 handle.write("SQ Sequence %i BP; %i A; %i C; %i G; %i T; %i other;\n" \ 803 % (seq_len, a_count, c_count, g_count, t_count, other)) 804 else: 805 handle.write("SQ \n") 806 807 for line_number in range(0, seq_len // LETTERS_PER_LINE): 808 handle.write(" ") #Just four, not five 809 for block in range(BLOCKS_PER_LINE) : 810 index = LETTERS_PER_LINE*line_number + LETTERS_PER_BLOCK*block 811 handle.write((" %s" % data[index:index+LETTERS_PER_BLOCK])) 812 handle.write(str((line_number+1) 813 *LETTERS_PER_LINE).rjust(POSITION_PADDING)) 814 handle.write("\n") 815 if seq_len % LETTERS_PER_LINE: 816 #Final (partial) line 817 line_number = (seq_len // LETTERS_PER_LINE) 818 handle.write(" ") #Just four, not five 819 for block in range(BLOCKS_PER_LINE) : 820 index = LETTERS_PER_LINE*line_number + LETTERS_PER_BLOCK*block 821 handle.write((" %s" % data[index:index+LETTERS_PER_BLOCK]).ljust(11)) 822 handle.write(str(seq_len).rjust(POSITION_PADDING)) 823 handle.write("\n")
824
825 - def _write_single_line(self, tag, text):
826 assert len(tag)==2 827 line = tag+" "+text 828 if len(text) > self.MAX_WIDTH: 829 import warnings 830 warnings.warn("Line %r too long" % line) 831 self.handle.write(line+"\n")
832
833 - def _write_multi_line(self, tag, text):
834 max_len = self.MAX_WIDTH - self.HEADER_WIDTH 835 lines = self._split_multi_line(text, max_len) 836 for line in lines : 837 self._write_single_line(tag, line)
838
839 - def _write_the_first_lines(self, record):
840 """Write the ID and AC lines.""" 841 if "." in record.id and record.id.rsplit(".", 1)[1].isdigit(): 842 version = "SV " + record.id.rsplit(".", 1)[1] 843 accession = self._get_annotation_str(record, "accession", 844 record.id.rsplit(".", 1)[0], 845 just_first=True) 846 else : 847 version = "" 848 accession = self._get_annotation_str(record, "accession", 849 record.id, 850 just_first=True) 851 852 if ";" in accession : 853 raise ValueError("Cannot have semi-colon in EMBL accession, %s" \ 854 % repr(str(accession))) 855 if " " in accession : 856 #This is out of practicallity... might it be allowed? 857 raise ValueError("Cannot have spaces in EMBL accession, %s" \ 858 % repr(str(accession))) 859 860 #Get the molecule type 861 #TODO - record this explicitly in the parser? 862 #Get the base alphabet (underneath any Gapped or StopCodon encoding) 863 a = Alphabet._get_base_alphabet(record.seq.alphabet) 864 if not isinstance(a, Alphabet.Alphabet): 865 raise TypeError("Invalid alphabet") 866 elif isinstance(a, Alphabet.DNAAlphabet): 867 mol_type = "DNA" 868 units = "BP" 869 elif isinstance(a, Alphabet.RNAAlphabet): 870 mol_type = "RNA" 871 units = "BP" 872 elif isinstance(a, Alphabet.ProteinAlphabet): 873 mol_type = "PROTEIN" 874 units = "AA" 875 else: 876 #Must be something like NucleotideAlphabet 877 raise ValueError("Need a DNA, RNA or Protein alphabet") 878 879 #Get the taxonomy division 880 division = self._get_data_division(record) 881 882 #TODO - Full ID line 883 handle = self.handle 884 #ID <1>; SV <2>; <3>; <4>; <5>; <6>; <7> BP. 885 #1. Primary accession number 886 #2. Sequence version number 887 #3. Topology: 'circular' or 'linear' 888 #4. Molecule type 889 #5. Data class 890 #6. Taxonomic division 891 #7. Sequence length 892 self._write_single_line("ID", "%s; %s; ; %s; ; %s; %i %s." \ 893 % (accession, version, mol_type, 894 division, len(record), units)) 895 handle.write("XX\n") 896 self._write_single_line("AC", accession+";") 897 handle.write("XX\n")
898
899 - def _get_data_division(self, record):
900 try: 901 division = record.annotations["data_file_division"] 902 except KeyError: 903 division = "UNC" 904 if division in ["PHG", "ENV", "FUN", "HUM", "INV", "MAM", "VRT", 905 "MUS", "PLN", "PRO", "ROD", "SYN", "TGN", "UNC", 906 "VRL", "XXX"]: 907 #Good, already EMBL style 908 # Division Code 909 # ----------------- ---- 910 # Bacteriophage PHG 911 # Environmental Sample ENV 912 # Fungal FUN 913 # Human HUM 914 # Invertebrate INV 915 # Other Mammal MAM 916 # Other Vertebrate VRT 917 # Mus musculus MUS 918 # Plant PLN 919 # Prokaryote PRO 920 # Other Rodent ROD 921 # Synthetic SYN 922 # Transgenic TGN 923 # Unclassified UNC (i.e. unknown) 924 # Viral VRL 925 # 926 #(plus XXX used for submiting data to EMBL) 927 pass 928 else: 929 #See if this is in GenBank style & can be converted. 930 #Generally a problem as the GenBank groups are wider 931 #than those of EMBL. Note that GenBank use "BCT" for 932 #both bacteria and acherea thus this maps to EMBL's 933 #"PRO" nicely. 934 gbk_to_embl = {"BCT":"PRO", 935 "UNK":"UNC", 936 } 937 try: 938 division = gbk_to_embl[division] 939 except KeyError: 940 division = "UNC" 941 assert len(division)==3 942 return division
943
944 - def _write_references(self, record):
945 #The order should be RN, RC, RP, RX, RG, RA, RT, RL 946 number = 0 947 for ref in record.annotations["references"]: 948 if not isinstance(ref, SeqFeature.Reference): 949 continue 950 number += 1 951 self._write_single_line("RN", "[%i]" % number) 952 #TODO - support for RC line (needed in parser too) 953 #TODO - support more complex record reference locations? 954 if ref.location and len(ref.location)==1: 955 self._write_single_line("RP", "%i-%i" % (ref.location[0].nofuzzy_start+1, 956 ref.location[0].nofuzzy_end)) 957 #TODO - record any DOI or AGRICOLA identifier in the reference object? 958 if ref.pubmed_id: 959 self._write_single_line("RX", "PUBMED; %s." % ref.pubmed_id) 960 if ref.consrtm: 961 self._write_single_line("RG", "%s" % ref.consrtm) 962 if ref.authors: 963 #We store the AUTHORS data as a single string 964 self._write_multi_line("RA", ref.authors+";") 965 if ref.title: 966 #We store the title as a single string 967 self._write_multi_line("RT", '"%s";' % ref.title) 968 if ref.journal: 969 #We store this as a single string - holds the journal name, 970 #volume, year, and page numbers of the citation 971 self._write_multi_line("RL", ref.journal) 972 self.handle.write("XX\n")
973
974 - def _write_comment(self, record):
975 #This is a bit complicated due to the range of possible 976 #ways people might have done their annotation... 977 #Currently the parser uses a single string with newlines. 978 #A list of lines is also reasonable. 979 #A single (long) string is perhaps the most natural of all. 980 #This means we may need to deal with line wrapping. 981 comment = record.annotations["comment"] 982 if isinstance(comment, basestring): 983 lines = comment.split("\n") 984 elif isinstance(comment, list) or isinstance(comment, tuple): 985 lines = comment 986 else: 987 raise ValueError("Could not understand comment annotation") 988 #TODO - Merge this with the GenBank comment code? 989 if not lines : return 990 for line in lines: 991 self._write_multi_line("CC", line) 992 self.handle.write("XX\n")
993
994 - def write_record(self, record):
995 """Write a single record to the output file.""" 996 997 handle = self.handle 998 self._write_the_first_lines(record) 999 1000 #PR line (0 or 1 lines only), project identifier 1001 for xref in record.dbxrefs: 1002 if xref.startswith("Project:"): 1003 self._write_single_line("PR", xref+";") 1004 handle.write("XX\n") 1005 break 1006 1007 #TODO - DT lines (date) 1008 1009 descr = record.description 1010 if descr == "<unknown description>" : descr = "." 1011 self._write_multi_line("DE", descr) 1012 handle.write("XX\n") 1013 1014 #Should this be "source" or "organism"? 1015 self._write_multi_line("OS", self._get_annotation_str(record, "organism")) 1016 try: 1017 #List of strings 1018 taxonomy = "; ".join(record.annotations["taxonomy"]) + "." 1019 except KeyError: 1020 taxonomy = "." 1021 self._write_multi_line("OC", taxonomy) 1022 handle.write("XX\n") 1023 1024 if "references" in record.annotations: 1025 self._write_references(record) 1026 1027 if "comment" in record.annotations: 1028 self._write_comment(record) 1029 1030 handle.write(self.FEATURE_HEADER) 1031 rec_length = len(record) 1032 for feature in record.features: 1033 self._write_feature(feature, rec_length) 1034 1035 self._write_sequence(record) 1036 handle.write("//\n")
1037
1038 -class ImgtWriter(EmblWriter):
1039 HEADER_WIDTH = 5 1040 QUALIFIER_INDENT = 25 # Not 21 as in EMBL 1041 QUALIFIER_INDENT_STR = "FT" + " "*(QUALIFIER_INDENT-2) 1042 QUALIFIER_INDENT_TMP = "FT %s " # 25 if %s is empty 1043 FEATURE_HEADER = "FH Key Location/Qualifiers\n"
1044 1045 if __name__ == "__main__": 1046 print "Quick self test" 1047 import os 1048 from StringIO import StringIO 1049
1050 - def compare_record(old, new):
1051 if old.id != new.id and old.name != new.name: 1052 raise ValueError("'%s' or '%s' vs '%s' or '%s' records" \ 1053 % (old.id, old.name, new.id, new.name)) 1054 if len(old.seq) != len(new.seq): 1055 raise ValueError("%i vs %i" % (len(old.seq), len(new.seq))) 1056 if str(old.seq).upper() != str(new.seq).upper(): 1057 if len(old.seq) < 200: 1058 raise ValueError("'%s' vs '%s'" % (old.seq, new.seq)) 1059 else: 1060 raise ValueError("'%s...' vs '%s...'" % (old.seq[:100], new.seq[:100])) 1061 if old.features and new.features: 1062 return compare_features(old.features, new.features) 1063 #Just insist on at least one word in common: 1064 if (old.description or new.description) \ 1065 and not set(old.description.split()).intersection(new.description.split()): 1066 raise ValueError("%s versus %s" \ 1067 % (repr(old.description), repr(new.description))) 1068 #TODO - check annotation 1069 if "contig" in old.annotations: 1070 assert old.annotations["contig"] == \ 1071 new.annotations["contig"] 1072 return True
1073
1074 - def compare_records(old_list, new_list):
1075 """Check two lists of SeqRecords agree, raises a ValueError if mismatch.""" 1076 if len(old_list) != len(new_list): 1077 raise ValueError("%i vs %i records" % (len(old_list), len(new_list))) 1078 for old, new in zip(old_list, new_list): 1079 if not compare_record(old, new): 1080 return False 1081 return True
1082
1083 - def compare_feature(old, new, ignore_sub_features=False):
1084 """Check two SeqFeatures agree.""" 1085 if old.type != new.type: 1086 raise ValueError("Type %s versus %s" % (old.type, new.type)) 1087 if old.location.nofuzzy_start != new.location.nofuzzy_start \ 1088 or old.location.nofuzzy_end != new.location.nofuzzy_end: 1089 raise ValueError("%s versus %s:\n%s\nvs:\n%s" \ 1090 % (old.location, new.location, str(old), str(new))) 1091 if old.strand != new.strand: 1092 raise ValueError("Different strand:\n%s\nvs:\n%s" % (str(old), str(new))) 1093 if old.location.start != new.location.start: 1094 raise ValueError("Start %s versus %s:\n%s\nvs:\n%s" \ 1095 % (old.location.start, new.location.start, str(old), str(new))) 1096 if old.location.end != new.location.end: 1097 raise ValueError("End %s versus %s:\n%s\nvs:\n%s" \ 1098 % (old.location.end, new.location.end, str(old), str(new))) 1099 if not ignore_sub_features: 1100 if len(old.sub_features) != len(new.sub_features): 1101 raise ValueError("Different sub features") 1102 for a, b in zip(old.sub_features, new.sub_features): 1103 if not compare_feature(a, b): 1104 return False 1105 #This only checks key shared qualifiers 1106 #Would a white list be easier? 1107 #for key in ["name", "gene", "translation", "codon_table", "codon_start", "locus_tag"]: 1108 for key in set(old.qualifiers).intersection(new.qualifiers): 1109 if key in ["db_xref", "protein_id", "product", "note"]: 1110 #EMBL and GenBank files are use different references/notes/etc 1111 continue 1112 if old.qualifiers[key] != new.qualifiers[key]: 1113 raise ValueError("Qualifier mis-match for %s:\n%s\n%s" \ 1114 % (key, old.qualifiers[key], new.qualifiers[key])) 1115 return True
1116
1117 - def compare_features(old_list, new_list, ignore_sub_features=False):
1118 """Check two lists of SeqFeatures agree, raises a ValueError if mismatch.""" 1119 if len(old_list) != len(new_list): 1120 raise ValueError("%i vs %i features" % (len(old_list), len(new_list))) 1121 for old, new in zip(old_list, new_list): 1122 #This assumes they are in the same order 1123 if not compare_feature(old, new, ignore_sub_features): 1124 return False 1125 return True
1126
1127 - def check_genbank_writer(records):
1128 handle = StringIO() 1129 GenBankWriter(handle).write_file(records) 1130 handle.seek(0) 1131 1132 records2 = list(GenBankIterator(handle)) 1133 assert compare_records(records, records2)
1134
1135 - def check_embl_writer(records):
1136 handle = StringIO() 1137 try: 1138 EmblWriter(handle).write_file(records) 1139 except ValueError, err: 1140 print err 1141 return 1142 handle.seek(0) 1143 1144 records2 = list(EmblIterator(handle)) 1145 assert compare_records(records, records2)
1146 1147 for filename in os.listdir("../../Tests/GenBank"): 1148 if not filename.endswith(".gbk") and not filename.endswith(".gb"): 1149 continue 1150 print filename 1151 1152 handle = open("../../Tests/GenBank/%s" % filename) 1153 records = list(GenBankIterator(handle)) 1154 handle.close() 1155 1156 check_genbank_writer(records) 1157 check_embl_writer(records) 1158 1159 for filename in os.listdir("../../Tests/EMBL"): 1160 if not filename.endswith(".embl"): 1161 continue 1162 print filename 1163 1164 handle = open("../../Tests/EMBL/%s" % filename) 1165 records = list(EmblIterator(handle)) 1166 handle.close() 1167 1168 check_genbank_writer(records) 1169 check_embl_writer(records) 1170 1171 from Bio import SeqIO 1172 for filename in os.listdir("../../Tests/SwissProt"): 1173 if not filename.startswith("sp"): 1174 continue 1175 print filename 1176 1177 handle = open("../../Tests/SwissProt/%s" % filename) 1178 records = list(SeqIO.parse(handle, "swiss")) 1179 handle.close() 1180 1181 check_genbank_writer(records) 1182