Package Bio :: Module ParserSupport
[hide private]
[frames] | no frames]

Source Code for Module Bio.ParserSupport

  1  # Copyright 1999 by Jeffrey Chang.  All rights reserved. 
  2  # This code is part of the Biopython distribution and governed by its 
  3  # license.  Please see the LICENSE file that should have been included 
  4  # as part of this package. 
  5   
  6  """Code to support writing parsers. 
  7   
  8   
  9   
 10  Classes: 
 11  AbstractParser         Base class for parsers. 
 12  AbstractConsumer       Base class of all Consumers. 
 13  TaggingConsumer        Consumer that tags output with its event.  For debugging 
 14  SGMLStrippingConsumer  Consumer that strips SGML tags from output. 
 15  EventGenerator         Generate Biopython Events from Martel XML output 
 16                         (note that Martel is now DEPRECATED) 
 17   
 18  Functions: 
 19  safe_readline          Read a line from a handle, with check for EOF. 
 20  safe_peekline          Peek at next line, with check for EOF. 
 21  read_and_call          Read a line from a handle and pass it to a method. 
 22  read_and_call_while    Read many lines, as long as a condition is met. 
 23  read_and_call_until    Read many lines, until a condition is met. 
 24  attempt_read_and_call  Like read_and_call, but forgiving of errors. 
 25  is_blank_line          Test whether a line is blank. 
 26   
 27  """ 
 28   
 29  import sys 
 30  import traceback 
 31  from types import * 
 32   
 33  from Bio import File 
 34   
 35  # XML from python 2.0 
 36  try: 
 37      from xml.sax import handler 
 38      xml_support = 1 
 39  except ImportError: 
 40      sys.stderr.write("Warning: Could not import SAX for dealing with XML.\n" + 
 41                       "This causes problems with some ParserSupport modules\n") 
 42      xml_support = 0 
 43   
44 -class AbstractParser(object):
45 """Base class for other parsers. 46 47 """
48 - def parse(self, handle):
49 raise NotImplementedError("Please implement in a derived class")
50
51 - def parse_str(self, string):
52 return self.parse(File.StringHandle(string))
53
54 - def parse_file(self, filename):
55 h = open(filename) 56 try: 57 retval = self.parse(h) 58 finally: 59 h.close() 60 return retval
61
62 -class AbstractConsumer(object):
63 """Base class for other Consumers. 64 65 Derive Consumers from this class and implement appropriate 66 methods for each event that you want to receive. 67 68 """
69 - def _unhandled_section(self):
70 pass
71 - def _unhandled(self, data):
72 pass
73 - def __getattr__(self, attr):
74 if attr[:6] == 'start_' or attr[:4] == 'end_': 75 method = self._unhandled_section 76 else: 77 method = self._unhandled 78 return method
79
80 -class TaggingConsumer(AbstractConsumer):
81 """A Consumer that tags the data stream with the event and 82 prints it to a handle. Useful for debugging. 83 84 """
85 - def __init__(self, handle=None, colwidth=15, maxwidth=80):
86 """TaggingConsumer(handle=sys.stdout, colwidth=15, maxwidth=80)""" 87 # I can't assign sys.stdout to handle in the argument list. 88 # If I do that, handle will be assigned the value of sys.stdout 89 # the first time this function is called. This will fail if 90 # the user has assigned sys.stdout to some other file, which may 91 # be closed or invalid at a later time. 92 if handle is None: 93 handle = sys.stdout 94 self._handle = handle 95 self._colwidth = colwidth 96 self._maxwidth = maxwidth
97
98 - def unhandled_section(self):
99 self._print_name('unhandled_section')
100
101 - def unhandled(self, data):
102 self._print_name('unhandled', data)
103
104 - def _print_name(self, name, data=None):
105 if data is None: 106 # Write the name of a section. 107 self._handle.write("%s %s\n" % ("*"*self._colwidth, name)) 108 else: 109 # Write the tag and line. 110 self._handle.write("%-*s: %s\n" % ( 111 self._colwidth, name[:self._colwidth], 112 data[:self._maxwidth-self._colwidth-2].rstrip()))
113
114 - def __getattr__(self, attr):
115 if attr[:6] == 'start_' or attr[:4] == 'end_': 116 method = lambda a=attr, s=self: s._print_name(a) 117 else: 118 method = lambda x, a=attr, s=self: s._print_name(a, x) 119 return method
120
121 -class SGMLStrippingConsumer(object):
122 """A consumer that strips off SGML tags. 123 124 This is meant to be used as a decorator for other consumers. 125 126 """
127 - def __init__(self, consumer):
128 if type(consumer) is not InstanceType: 129 raise ValueError("consumer should be an instance") 130 self._consumer = consumer 131 self._prev_attr = None 132 self._stripper = File.SGMLStripper()
133
134 - def _apply_clean_data(self, data):
135 clean = self._stripper.strip(data) 136 self._prev_attr(clean)
137
138 - def __getattr__(self, name):
139 if name in ['_prev_attr', '_stripper']: 140 return getattr(self, name) 141 attr = getattr(self._consumer, name) 142 # If this is not a method, then return it as is. 143 if type(attr) is not MethodType: 144 return attr 145 # If it's a section method, then return it. 146 if name[:6] == 'start_' or name[:4] == 'end_': 147 return attr 148 # Otherwise, it's an info event, and return my method. 149 self._prev_attr = attr 150 return self._apply_clean_data
151 152 # onle use the Event Generator if XML handling is okay 153 if xml_support:
154 - class EventGenerator(handler.ContentHandler):
155 """Handler to generate events associated with a Martel parsed file. 156 157 This acts like a normal SAX handler, and accepts XML generated by 158 Martel during parsing. These events are then converted into 159 'Biopython events', which can then be caught by a standard 160 biopython consumer. 161 162 Note that Martel is now DEPRECATED. 163 """
164 - def __init__(self, consumer, interest_tags, callback_finalizer = None, 165 exempt_tags = []):
166 """Initialize to begin catching and firing off events. 167 168 Arguments: 169 o consumer - The consumer that we'll send Biopython events to. 170 171 o interest_tags - A listing of all the tags we are interested in. 172 173 o callback_finalizer - A function to deal with the collected 174 information before passing it on to the consumer. By default 175 the collected information is a list of all of the lines read 176 for a particular tag -- if there are multiple tags in a row 177 like: 178 179 <some_info>Spam<some_info> 180 <some_info>More Spam<some_info> 181 182 In this case the list of information would be: 183 184 ['Spam', 'More Spam'] 185 186 This list of lines will be passed to the callback finalizer if 187 it is present. Otherwise the consumer will be called with the 188 list of content information. 189 190 o exempt_tags - A listing of particular tags that are exempt from 191 being processed by the callback_finalizer. This allows you to 192 use a finalizer to deal with most tags, but leave those you don't 193 want touched. 194 """ 195 self._consumer = consumer 196 self.interest_tags = interest_tags 197 self._finalizer = callback_finalizer 198 self._exempt_tags = exempt_tags 199 200 # a dictionary of content for each tag of interest 201 # the information for each tag is held as a list of the lines. 202 # This allows us to collect information from multiple tags 203 # in a row, and return it all at once. 204 self.info = {} 205 for tag in self.interest_tags: 206 self.info[tag] = [] 207 208 # the previous tag we were collecting information for. 209 # We set a delay in sending info to the consumer so that we can 210 # collect a bunch of tags in a row and append all of the info 211 # together. 212 self._previous_tag = '' 213 214 # the current character information for a tag 215 self._cur_content = [] 216 # whether we should be collecting information 217 self._collect_characters = 0
218
219 - def startElement(self, name, attrs):
220 """Determine if we should collect characters from this tag. 221 """ 222 if name in self.interest_tags: 223 self._collect_characters = 1
224
225 - def characters(self, content):
226 """Extract the information if we are interested in it. 227 """ 228 if self._collect_characters: 229 self._cur_content.append(content)
230
231 - def endElement(self, name):
232 """Send the information to the consumer. 233 234 Once we've got the end element we've collected up all of the 235 character information we need, and we need to send this on to 236 the consumer to do something with it. 237 238 We have a delay of one tag on doing this, so that we can collect 239 all of the info from multiple calls to the same element at once. 240 """ 241 # only deal with the tag if it is something we are 242 # interested in and potentially have information for 243 if self._collect_characters: 244 # add all of the information collected inside this tag 245 self.info[name].append("".join(self._cur_content)) 246 # reset our information and flags 247 self._cur_content = [] 248 self._collect_characters = 0 249 250 # if we are at a new tag, pass on the info from the last tag 251 if self._previous_tag and self._previous_tag != name: 252 self._make_callback(self._previous_tag) 253 254 # set this tag as the next to be passed 255 self._previous_tag = name
256
257 - def _make_callback(self, name):
258 """Call the callback function with the info with the given name. 259 """ 260 # strip off whitespace and call the consumer 261 callback_function = getattr(self._consumer, name) 262 263 # --- pass back the information 264 # if there is a finalizer, use that 265 if self._finalizer is not None and name not in self._exempt_tags: 266 info_to_pass = self._finalizer(self.info[name]) 267 # otherwise pass back the entire list of information 268 else: 269 info_to_pass = self.info[name] 270 271 callback_function(info_to_pass) 272 273 # reset the information for the tag 274 self.info[name] = []
275
276 - def endDocument(self):
277 """Make sure all of our information has been passed. 278 279 This just flushes out any stored tags that need to be passed. 280 """ 281 if self._previous_tag: 282 self._make_callback(self._previous_tag)
283
284 -def read_and_call(uhandle, method, **keywds):
285 """read_and_call(uhandle, method[, start][, end][, contains][, blank][, has_re]) 286 287 Read a line from uhandle, check it, and pass it to the method. 288 Raises a ValueError if the line does not pass the checks. 289 290 start, end, contains, blank, and has_re specify optional conditions 291 that the line must pass. start and end specifies what the line must 292 begin or end with (not counting EOL characters). contains 293 specifies a substring that must be found in the line. If blank 294 is a true value, then the line must be blank. has_re should be 295 a regular expression object with a pattern that the line must match 296 somewhere. 297 298 """ 299 line = safe_readline(uhandle) 300 errmsg = _fails_conditions(*(line,), **keywds) 301 if errmsg is not None: 302 raise ValueError(errmsg) 303 method(line)
304
305 -def read_and_call_while(uhandle, method, **keywds):
306 """read_and_call_while(uhandle, method[, start][, end][, contains][, blank][, has_re]) -> number of lines 307 308 Read a line from uhandle and pass it to the method as long as 309 some condition is true. Returns the number of lines that were read. 310 311 See the docstring for read_and_call for a description of the parameters. 312 313 """ 314 nlines = 0 315 while 1: 316 line = safe_readline(uhandle) 317 # If I've failed the condition, then stop reading the line. 318 if _fails_conditions(*(line,), **keywds): 319 uhandle.saveline(line) 320 break 321 method(line) 322 nlines = nlines + 1 323 return nlines
324
325 -def read_and_call_until(uhandle, method, **keywds):
326 """read_and_call_until(uhandle, method, 327 start=None, end=None, contains=None, blank=None) -> number of lines 328 329 Read a line from uhandle and pass it to the method until 330 some condition is true. Returns the number of lines that were read. 331 332 See the docstring for read_and_call for a description of the parameters. 333 334 """ 335 nlines = 0 336 while 1: 337 line = safe_readline(uhandle) 338 # If I've met the condition, then stop reading the line. 339 if not _fails_conditions(*(line,), **keywds): 340 uhandle.saveline(line) 341 break 342 method(line) 343 nlines = nlines + 1 344 return nlines
345
346 -def attempt_read_and_call(uhandle, method, **keywds):
347 """attempt_read_and_call(uhandle, method, **keywds) -> boolean 348 349 Similar to read_and_call, but returns a boolean specifying 350 whether the line has passed the checks. Does not raise 351 exceptions. 352 353 See docs for read_and_call for a description of the function 354 arguments. 355 356 """ 357 line = safe_readline(uhandle) 358 passed = not _fails_conditions(*(line,), **keywds) 359 if passed: 360 method(line) 361 else: 362 uhandle.saveline(line) 363 return passed
364
365 -def _fails_conditions(line, start=None, end=None, contains=None, blank=None, 366 has_re=None):
367 if start is not None: 368 if line[:len(start)] != start: 369 return "Line does not start with '%s':\n%s" % (start, line) 370 if end is not None: 371 if line.rstrip()[-len(end):] != end: 372 return "Line does not end with '%s':\n%s" % (end, line) 373 if contains is not None: 374 if line.find(contains) == -1: 375 return "Line does not contain '%s':\n%s" % (contains, line) 376 if blank is not None: 377 if blank: 378 if not is_blank_line(line): 379 return "Expected blank line, but got:\n%s" % line 380 else: 381 if is_blank_line(line): 382 return "Expected non-blank line, but got a blank one" 383 if has_re is not None: 384 if has_re.search(line) is None: 385 return "Line does not match regex '%s':\n%s" % ( 386 has_re.pattern, line) 387 return None
388
389 -def is_blank_line(line, allow_spaces=0):
390 """is_blank_line(line, allow_spaces=0) -> boolean 391 392 Return whether a line is blank. allow_spaces specifies whether to 393 allow whitespaces in a blank line. A true value signifies that a 394 line containing whitespaces as well as end-of-line characters 395 should be considered blank. 396 397 """ 398 if not line: 399 return 1 400 if allow_spaces: 401 return line.rstrip() == '' 402 return line[0] == '\n' or line[0] == '\r'
403
404 -def safe_readline(handle):
405 """safe_readline(handle) -> line 406 407 Read a line from an UndoHandle and return it. If there are no more 408 lines to read, I will raise a ValueError. 409 410 """ 411 line = handle.readline() 412 if not line: 413 raise ValueError("Unexpected end of stream.") 414 return line
415
416 -def safe_peekline(handle):
417 """safe_peekline(handle) -> line 418 419 Peek at the next line in an UndoHandle and return it. If there are no 420 more lines to peek, I will raise a ValueError. 421 422 """ 423 line = handle.peekline() 424 if not line: 425 raise ValueError("Unexpected end of stream.") 426 return line
427