Package flumotion :: Package manager :: Module manager
[hide private]

Source Code for Module flumotion.manager.manager

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_manager -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  """ 
  23  manager implementation and related classes 
  24   
  25  API Stability: semi-stable 
  26   
  27  @var  LOCAL_IDENTITY: an identity for the manager itself; can be used 
  28                        to compare against to verify that the manager 
  29                        requested an action 
  30  @type LOCAL_IDENTITY: L{LocalIdentity} 
  31  """ 
  32   
  33  import os 
  34   
  35  from twisted.internet import reactor, defer 
  36  from twisted.python import components, failure 
  37  from twisted.spread import pb 
  38  from twisted.cred import portal 
  39  from zope.interface import implements 
  40   
  41  from flumotion.common import errors, interfaces, log, registry 
  42  from flumotion.common import planet, common, dag, messages, reflectcall, server 
  43  from flumotion.common.i18n import N_, gettexter 
  44  from flumotion.common.identity import RemoteIdentity, LocalIdentity 
  45  from flumotion.common.netutils import addressGetHost 
  46  from flumotion.common.planet import moods 
  47  from flumotion.configure import configure 
  48  from flumotion.manager import admin, component, worker, base, config 
  49  from flumotion.twisted import checkers 
  50  from flumotion.twisted import portal as fportal 
  51  from flumotion.project import project 
  52   
  53  __all__ = ['ManagerServerFactory', 'Vishnu'] 
  54  __version__ = "$Rev: 8652 $" 
  55  T_ = gettexter() 
  56  LOCAL_IDENTITY = LocalIdentity('manager') 
  57   
  58   
  59  # an internal class 
  60   
  61   
62 -class Dispatcher(log.Loggable):
63 """ 64 I implement L{twisted.cred.portal.IRealm}. 65 I make sure that when a L{pb.Avatar} is requested through me, the 66 Avatar being returned knows about the mind (client) requesting 67 the Avatar. 68 """ 69 70 implements(portal.IRealm) 71 72 logCategory = 'dispatcher' 73
74 - def __init__(self, computeIdentity):
75 """ 76 @param computeIdentity: see L{Vishnu.computeIdentity} 77 @type computeIdentity: callable 78 """ 79 self._interfaceHeavens = {} # interface -> heaven 80 self._computeIdentity = computeIdentity 81 self._bouncer = None 82 self._avatarKeycards = {} # avatarId -> keycard
83
84 - def setBouncer(self, bouncer):
85 """ 86 @param bouncer: the bouncer to authenticate with 87 @type bouncer: L{flumotion.component.bouncers.bouncer} 88 """ 89 self._bouncer = bouncer
90
91 - def registerHeaven(self, heaven, interface):
92 """ 93 Register a Heaven as managing components with the given interface. 94 95 @type interface: L{twisted.python.components.Interface} 96 @param interface: a component interface to register the heaven with. 97 """ 98 assert isinstance(heaven, base.ManagerHeaven) 99 100 self._interfaceHeavens[interface] = heaven
101 102 ### IRealm methods 103
104 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
105 106 def got_avatar(avatar): 107 if avatar.avatarId in heaven.avatars: 108 raise errors.AlreadyConnectedError(avatar.avatarId) 109 heaven.avatars[avatar.avatarId] = avatar 110 self._avatarKeycards[avatar.avatarId] = keycard 111 112 # OK so this is byzantine, but test_manager_manager actually 113 # uses these kwargs to set its own info. so don't change 114 # these args or their order or you will break your test 115 # suite. 116 117 def cleanup(avatarId=avatar.avatarId, avatar=avatar, mind=mind): 118 self.info('lost connection to client %r', avatar) 119 del heaven.avatars[avatar.avatarId] 120 avatar.onShutdown() 121 # avoid leaking the keycard 122 keycard = self._avatarKeycards.pop(avatarId) 123 if self._bouncer: 124 try: 125 self._bouncer.removeKeycard(keycard) 126 except KeyError: 127 self.warning("bouncer forgot about keycard %r", 128 keycard)
129 130 return (pb.IPerspective, avatar, cleanup)
131 132 def got_error(failure): 133 # If we failed for some reason, we want to drop the connection. 134 # However, we want the failure to get to the client, so we don't 135 # call loseConnection() immediately - we return the failure first. 136 # loseConnection() will then not drop the connection until it has 137 # finished sending the current data to the client. 138 reactor.callLater(0, mind.broker.transport.loseConnection) 139 return failure 140 141 if pb.IPerspective not in ifaces: 142 raise errors.NoPerspectiveError(avatarId) 143 if len(ifaces) != 2: 144 # IPerspective and the specific avatar interface. 145 raise errors.NoPerspectiveError(avatarId) 146 iface = [x for x in ifaces if x != pb.IPerspective][0] 147 if iface not in self._interfaceHeavens: 148 self.warning('unknown interface %r', iface) 149 raise errors.NoPerspectiveError(avatarId) 150 151 heaven = self._interfaceHeavens[iface] 152 klass = heaven.avatarClass 153 host = addressGetHost(mind.broker.transport.getPeer()) 154 d = self._computeIdentity(keycard, host) 155 d.addCallback(lambda identity: \ 156 klass.makeAvatar(heaven, avatarId, identity, mind)) 157 d.addCallbacks(got_avatar, got_error) 158 return d 159 160
161 -class ComponentMapper:
162 """ 163 I am an object that ties together different objects related to a 164 component. I am used as values in a lookup hash in the vishnu. 165 """ 166
167 - def __init__(self):
168 self.state = None # ManagerComponentState; created first 169 self.id = None # avatarId of the eventual ComponentAvatar 170 self.avatar = None # ComponentAvatar 171 self.jobState = None # ManagerJobState of a running component
172 173
174 -class Vishnu(log.Loggable):
175 """ 176 I am the toplevel manager object that knows about all 177 heavens and factories. 178 179 @cvar dispatcher: dispatcher to create avatars 180 @type dispatcher: L{Dispatcher} 181 @cvar workerHeaven: the worker heaven 182 @type workerHeaven: L{worker.WorkerHeaven} 183 @cvar componentHeaven: the component heaven 184 @type componentHeaven: L{component.ComponentHeaven} 185 @cvar adminHeaven: the admin heaven 186 @type adminHeaven: L{admin.AdminHeaven} 187 @cvar configDir: the configuration directory for 188 this Vishnu's manager 189 @type configDir: str 190 """ 191 192 implements(server.IServable) 193 194 logCategory = "vishnu" 195
196 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
197 # create a Dispatcher which will hand out avatars to clients 198 # connecting to me 199 self.dispatcher = Dispatcher(self.computeIdentity) 200 201 self.workerHeaven = self._createHeaven(interfaces.IWorkerMedium, 202 worker.WorkerHeaven) 203 self.componentHeaven = self._createHeaven(interfaces.IComponentMedium, 204 component.ComponentHeaven) 205 self.adminHeaven = self._createHeaven(interfaces.IAdminMedium, 206 admin.AdminHeaven) 207 208 self.running = True 209 210 def setStopped(): 211 self.running = False
212 reactor.addSystemEventTrigger('before', 'shutdown', setStopped) 213 214 if configDir is not None: 215 self.configDir = configDir 216 else: 217 self.configDir = os.path.join(configure.configdir, 218 "managers", name) 219 220 self.bouncer = None # used by manager to authenticate worker/component 221 222 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 223 224 self._componentMappers = {} # any object -> ComponentMapper 225 226 self.state = planet.ManagerPlanetState() 227 self.state.set('name', name) 228 self.state.set('version', configure.version) 229 230 self.plugs = {} # socket -> list of plugs 231 232 # create a portal so that I can be connected to, through our dispatcher 233 # implementing the IRealm and a bouncer 234 self.portal = fportal.BouncerPortal(self.dispatcher, None) 235 #unsafeTracebacks = 1 # for debugging tracebacks to clients 236 self.factory = pb.PBServerFactory(self.portal, 237 unsafeTracebacks=unsafeTracebacks) 238 self.connectionInfo = {} 239 self.setConnectionInfo(None, None, None)
240
241 - def shutdown(self):
242 """Cancel any pending operations in preparation for shutdown. 243 244 This method is mostly useful for unit tests; currently, it is 245 not called during normal operation. Note that the caller is 246 responsible for stopping listening on the port, as the the 247 manager does not have a handle on the twisted port object. 248 249 @returns: A deferred that will fire when the manager has shut 250 down. 251 """ 252 if self.bouncer: 253 return self.bouncer.stop() 254 else: 255 return defer.succeed(None)
256
257 - def setConnectionInfo(self, host, port, use_ssl):
258 info = dict(host=host, port=port, use_ssl=use_ssl) 259 self.connectionInfo.update(info)
260
261 - def getConfiguration(self):
262 """Returns the manager's configuration as a string suitable for 263 importing via loadConfiguration(). 264 """ 265 return config.exportPlanetXml(self.state)
266
267 - def getBundlerBasket(self):
268 """ 269 Return a bundler basket to unbundle from. 270 If the registry files were updated since the last time, the 271 bundlerbasket will be rebuilt. 272 273 @since: 0.2.2 274 @rtype: L{flumotion.common.bundle.BundlerBasket} 275 """ 276 if registry.getRegistry().rebuildNeeded(): 277 self.info("Registry changed, rebuilding") 278 registry.getRegistry().verify(force=True) 279 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 280 return self.bundlerBasket
281
282 - def addMessage(self, level, mid, format, *args, **kwargs):
283 """ 284 Convenience message to construct a message and add it to the 285 planet state. `format' should be marked as translatable in the 286 source with N_, and *args will be stored as format arguments. 287 Keyword arguments are passed on to the message constructor. See 288 L{flumotion.common.messages.Message} for the meanings of the 289 rest of the arguments. 290 291 For example:: 292 293 self.addMessage(messages.WARNING, 'foo-warning', 294 N_('The answer is %d'), 42, debug='not really') 295 """ 296 self.addMessageObject(messages.Message(level, 297 T_(format, *args), 298 mid=mid, **kwargs))
299
300 - def addMessageObject(self, message):
301 """ 302 Add a message to the planet state. 303 304 @type message: L{flumotion.common.messages.Message} 305 """ 306 self.state.setitem('messages', message.id, message)
307
308 - def clearMessage(self, mid):
309 """ 310 Clear any messages with the given message ID from the planet 311 state. 312 313 @type mid: message ID, normally a str 314 """ 315 if mid in self.state.get('messages'): 316 self.state.delitem('messages', mid)
317
318 - def adminAction(self, identity, message, args, kw):
319 """ 320 @param identity: L{flumotion.common.identity.Identity} 321 """ 322 socket = 'flumotion.component.plugs.adminaction.AdminActionPlug' 323 if socket in self.plugs: 324 for plug in self.plugs[socket]: 325 plug.action(identity, message, args, kw)
326
327 - def computeIdentity(self, keycard, remoteHost):
328 """ 329 Compute a suitable identity for a remote host. First looks to 330 see if there is a 331 L{flumotion.component.plugs.identity.IdentityProviderPlug} plug 332 installed on the manager, falling back to user@host. 333 334 The identity is only used in the adminaction interface. An 335 example of its use is when you have an adminaction plug that 336 checks an admin's privileges before actually doing an action; 337 the identity object you use here might store the privileges that 338 the admin has. 339 340 @param keycard: the keycard that the remote host used to log in. 341 @type keycard: L{flumotion.common.keycards.Keycard} 342 @param remoteHost: the ip of the remote host 343 @type remoteHost: str 344 345 @rtype: a deferred that will fire a 346 L{flumotion.common.identity.RemoteIdentity} 347 """ 348 349 socket = 'flumotion.component.plugs.identity.IdentityProviderPlug' 350 if socket in self.plugs: 351 for plug in self.plugs[socket]: 352 identity = plug.computeIdentity(keycard, remoteHost) 353 if identity: 354 return identity 355 username = getattr(keycard, 'username', None) 356 return defer.succeed(RemoteIdentity(username, remoteHost))
357
358 - def _addComponent(self, conf, parent, identity):
359 """ 360 Add a component state for the given component config entry. 361 362 @rtype: L{flumotion.common.planet.ManagerComponentState} 363 """ 364 365 self.debug('adding component %s to %s' 366 % (conf.name, parent.get('name'))) 367 368 if identity != LOCAL_IDENTITY: 369 self.adminAction(identity, '_addComponent', (conf, parent), {}) 370 371 state = planet.ManagerComponentState() 372 state.set('name', conf.name) 373 state.set('type', conf.getType()) 374 state.set('workerRequested', conf.worker) 375 state.setMood(moods.sleeping.value) 376 state.set('config', conf.getConfigDict()) 377 378 state.set('parent', parent) 379 parent.append('components', state) 380 381 avatarId = conf.getConfigDict()['avatarId'] 382 383 self.clearMessage('loadComponent-%s' % avatarId) 384 385 configDict = conf.getConfigDict() 386 projectName = configDict['project'] 387 versionTuple = configDict['version'] 388 389 projectVersion = None 390 try: 391 projectVersion = project.get(projectName, 'version') 392 except errors.NoProjectError: 393 m = messages.Warning(T_(N_( 394 "This component is configured for Flumotion project '%s', " 395 "but that project is not installed.\n"), 396 projectName)) 397 state.append('messages', m) 398 399 if projectVersion: 400 self.debug('project %s, version %r, project version %r' % ( 401 projectName, versionTuple, projectVersion)) 402 if not common.checkVersionsCompat( 403 versionTuple, 404 common.versionStringToTuple(projectVersion)): 405 m = messages.Warning(T_(N_( 406 "This component is configured for " 407 "Flumotion '%s' version %s, " 408 "but you are running version %s.\n" 409 "Please update the configuration of the component.\n"), 410 projectName, common.versionTupleToString(versionTuple), 411 projectVersion)) 412 state.append('messages', m) 413 414 # add to mapper 415 m = ComponentMapper() 416 m.state = state 417 m.id = avatarId 418 self._componentMappers[state] = m 419 self._componentMappers[avatarId] = m 420 421 return state
422
423 - def _updateStateFromConf(self, _, conf, identity):
424 """ 425 Add a new config object into the planet state. 426 427 @returns: a list of all components added 428 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 429 """ 430 431 self.debug('syncing up planet state with config') 432 added = [] # added components while parsing 433 434 def checkNotRunning(comp, parentState): 435 name = comp.getName() 436 437 comps = dict([(x.get('name'), x) 438 for x in parentState.get('components')]) 439 runningComps = dict([(x.get('name'), x) 440 for x in parentState.get('components') 441 if x.get('mood') != moods.sleeping.value]) 442 if name not in comps: 443 # We don't have it at all; allow it 444 return True 445 elif name not in runningComps: 446 # We have it, but it's not running. Allow it after deleting 447 # the old one. 448 oldComp = comps[name] 449 self.deleteComponent(oldComp) 450 return True 451 452 # if we get here, the component is already running; warn if 453 # the running configuration is different. Return False in 454 # all cases. 455 parent = comps[name].get('parent').get('name') 456 newConf = c.getConfigDict() 457 oldConf = comps[name].get('config') 458 459 if newConf == oldConf: 460 self.debug('%s already has component %s running with ' 461 'same configuration', parent, name) 462 self.clearMessage('loadComponent-%s' % oldConf['avatarId']) 463 return False 464 465 self.info('%s already has component %s, but configuration ' 466 'not the same -- notifying admin', parent, name) 467 468 diff = config.dictDiff(oldConf, newConf) 469 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new') 470 471 self.addMessage(messages.WARNING, 472 'loadComponent-%s' % oldConf['avatarId'], 473 N_('Could not load component %r into %r: ' 474 'a component is already running with ' 475 'this name, but has a different ' 476 'configuration.'), name, parent, 477 debug=diffMsg) 478 return False
479 480 state = self.state 481 atmosphere = state.get('atmosphere') 482 for c in conf.atmosphere.components.values(): 483 if checkNotRunning(c, atmosphere): 484 added.append(self._addComponent(c, atmosphere, identity)) 485 486 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 487 for f in conf.flows: 488 if f.name in flows: 489 flow = flows[f.name] 490 else: 491 self.info('creating flow %r', f.name) 492 flow = planet.ManagerFlowState(name=f.name, parent=state) 493 state.append('flows', flow) 494 495 for c in f.components.values(): 496 if checkNotRunning(c, flow): 497 added.append(self._addComponent(c, flow, identity)) 498 499 return added 500
501 - def _startComponents(self, components, identity):
502 # now start all components that need starting -- collecting into 503 # an temporary dict of the form {workerId => [components]} 504 componentsToStart = {} 505 for c in components: 506 workerId = c.get('workerRequested') 507 if not workerId in componentsToStart: 508 componentsToStart[workerId] = [] 509 componentsToStart[workerId].append(c) 510 self.debug('_startComponents: componentsToStart %r' % 511 (componentsToStart, )) 512 513 for workerId, componentStates in componentsToStart.items(): 514 self._workerCreateComponents(workerId, componentStates)
515
516 - def _loadComponentConfiguration(self, conf, identity):
517 # makeBouncer only makes a bouncer if there is one in the config 518 d = defer.succeed(None) 519 d.addCallback(self._updateStateFromConf, conf, identity) 520 d.addCallback(self._startComponents, identity) 521 return d
522
523 - def loadComponentConfigurationXML(self, file, identity):
524 """ 525 Load the configuration from the given XML, merging it on top of 526 the currently running configuration. 527 528 @param file: file to parse, either as an open file object, 529 or as the name of a file to open 530 @type file: str or file 531 @param identity: The identity making this request.. This is used by the 532 adminaction logging mechanism in order to say who is 533 performing the action. 534 @type identity: L{flumotion.common.identity.Identity} 535 """ 536 self.debug('loading configuration') 537 mid = 'loadComponent-parse-error' 538 if isinstance(file, str): 539 mid += '-%s' % file 540 try: 541 self.clearMessage(mid) 542 conf = config.PlanetConfigParser(file) 543 conf.parse() 544 return self._loadComponentConfiguration(conf, identity) 545 except errors.ConfigError, e: 546 self.addMessage(messages.WARNING, mid, 547 N_('Invalid component configuration.'), 548 debug=e.args[0]) 549 return defer.fail(e) 550 except errors.UnknownComponentError, e: 551 if isinstance(file, str): 552 debug = 'Configuration loaded from file %r' % file 553 else: 554 debug = 'Configuration loaded remotely' 555 self.addMessage(messages.WARNING, mid, 556 N_('Unknown component in configuration: %s.'), 557 e.args[0], debug=debug) 558 return defer.fail(e) 559 except Exception, e: 560 self.addMessage(messages.WARNING, mid, 561 N_('Unknown error while loading configuration.'), 562 debug=log.getExceptionMessage(e)) 563 return defer.fail(e)
564
565 - def _loadManagerPlugs(self, conf):
566 # Load plugs 567 for socket, plugs in conf.plugs.items(): 568 if not socket in self.plugs: 569 self.plugs[socket] = [] 570 571 for args in plugs: 572 self.debug('loading plug type %s for socket %s' 573 % (args['type'], socket)) 574 defs = registry.getRegistry().getPlug(args['type']) 575 e = defs.getEntry() 576 call = reflectcall.reflectCallCatching 577 578 plug = call(errors.ConfigError, 579 e.getModuleName(), e.getFunction(), args) 580 self.plugs[socket].append(plug)
581
582 - def startManagerPlugs(self):
583 for socket in self.plugs: 584 for plug in self.plugs[socket]: 585 self.debug('starting plug %r for socket %s', plug, socket) 586 plug.start(self)
587
588 - def _loadManagerBouncer(self, conf):
589 if not (conf.bouncer): 590 self.warning('no bouncer defined, nothing can access the ' 591 'manager') 592 return defer.succeed(None) 593 594 self.debug('going to start manager bouncer %s of type %s', 595 conf.bouncer.name, conf.bouncer.type) 596 597 defs = registry.getRegistry().getComponent(conf.bouncer.type) 598 entry = defs.getEntryByType('component') 599 # FIXME: use entry.getModuleName() (doesn't work atm?) 600 moduleName = defs.getSource() 601 methodName = entry.getFunction() 602 bouncer = reflectcall.createComponent(moduleName, methodName, 603 conf.bouncer.getConfigDict()) 604 d = bouncer.waitForHappy() 605 606 def setupCallback(result): 607 bouncer.debug('started') 608 self.setBouncer(bouncer)
609 610 def setupErrback(failure): 611 self.warning('Error starting manager bouncer') 612 d.addCallbacks(setupCallback, setupErrback) 613 return d 614
615 - def loadManagerConfigurationXML(self, file):
616 """ 617 Load manager configuration from the given XML. The manager 618 configuration is currently used to load the manager's bouncer 619 and plugs, and is only run once at startup. 620 621 @param file: file to parse, either as an open file object, 622 or as the name of a file to open 623 @type file: str or file 624 """ 625 self.debug('loading configuration') 626 conf = config.ManagerConfigParser(file) 627 conf.parseBouncerAndPlugs() 628 self._loadManagerPlugs(conf) 629 self._loadManagerBouncer(conf) 630 conf.unlink()
631 632 __pychecker__ = 'maxargs=11' # hahaha 633
634 - def loadComponent(self, identity, componentType, componentId, 635 componentLabel, properties, workerName, 636 plugs, eaters, isClockMaster, virtualFeeds):
637 """ 638 Load a component into the manager configuration. 639 640 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent} 641 for a definition of the argument types. 642 """ 643 self.debug('loading %s component %s on %s', 644 componentType, componentId, workerName) 645 parentName, compName = common.parseComponentId(componentId) 646 647 if isClockMaster: 648 raise NotImplementedError("Clock master components are not " 649 "yet supported") 650 if worker is None: 651 raise errors.ConfigError("Component %r needs to specify the" 652 " worker on which it should run" 653 % componentId) 654 655 state = self.state 656 compState = None 657 658 compConf = config.ConfigEntryComponent(compName, parentName, 659 componentType, 660 componentLabel, 661 properties, 662 plugs, workerName, 663 eaters, isClockMaster, 664 None, None, virtualFeeds) 665 666 if compConf.defs.getNeedsSynchronization(): 667 raise NotImplementedError("Components that need " 668 "synchronization are not yet " 669 "supported") 670 671 if parentName == 'atmosphere': 672 parentState = state.get('atmosphere') 673 else: 674 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 675 if parentName in flows: 676 parentState = flows[parentName] 677 else: 678 self.info('creating flow %r', parentName) 679 parentState = planet.ManagerFlowState(name=parentName, 680 parent=state) 681 state.append('flows', parentState) 682 683 components = [x.get('name') for x in parentState.get('components')] 684 if compName in components: 685 self.debug('%r already has component %r', parentName, compName) 686 raise errors.ComponentAlreadyExistsError(compName) 687 688 compState = self._addComponent(compConf, parentState, identity) 689 690 self._startComponents([compState], identity) 691 692 return compState
693
694 - def _createHeaven(self, interface, klass):
695 """ 696 Create a heaven of the given klass that will send avatars to clients 697 implementing the given medium interface. 698 699 @param interface: the medium interface to create a heaven for 700 @type interface: L{flumotion.common.interfaces.IMedium} 701 @param klass: the type of heaven to create 702 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven} 703 """ 704 assert issubclass(interface, interfaces.IMedium) 705 heaven = klass(self) 706 self.dispatcher.registerHeaven(heaven, interface) 707 return heaven
708
709 - def setBouncer(self, bouncer):
710 """ 711 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer} 712 """ 713 if self.bouncer: 714 self.warning("manager already had a bouncer, setting anyway") 715 716 self.bouncer = bouncer 717 self.portal.bouncer = bouncer 718 self.dispatcher.setBouncer(bouncer)
719
720 - def getFactory(self):
721 return self.factory
722
723 - def componentCreate(self, componentState):
724 """ 725 Create the given component. This will currently also trigger 726 a start eventually when the component avatar attaches. 727 728 The component should be sleeping. 729 The worker it should be started on should be present. 730 """ 731 m = componentState.get('mood') 732 if m != moods.sleeping.value: 733 raise errors.ComponentMoodError("%r not sleeping but %s" % ( 734 componentState, moods.get(m).name)) 735 736 p = componentState.get('moodPending') 737 if p != None: 738 raise errors.ComponentMoodError( 739 "%r already has a pending mood %s" % ( 740 componentState, moods.get(p).name)) 741 742 # find a worker this component can start on 743 workerId = (componentState.get('workerName') 744 or componentState.get('workerRequested')) 745 746 if not workerId in self.workerHeaven.avatars: 747 raise errors.ComponentNoWorkerError( 748 "worker %s is not logged in" % workerId) 749 else: 750 return self._workerCreateComponents(workerId, [componentState])
751
752 - def _componentStopNoAvatar(self, componentState, avatarId):
753 # NB: reset moodPending if asked to stop without an avatar 754 # because we changed above to allow stopping even if moodPending 755 # is happy 756 757 def stopSad(): 758 self.debug('asked to stop a sad component without avatar') 759 for mid in componentState.get('messages')[:]: 760 self.debug("Deleting message %r", mid) 761 componentState.remove('messages', mid) 762 763 componentState.setMood(moods.sleeping.value) 764 componentState.set('moodPending', None) 765 return defer.succeed(None)
766 767 def stopLost(): 768 769 def gotComponents(comps): 770 return avatarId in comps 771 772 def gotJobRunning(running): 773 if running: 774 self.warning('asked to stop lost component %r, but ' 775 'it is still running', avatarId) 776 # FIXME: put a message on the state to suggest a 777 # kill? 778 msg = "Cannot stop lost component which is still running." 779 raise errors.ComponentMoodError(msg) 780 else: 781 self.debug('component %r seems to be really lost, ' 782 'setting to sleeping') 783 componentState.setMood(moods.sleeping.value) 784 componentState.set('moodPending', None) 785 return None 786 787 self.debug('asked to stop a lost component without avatar') 788 workerName = componentState.get('workerRequested') 789 if workerName and self.workerHeaven.hasAvatar(workerName): 790 self.debug('checking if component has job process running') 791 d = self.workerHeaven.getAvatar(workerName).getComponents() 792 d.addCallback(gotComponents) 793 d.addCallback(gotJobRunning) 794 return d 795 else: 796 self.debug('component lacks a worker, setting to sleeping') 797 d = defer.maybeDeferred(gotJobRunning, False) 798 return d 799 800 def stopUnknown(): 801 msg = ('asked to stop a component without avatar in mood %s' 802 % moods.get(mood)) 803 self.warning(msg) 804 return defer.fail(errors.ComponentMoodError(msg)) 805 806 mood = componentState.get('mood') 807 stoppers = {moods.sad.value: stopSad, 808 moods.lost.value: stopLost} 809 return stoppers.get(mood, stopUnknown)() 810
811 - def _componentStopWithAvatar(self, componentState, componentAvatar):
812 # FIXME: This deferred is just the remote call; there's no actual 813 # deferred for completion of shutdown. 814 d = componentAvatar.stop() 815 816 return d
817
818 - def componentStop(self, componentState):
819 """ 820 Stop the given component. 821 If the component was sad, we clear its sad state as well, 822 since the stop was explicitly requested by the admin. 823 824 @type componentState: L{planet.ManagerComponentState} 825 826 @rtype: L{twisted.internet.defer.Deferred} 827 """ 828 self.debug('componentStop(%r)', componentState) 829 # We permit stopping a component even if it has a pending mood of 830 # happy, so that if it never gets to happy, we can still stop it. 831 if (componentState.get('moodPending') != None and 832 componentState.get('moodPending') != moods.happy.value): 833 self.debug("Pending mood is %r", componentState.get('moodPending')) 834 835 raise errors.BusyComponentError(componentState) 836 837 m = self.getComponentMapper(componentState) 838 if not m: 839 # We have a stale componentState for an already-deleted 840 # component 841 self.warning("Component mapper for component state %r doesn't " 842 "exist", componentState) 843 raise errors.UnknownComponentError(componentState) 844 elif not m.avatar: 845 return self._componentStopNoAvatar(componentState, m.id) 846 else: 847 return self._componentStopWithAvatar(componentState, m.avatar)
848
849 - def componentAddMessage(self, avatarId, message):
850 """ 851 Set the given message on the given component's state. 852 Can be called e.g. by a worker to report on a crashed component. 853 Sets the mood to sad if it is an error message. 854 """ 855 if not avatarId in self._componentMappers: 856 self.warning('asked to set a message on non-mapped component %s' % 857 avatarId) 858 return 859 860 m = self._componentMappers[avatarId] 861 m.state.append('messages', message) 862 if message.level == messages.ERROR: 863 self.debug('Error message makes component sad') 864 m.state.setMood(moods.sad.value)
865 866 # FIXME: unify naming of stuff like this 867
868 - def workerAttached(self, workerAvatar):
869 # called when a worker logs in 870 workerId = workerAvatar.avatarId 871 self.debug('vishnu.workerAttached(): id %s' % workerId) 872 873 # Create all components assigned to this worker. Note that the 874 # order of creation is unimportant, it's only the order of 875 # starting that matters (and that's different code). 876 components = [c for c in self._getComponentsToCreate() 877 if c.get('workerRequested') in (workerId, None)] 878 # So now, check what components worker is running 879 # so we can remove them from this components list 880 # also add components we have that are lost but not 881 # in list given by worker 882 d = workerAvatar.getComponents() 883 884 def workerAvatarComponentListReceived(workerComponents): 885 # list() is called to work around a pychecker bug. FIXME. 886 lostComponents = list([c for c in self.getComponentStates() 887 if c.get('workerRequested') == workerId and \ 888 c.get('mood') == moods.lost.value]) 889 for comp in workerComponents: 890 # comp is an avatarId string 891 # components is a list of {ManagerComponentState} 892 if comp in self._componentMappers: 893 compState = self._componentMappers[comp].state 894 if compState in components: 895 components.remove(compState) 896 if compState in lostComponents: 897 lostComponents.remove(compState) 898 899 for compState in lostComponents: 900 self.info( 901 "Restarting previously lost component %s on worker %s", 902 self._componentMappers[compState].id, workerId) 903 # We set mood to sleeping first. This allows things to 904 # distinguish between a newly-started component and a lost 905 # component logging back in. 906 compState.set('moodPending', None) 907 compState.setMood(moods.sleeping.value) 908 909 allComponents = components + lostComponents 910 911 if not allComponents: 912 self.debug( 913 "vishnu.workerAttached(): no components for this worker") 914 return 915 916 self._workerCreateComponents(workerId, allComponents)
917 d.addCallback(workerAvatarComponentListReceived) 918 919 reactor.callLater(0, self.componentHeaven.feedServerAvailable, 920 workerId) 921
922 - def _workerCreateComponents(self, workerId, components):
923 """ 924 Create the list of components on the given worker, sequentially, but 925 in no specific order. 926 927 @param workerId: avatarId of the worker 928 @type workerId: string 929 @param components: components to start 930 @type components: list of 931 L{flumotion.common.planet.ManagerComponentState} 932 """ 933 self.debug("_workerCreateComponents: workerId %r, components %r" % ( 934 workerId, components)) 935 936 if not workerId in self.workerHeaven.avatars: 937 self.debug('worker %s not logged in yet, delaying ' 938 'component start' % workerId) 939 return defer.succeed(None) 940 941 workerAvatar = self.workerHeaven.avatars[workerId] 942 943 d = defer.Deferred() 944 945 for c in components: 946 componentType = c.get('type') 947 conf = c.get('config') 948 self.debug('scheduling create of %s on %s' 949 % (conf['avatarId'], workerId)) 950 d.addCallback(self._workerCreateComponentDelayed, 951 workerAvatar, c, componentType, conf) 952 953 d.addCallback(lambda result: self.debug( 954 '_workerCreateComponents(): completed setting up create chain')) 955 956 # now trigger the chain 957 self.debug('_workerCreateComponents(): triggering create chain') 958 d.callback(None) 959 #reactor.callLater(0, d.callback, None) 960 return d
961
962 - def _workerCreateComponentDelayed(self, result, workerAvatar, 963 componentState, componentType, conf):
964 965 avatarId = conf['avatarId'] 966 nice = conf.get('nice', 0) 967 968 # we set the moodPending to HAPPY, so this component only gets 969 # asked to start once 970 componentState.set('moodPending', moods.happy.value) 971 972 d = workerAvatar.createComponent(avatarId, componentType, nice, 973 conf) 974 # FIXME: here we get the avatar Id of the component we wanted 975 # started, so now attach it to the planetState's component state 976 d.addCallback(self._createCallback, componentState) 977 d.addErrback(self._createErrback, componentState)
978 979 # FIXME: shouldn't we return d here to make sure components 980 # wait on each other to be started ? 981
982 - def _createCallback(self, result, componentState):
983 self.debug('got avatarId %s for state %s' % (result, componentState)) 984 m = self._componentMappers[componentState] 985 assert result == m.id, "received id %s is not the expected id %s" % ( 986 result, m.id)
987
988 - def _createErrback(self, failure, state):
989 # FIXME: make ConfigError copyable so we can .check() it here 990 # and print a nicer warning 991 self.warning('failed to create component %s: %s', 992 state.get('name'), log.getFailureMessage(failure)) 993 994 if failure.check(errors.ComponentAlreadyRunningError): 995 if self._componentMappers[state].jobState: 996 self.info('component appears to have logged in in the ' 997 'meantime') 998 else: 999 self.info('component appears to be running already; ' 1000 'treating it as lost until it logs in') 1001 state.setMood(moods.lost.value) 1002 else: 1003 message = messages.Error(T_( 1004 N_("The component could not be started.")), 1005 debug=log.getFailureMessage(failure)) 1006 1007 state.setMood(moods.sad.value) 1008 state.append('messages', message) 1009 1010 return None
1011
1012 - def workerDetached(self, workerAvatar):
1013 # called when a worker logs out 1014 workerId = workerAvatar.avatarId 1015 self.debug('vishnu.workerDetached(): id %s' % workerId)
1016
1017 - def addComponentToFlow(self, componentState, flowName):
1018 # check if we have this flow yet and add if not 1019 if flowName == 'atmosphere': 1020 # treat the atmosphere like a flow, although it's not 1021 flow = self.state.get('atmosphere') 1022 else: 1023 flow = self._getFlowByName(flowName) 1024 if not flow: 1025 self.info('Creating flow "%s"' % flowName) 1026 flow = planet.ManagerFlowState() 1027 flow.set('name', flowName) 1028 flow.set('parent', self.state) 1029 self.state.append('flows', flow) 1030 1031 componentState.set('parent', flow) 1032 flow.append('components', componentState)
1033
1034 - def registerComponent(self, componentAvatar):
1035 # fetch or create a new mapper 1036 m = (self.getComponentMapper(componentAvatar.avatarId) 1037 or ComponentMapper()) 1038 1039 m.state = componentAvatar.componentState 1040 m.jobState = componentAvatar.jobState 1041 m.id = componentAvatar.avatarId 1042 m.avatar = componentAvatar 1043 1044 self._componentMappers[m.state] = m 1045 self._componentMappers[m.jobState] = m 1046 self._componentMappers[m.id] = m 1047 self._componentMappers[m.avatar] = m
1048
1049 - def unregisterComponent(self, componentAvatar):
1050 # called when the component is logging out 1051 # clear up jobState and avatar 1052 self.debug('unregisterComponent(%r): cleaning up state' % 1053 componentAvatar) 1054 1055 m = self._componentMappers[componentAvatar] 1056 1057 # unmap jobstate 1058 try: 1059 del self._componentMappers[m.jobState] 1060 except KeyError: 1061 self.warning('Could not remove jobState for %r' % componentAvatar) 1062 m.jobState = None 1063 1064 m.state.set('pid', None) 1065 m.state.set('workerName', None) 1066 m.state.set('moodPending', None) 1067 1068 # unmap avatar 1069 del self._componentMappers[m.avatar] 1070 m.avatar = None
1071
1072 - def getComponentStates(self):
1073 cList = self.state.getComponents() 1074 self.debug('getComponentStates(): %d components' % len(cList)) 1075 for c in cList: 1076 self.log(repr(c)) 1077 mood = c.get('mood') 1078 if mood == None: 1079 self.warning('%s has mood None' % c.get('name')) 1080 1081 return cList
1082
1083 - def deleteComponent(self, componentState):
1084 """ 1085 Empty the planet of the given component. 1086 1087 @returns: a deferred that will fire when all listeners have been 1088 notified of the removal of the component. 1089 """ 1090 self.debug('deleting component %r from state', componentState) 1091 c = componentState 1092 if c not in self._componentMappers: 1093 raise errors.UnknownComponentError(c) 1094 1095 flow = componentState.get('parent') 1096 if (c.get('moodPending') != None 1097 or c.get('mood') is not moods.sleeping.value): 1098 raise errors.BusyComponentError(c) 1099 1100 del self._componentMappers[self._componentMappers[c].id] 1101 del self._componentMappers[c] 1102 return flow.remove('components', c)
1103
1104 - def _getFlowByName(self, flowName):
1105 for flow in self.state.get('flows'): 1106 if flow.get('name') == flowName: 1107 return flow
1108
1109 - def deleteFlow(self, flowName):
1110 """ 1111 Empty the planet of a flow. 1112 1113 @returns: a deferred that will fire when the flow is removed. 1114 """ 1115 1116 flow = self._getFlowByName(flowName) 1117 if flow is None: 1118 raise ValueError("No flow called %s found" % (flowName, )) 1119 1120 components = flow.get('components') 1121 for c in components: 1122 # if any component is already in a mood change/command, fail 1123 if (c.get('moodPending') != None or 1124 c.get('mood') is not moods.sleeping.value): 1125 raise errors.BusyComponentError(c) 1126 for c in components: 1127 del self._componentMappers[self._componentMappers[c].id] 1128 del self._componentMappers[c] 1129 d = flow.empty() 1130 d.addCallback(lambda _: self.state.remove('flows', flow)) 1131 return d
1132
1133 - def emptyPlanet(self):
1134 """ 1135 Empty the planet of all components, and flows. Also clears all 1136 messages. 1137 1138 @returns: a deferred that will fire when the planet is empty. 1139 """ 1140 for mid in self.state.get('messages').keys(): 1141 self.clearMessage(mid) 1142 1143 # first get all components to sleep 1144 components = self.getComponentStates() 1145 1146 # if any component is already in a mood change/command, fail 1147 components = [c for c in components 1148 if c.get('moodPending') != None] 1149 if components: 1150 state = components[0] 1151 raise errors.BusyComponentError( 1152 state, 1153 "moodPending is %s" % moods.get(state.get('moodPending'))) 1154 1155 # filter out the ones that aren't sleeping and stop them 1156 components = [c for c in self.getComponentStates() 1157 if c.get('mood') is not moods.sleeping.value] 1158 1159 # create a big deferred for stopping everything 1160 d = defer.Deferred() 1161 1162 self.debug('need to stop %d components: %r' % ( 1163 len(components), components)) 1164 1165 for c in components: 1166 avatar = self._componentMappers[c].avatar 1167 # If this has logged out, but isn't sleeping (so is sad or lost), 1168 # we won't have an avatar. So, stop if it we can. 1169 if avatar: 1170 d.addCallback(lambda result, a: a.stop(), avatar) 1171 else: 1172 assert (c.get('mood') is moods.sad.value or 1173 c.get('mood') is moods.lost.value) 1174 1175 d.addCallback(self._emptyPlanetCallback) 1176 1177 # trigger the deferred after returning 1178 reactor.callLater(0, d.callback, None) 1179 1180 return d
1181
1182 - def _emptyPlanetCallback(self, result):
1183 # gets called after all components have stopped 1184 # cleans up the rest of the planet state 1185 components = self.getComponentStates() 1186 self.debug('_emptyPlanetCallback: need to delete %d components' % 1187 len(components)) 1188 1189 for c in components: 1190 if c.get('mood') is not moods.sleeping.value: 1191 self.warning('Component %s is not sleeping', c.get('name')) 1192 # clear mapper; remove componentstate and id 1193 m = self._componentMappers[c] 1194 del self._componentMappers[m.id] 1195 del self._componentMappers[c] 1196 1197 # if anything's left, we have a mistake somewhere 1198 l = self._componentMappers.keys() 1199 if len(l) > 0: 1200 self.warning('mappers still has keys %r' % (repr(l))) 1201 1202 dList = [] 1203 1204 dList.append(self.state.get('atmosphere').empty()) 1205 1206 for f in self.state.get('flows'): 1207 self.debug('appending deferred for emptying flow %r' % f) 1208 dList.append(f.empty()) 1209 self.debug('appending deferred for removing flow %r' % f) 1210 dList.append(self.state.remove('flows', f)) 1211 self.debug('appended deferreds') 1212 1213 dl = defer.DeferredList(dList) 1214 return dl
1215
1216 - def _getComponentsToCreate(self):
1217 """ 1218 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 1219 """ 1220 # return a list of components that are sleeping 1221 components = self.state.getComponents() 1222 1223 # filter the ones that are sleeping 1224 # NOTE: now sleeping indicates that there is no existing job 1225 # as when jobs are created, mood becomes waking, so no need to 1226 # filter on moodPending 1227 isSleeping = lambda c: c.get('mood') == moods.sleeping.value 1228 components = filter(isSleeping, components) 1229 return components
1230
1231 - def _getWorker(self, workerName):
1232 # returns the WorkerAvatar with the given name 1233 if not workerName in self.workerHeaven.avatars: 1234 raise errors.ComponentNoWorkerError("Worker %s not logged in?" 1235 % workerName) 1236 1237 return self.workerHeaven.avatars[workerName]
1238
1239 - def getWorkerFeedServerPort(self, workerName):
1240 if workerName in self.workerHeaven.avatars: 1241 return self._getWorker(workerName).feedServerPort 1242 return None
1243
1244 - def reservePortsOnWorker(self, workerName, numPorts):
1245 """ 1246 Requests a number of ports on the worker named workerName. The 1247 ports will be reserved for the use of the caller until 1248 releasePortsOnWorker is called. 1249 1250 @returns: a list of ports as integers 1251 """ 1252 return self._getWorker(workerName).reservePorts(numPorts)
1253
1254 - def releasePortsOnWorker(self, workerName, ports):
1255 """ 1256 Tells the manager that the given ports are no longer being used, 1257 and may be returned to the allocation pool. 1258 """ 1259 try: 1260 return self._getWorker(workerName).releasePorts(ports) 1261 except errors.ComponentNoWorkerError, e: 1262 self.warning('could not release ports: %r' % e.args)
1263
1264 - def getComponentMapper(self, object):
1265 """ 1266 Look up an object mapper given the object. 1267 1268 @rtype: L{ComponentMapper} or None 1269 """ 1270 if object in self._componentMappers.keys(): 1271 return self._componentMappers[object] 1272 1273 return None
1274
1275 - def getManagerComponentState(self, object):
1276 """ 1277 Look up an object mapper given the object. 1278 1279 @rtype: L{ComponentMapper} or None 1280 """ 1281 if object in self._componentMappers.keys(): 1282 return self._componentMappers[object].state 1283 1284 return None
1285
1286 - def invokeOnComponents(self, componentType, methodName, *args, **kwargs):
1287 """ 1288 Invokes method on all components of a certain type 1289 """ 1290 1291 def invokeOnOneComponent(component, methodName, *args, **kwargs): 1292 m = self.getComponentMapper(component) 1293 if not m: 1294 self.warning('Component %s not mapped. Maybe deleted.', 1295 component.get('name')) 1296 raise errors.UnknownComponentError(component) 1297 1298 avatar = m.avatar 1299 if not avatar: 1300 self.warning('No avatar for %s, cannot call remote', 1301 component.get('name')) 1302 raise errors.SleepingComponentError(component) 1303 1304 try: 1305 return avatar.mindCallRemote(methodName, *args, **kwargs) 1306 except Exception, e: 1307 log_message = log.getExceptionMessage(e) 1308 msg = "exception on remote call %s: %s" % (methodName, 1309 log_message) 1310 self.warning(msg) 1311 raise errors.RemoteMethodError(methodName, 1312 log_message)
1313 1314 # only do this on happy or hungry components of type componentType 1315 dl_array = [] 1316 for c in self.getComponentStates(): 1317 if c.get('type') == componentType and \ 1318 (c.get('mood') is moods.happy.value or 1319 c.get('mood') is moods.hungry.value): 1320 self.info("component %r to have %s run", c, methodName) 1321 d = invokeOnOneComponent(c, methodName, *args, **kwargs) 1322 dl_array.append(d) 1323 dl = defer.DeferredList(dl_array) 1324 return dl 1325