Package flumotion :: Package component :: Module feedcomponent010
[hide private]

Source Code for Module flumotion.component.feedcomponent010

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_feedcomponent010 -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24   
 25  import os 
 26  import time 
 27   
 28  from twisted.internet import reactor, defer 
 29   
 30  from flumotion.common import common, errors, pygobject, messages, log 
 31  from flumotion.common import gstreamer 
 32  from flumotion.common.i18n import N_, gettexter 
 33  from flumotion.common.planet import moods 
 34  from flumotion.component import component as basecomponent 
 35  from flumotion.component import feed, padmonitor 
 36  from flumotion.component.feeder import Feeder 
 37  from flumotion.component.eater import Eater 
 38   
 39  __version__ = "$Rev$" 
 40  T_ = gettexter() 
 41   
 42   
43 -class FeedComponent(basecomponent.BaseComponent):
44 """ 45 I am a base class for all Flumotion feed components. 46 """ 47 48 # how often to update the UIState feeder statistics 49 FEEDER_STATS_UPDATE_FREQUENCY = 12.5 50 keepStreamheaderForLater = False 51 dropStreamHeaders = True 52 swallowNewSegment = True 53 54 logCategory = 'feedcomponent' 55 56 ### BaseComponent interface implementations 57
58 - def init(self):
59 # add keys for eaters and feeders uiState 60 self.feeders = {} # feeder feedName -> Feeder 61 self.eaters = {} # eater eaterAlias -> Eater 62 self.uiState.addListKey('feeders') 63 self.uiState.addListKey('eaters') 64 self.uiState.addKey('gst-debug') 65 66 self.pipeline = None 67 self.pipeline_signals = [] 68 self.bus_signal_id = None 69 self.effects = {} 70 self._feeder_probe_cl = None 71 72 self._pad_monitors = padmonitor.PadMonitorSet( 73 lambda: self.setMood(moods.happy), 74 lambda: self.setMood(moods.hungry)) 75 76 self._clock_slaved = False 77 self.clock_provider = None 78 self._master_clock_info = None # (ip, port, basetime) if we're the 79 # clock master 80 81 self._change_monitor = gstreamer.StateChangeMonitor() 82 83 # multifdsink's get-stats signal had critical bugs before this version 84 self._get_stats_supported = (gstreamer.get_plugin_version('tcp') 85 >= (0, 10, 11, 0))
86
87 - def do_setup(self):
88 """ 89 Sets up component. 90 91 Invokes the L{create_pipeline} and L{set_pipeline} vmethods, 92 which subclasses can provide. 93 """ 94 config = self.config 95 eater_config = config.get('eater', {}) 96 feeder_config = config.get('feed', []) 97 source_config = config.get('source', []) 98 99 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config) 100 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config) 101 self.debug("FeedComponent.do_setup(): source_config %r", source_config) 102 # for upgrade of code without restarting managers 103 # this will only be for components whose eater name in registry is 104 # default, so no need to import registry and find eater name 105 if eater_config == {} and source_config != []: 106 eater_config = {'default': [(x, 'default') for x in source_config]} 107 108 for eaterName in eater_config: 109 for feedId, eaterAlias in eater_config[eaterName]: 110 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName) 111 self.uiState.append('eaters', self.eaters[eaterAlias].uiState) 112 113 for feederName in feeder_config: 114 self.feeders[feederName] = Feeder(feederName) 115 self.uiState.append('feeders', 116 self.feeders[feederName].uiState) 117 118 clockMaster = config.get('clock-master', None) 119 if clockMaster: 120 self._clock_slaved = clockMaster != config['avatarId'] 121 else: 122 self._clock_slaved = False 123 124 pipeline = self.create_pipeline() 125 self.connect_feeders(pipeline) 126 self.set_pipeline(pipeline) 127 128 self.uiState.set('gst-debug', os.environ.get('GST_DEBUG', '*:0')) 129 self.debug("FeedComponent.do_setup(): setup finished") 130 131 self.try_start_pipeline() 132 133 # no race, messages marshalled asynchronously via the bus 134 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING) 135 d.addCallback(lambda x: self.do_pipeline_playing())
136
137 - def setup_completed(self):
138 # Just log; we override the superclass to not turn happy here. 139 # Instead, we turn happy once the pipeline gets to PLAYING. 140 self.debug("Setup completed")
141 142 ### FeedComponent interface for subclasses 143
144 - def create_pipeline(self):
145 """ 146 Subclasses have to implement this method. 147 148 @rtype: L{gst.Pipeline} 149 """ 150 raise NotImplementedError( 151 "subclass must implement create_pipeline")
152
153 - def set_pipeline(self, pipeline):
154 """ 155 Subclasses can override me. 156 They should chain up first. 157 """ 158 if self.pipeline: 159 self.cleanup() 160 self.pipeline = pipeline 161 self._setup_pipeline()
162
163 - def attachPadMonitorToFeeder(self, feederName):
164 elementName = self.feeders[feederName].payName 165 element = self.pipeline.get_by_name(elementName) 166 if not element: 167 raise errors.ComponentError("No such feeder %s" % feederName) 168 169 pad = element.get_pad('src') 170 self._pad_monitors.attach(pad, elementName)
171 172 ### FeedComponent methods 173
174 - def addEffect(self, effect):
175 self.effects[effect.name] = effect 176 effect.setComponent(self)
177
178 - def connect_feeders(self, pipeline):
179 # Connect to the client-fd-removed signals on each feeder, so we 180 # can clean up properly on removal. 181 182 def client_fd_removed(sink, fd, feeder): 183 # Called (as a signal callback) when the FD is no longer in 184 # use by multifdsink. 185 # This will call the registered callable on the fd. 186 # Called from GStreamer threads. 187 self.debug("cleaning up fd %d", fd) 188 feeder.clientDisconnected(fd)
189 190 for feeder in self.feeders.values(): 191 element = pipeline.get_by_name(feeder.elementName) 192 if element: 193 element.connect('client-fd-removed', client_fd_removed, 194 feeder) 195 self.debug("Connected to client-fd-removed on %r", feeder) 196 else: 197 self.warning("No feeder %s in pipeline", feeder.elementName)
198
199 - def get_pipeline(self):
200 return self.pipeline
201
202 - def do_pipeline_playing(self):
203 """ 204 Invoked when the pipeline has changed the state to playing. 205 The default implementation sets the component's mood to HAPPY. 206 """ 207 self.setMood(moods.happy)
208
209 - def make_message_for_gstreamer_error(self, gerror, debug):
210 """Make a flumotion error message to show to the user. 211 212 This method may be overridden by components that have special 213 knowledge about potential errors. If the component does not know 214 about the error, it can chain up to this implementation, which 215 will make a generic message. 216 217 @param gerror: The GError from the error message posted on the 218 GStreamer message bus. 219 @type gerror: L{gst.GError} 220 @param debug: A string with debugging information. 221 @type debug: str 222 223 @returns: A L{flumotion.common.messages.Message} to show to the 224 user. 225 """ 226 # generate a unique id 227 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code) 228 m = messages.Error(T_(N_( 229 "Internal GStreamer error.")), 230 debug="%s\n%s: %d\n%s" % ( 231 gerror.message, gerror.domain, gerror.code, debug), 232 mid=mid, priority=40) 233 return m
234
235 - def bus_message_received_cb(self, bus, message):
236 237 def state_changed(): 238 if src == self.pipeline: 239 old, new, pending = message.parse_state_changed() 240 self._change_monitor.state_changed(old, new) 241 dump_filename = "%s.%s_%s" % (self.name, 242 gst.element_state_get_name(old), 243 gst.element_state_get_name(new)) 244 self.dump_gstreamer_debug_dot_file(dump_filename, True)
245 246 def error(): 247 gerror, debug = message.parse_error() 248 self.warning('element %s error %s %s', 249 src.get_path_string(), gerror, debug) 250 self.setMood(moods.sad) 251 252 # this method can fail if the component has a mistake 253 try: 254 m = self.make_message_for_gstreamer_error(gerror, debug) 255 except Exception, e: 256 msg = log.getExceptionMessage(e) 257 m = messages.Error(T_(N_( 258 "Programming error in component.")), 259 debug="Bug in %r.make_message_for_gstreamer_error: %s" % ( 260 self.__class__, msg)) 261 262 self.state.append('messages', m) 263 self._change_monitor.have_error(self.pipeline.get_state(), 264 message) 265 266 def eos(): 267 name = src.get_name() 268 if name in self._pad_monitors: 269 self.info('End of stream in element %s', name) 270 self._pad_monitors[name].setInactive() 271 else: 272 self.info("We got an eos from %s", name) 273 274 def default(): 275 self.log('message received: %r', message) 276 277 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed, 278 gst.MESSAGE_ERROR: error, 279 gst.MESSAGE_EOS: eos} 280 t = message.type 281 src = message.src 282 handlers.get(t, default)() 283 return True 284
285 - def install_eater_continuity_watch(self, eaterWatchElements):
286 """Watch a set of elements for discontinuity messages. 287 288 @param eaterWatchElements: the set of elements to watch for 289 discontinuities. 290 @type eaterWatchElements: Dict of elementName => Eater. 291 """ 292 293 def on_element_message(bus, message): 294 src = message.src 295 name = src.get_name() 296 if name in eaterWatchElements: 297 eater = eaterWatchElements[name] 298 s = message.structure 299 300 def timestampDiscont(): 301 prevTs = s["prev-timestamp"] 302 prevDuration = s["prev-duration"] 303 curTs = s["cur-timestamp"] 304 305 if prevTs == gst.CLOCK_TIME_NONE: 306 self.debug("no previous timestamp") 307 return 308 if prevDuration == gst.CLOCK_TIME_NONE: 309 self.debug("no previous duration") 310 return 311 if curTs == gst.CLOCK_TIME_NONE: 312 self.debug("no current timestamp") 313 return 314 315 discont = curTs - (prevTs + prevDuration) 316 dSeconds = discont / float(gst.SECOND) 317 self.debug("we have a discont on eater %s of %.9f s " 318 "between %s and %s ", eater.eaterAlias, 319 dSeconds, 320 gst.TIME_ARGS(prevTs + prevDuration), 321 gst.TIME_ARGS(curTs)) 322 323 eater.timestampDiscont(dSeconds, 324 float(curTs) / float(gst.SECOND))
325 326 def offsetDiscont(): 327 prevOffsetEnd = s["prev-offset-end"] 328 curOffset = s["cur-offset"] 329 discont = curOffset - prevOffsetEnd 330 self.debug("we have a discont on eater %s of %d " 331 "units between %d and %d ", 332 eater.eaterAlias, discont, prevOffsetEnd, 333 curOffset) 334 eater.offsetDiscont(discont, curOffset) 335 336 handlers = {'imperfect-timestamp': timestampDiscont, 337 'imperfect-offset': offsetDiscont} 338 if s.get_name() in handlers: 339 handlers[s.get_name()]() 340 341 # we know that there is a signal watch already installed 342 bus = self.pipeline.get_bus() 343 # never gets cleaned up; does that matter? 344 bus.connect("message::element", on_element_message) 345
346 - def install_eater_event_probes(self, eater):
347 348 def fdsrc_event(pad, event): 349 # An event probe used to consume unwanted EOS events on eaters. 350 # Called from GStreamer threads. 351 if event.type == gst.EVENT_EOS: 352 self.info('End of stream for eater %s, disconnect will be ' 353 'triggered', eater.eaterAlias) 354 # We swallow it because otherwise our component acts on the EOS 355 # and we can't recover from that later. Instead, fdsrc will be 356 # taken out and given a new fd on the next eatFromFD call. 357 return False 358 return True
359 360 def depay_event(pad, event): 361 # An event probe used to consume unwanted duplicate 362 # newsegment events. 363 # Called from GStreamer threads. 364 if event.type == gst.EVENT_NEWSEGMENT: 365 # We do this because we know gdppay/gdpdepay screw up on 2nd 366 # newsegments (unclear what the original reason for this 367 # was, perhaps #349204) 368 # Other elements might also have problems with repeated 369 # newsegments coming in, so we just drop them all. Flumotion 370 # operates in single segment space, so dropping newsegments 371 # should be fine. 372 if getattr(eater, '_gotFirstNewSegment', False): 373 self.info("Subsequent new segment event received on " 374 "depay on eater %s", eater.eaterAlias) 375 # swallow (gulp) 376 eater.streamheader = [] 377 if self.swallowNewSegment: 378 return False 379 else: 380 eater._gotFirstNewSegment = True 381 return True 382 383 self.debug('adding event probe for eater %s', eater.eaterAlias) 384 fdsrc = self.get_element(eater.elementName) 385 fdsrc.get_pad("src").add_event_probe(fdsrc_event) 386 depay = self.get_element(eater.depayName) 387 depay.get_pad("src").add_event_probe(depay_event) 388
389 - def _setup_pipeline(self):
390 self.debug('setup_pipeline()') 391 assert self.bus_signal_id == None 392 393 self.pipeline.set_name('pipeline-' + self.getName()) 394 bus = self.pipeline.get_bus() 395 bus.add_signal_watch() 396 self.bus_signal_id = bus.connect('message', 397 self.bus_message_received_cb) 398 sig_id = self.pipeline.connect('deep-notify', 399 gstreamer.verbose_deep_notify_cb, self) 400 self.pipeline_signals.append(sig_id) 401 402 # set to ready so that multifdsinks can always receive fds, even 403 # if the pipeline has a delayed start due to clock slaving 404 self.pipeline.set_state(gst.STATE_READY) 405 406 # start checking feeders, if we have a sufficiently recent multifdsink 407 if self._get_stats_supported: 408 self._feeder_probe_cl = reactor.callLater( 409 self.FEEDER_STATS_UPDATE_FREQUENCY, 410 self._feeder_probe_calllater) 411 else: 412 self.warning("Feeder statistics unavailable, your " 413 "gst-plugins-base is too old") 414 m = messages.Warning(T_(N_( 415 "Your gst-plugins-base is too old, so " 416 "feeder statistics will be unavailable.")), 417 mid='multifdsink') 418 m.add(T_(N_( 419 "Please upgrade '%s' to version %s."), 'gst-plugins-base', 420 '0.10.11')) 421 self.addMessage(m) 422 423 for eater in self.eaters.values(): 424 self.install_eater_event_probes(eater) 425 pad = self.get_element(eater.elementName).get_pad('src') 426 self._pad_monitors.attach(pad, eater.elementName, 427 padmonitor.EaterPadMonitor, 428 self.reconnectEater, 429 eater.eaterAlias) 430 eater.setPadMonitor(self._pad_monitors[eater.elementName])
431
432 - def stop_pipeline(self):
433 if not self.pipeline: 434 return 435 436 if self.clock_provider: 437 self.clock_provider.set_property('active', False) 438 self.clock_provider = None 439 retval = self.pipeline.set_state(gst.STATE_NULL) 440 if retval != gst.STATE_CHANGE_SUCCESS: 441 self.warning('Setting pipeline to NULL failed')
442
443 - def cleanup(self):
444 self.debug("cleaning up") 445 446 assert self.pipeline != None 447 448 self.stop_pipeline() 449 # Disconnect signals 450 map(self.pipeline.disconnect, self.pipeline_signals) 451 self.pipeline_signals = [] 452 if self.bus_signal_id: 453 self.pipeline.get_bus().disconnect(self.bus_signal_id) 454 self.pipeline.get_bus().remove_signal_watch() 455 self.bus_signal_id = None 456 self.pipeline = None 457 458 if self._feeder_probe_cl: 459 self._feeder_probe_cl.cancel() 460 self._feeder_probe_cl = None 461 462 # clean up checkEater callLaters 463 for eater in self.eaters.values(): 464 self._pad_monitors.remove(eater.elementName) 465 eater.setPadMonitor(None)
466
467 - def do_stop(self):
468 self.debug('Stopping') 469 if self.pipeline: 470 self.cleanup() 471 self.debug('Stopped') 472 return defer.succeed(None)
473
474 - def set_master_clock(self, ip, port, base_time):
475 self.debug("Master clock set to %s:%d with base_time %s", ip, port, 476 gst.TIME_ARGS(base_time)) 477 478 assert self._clock_slaved 479 if self._master_clock_info == (ip, port, base_time): 480 self.debug("Same master clock info, returning directly") 481 return defer.succeed(None) 482 elif self._master_clock_info: 483 self.stop_pipeline() 484 485 self._master_clock_info = ip, port, base_time 486 487 clock = gst.NetClientClock(None, ip, port, base_time) 488 # disable the pipeline's management of base_time -- we're going 489 # to set it ourselves. 490 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 491 self.pipeline.set_base_time(base_time) 492 self.pipeline.use_clock(clock) 493 494 self.try_start_pipeline()
495
496 - def get_master_clock(self):
497 """ 498 Return the connection details for the network clock provided by 499 this component, if any. 500 """ 501 if self.clock_provider: 502 ip, port, base_time = self._master_clock_info 503 return ip, port, base_time 504 else: 505 return None
506
507 - def provide_master_clock(self, port):
508 """ 509 Tell the component to provide a master clock on the given port. 510 511 @returns: a deferred firing a (ip, port, base_time) triple. 512 """ 513 514 def pipelinePaused(r): 515 clock = self.pipeline.get_clock() 516 # make sure the pipeline sticks with this clock 517 self.pipeline.use_clock(clock) 518 519 self.clock_provider = gst.NetTimeProvider(clock, None, port) 520 realport = self.clock_provider.get_property('port') 521 522 base_time = self.pipeline.get_base_time() 523 524 self.debug('provided master clock from %r, base time %s', 525 clock, gst.TIME_ARGS(base_time)) 526 527 if self.medium: 528 # FIXME: This isn't always correct. We need a more 529 # flexible API, and a proper network map, to do this. 530 # Even then, it's not always going to be possible. 531 ip = self.medium.getIP() 532 else: 533 ip = "127.0.0.1" 534 535 self._master_clock_info = (ip, realport, base_time) 536 return self.get_master_clock()
537 538 assert self.pipeline 539 assert not self._clock_slaved 540 (ret, state, pending) = self.pipeline.get_state(0) 541 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING: 542 self.debug("pipeline still spinning up: %r", state) 543 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED) 544 d.addCallback(pipelinePaused) 545 return d 546 elif self.clock_provider: 547 self.debug("returning existing master clock info") 548 return defer.succeed(self.get_master_clock()) 549 else: 550 return defer.maybeDeferred(pipelinePaused, None) 551
552 - def dump_gstreamer_debug_dot_file(self, filename, with_timestamp=False):
553 """ 554 Dumps a graphviz dot file of the pipeline's current state to disk. 555 This will only actually do anything if the environment variable 556 GST_DEBUG_DUMP_DOT_DIR is set. 557 558 @param filename: filename to store 559 @param with_timestamp: if True, then timestamp will be prepended to 560 filename 561 """ 562 if hasattr(gst, "DEBUG_BIN_TO_DOT_FILE"): 563 method = gst.DEBUG_BIN_TO_DOT_FILE 564 if with_timestamp: 565 method = gst.DEBUG_BIN_TO_DOT_FILE_WITH_TS 566 method(self.pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filename)
567 568 ### BaseComponent interface implementation 569
570 - def try_start_pipeline(self, force=False):
571 """ 572 Tell the component to start. 573 Whatever is using the component is responsible for making sure all 574 eaters have received their file descriptor to eat from. 575 """ 576 (ret, state, pending) = self.pipeline.get_state(0) 577 if state == gst.STATE_PLAYING: 578 self.log('already PLAYING') 579 if not force: 580 return 581 self.debug('pipeline PLAYING, but starting anyway as requested') 582 583 if self._clock_slaved and not self._master_clock_info: 584 self.debug("Missing master clock info, deferring set to PLAYING") 585 return 586 587 for eater in self.eaters.values(): 588 if not eater.fd: 589 self.debug('eater %s not yet connected, deferring set to ' 590 'PLAYING', eater.eaterAlias) 591 return 592 593 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline) 594 self.pipeline.set_state(gst.STATE_PLAYING)
595
596 - def _feeder_probe_calllater(self):
597 for feedId, feeder in self.feeders.items(): 598 feederElement = self.get_element(feeder.elementName) 599 for client in feeder.getClients(): 600 # a currently disconnected client will have fd None 601 if client.fd is not None: 602 array = feederElement.emit('get-stats', client.fd) 603 if len(array) == 0: 604 # There is an unavoidable race here: we can't know 605 # whether the fd has been removed from multifdsink. 606 # However, if we call get-stats on an fd that 607 # multifdsink doesn't know about, we just get a 608 # 0-length array. We ensure that we don't reuse 609 # the FD too soon so this can't result in calling 610 # this on a valid but WRONG fd 611 self.debug('Feeder element for feed %s does not know ' 612 'client fd %d' % (feedId, client.fd)) 613 else: 614 client.setStats(array) 615 self._feeder_probe_cl = reactor.callLater( 616 self.FEEDER_STATS_UPDATE_FREQUENCY, 617 self._feeder_probe_calllater)
618
619 - def unblock_eater(self, eaterAlias):
620 """ 621 After this function returns, the stream lock for this eater must have 622 been released. If your component needs to do something here, override 623 this method. 624 """ 625 pass
626
627 - def get_element(self, element_name):
628 """Get an element out of the pipeline. 629 630 If it is possible that the component has not yet been set up, 631 the caller needs to check if self.pipeline is actually set. 632 """ 633 assert self.pipeline 634 self.log('Looking up element %r in pipeline %r', 635 element_name, self.pipeline) 636 element = self.pipeline.get_by_name(element_name) 637 if not element: 638 self.warning("No element named %r in pipeline", element_name) 639 return element
640
641 - def get_element_property(self, element_name, property):
642 'Gets a property of an element in the GStreamer pipeline.' 643 self.debug("%s: getting property %s of element %s" % ( 644 self.getName(), property, element_name)) 645 element = self.get_element(element_name) 646 if not element: 647 msg = "Element '%s' does not exist" % element_name 648 self.warning(msg) 649 raise errors.PropertyError(msg) 650 651 self.debug('getting property %s on element %s' % ( 652 property, element_name)) 653 try: 654 value = element.get_property(property) 655 except (ValueError, TypeError): 656 msg = "Property '%s' on element '%s' does not exist" % ( 657 property, element_name) 658 self.warning(msg) 659 raise errors.PropertyError(msg) 660 661 # param enums and enums need to be returned by integer value 662 if isinstance(value, gobject.GEnum): 663 value = int(value) 664 665 return value
666
667 - def set_element_property(self, element_name, property, value):
668 'Sets a property on an element in the GStreamer pipeline.' 669 self.debug("%s: setting property %s of element %s to %s" % ( 670 self.getName(), property, element_name, value)) 671 element = self.get_element(element_name) 672 if not element: 673 msg = "Element '%s' does not exist" % element_name 674 self.warning(msg) 675 raise errors.PropertyError(msg) 676 677 self.debug('setting property %s on element %r to %s' % 678 (property, element_name, value)) 679 pygobject.gobject_set_property(element, property, value)
680 681 ### methods to connect component eaters and feeders 682
683 - def reconnectEater(self, eaterAlias):
684 if not self.medium: 685 self.debug("Can't reconnect eater %s, running " 686 "without a medium", eaterAlias) 687 return 688 689 self.eaters[eaterAlias].disconnected() 690 self.medium.connectEater(eaterAlias)
691
692 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
693 """ 694 @param feedName: name of the feed to feed to the given fd. 695 @type feedName: str 696 @param fd: the file descriptor to feed to 697 @type fd: int 698 @param cleanup: the function to call when the FD is no longer feeding 699 @type cleanup: callable 700 """ 701 self.debug('FeedToFD(%s, %d)', feedName, fd) 702 703 # We must have a pipeline in READY or above to do this. Do a 704 # non-blocking (zero timeout) get_state. 705 if (not self.pipeline or 706 self.pipeline.get_state(0)[1] == gst.STATE_NULL): 707 self.warning('told to feed %s to fd %d, but pipeline not ' 708 'running yet', feedName, fd) 709 cleanup(fd) 710 # can happen if we are restarting but the other component is 711 # happy; assume other side will reconnect later 712 return 713 714 if feedName not in self.feeders: 715 msg = "Cannot find feeder named '%s'" % feedName 716 mid = "feedToFD-%s" % feedName 717 m = messages.Warning(T_(N_("Internal Flumotion error.")), 718 debug=msg, mid=mid, priority=40) 719 self.state.append('messages', m) 720 self.warning(msg) 721 cleanup(fd) 722 return False 723 724 feeder = self.feeders[feedName] 725 element = self.get_element(feeder.elementName) 726 assert element 727 clientId = eaterId or ('client-%d' % fd) 728 element.emit('add', fd) 729 feeder.clientConnected(clientId, fd, cleanup)
730
731 - def eatFromFD(self, eaterAlias, feedId, fd):
732 """ 733 Tell the component to eat the given feedId from the given fd. 734 The component takes over the ownership of the fd, closing it when 735 no longer eating. 736 737 @param eaterAlias: the alias of the eater 738 @type eaterAlias: str 739 @param feedId: feed id (componentName:feedName) to eat from through 740 the given fd 741 @type feedId: str 742 @param fd: the file descriptor to eat from 743 @type fd: int 744 """ 745 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd) 746 747 if not self.pipeline: 748 self.warning('told to eat %s from fd %d, but pipeline not ' 749 'running yet', feedId, fd) 750 # can happen if we are restarting but the other component is 751 # happy; assume other side will reconnect later 752 os.close(fd) 753 return 754 755 if eaterAlias not in self.eaters: 756 self.warning('Unknown eater alias: %s', eaterAlias) 757 os.close(fd) 758 return 759 760 eater = self.eaters[eaterAlias] 761 element = self.get_element(eater.elementName) 762 if not element: 763 self.warning('Eater element %s not found', eater.elementName) 764 os.close(fd) 765 return 766 767 # fdsrc only switches to the new fd in ready or below 768 (result, current, pending) = element.get_state(0L) 769 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY] 770 if pipeline_playing: 771 self.debug('eater %s in state %r, kidnapping it', 772 eaterAlias, current) 773 774 # we unlink fdsrc from its peer, take it out of the pipeline 775 # so we can set it to READY without having it send EOS, 776 # then switch fd and put it back in. 777 # To do this safely, we first block fdsrc:src, then let the 778 # component do any neccesary unlocking (needed for multi-input 779 # elements) 780 srcpad = element.get_pad('src') 781 782 def _block_cb(pad, blocked): 783 pass
784 srcpad.set_blocked_async(True, _block_cb) 785 # add buffer probe to drop buffers that are flagged as IN_CAPS 786 # needs to be done to gdpdepay's src pad 787 depay = self.get_element(eater.depayName) 788 789 def remove_in_caps_buffers(pad, buffer, eater): 790 if buffer.flag_is_set(gst.BUFFER_FLAG_IN_CAPS): 791 if self.keepStreamheaderForLater: 792 self.log("We got buffer with IN_CAPS which we are " 793 "keeping for later %r", eater) 794 eater.streamheader.append(buffer) 795 return False 796 self.info("We got streamheader buffer which " \ 797 "we are dropping because we do not want this just " \ 798 "after a reconnect because it breaks everything ") 799 return False 800 801 # now we have a buffer with no flag set 802 # we should remove the handler 803 self.log("We got buffer with no in caps flag set on " 804 "eater %r", eater) 805 if eater.streamheaderBufferProbeHandler: 806 self.log("Removing buffer probe on depay src pad on " 807 "eater %r", eater) 808 pad.remove_buffer_probe( 809 eater.streamheaderBufferProbeHandler) 810 eater.streamheaderBufferProbeHandler = None 811 else: 812 self.warning("buffer probe handler is None, bad news on " 813 "eater %r", eater) 814 815 if not self.dropStreamHeaders: 816 self.log("Pushing earlier buffers with IN_CAPS flag") 817 for buff in eater.streamheader: 818 pad.push(buff) 819 self.dropStreamHeaders = True 820 821 eater.streamheader = [] 822 return True 823 824 if not eater.streamheaderBufferProbeHandler: 825 self.log("Adding buffer probe on depay src pad on " 826 "eater %r", eater) 827 eater.streamheaderBufferProbeHandler = \ 828 depay.get_pad("src").add_buffer_probe( 829 remove_in_caps_buffers, eater) 830 831 self.unblock_eater(eaterAlias) 832 833 # Now, we can switch FD with this mess 834 sinkpad = srcpad.get_peer() 835 srcpad.unlink(sinkpad) 836 parent = element.get_parent() 837 parent.remove(element) 838 self.log("setting to ready") 839 element.set_state(gst.STATE_READY) 840 self.log("setting to ready complete!!!") 841 old = element.get_property('fd') 842 self.log("Closing old fd %d", old) 843 os.close(old) 844 element.set_property('fd', fd) 845 parent.add(element) 846 srcpad.link(sinkpad) 847 element.set_state(gst.STATE_PLAYING) 848 # We're done; unblock the pad 849 srcpad.set_blocked_async(False, _block_cb) 850 else: 851 element.set_property('fd', fd) 852 853 # update our eater uiState, saying that we are eating from a 854 # possibly new feedId 855 eater.connected(fd, feedId) 856 857 if not pipeline_playing: 858 self.try_start_pipeline() 859