1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gst
23 import gobject
24
25 from twisted.internet import defer, reactor
26
27 from flumotion.common import errors, messages, log, python
28 from flumotion.common.i18n import N_, gettexter
29 from flumotion.common.planet import moods
30 from flumotion.component import feedcomponent
31 from flumotion.component.base import scheduler
32 from flumotion.component.padmonitor import PadMonitor
33 from flumotion.component.plugs import base
34 from flumotion.worker.checks import check
35
36 __version__ = "$Rev$"
37 T_ = gettexter()
38
39
50
51
53 logCategory = "ical-switch"
54
55 - def start(self, component):
56 self._sid = None
57 self.sched = None
58 try:
59
60 def eventStarted(eventInstance):
61 self.debug("event started %r", eventInstance.event.uid)
62 component.switch_to("backup")
63
64 def eventEnded(eventInstance):
65 self.debug("event ended %r", eventInstance.event.uid)
66 component.switch_to("master")
67
68
69
70 filename = self.args['properties']['ical-schedule']
71 self.sched = scheduler.ICalScheduler(open(filename, 'r'))
72 self._sid = self.sched.subscribe(eventStarted, eventEnded)
73 if self.sched.getCalendar().getActiveEventInstances():
74 component.idealFeed = "backup"
75 except ValueError:
76 fmt = N_("Error parsing ical file %s, so not scheduling "
77 "any events.")
78 component.addWarning("error-parsing-ical", fmt, filename)
79 except ImportError, e:
80 fmt = N_("An ical file has been specified for scheduling, "
81 "but the necessary modules are not installed.")
82 component.addWarning("error-parsing-ical", fmt, debug=e.message)
83
84 - def stop(self, component):
87
88
89 -class Switch(feedcomponent.MultiInputParseLaunchComponent):
90 logCategory = 'switch'
91 componentMediumClass = SwitchMedium
92
94 self.uiState.addKey("active-eater")
95 self.icalScheduler = None
96
97
98
99
100
101
102
103
104
105
106
107 self.logicalFeeds = {}
108
109 self.feedsByPriority = []
110
111
112 self.switchPads = {}
113
114
115
116
117
118
119
120
121 self.idealFeed = None
122 self.activeFeed = None
123
124
125
126
127 self.newSegmentEvents = {}
128
129
130
131 self.eventProbeIds = {}
132 self.bufferProbeIds = {}
133
134
135 self._padMonitors = {}
136
137 - def addWarning(self, id, format, *args, **kwargs):
142
144 for m in self.state.get('messages')[:]:
145 if m.id == id:
146 self.state.remove('messages', m)
147
149
150 def checkSignal(fact):
151 fact = fact.load()
152 signals = gobject.signal_list_names(fact.get_element_type())
153 return 'block' in signals
154
155 def cb(result):
156 for m in result.messages:
157 self.addMessage(m)
158 return result.value
159
160 self.debug("checking for input-selector element")
161 d = check.checkPlugin('selector', 'gst-plugins-bad',
162 (0, 10, 5, 2), 'input-selector', checkSignal)
163 d.addCallback(cb)
164 return d
165
167 ical = self.config['properties'].get('ical-schedule', None)
168 if ical:
169 args = {'properties': {'ical-schedule': ical}}
170 self.icalScheduler = ICalSwitchPlug(args)
171 self.icalScheduler.start(self)
172
186
188 raise errors.NotImplementedError('subclasses should implement '
189 'get_logical_feeds')
190
199
200 switchElements = self.get_switch_elements(pipeline)
201 for alias in self.eaters:
202 e = pipeline.get_by_name(self.eaters[alias].elementName)
203 pad = None
204 while e not in switchElements:
205 self.log("Element: %s", e.get_name())
206 pad, e = getDownstreamElement(e)
207 self.debug('eater %s maps to pad %s', alias, pad)
208 self.switchPads[alias] = pad, e
209
210
211
212 pairs = [self.switchPads[alias]
213 for alias in self.logicalFeeds[self.idealFeed]]
214
215 for p, s in pairs:
216 s.set_property('active-pad', p)
217 self.activeFeed = self.idealFeed
218 self.uiState.set("active-eater", self.idealFeed)
219
220 self.install_logical_feed_watches()
221
222 self.do_switch()
223
224
225
235
236 def eaterSetInactive(eaterAlias):
237 for feed, aliases in self.logicalFeeds.items():
238 if eaterAlias in aliases and feed in activeFeeds:
239 activeFeeds.remove(feed)
240 self.feedSetInactive(feed)
241
242
243
244
245
246
247 pad = self.switchPads[eaterAlias][0]
248 self.eventProbeIds[pad] = \
249 pad.add_event_probe(self._eventProbe)
250 self.bufferProbeIds[pad] = \
251 pad.add_buffer_probe(self._bufferProbe)
252 return
253
254 activeFeeds = []
255 for alias in self.eaters:
256 self._padMonitors[alias] = PadMonitor(self.switchPads[alias][0],
257 alias, eaterSetActive, eaterSetInactive)
258
260
261 ret = True
262 if event.type == gst.EVENT_NEWSEGMENT:
263 ret = False
264 self.newSegmentEvents[pad] = event
265 if self.eventProbeIds[pad]:
266 pad.remove_event_probe(self.eventProbeIds[pad])
267 del self.eventProbeIds[pad]
268 return ret
269
271
272 ts = buffer.timestamp
273 if pad in self.newSegmentEvents:
274 parsed = self.newSegmentEvents[pad].parse_new_segment()
275 newEvent = gst.event_new_new_segment(parsed[0], parsed[1],
276 parsed[2], ts, parsed[4], parsed[5])
277 pad.push_event(newEvent)
278 del self.newSegmentEvents[pad]
279 if pad in self.bufferProbeIds:
280 pad.remove_buffer_probe(self.bufferProbeIds[pad])
281 del self.bufferProbeIds[pad]
282 return True
283
285 raise errors.NotImplementedError('subclasses should implement '
286 'get_switch_elements')
287
289 return python.all([self.eaters[alias].isActive()
290 for alias in self.logicalFeeds[feed]])
291
296
298 self.debug('feed %r is now inactive', feed)
299
300
301
303 allFeeds = self.feedsByPriority[:]
304 feed = None
305 while allFeeds:
306 feed = allFeeds.pop(0)
307 if self.is_active(feed):
308 self.debug('autoswitch selects feed %r', feed)
309 self.do_switch(feed)
310 break
311 else:
312 self.debug("could not select feed %r because not active", feed)
313 if feed is None:
314 feed = self.feedsByPriority.get(0, None)
315 self.debug('no feeds active during autoswitch, choosing %r',
316 feed)
317 self.do_switch(feed)
318
319
320
321
323 """
324 @param feed: a logical feed
325 """
326 if feed not in self.logicalFeeds:
327 self.warning("unknown logical feed: %s", feed)
328 return None
329
330 self.debug('scheduling switch to feed %s', feed)
331 self.idealFeed = feed
332
333 self.feedsByPriority = [feed]
334 for name, aliases in self.get_logical_feeds():
335 if name != feed:
336 self.feedsByPriority.append(name)
337
338 if not self.pipeline:
339 return
340
341 if self.is_active(feed):
342 self.do_switch()
343 else:
344 fmt = N_("Tried to switch to %s, but feed is unavailable. "
345 "Will retry when the feed is back.")
346 self.addWarning("temporary-switch-problem", fmt, feed)
347
348
349
350
351
352
353
354
356 if feed == None:
357 feed = self.idealFeed
358
359 self.clearWarning('temporary-switch-problem')
360 if feed == self.activeFeed:
361 self.debug("already streaming from feed %r", feed)
362 return
363 if feed not in self.logicalFeeds:
364 self.warning("unknown logical feed: %s", feed)
365 return
366
367
368 pairs = [self.switchPads[alias]
369 for alias in self.logicalFeeds[feed]]
370
371 stop_times = [e.emit('block') for p, e in pairs]
372 start_times = [p.get_property('running-time') for p, e in pairs]
373
374 stop_time = max(stop_times)
375 self.debug('stop time = %d', stop_time)
376 self.debug('stop time = %s', gst.TIME_ARGS(stop_time))
377
378 if stop_time != gst.CLOCK_TIME_NONE:
379 diff = float(max(stop_times) - min(stop_times))
380 if diff > gst.SECOND * 10:
381 fmt = N_("When switching to %s, feed timestamps out"
382 " of sync by %us")
383 self.addWarning('large-timestamp-difference', fmt,
384 feed, diff / gst.SECOND, priority=40)
385
386 start_time = min(start_times)
387 self.debug('start time = %s', gst.TIME_ARGS(start_time))
388
389 self.debug('switching from %r to %r', self.activeFeed, feed)
390 for p, e in pairs:
391 self.debug("switching to pad %r", p)
392 e.emit('switch', p, stop_time, start_time)
393
394 self.activeFeed = feed
395 self.uiState.set("active-eater", feed)
396
397
399 logCategory = "single-switch"
400
402 return [('master', ['master']),
403 ('backup', ['backup'])]
404
406 return ("input-selector name=muxer ! "
407 "identity silent=true single-segment=true name=iden ")
408
410 return [pipeline.get_by_name('muxer')]
411
412
414 logCategory = "av-switch"
415
417
418 self.vparms = {'video-width': 'width', 'video-height': 'height',
419 'video-framerate': 'framerate',
420 'video-pixel-aspect-ratio': 'par'}
421 self.aparms = {'audio-channels': 'channels',
422 'audio-samplerate': 'samplerate'}
423
425 return [('master', ['video-master', 'audio-master']),
426 ('backup', ['video-backup', 'audio-backup'])]
427
429
430
431 return [pipeline.get_by_name('vswitch'),
432 pipeline.get_by_name('aswitch')]
433
434 - def addError(self, id, format, *args, **kwargs):
440
442 propkeys = python.set(self.config['properties'].keys())
443 vparms = python.set(self.vparms.keys())
444 aparms = python.set(self.aparms.keys())
445
446 for kind, parms in ('Video', vparms), ('Audio', aparms):
447 missing = parms - (propkeys & parms)
448 if missing and missing != parms:
449 fmt = N_("%s parameter(s) were specified but not all. "
450 "Missing parameters are: %r")
451 self.addError("video-params-not-specified", fmt, kind,
452 list(missing))
453
455
456 def i420caps(framerate, par, width, height):
457 return ("video/x-raw-yuv,width=%d,height=%d,framerate=%d/%d,"
458 "pixel-aspect-ratio=%d/%d,format=(fourcc)I420"
459 % (width, height, framerate[0], framerate[1],
460 par[0], par[1]))
461
462 def audiocaps(channels, samplerate):
463 return ("audio/x-raw-int,channels=%d,samplerate=%d,width=16,"
464 "depth=16,signed=true" % (channels, samplerate))
465
466 def props2caps(proc, parms, prefix, suffix=' ! '):
467 kw = dict([(parms[prop], properties[prop])
468 for prop in properties if prop in parms])
469 if kw:
470 return prefix + proc(**kw) + suffix
471 else:
472 return ''
473
474 vforce = props2caps(i420caps, self.vparms,
475 "ffmpegcolorspace ! videorate ! videoscale "
476 "! capsfilter caps=")
477 aforce = props2caps(audiocaps, self.aparms,
478 "audioconvert ! audioconvert ! capsfilter caps=")
479
480 pipeline = ("input-selector name=vswitch"
481 " ! identity silent=true single-segment=true"
482 " ! @feeder:video@ "
483 "input-selector name=aswitch"
484 " ! identity silent=true single-segment=true"
485 " ! @feeder:audio@ ")
486 for alias in self.eaters:
487 if "video" in alias:
488 pipeline += '@eater:%s@ ! %s vswitch. ' % (alias, vforce)
489 elif "audio" in alias:
490 pipeline += '@eater:%s@ ! %s aswitch. ' % (alias, aforce)
491 else:
492 raise AssertionError()
493
494 return pipeline
495