Package flumotion :: Package worker :: Package checks :: Module gst010
[hide private]

Source Code for Module flumotion.worker.checks.gst010

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gobject 
 23  import gst 
 24  import gst.interfaces 
 25  from twisted.internet.threads import deferToThread 
 26  from twisted.internet import defer 
 27   
 28  from flumotion.common import gstreamer, errors, log, messages 
 29  from flumotion.common.i18n import N_, gettexter 
 30  from flumotion.twisted import defer as fdefer 
 31  from flumotion.worker.checks import check 
 32   
 33  __version__ = "$Rev: 8748 $" 
 34  T_ = gettexter() 
 35   
 36   
37 -class BusResolution(fdefer.Resolution):
38 pipeline = None 39 signal_id = None 40
41 - def cleanup(self):
42 if self.pipeline: 43 if self.signal_id: 44 self.pipeline.get_bus().remove_signal_watch() 45 self.pipeline.get_bus().disconnect(self.signal_id) 46 self.signal_id = None 47 self.pipeline.set_state(gst.STATE_NULL) 48 self.pipeline = None
49 50
51 -def do_element_check(pipeline_str, element_name, check_proc, state=None, 52 set_state_deferred=False):
53 """ 54 Parse the given pipeline and set it to the given state. 55 When the bin reaches that state, perform the given check function on the 56 element with the given name. 57 58 @param pipeline_str: description of the pipeline used to test 59 @param element_name: name of the element being checked 60 @param check_proc: a function to call with the GstElement as argument. 61 @param state: an unused keyword parameter that will be removed when 62 support for GStreamer 0.8 is dropped. 63 @param set_state_deferred: a flag to say whether the set_state is run in 64 a deferToThread 65 @type set_state_deferred: bool 66 @returns: a deferred that will fire with the result of check_proc, or 67 fail. 68 @rtype: L{twisted.internet.defer.Deferred} 69 """ 70 71 def run_check(pipeline, resolution): 72 element = pipeline.get_by_name(element_name) 73 try: 74 retval = check_proc(element) 75 resolution.callback(retval) 76 except check.CheckProcError, e: 77 log.debug('check', 'CheckProcError when running %r: %r', 78 check_proc, e.data) 79 resolution.errback(errors.RemoteRunError(e.data)) 80 except Exception, e: 81 log.debug('check', 'Unhandled exception while running %r: %r', 82 check_proc, e) 83 resolution.errback(errors.RemoteRunError( 84 log.getExceptionMessage(e))) 85 # set pipeline state to NULL so worker does not consume 86 # unnecessary resources 87 pipeline.set_state(gst.STATE_NULL)
88 89 def message_rcvd(bus, message, pipeline, resolution): 90 t = message.type 91 if t == gst.MESSAGE_STATE_CHANGED: 92 if message.src == pipeline: 93 old, new, pending = message.parse_state_changed() 94 if new == gst.STATE_PLAYING: 95 run_check(pipeline, resolution) 96 elif t == gst.MESSAGE_ERROR: 97 gerror, debug = message.parse_error() 98 # set pipeline state to NULL so worker does not consume 99 # unnecessary resources 100 pipeline.set_state(gst.STATE_NULL) 101 resolution.errback(errors.GStreamerGstError( 102 message.src, gerror, debug)) 103 elif t == gst.MESSAGE_EOS: 104 resolution.errback(errors.GStreamerError( 105 "Unexpected end of stream")) 106 else: 107 log.debug('check', 'message: %s: %s:' % ( 108 message.src.get_path_string(), 109 message.type.value_nicks[1])) 110 if message.structure: 111 log.debug('check', 'message: %s' % 112 message.structure.to_string()) 113 else: 114 log.debug('check', 'message: (no structure)') 115 return True 116 117 resolution = BusResolution() 118 119 log.debug('check', 'parsing pipeline %s' % pipeline_str) 120 try: 121 pipeline = gst.parse_launch(pipeline_str) 122 log.debug('check', 'parsed pipeline %s' % pipeline_str) 123 except gobject.GError, e: 124 resolution.errback(errors.GStreamerError(e.message)) 125 return resolution.d 126 127 bus = pipeline.get_bus() 128 bus.add_signal_watch() 129 signal_id = bus.connect('message', message_rcvd, pipeline, resolution) 130 131 resolution.signal_id = signal_id 132 resolution.pipeline = pipeline 133 log.debug('check', 'setting state to playing') 134 if set_state_deferred: 135 d = deferToThread(pipeline.set_state, gst.STATE_PLAYING) 136 137 def stateChanged(res): 138 return resolution.d 139 d.addCallback(stateChanged) 140 return d 141 else: 142 pipeline.set_state(gst.STATE_PLAYING) 143 return resolution.d 144 145
146 -def check1394(mid, guid):
147 """ 148 Probe the firewire device. 149 150 Return a deferred firing a result. 151 152 The result is either: 153 - succesful, with a None value: no device found 154 - succesful, with a dictionary of width, height, and par as a num/den pair 155 - failed 156 157 @param mid: the id to set on the message. 158 @param guid: the id of the selected device. 159 160 @rtype: L{twisted.internet.defer.Deferred} of 161 L{flumotion.common.messages.Result} 162 """ 163 result = messages.Result() 164 165 def do_check(demux): 166 pad = demux.get_pad('video') 167 168 if not pad or pad.get_negotiated_caps() == None: 169 raise errors.GStreamerError('Pipeline failed to negotiate?') 170 171 caps = pad.get_negotiated_caps() 172 s = caps.get_structure(0) 173 w = s['width'] 174 h = s['height'] 175 par = s['pixel-aspect-ratio'] 176 result = dict(width=w, height=h, par=(par.num, par.denom)) 177 log.debug('check', 'returning dict %r' % result) 178 return result
179 180 pipeline = \ 181 'dv1394src guid=%s ! dvdemux name=demux .video ! fakesink' % guid 182 183 d = do_element_check(pipeline, 'demux', do_check) 184 185 def errbackResult(failure): 186 log.debug('check', 'returning failed Result, %r' % failure) 187 m = None 188 if failure.check(errors.GStreamerGstError): 189 source, gerror, debug = failure.value.args 190 log.debug('check', 'GStreamer GError: %s (debug: %s)' % ( 191 gerror.message, debug)) 192 if gerror.domain == "gst-resource-error-quark": 193 if gerror.code == int(gst.RESOURCE_ERROR_NOT_FOUND): 194 # dv1394src was fixed after gst-plugins-good 0.10.2 195 # to distinguish NOT_FOUND and OPEN_READ 196 version = gstreamer.get_plugin_version('1394') 197 if version >= (0, 10, 0, 0) and version <= (0, 10, 2, 0): 198 m = messages.Error(T_( 199 N_("Could not find or open the Firewire device. " 200 "Check the device node and its permissions."))) 201 else: 202 m = messages.Error(T_( 203 N_("No Firewire device found."))) 204 elif gerror.code == int(gst.RESOURCE_ERROR_OPEN_READ): 205 m = messages.Error(T_( 206 N_("Could not open Firewire device for reading. " 207 "Check permissions on the device."))) 208 209 if not m: 210 m = check.handleGStreamerDeviceError(failure, 'Firewire', 211 mid=mid) 212 213 if not m: 214 m = messages.Error(T_(N_("Could not probe Firewire device.")), 215 debug=check.debugFailure(failure)) 216 217 m.id = mid 218 result.add(m) 219 return result 220 d.addCallback(check.callbackResult, result) 221 d.addErrback(errbackResult) 222 223 return d 224