1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23
24 import gst
25 from twisted.internet import reactor, defer
26
27 from flumotion.common import log
28 from flumotion.common.poller import Poller
29
30 __version__ = "$Rev: 7532 $"
31
32
34 PAD_MONITOR_PROBE_FREQUENCY = 5.0
35 PAD_MONITOR_TIMEOUT = PAD_MONITOR_PROBE_FREQUENCY * 2.5
36
37 - def __init__(self, pad, name, setActive, setInactive):
59
65
68
70 self.check_poller.stop()
71 self.watch_poller.stop()
72 self._running = False
73
74
75
76
77 d, probe_id = self._probe_id.pop("id", (None, None))
78 if probe_id:
79 self._pad.remove_buffer_probe(probe_id)
80 d.callback(None)
81
83
84 def probe_cb(pad, buffer):
85 """
86 Periodically scheduled buffer probe, that ensures that we're
87 currently actually having dataflow through our eater
88 elements.
89
90 Called from GStreamer threads.
91
92 @param pad: The gst.Pad srcpad for one eater in this
93 component.
94 @param buffer: A gst.Buffer that has arrived on this pad
95 """
96 self._last_data_time = time.time()
97
98 self.logMessage('buffer probe on %s has timestamp %s', self.name,
99 gst.TIME_ARGS(buffer.timestamp))
100
101 deferred, probe_id = self._probe_id.pop("id", (None, None))
102 if probe_id:
103
104 self._pad.remove_buffer_probe(probe_id)
105
106 reactor.callFromThread(deferred.callback, None)
107
108 reactor.callFromThread(self.watch_poller.run)
109
110 self._first = False
111
112
113 return True
114
115 d = defer.Deferred()
116
117
118 self._probe_id['id'] = (d, self._pad.add_buffer_probe(probe_cb))
119 return d
120
122 self.log('last buffer for %s at %r', self.name, self._last_data_time)
123
124 now = time.time()
125
126 if self._last_data_time < 0:
127
128 self._last_data_time = 0
129 self.setInactive()
130 elif self._last_data_time == 0:
131
132 pass
133 else:
134
135 delta = now - self._last_data_time
136
137 if self._active and delta > self.PAD_MONITOR_TIMEOUT:
138 self.info("No data received on pad %s for > %r seconds, "
139 "setting to hungry",
140 self.name, self.PAD_MONITOR_TIMEOUT)
141 self.setInactive()
142 elif not self._active and delta < self.PAD_MONITOR_TIMEOUT:
143 self.info("Receiving data again on pad %s, flow active",
144 self.name)
145 self.setActive()
146
147 - def addWatch(self, setActive, setInactive):
150
155
160
161
163
164 - def __init__(self, pad, name, setActive, setInactive,
165 reconnectEater, *args):
171
173 PadMonitor.setInactive(self)
174
175
176
177
178
179
180
181
182
183
184
185
186 if self._running:
187
188
189
190
191
192 self._last_data_time = 0
193
194 self.debug('starting the reconnect poller')
195 self._reconnectPoller.start(immediately=True)
196
201
206
207
209
210 - def __init__(self, setActive, setInactive):
211
212
213 self._doSetActive = setActive
214 self._doSetInactive = setInactive
215 self._wasActive = True
216
218 """
219 Watch for data flow through this pad periodically.
220 If data flow ceases for too long, we turn hungry. If data flow resumes,
221 we return to happy.
222 """
223
224 def monitorActive(name):
225 self.info('Pad data flow at %s is active', name)
226 if self.isActive() and not self._wasActive:
227
228
229
230
231 self._wasActive = True
232 self._doSetActive()
233
234 def monitorInactive(name):
235 self.info('Pad data flow at %s is inactive', name)
236 if self._wasActive:
237 self._doSetInactive()
238 self._wasActive = False
239
240 assert name not in self
241 monitor = klass(pad, name, monitorActive, monitorInactive, *args)
242 self[monitor.name] = monitor
243 self.info("Added pad monitor %s", monitor.name)
244
246 if name not in self:
247 self.warning("No pad monitor with name %s", name)
248 return
249
250 monitor = self.pop(name)
251 monitor.detach()
252
254 for monitor in self.values():
255 if not monitor.isActive():
256 return False
257 return True
258