Package Bio :: Package HMM :: Module MarkovModel
[hide private]
[frames] | no frames]

Source Code for Module Bio.HMM.MarkovModel

  1  """Deal with representations of Markov Models. 
  2  """ 
  3  # standard modules 
  4  import copy 
  5  import math 
  6  import random 
  7   
  8  #TODO - Take advantage of defaultdict once Python 2.4 is dead? 
  9  #from collections import defaultdict 
 10   
 11  # biopython 
 12  from Bio.Seq import MutableSeq 
 13   
14 -def _gen_random_array(n):
15 """ Return an array of n random numbers, where the elements of the array sum 16 to 1.0""" 17 randArray = [random.random() for i in range(n)] 18 total = sum(randArray) 19 normalizedRandArray = [x/total for x in randArray] 20 21 return normalizedRandArray
22
23 -def _calculate_emissions(emission_probs):
24 """Calculate which symbols can be emitted in each state 25 """ 26 # loop over all of the state-symbol duples, mapping states to 27 # lists of emitted symbols 28 emissions = dict() 29 for state, symbol in emission_probs: 30 try: 31 emissions[state].append(symbol) 32 except KeyError: 33 emissions[state] = [symbol] 34 35 return emissions
36
37 -def _calculate_from_transitions(trans_probs):
38 """Calculate which 'from transitions' are allowed for each state 39 40 This looks through all of the trans_probs, and uses this dictionary 41 to determine allowed transitions. It converts this information into 42 a dictionary, whose keys are source states and whose values are 43 lists of destination states reachable from the source state via a 44 transition. 45 """ 46 transitions = dict() 47 for from_state, to_state in trans_probs: 48 try: 49 transitions[from_state].append(to_state) 50 except KeyError: 51 transitions[from_state] = [to_state] 52 53 return transitions
54
55 -def _calculate_to_transitions(trans_probs):
56 """Calculate which 'to transitions' are allowed for each state 57 58 This looks through all of the trans_probs, and uses this dictionary 59 to determine allowed transitions. It converts this information into 60 a dictionary, whose keys are destination states and whose values are 61 lists of source states from which the destination is reachable via a 62 transition. 63 """ 64 transitions = dict() 65 for from_state, to_state in trans_probs: 66 try: 67 transitions[to_state].append(from_state) 68 except KeyError: 69 transitions[to_state] = [from_state] 70 71 return transitions
72
73 -class MarkovModelBuilder(object):
74 """Interface to build up a Markov Model. 75 76 This class is designed to try to separate the task of specifying the 77 Markov Model from the actual model itself. This is in hopes of making 78 the actual Markov Model classes smaller. 79 80 So, this builder class should be used to create Markov models instead 81 of trying to initiate a Markov Model directly. 82 """ 83 # the default pseudo counts to use 84 DEFAULT_PSEUDO = 1 85
86 - def __init__(self, state_alphabet, emission_alphabet):
87 """Initialize a builder to create Markov Models. 88 89 Arguments: 90 91 o state_alphabet -- An alphabet containing all of the letters that 92 can appear in the states 93 94 o emission_alphabet -- An alphabet containing all of the letters for 95 states that can be emitted by the HMM. 96 """ 97 self._state_alphabet = state_alphabet 98 self._emission_alphabet = emission_alphabet 99 100 # probabilities for the initial state, initialized by calling 101 # set_initial_probabilities (required) 102 self.initial_prob = {} 103 104 # the probabilities for transitions and emissions 105 # by default we have no transitions and all possible emissions 106 self.transition_prob = {} 107 self.emission_prob = self._all_blank(state_alphabet, 108 emission_alphabet) 109 110 # the default pseudocounts for transition and emission counting 111 self.transition_pseudo = {} 112 self.emission_pseudo = self._all_pseudo(state_alphabet, 113 emission_alphabet)
114
115 - def _all_blank(self, first_alphabet, second_alphabet):
116 """Return a dictionary with all counts set to zero. 117 118 This uses the letters in the first and second alphabet to create 119 a dictionary with keys of two tuples organized as 120 (letter of first alphabet, letter of second alphabet). The values 121 are all set to 0. 122 """ 123 all_blank = {} 124 for first_state in first_alphabet.letters: 125 for second_state in second_alphabet.letters: 126 all_blank[(first_state, second_state)] = 0 127 128 return all_blank
129
130 - def _all_pseudo(self, first_alphabet, second_alphabet):
131 """Return a dictionary with all counts set to a default value. 132 133 This takes the letters in first alphabet and second alphabet and 134 creates a dictionary with keys of two tuples organized as: 135 (letter of first alphabet, letter of second alphabet). The values 136 are all set to the value of the class attribute DEFAULT_PSEUDO. 137 """ 138 all_counts = {} 139 for first_state in first_alphabet.letters: 140 for second_state in second_alphabet.letters: 141 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO 142 143 return all_counts
144
145 - def get_markov_model(self):
146 """Return the markov model corresponding with the current parameters. 147 148 Each markov model returned by a call to this function is unique 149 (ie. they don't influence each other). 150 """ 151 152 # user must set initial probabilities 153 if not self.initial_prob: 154 raise Exception("set_initial_probabilities must be called to " + 155 "fully initialize the Markov model") 156 157 initial_prob = copy.deepcopy(self.initial_prob) 158 transition_prob = copy.deepcopy(self.transition_prob) 159 emission_prob = copy.deepcopy(self.emission_prob) 160 transition_pseudo = copy.deepcopy(self.transition_pseudo) 161 emission_pseudo = copy.deepcopy(self.emission_pseudo) 162 163 return HiddenMarkovModel(initial_prob, transition_prob, emission_prob, 164 transition_pseudo, emission_pseudo)
165
166 - def set_initial_probabilities(self, initial_prob):
167 """Set initial state probabilities. 168 169 initial_prob is a dictionary mapping states to probabilities. 170 Suppose, for example, that the state alphabet is ['A', 'B']. Call 171 set_initial_prob({'A': 1}) to guarantee that the initial 172 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5}) 173 to make each initial state equally probable. 174 175 This method must now be called in order to use the Markov model 176 because the calculation of initial probabilities has changed 177 incompatibly; the previous calculation was incorrect. 178 179 If initial probabilities are set for all states, then they should add up 180 to 1. Otherwise the sum should be <= 1. The residual probability is 181 divided up evenly between all the states for which the initial 182 probability has not been set. For example, calling 183 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5, 184 for the above example. 185 """ 186 self.initial_prob = copy.copy(initial_prob) 187 188 # ensure that all referenced states are valid 189 for state in initial_prob.iterkeys(): 190 assert state in self._state_alphabet.letters, \ 191 "State %s was not found in the sequence alphabet" % state 192 193 # distribute the residual probability, if any 194 num_states_not_set =\ 195 len(self._state_alphabet.letters) - len(self.initial_prob) 196 if num_states_not_set < 0: 197 raise Exception("Initial probabilities can't exceed # of states") 198 prob_sum = sum(self.initial_prob.values()) 199 if prob_sum > 1.0: 200 raise Exception("Total initial probability cannot exceed 1.0") 201 if num_states_not_set > 0: 202 prob = (1.0 - prob_sum) / num_states_not_set 203 for state in self._state_alphabet.letters: 204 if not state in self.initial_prob: 205 self.initial_prob[state] = prob
206
207 - def set_equal_probabilities(self):
208 """Reset all probabilities to be an average value. 209 210 Resets the values of all initial probabilities and all allowed 211 transitions and all allowed emissions to be equal to 1 divided by the 212 number of possible elements. 213 214 This is useful if you just want to initialize a Markov Model to 215 starting values (ie. if you have no prior notions of what the 216 probabilities should be -- or if you are just feeling too lazy 217 to calculate them :-). 218 219 Warning 1 -- this will reset all currently set probabilities. 220 221 Warning 2 -- This just sets all probabilities for transitions and 222 emissions to total up to 1, so it doesn't ensure that the sum of 223 each set of transitions adds up to 1. 224 """ 225 226 # set initial state probabilities 227 new_initial_prob = float(1) / float(len(self.transition_prob)) 228 for state in self._state_alphabet.letters: 229 self.initial_prob[state] = new_initial_prob 230 231 # set the transitions 232 new_trans_prob = float(1) / float(len(self.transition_prob)) 233 for key in self.transition_prob: 234 self.transition_prob[key] = new_trans_prob 235 236 # set the emissions 237 new_emission_prob = float(1) / float(len(self.emission_prob)) 238 for key in self.emission_prob: 239 self.emission_prob[key] = new_emission_prob
240 241
243 """Set all initial state probabilities to a randomly generated distribution. 244 Returns the dictionary containing the initial probabilities. 245 """ 246 initial_freqs = _gen_random_array(len(self._state_alphabet.letters)) 247 for state in self._state_alphabet.letters: 248 self.initial_prob[state] = initial_freqs.pop() 249 250 return self.initial_prob
251
253 """Set all allowed transition probabilities to a randomly generated distribution. 254 Returns the dictionary containing the transition probabilities. 255 """ 256 257 if not self.transition_prob: 258 raise Exception("No transitions have been allowed yet. " + 259 "Allow some or all transitions by calling " + 260 "allow_transition or allow_all_transitions first.") 261 262 transitions_from = _calculate_from_transitions(self.transition_prob) 263 for from_state in transitions_from.keys(): 264 freqs = _gen_random_array(len(transitions_from[from_state])) 265 for to_state in transitions_from[from_state]: 266 self.transition_prob[(from_state, to_state)] = freqs.pop() 267 268 return self.transition_prob
269
271 """Set all allowed emission probabilities to a randomly generated 272 distribution. Returns the dictionary containing the emission 273 probabilities. 274 """ 275 276 if not self.emission_prob: 277 raise Exception("No emissions have been allowed yet. " + 278 "Allow some or all emissions.") 279 280 emissions = _calculate_emissions(self.emission_prob) 281 for state in emissions.iterkeys(): 282 freqs = _gen_random_array(len(emissions[state])) 283 for symbol in emissions[state]: 284 self.emission_prob[(state, symbol)] = freqs.pop() 285 286 return self.emission_prob
287 288
289 - def set_random_probabilities(self):
290 """Set all probabilities to randomly generated numbers. 291 292 Resets probabilities of all initial states, transitions, and 293 emissions to random values. 294 """ 295 self.set_random_initial_probabilities() 296 self.set_random_transition_probabilities() 297 self.set_random_emission_probabilities()
298 299 # --- functions to deal with the transitions in the sequence 300
301 - def allow_all_transitions(self):
302 """A convenience function to create transitions between all states. 303 304 By default all transitions within the alphabet are disallowed; this 305 is a way to change this to allow all possible transitions. 306 """ 307 # first get all probabilities and pseudo counts set 308 # to the default values 309 all_probs = self._all_blank(self._state_alphabet, 310 self._state_alphabet) 311 312 all_pseudo = self._all_pseudo(self._state_alphabet, 313 self._state_alphabet) 314 315 # now set any probabilities and pseudo counts that 316 # were previously set 317 for set_key in self.transition_prob: 318 all_probs[set_key] = self.transition_prob[set_key] 319 320 for set_key in self.transition_pseudo: 321 all_pseudo[set_key] = self.transition_pseudo[set_key] 322 323 # finally reinitialize the transition probs and pseudo counts 324 self.transition_prob = all_probs 325 self.transition_pseudo = all_pseudo
326
327 - def allow_transition(self, from_state, to_state, probability = None, 328 pseudocount = None):
329 """Set a transition as being possible between the two states. 330 331 probability and pseudocount are optional arguments 332 specifying the probabilities and pseudo counts for the transition. 333 If these are not supplied, then the values are set to the 334 default values. 335 336 Raises: 337 KeyError -- if the two states already have an allowed transition. 338 """ 339 # check the sanity of adding these states 340 for state in [from_state, to_state]: 341 assert state in self._state_alphabet.letters, \ 342 "State %s was not found in the sequence alphabet" % state 343 344 # ensure that the states are not already set 345 if ((from_state, to_state) not in self.transition_prob and 346 (from_state, to_state) not in self.transition_pseudo): 347 # set the initial probability 348 if probability is None: 349 probability = 0 350 self.transition_prob[(from_state, to_state)] = probability 351 352 # set the initial pseudocounts 353 if pseudocount is None: 354 pseudcount = self.DEFAULT_PSEUDO 355 self.transition_pseudo[(from_state, to_state)] = pseudocount 356 else: 357 raise KeyError("Transition from %s to %s is already allowed." 358 % (from_state, to_state))
359
360 - def destroy_transition(self, from_state, to_state):
361 """Restrict transitions between the two states. 362 363 Raises: 364 KeyError if the transition is not currently allowed. 365 """ 366 try: 367 del self.transition_prob[(from_state, to_state)] 368 del self.transition_pseudo[(from_state, to_state)] 369 except KeyError: 370 raise KeyError("Transition from %s to %s is already disallowed." 371 % (from_state, to_state))
372
373 - def set_transition_score(self, from_state, to_state, probability):
374 """Set the probability of a transition between two states. 375 376 Raises: 377 KeyError if the transition is not allowed. 378 """ 379 if (from_state, to_state) in self.transition_prob: 380 self.transition_prob[(from_state, to_state)] = probability 381 else: 382 raise KeyError("Transition from %s to %s is not allowed." 383 % (from_state, to_state))
384
385 - def set_transition_pseudocount(self, from_state, to_state, count):
386 """Set the default pseudocount for a transition. 387 388 To avoid computational problems, it is helpful to be able to 389 set a 'default' pseudocount to start with for estimating 390 transition and emission probabilities (see p62 in Durbin et al 391 for more discussion on this. By default, all transitions have 392 a pseudocount of 1. 393 394 Raises: 395 KeyError if the transition is not allowed. 396 """ 397 if (from_state, to_state) in self.transition_pseudo: 398 self.transition_pseudo[(from_state, to_state)] = count 399 else: 400 raise KeyError("Transition from %s to %s is not allowed." 401 % (from_state, to_state))
402 403 # --- functions to deal with emissions from the sequence 404
405 - def set_emission_score(self, seq_state, emission_state, probability):
406 """Set the probability of a emission from a particular state. 407 408 Raises: 409 KeyError if the emission from the given state is not allowed. 410 """ 411 if (seq_state, emission_state) in self.emission_prob: 412 self.emission_prob[(seq_state, emission_state)] = probability 413 else: 414 raise KeyError("Emission of %s from %s is not allowed." 415 % (emission_state, seq_state))
416
417 - def set_emission_pseudocount(self, seq_state, emission_state, count):
418 """Set the default pseudocount for an emission. 419 420 To avoid computational problems, it is helpful to be able to 421 set a 'default' pseudocount to start with for estimating 422 transition and emission probabilities (see p62 in Durbin et al 423 for more discussion on this. By default, all emissions have 424 a pseudocount of 1. 425 426 Raises: 427 KeyError if the emission from the given state is not allowed. 428 """ 429 if (seq_state, emission_state) in self.emission_pseudo: 430 self.emission_pseudo[(seq_state, emission_state)] = count 431 else: 432 raise KeyError("Emission of %s from %s is not allowed." 433 % (emission_state, seq_state))
434
435 -class HiddenMarkovModel(object):
436 """Represent a hidden markov model that can be used for state estimation. 437 """
438 - def __init__(self, initial_prob, transition_prob, emission_prob, 439 transition_pseudo, emission_pseudo):
440 """Initialize a Markov Model. 441 442 Note: You should use the MarkovModelBuilder class instead of 443 initiating this class directly. 444 445 Arguments: 446 447 o initial_prob - A dictionary of initial probabilities for all states. 448 449 o transition_prob -- A dictionary of transition probabilities for all 450 possible transitions in the sequence. 451 452 o emission_prob -- A dictionary of emission probabilities for all 453 possible emissions from the sequence states. 454 455 o transition_pseudo -- Pseudo-counts to be used for the transitions, 456 when counting for purposes of estimating transition probabilities. 457 458 o emission_pseudo -- Pseudo-counts to be used for the emissions, 459 when counting for purposes of estimating emission probabilities. 460 """ 461 462 self.initial_prob = initial_prob 463 464 self._transition_pseudo = transition_pseudo 465 self._emission_pseudo = emission_pseudo 466 467 self.transition_prob = transition_prob 468 self.emission_prob = emission_prob 469 470 # a dictionary of the possible transitions from each state 471 # each key is a source state, mapped to a list of the destination states 472 # that are reachable from the source state via a transition 473 self._transitions_from = \ 474 _calculate_from_transitions(self.transition_prob) 475 476 # a dictionary of the possible transitions to each state 477 # each key is a destination state, mapped to a list of source states 478 # from which the destination is reachable via a transition 479 self._transitions_to = \ 480 _calculate_to_transitions(self.transition_prob)
481 482
483 - def get_blank_transitions(self):
484 """Get the default transitions for the model. 485 486 Returns a dictionary of all of the default transitions between any 487 two letters in the sequence alphabet. The dictionary is structured 488 with keys as (letter1, letter2) and values as the starting number 489 of transitions. 490 """ 491 return self._transition_pseudo
492
493 - def get_blank_emissions(self):
494 """Get the starting default emmissions for each sequence. 495 496 This returns a dictionary of the default emmissions for each 497 letter. The dictionary is structured with keys as 498 (seq_letter, emmission_letter) and values as the starting number 499 of emmissions. 500 """ 501 return self._emission_pseudo
502
503 - def transitions_from(self, state_letter):
504 """Get all destination states to which there are transitions from the 505 state_letter source state. 506 507 This returns all letters which the given state_letter can transition 508 to. An empty list is returned if state_letter has no outgoing 509 transitions. 510 """ 511 if state_letter in self._transitions_from: 512 return self._transitions_from[state_letter] 513 else: 514 return []
515
516 - def transitions_to(self, state_letter):
517 """Get all source states from which there are transitions to the 518 state_letter destination state. 519 520 This returns all letters which the given state_letter is reachable 521 from. An empty list is returned if state_letter is unreachable. 522 """ 523 if state_letter in self._transitions_to: 524 return self._transitions_to[state_letter] 525 else: 526 return []
527
528 - def viterbi(self, sequence, state_alphabet):
529 """Calculate the most probable state path using the Viterbi algorithm. 530 531 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et 532 al for a full explanation -- this is where I took my implementation 533 ideas from), to allow decoding of the state path, given a sequence 534 of emissions. 535 536 Arguments: 537 538 o sequence -- A Seq object with the emission sequence that we 539 want to decode. 540 541 o state_alphabet -- The alphabet of the possible state sequences 542 that can be generated. 543 """ 544 545 # calculate logarithms of the initial, transition, and emission probs 546 log_initial = self._log_transform(self.initial_prob) 547 log_trans = self._log_transform(self.transition_prob) 548 log_emission = self._log_transform(self.emission_prob) 549 550 viterbi_probs = {} 551 pred_state_seq = {} 552 state_letters = state_alphabet.letters 553 554 # --- recursion 555 # loop over the training squence (i = 1 .. L) 556 # NOTE: My index numbers are one less than what is given in Durbin 557 # et al, since we are indexing the sequence going from 0 to 558 # (Length - 1) not 1 to Length, like in Durbin et al. 559 for i in range(0, len(sequence)): 560 # loop over all of the possible i-th states in the state path 561 for cur_state in state_letters: 562 # e_{l}(x_{i}) 563 emission_part = log_emission[(cur_state, sequence[i])] 564 565 max_prob = 0 566 if i == 0: 567 # for the first state, use the initial probability rather 568 # than looking back to previous states 569 max_prob = log_initial[cur_state] 570 else: 571 # loop over all possible (i-1)-th previous states 572 possible_state_probs = {} 573 for prev_state in self.transitions_to(cur_state): 574 # a_{kl} 575 trans_part = log_trans[(prev_state, cur_state)] 576 577 # v_{k}(i - 1) 578 viterbi_part = viterbi_probs[(prev_state, i - 1)] 579 cur_prob = viterbi_part + trans_part 580 581 possible_state_probs[prev_state] = cur_prob 582 583 # calculate the viterbi probability using the max 584 max_prob = max(possible_state_probs.values()) 585 586 # v_{k}(i) 587 viterbi_probs[(cur_state, i)] = (emission_part + max_prob) 588 589 if i > 0: 590 # get the most likely prev_state leading to cur_state 591 for state in possible_state_probs: 592 if possible_state_probs[state] == max_prob: 593 pred_state_seq[(i - 1, cur_state)] = state 594 break 595 596 # --- termination 597 # calculate the probability of the state path 598 # loop over all states 599 all_probs = {} 600 for state in state_letters: 601 # v_{k}(L) 602 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)] 603 604 state_path_prob = max(all_probs.values()) 605 606 # find the last pointer we need to trace back from 607 last_state = '' 608 for state in all_probs: 609 if all_probs[state] == state_path_prob: 610 last_state = state 611 612 assert last_state != '', "Didn't find the last state to trace from!" 613 614 # --- traceback 615 traceback_seq = MutableSeq('', state_alphabet) 616 617 loop_seq = range(1, len(sequence)) 618 loop_seq.reverse() 619 620 # last_state is the last state in the most probable state sequence. 621 # Compute that sequence by walking backwards in time. From the i-th 622 # state in the sequence, find the (i-1)-th state as the most 623 # probable state preceding the i-th state. 624 state = last_state 625 traceback_seq.append(state) 626 for i in loop_seq: 627 state = pred_state_seq[(i - 1, state)] 628 traceback_seq.append(state) 629 630 # put the traceback sequence in the proper orientation 631 traceback_seq.reverse() 632 633 return traceback_seq.toseq(), state_path_prob
634
635 - def _log_transform(self, probability):
636 """Return log transform of the given probability dictionary. 637 638 When calculating the Viterbi equation, add logs of probabilities rather 639 than multiplying probabilities, to avoid underflow errors. This method 640 returns a new dictionary with the same keys as the given dictionary 641 and log-transformed values. 642 """ 643 log_prob = copy.copy(probability) 644 try: 645 neg_inf = float("-inf") 646 except ValueError: 647 #On Python 2.5 or older that was handled in C code, 648 #and failed on Windows XP 32bit 649 neg_inf = - 1E400 650 for key in log_prob: 651 prob = log_prob[key] 652 if prob > 0: 653 log_prob[key] = math.log(log_prob[key]) 654 else: 655 log_prob[key] = neg_inf 656 657 return log_prob
658