1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager-side objects to handle worker clients
24 """
25
26 from twisted.internet import defer
27
28 from flumotion.manager import base
29 from flumotion.common import errors, interfaces, log, registry
30 from flumotion.common import worker, common
31 from flumotion.common.vfs import registerVFSJelly
32
33 __version__ = "$Rev: 7162 $"
34
35
37 """
38 I am an avatar created for a worker.
39 A reference to me is given when logging in and requesting a worker avatar.
40 I live in the manager.
41
42 @ivar feedServerPort: TCP port the feed server is listening on
43 @type feedServerPort: int
44 """
45 logCategory = 'worker-avatar'
46
47 _portSet = None
48 feedServerPort = None
49
50 - def __init__(self, heaven, avatarId, remoteIdentity, mind,
51 feedServerPort, ports, randomPorts):
62
65
68
69 def havePorts(res):
70 log.debug('worker-avatar', 'got port information')
71 (_s1, feedServerPort), (_s2, (ports, random)) = res
72 return (heaven, avatarId, remoteIdentity, mind,
73 feedServerPort, ports, random)
74 log.debug('worker-avatar', 'calling mind for port information')
75 d = defer.DeferredList([mind.callRemote('getFeedServerPort'),
76 mind.callRemote('getPorts')],
77 fireOnOneErrback=True)
78 d.addCallback(havePorts)
79 return d
80 makeAvatarInitArgs = classmethod(makeAvatarInitArgs)
81
86
88 """
89 Reserve the given number of ports on the worker.
90
91 @param numPorts: how many ports to reserve
92 @type numPorts: int
93 """
94 return self._portSet.reservePorts(numPorts)
95
97 """
98 Release the given list of ports on the worker.
99
100 @param ports: list of ports to release
101 @type ports: list of int
102 """
103 self._portSet.releasePorts(ports)
104
106 """
107 Create a component of the given type with the given nice level.
108
109 @param avatarId: avatarId the component should use to log in
110 @type avatarId: str
111 @param type: type of the component to create
112 @type type: str
113 @param nice: the nice level to create the component at
114 @type nice: int
115 @param conf: the component's config dict
116 @type conf: dict
117
118 @returns: a deferred that will give the avatarId the component
119 will use to log in to the manager
120 """
121 self.debug('creating %s (%s) on worker %s with nice level %d',
122 avatarId, type, self.avatarId, nice)
123 defs = registry.getRegistry().getComponent(type)
124 try:
125 entry = defs.getEntryByType('component')
126
127 moduleName = defs.getSource()
128 methodName = entry.getFunction()
129 except KeyError:
130 self.warning('no "component" entry in registry of type %s, %s',
131 type, 'falling back to createComponent')
132 moduleName = defs.getSource()
133 methodName = "createComponent"
134
135 self.debug('call remote create')
136 return self.mindCallRemote('create', avatarId, type, moduleName,
137 methodName, nice, conf)
138
140 """
141 Get a list of components that the worker is running.
142
143 @returns: a deferred that will give the avatarIds running on the
144 worker
145 """
146 self.debug('getting component list from worker %s' %
147 self.avatarId)
148 return self.mindCallRemote('getComponents')
149
150
151
153 """
154 Called by the worker to tell the manager to add a given message to
155 the given component.
156
157 Useful in cases where the component can't report messages itself,
158 for example because it crashed.
159
160 @param avatarId: avatarId of the component the message is about
161 @type message: L{flumotion.common.messages.Message}
162 """
163 self.debug('received message from component %s' % avatarId)
164 self.vishnu.componentAddMessage(avatarId, message)
165
166
168 """
169 I interface between the Manager and worker clients.
170 For each worker client I create an L{WorkerAvatar} to handle requests.
171 I live in the manager.
172 """
173
174 logCategory = "workerheaven"
175 avatarClass = WorkerAvatar
176
180
181
182
200
202 """
203 Notify the heaven that the given worker has logged out.
204
205 @type workerAvatar: L{WorkerAvatar}
206 """
207 workerName = workerAvatar.getName()
208 try:
209 self.state.remove('names', workerName)
210 for state in list(self.state.get('workers')):
211 if state.get('name') == workerName:
212 self.state.remove('workers', state)
213 except ValueError:
214 self.warning('worker %s was never registered in the heaven',
215 workerName)
216