Package flumotion :: Package component :: Module eater
[hide private]

Source Code for Module flumotion.component.eater

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  from twisted.internet import reactor 
 25   
 26  from flumotion.common import componentui 
 27   
 28  __version__ = "$Rev: 8651 $" 
 29   
 30   
31 -class Eater:
32 """ 33 This class groups eater-related information as used by a Feed Component. 34 35 @ivar eaterAlias: the alias of this eater (e.g. "default", "video", 36 ...) 37 @ivar feedId: id of the feed this is eating from 38 @ivar uiState: the serializable UI State for this eater 39 """ 40
41 - def __init__(self, eaterAlias, eaterName):
42 self.eaterAlias = eaterAlias 43 self.eaterName = eaterName 44 self.feedId = None 45 self.fd = None 46 self.elementName = 'eater:' + eaterAlias 47 self.depayName = self.elementName + '-depay' 48 # for use to detect duplicate streamheader buffers 49 self.streamheaderBufferProbeHandler = None 50 self.streamheader = [] 51 self.setPadMonitor(None) 52 self.uiState = componentui.WorkerComponentUIState() 53 self.uiState.addKey('eater-alias') 54 self.uiState.set('eater-alias', eaterAlias) 55 self.uiState.addKey('eater-name') 56 self.uiState.set('eater-name', eaterName) 57 # dict for the current connection 58 connectionDict = { 59 "feed-id": None, 60 "time-timestamp-discont": None, 61 "timestamp-timestamp-discont": 0.0, # ts of buffer after discont, 62 # in float seconds 63 "last-timestamp-discont": 0.0, 64 "total-timestamp-discont": 0.0, 65 "count-timestamp-discont": 0, 66 "time-offset-discont": None, 67 "offset-offset-discont": 0, # offset of buffer 68 # after discont 69 "last-offset-discont": 0, 70 "total-offset-discont": 0, 71 "count-offset-discont": 0} 72 self.uiState.addDictKey('connection', connectionDict) 73 74 for key in ( 75 'last-connect', # last client connection, in epoch sec 76 'last-disconnect', # last client disconnect, in epoch sec 77 'total-connections', # number of connections by this client 78 'count-timestamp-discont', # number of timestamp disconts seen 79 'count-offset-discont', # number of timestamp disconts seen 80 ): 81 self.uiState.addKey(key, 0) 82 for key in ( 83 'total-timestamp-discont', # total timestamp discontinuity 84 'total-offset-discont', # total offset discontinuity 85 ): 86 self.uiState.addKey(key, 0.0) 87 self.uiState.addKey('fd', None)
88
89 - def __repr__(self):
90 return '<Eater %s %s>' % (self.eaterAlias, 91 (self.feedId and '(disconnected)' 92 or ('eating from %s' % self.feedId)))
93
94 - def connected(self, fd, feedId, when=None):
95 """ 96 The eater has been connected. 97 Update related stats. 98 """ 99 if not when: 100 when = time.time() 101 102 self.feedId = feedId 103 self.fd = fd 104 105 self.uiState.set('last-connect', when) 106 self.uiState.set('fd', fd) 107 self.uiState.set('total-connections', 108 self.uiState.get('total-connections', 0) + 1) 109 110 self.uiState.setitem("connection", 'feed-id', feedId) 111 self.uiState.setitem("connection", "count-timestamp-discont", 0) 112 self.uiState.setitem("connection", "time-timestamp-discont", None) 113 self.uiState.setitem("connection", "last-timestamp-discont", 0.0) 114 self.uiState.setitem("connection", "total-timestamp-discont", 0.0) 115 self.uiState.setitem("connection", "count-offset-discont", 0) 116 self.uiState.setitem("connection", "time-offset-discont", None) 117 self.uiState.setitem("connection", "last-offset-discont", 0) 118 self.uiState.setitem("connection", "total-offset-discont", 0)
119
120 - def disconnected(self, when=None):
121 """ 122 The eater has been disconnected. 123 Update related stats. 124 """ 125 if not when: 126 when = time.time() 127 128 def updateUIState(): 129 self.uiState.set('last-disconnect', when) 130 self.fd = None 131 self.uiState.set('fd', None)
132 133 reactor.callFromThread(updateUIState)
134
135 - def setPadMonitor(self, monitor):
136 self._padMonitor = monitor
137
138 - def isActive(self):
139 return self._padMonitor and self._padMonitor.isActive()
140
141 - def addWatch(self, setActive, setInactive):
142 self._padMonitor.addWatch(lambda _: setActive(self.eaterAlias), 143 lambda _: setInactive(self.eaterAlias))
144
145 - def timestampDiscont(self, seconds, timestamp):
146 """ 147 @param seconds: discont duration in seconds 148 @param timestamp: GStreamer timestamp of new buffer, in seconds. 149 150 Inform the eater of a timestamp discontinuity. 151 This is called from a bus message handler, so in the main thread. 152 """ 153 uiState = self.uiState 154 155 c = uiState.get('connection') # dict 156 uiState.setitem('connection', 'count-timestamp-discont', 157 c.get('count-timestamp-discont', 0) + 1) 158 uiState.set('count-timestamp-discont', 159 uiState.get('count-timestamp-discont', 0) + 1) 160 161 uiState.setitem('connection', 'time-timestamp-discont', time.time()) 162 uiState.setitem('connection', 'timestamp-timestamp-discont', timestamp) 163 uiState.setitem('connection', 'last-timestamp-discont', seconds) 164 uiState.setitem('connection', 'total-timestamp-discont', 165 c.get('total-timestamp-discont', 0) + seconds) 166 uiState.set('total-timestamp-discont', 167 uiState.get('total-timestamp-discont', 0) + seconds)
168
169 - def offsetDiscont(self, units, offset):
170 """ 171 Inform the eater of an offset discontinuity. 172 This is called from a bus message handler, so in the main thread. 173 """ 174 uiState = self.uiState 175 176 c = uiState.get('connection') # dict 177 uiState.setitem('connection', 'count-offset-discont', 178 c.get('count-offset-discont', 0) + 1) 179 uiState.set('count-offset-discont', 180 uiState.get('count-offset-discont', 0) + 1) 181 182 uiState.setitem('connection', 'time-offset-discont', time.time()) 183 uiState.setitem('connection', 'offset-offset-discont', offset) 184 uiState.setitem('connection', 'last-offset-discont', units) 185 uiState.setitem('connection', 'total-offset-discont', 186 c.get('total-offset-discont', 0) + units) 187 uiState.set('total-offset-discont', 188 uiState.get('total-offset-discont', 0) + units)
189