1 """Deal with representations of Markov Models.
2 """
3
4 import copy
5 import math
6 import random
7
8
9
10
11
12 from Bio.Seq import MutableSeq
13
15 """ Return an array of n random numbers, where the elements of the array sum
16 to 1.0"""
17 randArray = [random.random() for i in range(n)]
18 total = sum(randArray)
19 normalizedRandArray = [x/total for x in randArray]
20
21 return normalizedRandArray
22
24 """Calculate which symbols can be emitted in each state
25 """
26
27
28 emissions = dict()
29 for state, symbol in emission_probs:
30 try:
31 emissions[state].append(symbol)
32 except KeyError:
33 emissions[state] = [symbol]
34
35 return emissions
36
38 """Calculate which 'from transitions' are allowed for each state
39
40 This looks through all of the trans_probs, and uses this dictionary
41 to determine allowed transitions. It converts this information into
42 a dictionary, whose keys are source states and whose values are
43 lists of destination states reachable from the source state via a
44 transition.
45 """
46 transitions = dict()
47 for from_state, to_state in trans_probs:
48 try:
49 transitions[from_state].append(to_state)
50 except KeyError:
51 transitions[from_state] = [to_state]
52
53 return transitions
54
56 """Calculate which 'to transitions' are allowed for each state
57
58 This looks through all of the trans_probs, and uses this dictionary
59 to determine allowed transitions. It converts this information into
60 a dictionary, whose keys are destination states and whose values are
61 lists of source states from which the destination is reachable via a
62 transition.
63 """
64 transitions = dict()
65 for from_state, to_state in trans_probs:
66 try:
67 transitions[to_state].append(from_state)
68 except KeyError:
69 transitions[to_state] = [from_state]
70
71 return transitions
72
74 """Interface to build up a Markov Model.
75
76 This class is designed to try to separate the task of specifying the
77 Markov Model from the actual model itself. This is in hopes of making
78 the actual Markov Model classes smaller.
79
80 So, this builder class should be used to create Markov models instead
81 of trying to initiate a Markov Model directly.
82 """
83
84 DEFAULT_PSEUDO = 1
85
86 - def __init__(self, state_alphabet, emission_alphabet):
87 """Initialize a builder to create Markov Models.
88
89 Arguments:
90
91 o state_alphabet -- An alphabet containing all of the letters that
92 can appear in the states
93
94 o emission_alphabet -- An alphabet containing all of the letters for
95 states that can be emitted by the HMM.
96 """
97 self._state_alphabet = state_alphabet
98 self._emission_alphabet = emission_alphabet
99
100
101
102 self.initial_prob = {}
103
104
105
106 self.transition_prob = {}
107 self.emission_prob = self._all_blank(state_alphabet,
108 emission_alphabet)
109
110
111 self.transition_pseudo = {}
112 self.emission_pseudo = self._all_pseudo(state_alphabet,
113 emission_alphabet)
114
115 - def _all_blank(self, first_alphabet, second_alphabet):
116 """Return a dictionary with all counts set to zero.
117
118 This uses the letters in the first and second alphabet to create
119 a dictionary with keys of two tuples organized as
120 (letter of first alphabet, letter of second alphabet). The values
121 are all set to 0.
122 """
123 all_blank = {}
124 for first_state in first_alphabet.letters:
125 for second_state in second_alphabet.letters:
126 all_blank[(first_state, second_state)] = 0
127
128 return all_blank
129
130 - def _all_pseudo(self, first_alphabet, second_alphabet):
131 """Return a dictionary with all counts set to a default value.
132
133 This takes the letters in first alphabet and second alphabet and
134 creates a dictionary with keys of two tuples organized as:
135 (letter of first alphabet, letter of second alphabet). The values
136 are all set to the value of the class attribute DEFAULT_PSEUDO.
137 """
138 all_counts = {}
139 for first_state in first_alphabet.letters:
140 for second_state in second_alphabet.letters:
141 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO
142
143 return all_counts
144
146 """Return the markov model corresponding with the current parameters.
147
148 Each markov model returned by a call to this function is unique
149 (ie. they don't influence each other).
150 """
151
152
153 if not self.initial_prob:
154 raise Exception("set_initial_probabilities must be called to " +
155 "fully initialize the Markov model")
156
157 initial_prob = copy.deepcopy(self.initial_prob)
158 transition_prob = copy.deepcopy(self.transition_prob)
159 emission_prob = copy.deepcopy(self.emission_prob)
160 transition_pseudo = copy.deepcopy(self.transition_pseudo)
161 emission_pseudo = copy.deepcopy(self.emission_pseudo)
162
163 return HiddenMarkovModel(initial_prob, transition_prob, emission_prob,
164 transition_pseudo, emission_pseudo)
165
167 """Set initial state probabilities.
168
169 initial_prob is a dictionary mapping states to probabilities.
170 Suppose, for example, that the state alphabet is ['A', 'B']. Call
171 set_initial_prob({'A': 1}) to guarantee that the initial
172 state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5})
173 to make each initial state equally probable.
174
175 This method must now be called in order to use the Markov model
176 because the calculation of initial probabilities has changed
177 incompatibly; the previous calculation was incorrect.
178
179 If initial probabilities are set for all states, then they should add up
180 to 1. Otherwise the sum should be <= 1. The residual probability is
181 divided up evenly between all the states for which the initial
182 probability has not been set. For example, calling
183 set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5,
184 for the above example.
185 """
186 self.initial_prob = copy.copy(initial_prob)
187
188
189 for state in initial_prob.iterkeys():
190 assert state in self._state_alphabet.letters, \
191 "State %s was not found in the sequence alphabet" % state
192
193
194 num_states_not_set =\
195 len(self._state_alphabet.letters) - len(self.initial_prob)
196 if num_states_not_set < 0:
197 raise Exception("Initial probabilities can't exceed # of states")
198 prob_sum = sum(self.initial_prob.values())
199 if prob_sum > 1.0:
200 raise Exception("Total initial probability cannot exceed 1.0")
201 if num_states_not_set > 0:
202 prob = (1.0 - prob_sum) / num_states_not_set
203 for state in self._state_alphabet.letters:
204 if not state in self.initial_prob:
205 self.initial_prob[state] = prob
206
208 """Reset all probabilities to be an average value.
209
210 Resets the values of all initial probabilities and all allowed
211 transitions and all allowed emissions to be equal to 1 divided by the
212 number of possible elements.
213
214 This is useful if you just want to initialize a Markov Model to
215 starting values (ie. if you have no prior notions of what the
216 probabilities should be -- or if you are just feeling too lazy
217 to calculate them :-).
218
219 Warning 1 -- this will reset all currently set probabilities.
220
221 Warning 2 -- This just sets all probabilities for transitions and
222 emissions to total up to 1, so it doesn't ensure that the sum of
223 each set of transitions adds up to 1.
224 """
225
226
227 new_initial_prob = float(1) / float(len(self.transition_prob))
228 for state in self._state_alphabet.letters:
229 self.initial_prob[state] = new_initial_prob
230
231
232 new_trans_prob = float(1) / float(len(self.transition_prob))
233 for key in self.transition_prob:
234 self.transition_prob[key] = new_trans_prob
235
236
237 new_emission_prob = float(1) / float(len(self.emission_prob))
238 for key in self.emission_prob:
239 self.emission_prob[key] = new_emission_prob
240
241
243 """Set all initial state probabilities to a randomly generated distribution.
244 Returns the dictionary containing the initial probabilities.
245 """
246 initial_freqs = _gen_random_array(len(self._state_alphabet.letters))
247 for state in self._state_alphabet.letters:
248 self.initial_prob[state] = initial_freqs.pop()
249
250 return self.initial_prob
251
253 """Set all allowed transition probabilities to a randomly generated distribution.
254 Returns the dictionary containing the transition probabilities.
255 """
256
257 if not self.transition_prob:
258 raise Exception("No transitions have been allowed yet. " +
259 "Allow some or all transitions by calling " +
260 "allow_transition or allow_all_transitions first.")
261
262 transitions_from = _calculate_from_transitions(self.transition_prob)
263 for from_state in transitions_from.keys():
264 freqs = _gen_random_array(len(transitions_from[from_state]))
265 for to_state in transitions_from[from_state]:
266 self.transition_prob[(from_state, to_state)] = freqs.pop()
267
268 return self.transition_prob
269
271 """Set all allowed emission probabilities to a randomly generated
272 distribution. Returns the dictionary containing the emission
273 probabilities.
274 """
275
276 if not self.emission_prob:
277 raise Exception("No emissions have been allowed yet. " +
278 "Allow some or all emissions.")
279
280 emissions = _calculate_emissions(self.emission_prob)
281 for state in emissions.iterkeys():
282 freqs = _gen_random_array(len(emissions[state]))
283 for symbol in emissions[state]:
284 self.emission_prob[(state, symbol)] = freqs.pop()
285
286 return self.emission_prob
287
288
298
299
300
302 """A convenience function to create transitions between all states.
303
304 By default all transitions within the alphabet are disallowed; this
305 is a way to change this to allow all possible transitions.
306 """
307
308
309 all_probs = self._all_blank(self._state_alphabet,
310 self._state_alphabet)
311
312 all_pseudo = self._all_pseudo(self._state_alphabet,
313 self._state_alphabet)
314
315
316
317 for set_key in self.transition_prob:
318 all_probs[set_key] = self.transition_prob[set_key]
319
320 for set_key in self.transition_pseudo:
321 all_pseudo[set_key] = self.transition_pseudo[set_key]
322
323
324 self.transition_prob = all_probs
325 self.transition_pseudo = all_pseudo
326
327 - def allow_transition(self, from_state, to_state, probability = None,
328 pseudocount = None):
329 """Set a transition as being possible between the two states.
330
331 probability and pseudocount are optional arguments
332 specifying the probabilities and pseudo counts for the transition.
333 If these are not supplied, then the values are set to the
334 default values.
335
336 Raises:
337 KeyError -- if the two states already have an allowed transition.
338 """
339
340 for state in [from_state, to_state]:
341 assert state in self._state_alphabet.letters, \
342 "State %s was not found in the sequence alphabet" % state
343
344
345 if ((from_state, to_state) not in self.transition_prob and
346 (from_state, to_state) not in self.transition_pseudo):
347
348 if probability is None:
349 probability = 0
350 self.transition_prob[(from_state, to_state)] = probability
351
352
353 if pseudocount is None:
354 pseudcount = self.DEFAULT_PSEUDO
355 self.transition_pseudo[(from_state, to_state)] = pseudocount
356 else:
357 raise KeyError("Transition from %s to %s is already allowed."
358 % (from_state, to_state))
359
361 """Restrict transitions between the two states.
362
363 Raises:
364 KeyError if the transition is not currently allowed.
365 """
366 try:
367 del self.transition_prob[(from_state, to_state)]
368 del self.transition_pseudo[(from_state, to_state)]
369 except KeyError:
370 raise KeyError("Transition from %s to %s is already disallowed."
371 % (from_state, to_state))
372
374 """Set the probability of a transition between two states.
375
376 Raises:
377 KeyError if the transition is not allowed.
378 """
379 if (from_state, to_state) in self.transition_prob:
380 self.transition_prob[(from_state, to_state)] = probability
381 else:
382 raise KeyError("Transition from %s to %s is not allowed."
383 % (from_state, to_state))
384
386 """Set the default pseudocount for a transition.
387
388 To avoid computational problems, it is helpful to be able to
389 set a 'default' pseudocount to start with for estimating
390 transition and emission probabilities (see p62 in Durbin et al
391 for more discussion on this. By default, all transitions have
392 a pseudocount of 1.
393
394 Raises:
395 KeyError if the transition is not allowed.
396 """
397 if (from_state, to_state) in self.transition_pseudo:
398 self.transition_pseudo[(from_state, to_state)] = count
399 else:
400 raise KeyError("Transition from %s to %s is not allowed."
401 % (from_state, to_state))
402
403
404
406 """Set the probability of a emission from a particular state.
407
408 Raises:
409 KeyError if the emission from the given state is not allowed.
410 """
411 if (seq_state, emission_state) in self.emission_prob:
412 self.emission_prob[(seq_state, emission_state)] = probability
413 else:
414 raise KeyError("Emission of %s from %s is not allowed."
415 % (emission_state, seq_state))
416
418 """Set the default pseudocount for an emission.
419
420 To avoid computational problems, it is helpful to be able to
421 set a 'default' pseudocount to start with for estimating
422 transition and emission probabilities (see p62 in Durbin et al
423 for more discussion on this. By default, all emissions have
424 a pseudocount of 1.
425
426 Raises:
427 KeyError if the emission from the given state is not allowed.
428 """
429 if (seq_state, emission_state) in self.emission_pseudo:
430 self.emission_pseudo[(seq_state, emission_state)] = count
431 else:
432 raise KeyError("Emission of %s from %s is not allowed."
433 % (emission_state, seq_state))
434
436 """Represent a hidden markov model that can be used for state estimation.
437 """
438 - def __init__(self, initial_prob, transition_prob, emission_prob,
439 transition_pseudo, emission_pseudo):
440 """Initialize a Markov Model.
441
442 Note: You should use the MarkovModelBuilder class instead of
443 initiating this class directly.
444
445 Arguments:
446
447 o initial_prob - A dictionary of initial probabilities for all states.
448
449 o transition_prob -- A dictionary of transition probabilities for all
450 possible transitions in the sequence.
451
452 o emission_prob -- A dictionary of emission probabilities for all
453 possible emissions from the sequence states.
454
455 o transition_pseudo -- Pseudo-counts to be used for the transitions,
456 when counting for purposes of estimating transition probabilities.
457
458 o emission_pseudo -- Pseudo-counts to be used for the emissions,
459 when counting for purposes of estimating emission probabilities.
460 """
461
462 self.initial_prob = initial_prob
463
464 self._transition_pseudo = transition_pseudo
465 self._emission_pseudo = emission_pseudo
466
467 self.transition_prob = transition_prob
468 self.emission_prob = emission_prob
469
470
471
472
473 self._transitions_from = \
474 _calculate_from_transitions(self.transition_prob)
475
476
477
478
479 self._transitions_to = \
480 _calculate_to_transitions(self.transition_prob)
481
482
484 """Get the default transitions for the model.
485
486 Returns a dictionary of all of the default transitions between any
487 two letters in the sequence alphabet. The dictionary is structured
488 with keys as (letter1, letter2) and values as the starting number
489 of transitions.
490 """
491 return self._transition_pseudo
492
494 """Get the starting default emmissions for each sequence.
495
496 This returns a dictionary of the default emmissions for each
497 letter. The dictionary is structured with keys as
498 (seq_letter, emmission_letter) and values as the starting number
499 of emmissions.
500 """
501 return self._emission_pseudo
502
504 """Get all destination states to which there are transitions from the
505 state_letter source state.
506
507 This returns all letters which the given state_letter can transition
508 to. An empty list is returned if state_letter has no outgoing
509 transitions.
510 """
511 if state_letter in self._transitions_from:
512 return self._transitions_from[state_letter]
513 else:
514 return []
515
517 """Get all source states from which there are transitions to the
518 state_letter destination state.
519
520 This returns all letters which the given state_letter is reachable
521 from. An empty list is returned if state_letter is unreachable.
522 """
523 if state_letter in self._transitions_to:
524 return self._transitions_to[state_letter]
525 else:
526 return []
527
528 - def viterbi(self, sequence, state_alphabet):
529 """Calculate the most probable state path using the Viterbi algorithm.
530
531 This implements the Viterbi algorithm (see pgs 55-57 in Durbin et
532 al for a full explanation -- this is where I took my implementation
533 ideas from), to allow decoding of the state path, given a sequence
534 of emissions.
535
536 Arguments:
537
538 o sequence -- A Seq object with the emission sequence that we
539 want to decode.
540
541 o state_alphabet -- The alphabet of the possible state sequences
542 that can be generated.
543 """
544
545
546 log_initial = self._log_transform(self.initial_prob)
547 log_trans = self._log_transform(self.transition_prob)
548 log_emission = self._log_transform(self.emission_prob)
549
550 viterbi_probs = {}
551 pred_state_seq = {}
552 state_letters = state_alphabet.letters
553
554
555
556
557
558
559 for i in range(0, len(sequence)):
560
561 for cur_state in state_letters:
562
563 emission_part = log_emission[(cur_state, sequence[i])]
564
565 max_prob = 0
566 if i == 0:
567
568
569 max_prob = log_initial[cur_state]
570 else:
571
572 possible_state_probs = {}
573 for prev_state in self.transitions_to(cur_state):
574
575 trans_part = log_trans[(prev_state, cur_state)]
576
577
578 viterbi_part = viterbi_probs[(prev_state, i - 1)]
579 cur_prob = viterbi_part + trans_part
580
581 possible_state_probs[prev_state] = cur_prob
582
583
584 max_prob = max(possible_state_probs.values())
585
586
587 viterbi_probs[(cur_state, i)] = (emission_part + max_prob)
588
589 if i > 0:
590
591 for state in possible_state_probs:
592 if possible_state_probs[state] == max_prob:
593 pred_state_seq[(i - 1, cur_state)] = state
594 break
595
596
597
598
599 all_probs = {}
600 for state in state_letters:
601
602 all_probs[state] = viterbi_probs[(state, len(sequence) - 1)]
603
604 state_path_prob = max(all_probs.values())
605
606
607 last_state = ''
608 for state in all_probs:
609 if all_probs[state] == state_path_prob:
610 last_state = state
611
612 assert last_state != '', "Didn't find the last state to trace from!"
613
614
615 traceback_seq = MutableSeq('', state_alphabet)
616
617 loop_seq = range(1, len(sequence))
618 loop_seq.reverse()
619
620
621
622
623
624 state = last_state
625 traceback_seq.append(state)
626 for i in loop_seq:
627 state = pred_state_seq[(i - 1, state)]
628 traceback_seq.append(state)
629
630
631 traceback_seq.reverse()
632
633 return traceback_seq.toseq(), state_path_prob
634
658