Package flumotion :: Package component :: Package producers :: Package icecast :: Module icecast
[hide private]

Source Code for Module flumotion.component.producers.icecast.icecast

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  from twisted.internet import defer 
 24  from flumotion.component import feedcomponent 
 25  from flumotion.twisted.defer import RetryingDeferred 
 26  from flumotion.common import errors 
 27   
 28  __version__ = "$Rev: 8795 $" 
 29   
 30   
31 -class Icecast(feedcomponent.ParseLaunchComponent):
32
33 - def get_pipeline_string(self, properties):
34 return "souphttpsrc name=src ! typefind name=tf"
35
36 - def configure_pipeline(self, pipeline, properties):
37 # Later, when the typefind element has successfully found the type 38 # of the data, we'll rebuild the pipeline. 39 40 def have_caps(tf, prob, caps): 41 capsname = caps[0].get_name() 42 # We should add appropriate parsers for any given format here. For 43 # some it's critical for this to work at all, for others 44 # it's needed for timestamps (thus for things like 45 # time-based burst-on-connect) Currently, we only handle ogg. 46 parser = None 47 if capsname == 'application/ogg': 48 parser = gst.element_factory_make('oggparse') 49 elif capsname == 'audio/mpeg': 50 parser = gst.element_factory_make('mp3parse') 51 52 if parser: 53 parser.set_state(gst.STATE_PLAYING) 54 pipeline.add(parser) 55 # Relink - unlink typefind from the bits that follow it (the 56 # gdp payloader), link in the parser, relink to the payloader. 57 pad = tf.get_pad('src') 58 peer = pad.get_peer() 59 pad.unlink(peer) 60 tf.link(parser) 61 parser.link(peer.get_parent()) 62 # Disconnect signal to avoid adding a parser every time 63 # it gets reconnected. 64 tf.disconnect(self.signal_id)
65 66 self.src = pipeline.get_by_name('src') 67 self.url = properties['url'] 68 self.src.set_property('location', self.url) 69 70 typefind = pipeline.get_by_name('tf') 71 self.signal_id = typefind.connect('have-type', have_caps) 72 73 self._pad_monitors.attach(self.src.get_pad('src'), 'souphttp-src') 74 self._pad_monitors['souphttp-src'].addWatch( 75 self._src_connected, self._src_disconnected) 76 self.reconnecting = False 77 self.reconnector = RetryingDeferred(self.connect) 78 self.reconnector.initialDelay = 1.0 79 self.attemptD = None 80 81 def _drop_eos(pad, event): 82 self.debug('Swallowing event %r', event) 83 if event.type == gst.EVENT_EOS: 84 return False 85 return True
86 self.src.get_pad('src').add_event_probe(_drop_eos) 87
88 - def bus_message_received_cb(self, bus, message):
89 if message.type == gst.MESSAGE_ERROR and message.src == self.src: 90 gerror, debug = message.parse_error() 91 self.warning('element %s error %s %s', 92 message.src.get_path_string(), gerror, debug) 93 if self.reconnecting: 94 self._retry() 95 return True 96 feedcomponent.ParseLaunchComponent.bus_message_received_cb( 97 self, bus, message)
98
99 - def connect(self):
100 self.info('Connecting to icecast server on %s', self.url) 101 self.src.set_state(gst.STATE_READY) 102 # can't just self.src.set_state(gst.STATE_PLAYING), 103 # because the pipeline might NOT be in PLAYING, 104 # if we never connected to Icecast and never went to PLAYING 105 self.try_start_pipeline(force=True) 106 self.attemptD = defer.Deferred() 107 return self.attemptD
108
109 - def _src_connected(self, name):
110 self.info('Connected to icecast server on %s', self.url) 111 if self.reconnecting: 112 assert self.attemptD 113 self.attemptD.callback(None) 114 self.reconnecting = False
115
116 - def _src_disconnected(self, name):
117 self.info('Disconnected from icecast server on %s', self.url) 118 if not self.reconnecting: 119 self.reconnecting = True 120 self.reconnector.start()
121
122 - def _retry(self):
123 assert self.attemptD 124 self.debug('Retrying connection to icecast server on %s', self.url) 125 self.attemptD.errback(errors.ConnectionError)
126