Package flumotion :: Package worker :: Module job
[hide private]

Source Code for Module flumotion.worker.job

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects to handle worker clients 
 24  """ 
 25   
 26  import os 
 27  import signal 
 28  import sys 
 29   
 30  from twisted.internet import defer, reactor 
 31   
 32  from flumotion.common import errors, log 
 33  from flumotion.common import messages 
 34  from flumotion.common.i18n import N_, gettexter 
 35  from flumotion.configure import configure 
 36  from flumotion.worker import base 
 37   
 38  __version__ = "$Rev: 8530 $" 
 39  T_ = gettexter() 
 40   
 41   
42 -class ComponentJobAvatar(base.BaseJobAvatar):
43
44 - def haveMind(self):
45 46 def bootstrap(*args): 47 return self.mindCallRemote('bootstrap', *args)
48 49 def create(_, job): 50 self.debug("asking job to create component with avatarId %s," 51 " type %s", job.avatarId, job.type) 52 return self.mindCallRemote('create', job.avatarId, job.type, 53 job.moduleName, job.methodName, 54 job.nice, job.conf)
55 56 def success(_, avatarId): 57 self.debug('job started component with avatarId %s', 58 avatarId) 59 # FIXME: drills down too much? 60 self._heaven._startSet.createSuccess(avatarId) 61 62 def error(failure, job): 63 msg = log.getFailureMessage(failure) 64 if failure.check(errors.ComponentCreateError): 65 self.warning('could not create component %s of type %s:' 66 ' %s', job.avatarId, job.type, msg) 67 else: 68 self.warning('unhandled error creating component %s: %s', 69 job.avatarId, msg) 70 # FIXME: drills down too much? 71 self._heaven._startSet.createFailed(job.avatarId, failure) 72 73 def gotPid(pid): 74 self.pid = pid 75 info = self._heaven.getManagerConnectionInfo() 76 if info.use_ssl: 77 transport = 'ssl' 78 else: 79 transport = 'tcp' 80 job = self._heaven.getJobInfo(pid) 81 workerName = self._heaven.getWorkerName() 82 83 d = bootstrap(workerName, info.host, info.port, transport, 84 info.authenticator, job.bundles) 85 d.addCallback(create, job) 86 d.addCallback(success, job.avatarId) 87 d.addErrback(error, job) 88 return d 89 d = self.mindCallRemote("getPid") 90 d.addCallback(gotPid) 91 return d 92
93 - def stop(self):
94 """ 95 returns: a deferred marking completed stop. 96 """ 97 if not self.mind: 98 self.debug('already logged out') 99 return defer.succeed(None) 100 else: 101 self.debug('stopping') 102 return self.mindCallRemote('stop')
103
104 - def sendFeed(self, feedName, fd, eaterId):
105 """ 106 Tell the feeder to send the given feed to the given fd. 107 108 @returns: whether the fd was successfully handed off to the component. 109 """ 110 self.debug('Sending FD %d to component job to feed %s to fd', 111 fd, feedName) 112 113 # it is possible that the component has logged out, in which 114 # case we don't have a mind. Trying to check for this earlier 115 # only introduces a race, so we handle it here by triggering a 116 # disconnect on the fd. 117 if self.mind: 118 message = "sendFeed %s %s" % (feedName, eaterId) 119 return self._sendFileDescriptor(fd, message) 120 else: 121 self.debug('my mind is gone, trigger disconnect') 122 return False
123
124 - def receiveFeed(self, eaterAlias, fd, feedId):
125 """ 126 Tell the feeder to receive the given feed from the given fd. 127 128 @returns: whether the fd was successfully handed off to the component. 129 """ 130 self.debug('Sending FD %d to component job to eat %s from fd', 131 fd, eaterAlias) 132 133 # same note as in sendFeed 134 if self.mind: 135 message = "receiveFeed %s %s" % (eaterAlias, feedId) 136 return self._sendFileDescriptor(fd, message) 137 else: 138 self.debug('my mind is gone, trigger disconnect') 139 return False
140
141 - def perspective_cleanShutdown(self):
142 """ 143 This notification from the job process will be fired when it is 144 shutting down, so that although the process might still be 145 around, we know it's OK to accept new start requests for this 146 avatar ID. 147 """ 148 self.info("component %s shutting down cleanly", self.avatarId) 149 # FIXME: drills down too much? 150 self._heaven._startSet.shutdownStart(self.avatarId)
151 152
153 -class ComponentJobInfo(base.JobInfo):
154 __slots__ = ('conf', ) 155
156 - def __init__(self, pid, avatarId, type, moduleName, methodName, 157 nice, bundles, conf):
161 162
163 -class ComponentJobHeaven(base.BaseJobHeaven):
164 avatarClass = ComponentJobAvatar 165 logCategory = 'component-job-heaven' 166
167 - def getManagerConnectionInfo(self):
168 """ 169 Gets the L{flumotion.common.connection.PBConnectionInfo} 170 describing how to connect to the manager. 171 172 @rtype: L{flumotion.common.connection.PBConnectionInfo} 173 """ 174 return self.brain.managerConnectionInfo
175
176 - def spawn(self, avatarId, type, moduleName, methodName, nice, 177 bundles, conf):
178 """ 179 Spawn a new job. 180 181 This will spawn a new flumotion-job process, running under the 182 requested nice level. When the job logs in, it will be told to 183 load bundles and run a function, which is expected to return a 184 component. 185 186 @param avatarId: avatarId the component should use to log in 187 @type avatarId: str 188 @param type: type of component to start 189 @type type: str 190 @param moduleName: name of the module to create the component from 191 @type moduleName: str 192 @param methodName: the factory method to use to create the component 193 @type methodName: str 194 @param nice: nice level 195 @type nice: int 196 @param bundles: ordered list of (bundleName, bundlePath) for this 197 component 198 @type bundles: list of (str, str) 199 @param conf: component configuration 200 @type conf: dict 201 """ 202 d = self._startSet.createStart(avatarId) 203 204 p = base.JobProcessProtocol(self, avatarId, self._startSet) 205 executable = os.path.join(configure.bindir, 'flumotion-job') 206 if not os.path.exists(executable): 207 self.error("Trying to spawn job process, but '%s' does not " 208 "exist", executable) 209 argv = [executable, avatarId, self._socketPath] 210 211 realexecutable = executable 212 213 # Run some jobs under valgrind, optionally. Would be nice to have the 214 # arguments to run it with configurable, but this'll do for now. 215 # FLU_VALGRIND_JOB takes a comma-seperated list of full component 216 # avatar IDs. 217 if 'FLU_VALGRIND_JOB' in os.environ: 218 jobnames = os.environ['FLU_VALGRIND_JOB'].split(',') 219 if avatarId in jobnames: 220 realexecutable = 'valgrind' 221 # We can't just valgrind flumotion-job, we have to valgrind 222 # python running flumotion-job, otherwise we'd need 223 # --trace-children (not quite sure why), which we don't want 224 argv = ['valgrind', '--leak-check=full', '--num-callers=24', 225 '--leak-resolution=high', '--show-reachable=yes', 226 'python'] + argv 227 228 childFDs = {0: 0, 1: 1, 2: 2} 229 env = {} 230 env.update(os.environ) 231 env['FLU_DEBUG'] = log.getDebug() 232 process = reactor.spawnProcess(p, realexecutable, env=env, args=argv, 233 childFDs=childFDs) 234 235 p.setPid(process.pid) 236 237 self.addJobInfo(process.pid, 238 ComponentJobInfo(process.pid, avatarId, type, 239 moduleName, methodName, nice, 240 bundles, conf)) 241 return d
242 243
244 -class CheckJobAvatar(base.BaseJobAvatar):
245
246 - def haveMind(self):
247 # FIXME: drills down too much? 248 249 def gotPid(pid): 250 self.pid = pid 251 job = self._heaven.getJobInfo(pid) 252 self._heaven._startSet.createSuccess(job.avatarId)
253 254 d = self.mindCallRemote("getPid") 255 d.addCallback(gotPid) 256 return d
257
258 - def stop(self):
259 """ 260 returns: a deferred marking completed stop. 261 """ 262 self._heaven._startSet.shutdownStart(self.avatarId) 263 self._heaven.killJob(self.avatarId, signal.SIGTERM)
264
265 - def perspective_cleanShutdown(self):
266 self.debug("job is stopping")
267 268
269 -class CheckJobHeaven(base.BaseJobHeaven):
270 avatarClass = CheckJobAvatar 271 logCategory = 'check-job-heaven' 272 273 _checkCount = 0 274 _timeout = 45 275
276 - def __init__(self, brain):
277 base.BaseJobHeaven.__init__(self, brain) 278 279 # job processes that are available to do work (i.e. not actively 280 # running checks) 281 self.jobPool = []
282
283 - def getCheckJobFromPool(self):
284 if self.jobPool: 285 job, expireDC = self.jobPool.pop(0) 286 expireDC.cancel() 287 self.debug('running check in already-running job %s', 288 job.avatarId) 289 return defer.succeed(job) 290 291 avatarId = 'check-%d' % (self._checkCount, ) 292 self._checkCount += 1 293 294 self.debug('spawning new job %s to run a check', avatarId) 295 d = self._startSet.createStart(avatarId) 296 297 p = base.JobProcessProtocol(self, avatarId, self._startSet) 298 executable = os.path.join(configure.bindir, 'flumotion-job') 299 argv = [executable, avatarId, self._socketPath] 300 301 childFDs = {0: 0, 1: 1, 2: 2} 302 env = {} 303 env.update(os.environ) 304 env['FLU_DEBUG'] = log.getDebug() 305 process = reactor.spawnProcess(p, executable, env=env, args=argv, 306 childFDs=childFDs) 307 308 p.setPid(process.pid) 309 jobInfo = base.JobInfo(process.pid, avatarId, type, None, None, 310 None, []) 311 self._jobInfos[process.pid] = jobInfo 312 313 def haveMind(_): 314 # we have a mind, in theory; return the job avatar 315 return self.avatars[avatarId]
316 317 d.addCallback(haveMind) 318 return d
319
320 - def runCheck(self, bundles, moduleName, methodName, *args, **kwargs):
321 322 def haveJob(job): 323 324 def callProc(_): 325 return job.mindCallRemote('runFunction', moduleName, 326 methodName, *args, **kwargs)
327 328 def timeout(sig): 329 self.killJobByPid(job.pid, sig) 330 331 def haveResult(res): 332 if not termtimeout.active(): 333 self.info("Discarding error %s", res) 334 res = messages.Result() 335 res.add(messages.Error( 336 T_(N_("Check timed out.")), 337 debug=("Timed out running %s."%methodName))) 338 else: 339 340 def expire(): 341 if (job, expireDC) in self.jobPool: 342 self.debug('stopping idle check job process %s', 343 job.avatarId) 344 self.jobPool.remove((job, expireDC)) 345 job.mindCallRemote('stop') 346 expireDC = reactor.callLater(self._timeout, expire) 347 self.jobPool.append((job, expireDC)) 348 349 if termtimeout.active(): 350 termtimeout.cancel() 351 if killtimeout.active(): 352 killtimeout.cancel() 353 return res 354 355 # add callbacks and errbacks that kill the job 356 357 termtimeout = reactor.callLater(self._timeout, timeout, 358 signal.SIGTERM) 359 killtimeout = reactor.callLater(self._timeout, timeout, 360 signal.SIGKILL) 361 362 d = job.mindCallRemote('bootstrap', self.getWorkerName(), 363 None, None, None, None, bundles) 364 d.addCallback(callProc) 365 d.addCallbacks(haveResult, haveResult) 366 return d 367 368 d = self.getCheckJobFromPool() 369 d.addCallback(haveJob) 370 371 return d 372