Package flumotion :: Package component :: Package consumers :: Package httpstreamer :: Module httpstreamer
[hide private]

Source Code for Module flumotion.component.consumers.httpstreamer.httpstreamer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  import gst 
 25  from twisted.cred import credentials 
 26  from twisted.internet import reactor, error, defer 
 27  from twisted.web import server 
 28  from zope.interface import implements 
 29   
 30  from flumotion.common import gstreamer, errors 
 31  from flumotion.common import messages, netutils, interfaces 
 32  from flumotion.common.format import formatStorage, formatTime 
 33  from flumotion.common.i18n import N_, gettexter 
 34  from flumotion.component import feedcomponent 
 35  from flumotion.component.base import http 
 36  from flumotion.component.component import moods 
 37  from flumotion.component.consumers.httpstreamer import resources 
 38  from flumotion.component.misc.porter import porterclient 
 39  from flumotion.twisted import fdserver 
 40   
 41  __all__ = ['HTTPMedium', 'MultifdSinkStreamer'] 
 42  __version__ = "$Rev: 8767 $" 
 43  T_ = gettexter() 
 44  STATS_POLL_INTERVAL = 10 
 45  UI_UPDATE_THROTTLE_PERIOD = 2.0 # Don't update UI more than once every two 
 46                                  # seconds 
 47   
 48   
 49  # FIXME: generalize this class and move it out here ? 
 50   
 51   
52 -class Stats:
53
54 - def __init__(self, sink):
55 self.sink = sink 56 57 self.no_clients = 0 58 self.clients_added_count = 0 59 self.clients_removed_count = 0 60 self.start_time = time.time() 61 # keep track of the highest number and the last epoch this was reached 62 self.peak_client_number = 0 63 self.peak_epoch = self.start_time 64 self.load_deltas = [0, 0] 65 self._load_deltas_period = 10 # seconds 66 self._load_deltas_ongoing = [time.time(), 0, 0] 67 self._currentBitrate = -1 # not known yet 68 self._lastBytesReceived = -1 # not known yet 69 70 # keep track of average clients by tracking last average and its time 71 self.average_client_number = 0 72 self.average_time = self.start_time 73 74 self.hostname = "localhost" 75 self.port = 0 76 self.mountPoint = "/"
77
78 - def _updateAverage(self):
79 # update running average of clients connected 80 now = time.time() 81 # calculate deltas 82 dt1 = self.average_time - self.start_time 83 dc1 = self.average_client_number 84 dt2 = now - self.average_time 85 dc2 = self.no_clients 86 self.average_time = now # we can update now that we used self.a 87 if dt1 == 0: 88 # first measurement 89 self.average_client_number = 0 90 else: 91 dt = dt1 + dt2 92 before = (dc1 * dt1) / dt 93 after = dc2 * dt2 / dt 94 self.average_client_number = before + after
95
96 - def clientAdded(self):
97 self._updateAverage() 98 99 self.no_clients += 1 100 self.clients_added_count +=1 101 102 # >= so we get the last epoch this peak was achieved 103 if self.no_clients >= self.peak_client_number: 104 self.peak_epoch = time.time() 105 self.peak_client_number = self.no_clients
106
107 - def clientRemoved(self):
108 self._updateAverage() 109 self.no_clients -= 1 110 self.clients_removed_count +=1
111
112 - def _updateStats(self):
113 """ 114 Periodically, update our statistics on load deltas, and update the 115 UIState with new values for total bytes, bitrate, etc. 116 """ 117 118 oldtime, oldadd, oldremove = self._load_deltas_ongoing 119 add, remove = self.clients_added_count, self.clients_removed_count 120 now = time.time() 121 diff = float(now - oldtime) 122 123 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff] 124 self._load_deltas_ongoing = [now, add, remove] 125 126 bytesReceived = self.getBytesReceived() 127 if self._lastBytesReceived >= 0: 128 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) * 129 8 / STATS_POLL_INTERVAL) 130 self._lastBytesReceived = bytesReceived 131 132 self.update_ui_state() 133 134 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL, 135 self._updateStats)
136
137 - def getCurrentBitrate(self):
138 if self._currentBitrate >= 0: 139 return self._currentBitrate 140 else: 141 return self.getBytesReceived() * 8 / self.getUptime()
142
143 - def getBytesSent(self):
144 return self.sink.get_property('bytes-served')
145
146 - def getBytesReceived(self):
147 return self.sink.get_property('bytes-to-serve')
148
149 - def getUptime(self):
150 return time.time() - self.start_time
151
152 - def getClients(self):
153 return self.no_clients
154
155 - def getPeakClients(self):
156 return self.peak_client_number
157
158 - def getPeakEpoch(self):
159 return self.peak_epoch
160
161 - def getAverageClients(self):
162 return self.average_client_number
163
164 - def getUrl(self):
165 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
166
167 - def getLoadDeltas(self):
168 return self.load_deltas
169
170 - def updateState(self, set):
171 c = self 172 173 bytes_sent = c.getBytesSent() 174 bytes_received = c.getBytesReceived() 175 uptime = c.getUptime() 176 177 set('stream-mime', c.get_mime()) 178 set('stream-url', c.getUrl()) 179 set('stream-uptime', formatTime(uptime)) 180 bitspeed = bytes_received * 8 / uptime 181 currentbitrate = self.getCurrentBitrate() 182 set('stream-bitrate', formatStorage(bitspeed) + 'bit/s') 183 set('stream-current-bitrate', 184 formatStorage(currentbitrate) + 'bit/s') 185 set('stream-totalbytes', formatStorage(bytes_received) + 'Byte') 186 set('stream-bitrate-raw', bitspeed) 187 set('stream-totalbytes-raw', bytes_received) 188 189 set('clients-current', str(c.getClients())) 190 set('clients-max', str(c.getMaxClients())) 191 set('clients-peak', str(c.getPeakClients())) 192 set('clients-peak-time', c.getPeakEpoch()) 193 set('clients-average', str(int(c.getAverageClients()))) 194 195 bitspeed = bytes_sent * 8 / uptime 196 set('consumption-bitrate', formatStorage(bitspeed) + 'bit/s') 197 set('consumption-bitrate-current', 198 formatStorage(currentbitrate * c.getClients()) + 'bit/s') 199 set('consumption-totalbytes', formatStorage(bytes_sent) + 'Byte') 200 set('consumption-bitrate-raw', bitspeed) 201 set('consumption-totalbytes-raw', bytes_sent)
202 203
204 -class HTTPMedium(feedcomponent.FeedComponentMedium):
205
206 - def __init__(self, comp):
207 """ 208 @type comp: L{Stats} 209 """ 210 feedcomponent.FeedComponentMedium.__init__(self, comp)
211
212 - def authenticate(self, bouncerName, keycard):
213 """ 214 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None. 215 """ 216 d = self.callRemote('authenticate', bouncerName, keycard) 217 return d
218
219 - def keepAlive(self, bouncerName, issuerName, ttl):
220 """ 221 @rtype: L{twisted.internet.defer.Deferred} 222 """ 223 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
224
225 - def removeKeycardId(self, bouncerName, keycardId):
226 """ 227 @rtype: L{twisted.internet.defer.Deferred} 228 """ 229 return self.callRemote('removeKeycardId', bouncerName, keycardId)
230 231 ### remote methods for manager to call on 232
233 - def remote_expireKeycard(self, keycardId):
234 return self.comp.httpauth.expireKeycard(keycardId)
235
236 - def remote_expireKeycards(self, keycardIds):
237 return self.comp.httpauth.expireKeycards(keycardIds)
238
239 - def remote_notifyState(self):
240 self.comp.update_ui_state()
241
242 - def remote_rotateLog(self):
243 self.comp.resource.rotateLogs()
244
245 - def remote_getStreamData(self):
246 return self.comp.getStreamData()
247
248 - def remote_getLoadData(self):
249 return self.comp.getLoadData()
250
251 - def remote_updatePorterDetails(self, path, username, password):
252 return self.comp.updatePorterDetails(path, username, password)
253
254 - def remote_removeAllClients(self):
255 return self.comp.remove_all_clients()
256 257 ### the actual component is a streamer using multifdsink 258 259
260 -class MultifdSinkStreamer(feedcomponent.ParseLaunchComponent, Stats):
261 implements(interfaces.IStreamingComponent) 262 263 checkOffset = True 264 265 # this object is given to the HTTPMedium as comp 266 logCategory = 'cons-http' 267 268 pipe_template = 'multifdsink name=sink ' + \ 269 'sync=false ' + \ 270 'recover-policy=3' 271 272 componentMediumClass = HTTPMedium 273
274 - def init(self):
275 reactor.debug = True 276 self.debug("HTTP streamer initialising") 277 278 self.caps = None 279 self.resource = None 280 self.httpauth = None 281 self.mountPoint = None 282 self.burst_on_connect = False 283 284 self.description = None 285 286 self.type = None 287 288 # Used if we've slaved to a porter. 289 self._pbclient = None 290 self._porterUsername = None 291 self._porterPassword = None 292 self._porterPath = None 293 294 # Or if we're a master, we open our own port here. Also used for URLs 295 # in the porter case. 296 self.port = None 297 # We listen on this interface, if set. 298 self.iface = None 299 300 self._tport = None 301 302 self._updateCallLaterId = None 303 self._lastUpdate = 0 304 self._updateUI_DC = None 305 306 self._pending_removals = {} 307 308 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate', 309 'stream-bitrate', 'stream-totalbytes', 'clients-current', 310 'clients-max', 'clients-peak', 'clients-peak-time', 311 'clients-average', 'consumption-bitrate', 312 'consumption-bitrate-current', 313 'consumption-totalbytes', 'stream-bitrate-raw', 314 'stream-totalbytes-raw', 'consumption-bitrate-raw', 315 'consumption-totalbytes-raw', 'stream-url'): 316 self.uiState.addKey(i, None)
317
318 - def getDescription(self):
319 return self.description
320
321 - def get_pipeline_string(self, properties):
322 return self.pipe_template
323
324 - def check_properties(self, props, addMessage):
325 326 if props.get('type', 'master') == 'slave': 327 for k in 'socket-path', 'username', 'password': 328 if not 'porter-' + k in props: 329 raise errors.ConfigError("slave mode, missing required" 330 " property 'porter-%s'" % k) 331 332 if 'burst-size' in props and 'burst-time' in props: 333 raise errors.ConfigError('both burst-size and burst-time ' 334 'set, cannot satisfy') 335 336 # tcp is where multifdsink is 337 version = gstreamer.get_plugin_version('tcp') 338 if version < (0, 10, 9, 1): 339 m = messages.Error(T_(N_( 340 "Version %s of the '%s' GStreamer plug-in is too old.\n"), 341 ".".join(map(str, version)), 'multifdsink')) 342 m.add(T_(N_("Please upgrade '%s' to version %s."), 343 'gst-plugins-base', '0.10.10')) 344 addMessage(m)
345
346 - def setup_burst_mode(self, sink):
347 if self.burst_on_connect: 348 if self.burst_time and \ 349 gstreamer.element_factory_has_property('multifdsink', 350 'units-max'): 351 self.debug("Configuring burst mode for %f second burst", 352 self.burst_time) 353 # Set a burst for configurable minimum time, plus extra to 354 # start from a keyframe if needed. 355 sink.set_property('sync-method', 4) # burst-keyframe 356 sink.set_property('burst-unit', 2) # time 357 sink.set_property('burst-value', 358 long(self.burst_time * gst.SECOND)) 359 360 # We also want to ensure that we have sufficient data available 361 # to satisfy this burst; and an appropriate maximum, all 362 # specified in units of time. 363 sink.set_property('time-min', 364 long((self.burst_time + 5) * gst.SECOND)) 365 366 sink.set_property('unit-type', 2) # time 367 sink.set_property('units-soft-max', 368 long((self.burst_time + 8) * gst.SECOND)) 369 sink.set_property('units-max', 370 long((self.burst_time + 10) * gst.SECOND)) 371 elif self.burst_size: 372 self.debug("Configuring burst mode for %d kB burst", 373 self.burst_size) 374 # If we have a burst-size set, use modern 375 # needs-recent-multifdsink behaviour to have complex bursting. 376 # In this mode, we burst a configurable minimum, plus extra 377 # so we start from a keyframe (or less if we don't have a 378 # keyframe available) 379 sink.set_property('sync-method', 'burst-keyframe') 380 sink.set_property('burst-unit', 'bytes') 381 sink.set_property('burst-value', self.burst_size * 1024) 382 383 # To use burst-on-connect, we need to ensure that multifdsink 384 # has a minimum amount of data available - assume 512 kB beyond 385 # the burst amount so that we should have a keyframe available 386 sink.set_property('bytes-min', (self.burst_size + 512) * 1024) 387 388 # And then we need a maximum still further above that - the 389 # exact value doesn't matter too much, but we want it 390 # reasonably small to limit memory usage. multifdsink doesn't 391 # give us much control here, we can only specify the max 392 # values in buffers. We assume each buffer is close enough 393 # to 4kB - true for asf and ogg, at least 394 sink.set_property('buffers-soft-max', 395 (self.burst_size + 1024) / 4) 396 sink.set_property('buffers-max', 397 (self.burst_size + 2048) / 4) 398 399 else: 400 # Old behaviour; simple burst-from-latest-keyframe 401 self.debug("simple burst-on-connect, setting sync-method 2") 402 sink.set_property('sync-method', 2) 403 404 sink.set_property('buffers-soft-max', 250) 405 sink.set_property('buffers-max', 500) 406 else: 407 self.debug("no burst-on-connect, setting sync-method 0") 408 sink.set_property('sync-method', 0) 409 410 sink.set_property('buffers-soft-max', 250) 411 sink.set_property('buffers-max', 500)
412
413 - def configure_pipeline(self, pipeline, properties):
414 Stats.__init__(self, sink=self.get_element('sink')) 415 416 self._updateCallLaterId = reactor.callLater(10, self._updateStats) 417 418 mountPoint = properties.get('mount-point', '') 419 if not mountPoint.startswith('/'): 420 mountPoint = '/' + mountPoint 421 self.mountPoint = mountPoint 422 423 # Hostname is used for a variety of purposes. We do a best-effort guess 424 # where nothing else is possible, but it's much preferable to just 425 # configure this 426 self.hostname = properties.get('hostname', None) 427 self.iface = self.hostname # We listen on this if explicitly 428 # configured, but not if it's only guessed 429 # at by the below code. 430 if not self.hostname: 431 # Don't call this nasty, nasty, probably flaky function unless we 432 # need to. 433 self.hostname = netutils.guess_public_hostname() 434 435 self.description = properties.get('description', None) 436 if self.description is None: 437 self.description = "Flumotion Stream" 438 439 # FIXME: tie these together more nicely 440 self.httpauth = http.HTTPAuthentication(self) 441 self.resource = resources.HTTPStreamingResource(self, 442 self.httpauth) 443 444 # check how to set client sync mode 445 sink = self.get_element('sink') 446 self.burst_on_connect = properties.get('burst-on-connect', False) 447 self.burst_size = properties.get('burst-size', 0) 448 self.burst_time = properties.get('burst-time', 0.0) 449 450 self.setup_burst_mode(sink) 451 452 if gstreamer.element_factory_has_property('multifdsink', 453 'resend-streamheader'): 454 sink.set_property('resend-streamheader', False) 455 else: 456 self.debug("resend-streamheader property not available, " 457 "resending streamheader when it changes in the caps") 458 459 sink.connect('deep-notify::caps', self._notify_caps_cb) 460 461 # these are made threadsafe using idle_add in the handler 462 sink.connect('client-added', self._client_added_handler) 463 464 # We now require a sufficiently recent multifdsink anyway that we can 465 # use the new client-fd-removed signal 466 sink.connect('client-fd-removed', self._client_fd_removed_cb) 467 sink.connect('client-removed', self._client_removed_cb) 468 469 if 'client-limit' in properties: 470 limit = int(properties['client-limit']) 471 self.resource.setUserLimit(limit) 472 if limit != self.resource.maxclients: 473 m = messages.Info(T_(N_( 474 "Your system configuration does not allow the maximum " 475 "client limit to be set to %d clients."), 476 limit)) 477 m.description = T_(N_( 478 "Learn how to increase the maximum number of clients.")) 479 m.section = 'chapter-optimization' 480 m.anchor = 'section-configuration-system-fd' 481 self.addMessage(m) 482 483 if 'bandwidth-limit' in properties: 484 limit = int(properties['bandwidth-limit']) 485 if limit < 1000: 486 # The wizard used to set this as being in Mbps, oops. 487 self.debug("Bandwidth limit set to unreasonably low %d bps, " 488 "assuming this is meant to be Mbps", limit) 489 limit *= 1000000 490 self.resource.setBandwidthLimit(limit) 491 492 if 'redirect-on-overflow' in properties: 493 self.resource.setRedirectionOnLimits( 494 properties['redirect-on-overflow']) 495 496 if 'bouncer' in properties: 497 self.httpauth.setBouncerName(properties['bouncer']) 498 499 if 'allow-default' in properties: 500 self.httpauth.setAllowDefault(properties['allow-default']) 501 502 if 'duration' in properties: 503 self.httpauth.setDefaultDuration( 504 float(properties['duration'])) 505 506 if 'domain' in properties: 507 self.httpauth.setDomain(properties['domain']) 508 509 if 'avatarId' in self.config: 510 self.httpauth.setRequesterId(self.config['avatarId']) 511 512 if 'ip-filter' in properties: 513 logFilter = http.LogFilter() 514 for f in properties['ip-filter']: 515 logFilter.addIPFilter(f) 516 self.resource.setLogFilter(logFilter) 517 518 self.type = properties.get('type', 'master') 519 if self.type == 'slave': 520 # already checked for these in do_check 521 self._porterPath = properties['porter-socket-path'] 522 self._porterUsername = properties['porter-username'] 523 self._porterPassword = properties['porter-password'] 524 525 self.port = int(properties.get('port', 8800))
526
527 - def __repr__(self):
528 return '<MultifdSinkStreamer (%s)>' % self.name
529
530 - def getMaxClients(self):
531 return self.resource.maxclients
532
533 - def get_mime(self):
534 if self.caps: 535 return self.caps.get_structure(0).get_name()
536
537 - def get_content_type(self):
538 mime = self.get_mime() 539 if mime == 'multipart/x-mixed-replace': 540 mime += ";boundary=ThisRandomString" 541 return mime
542
543 - def getUrl(self):
544 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
545
546 - def getStreamData(self):
547 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug' 548 if self.plugs[socket]: 549 plug = self.plugs[socket][-1] 550 return plug.getStreamData() 551 else: 552 return {'protocol': 'HTTP', 553 'description': self.description, 554 'url': self.getUrl()}
555
556 - def getLoadData(self):
557 """Return a tuple (deltaadded, deltaremoved, bytes_transferred, 558 current_clients, current_load) of our current bandwidth and 559 user values. 560 The deltas are estimates of how much bitrate is added, removed 561 due to client connections, disconnections, per second. 562 """ 563 # We calculate the estimated clients added/removed per second, then 564 # multiply by the stream bitrate 565 deltaadded, deltaremoved = self.getLoadDeltas() 566 567 bytes_received = self.getBytesReceived() 568 uptime = self.getUptime() 569 bitrate = bytes_received * 8 / uptime 570 571 bytes_sent = self.getBytesSent() 572 clients_connected = self.getClients() 573 current_load = bitrate * clients_connected 574 575 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent, 576 clients_connected, current_load)
577
578 - def add_client(self, fd):
579 sink = self.get_element('sink') 580 sink.emit('add', fd)
581
582 - def remove_client(self, fd):
583 sink = self.get_element('sink') 584 sink.emit('remove', fd)
585
586 - def remove_all_clients(self):
587 """Remove all the clients. 588 589 Returns a deferred fired once all clients have been removed. 590 """ 591 if self.resource: 592 # can be None if we never went happy 593 self.debug("Asking for all clients to be removed") 594 return self.resource.removeAllClients()
595
596 - def update_ui_state(self):
597 """Update the uiState object. 598 Such updates (through this function) are throttled to a maximum rate, 599 to avoid saturating admin clients with traffic when many clients are 600 connecting/disconnecting. 601 """ 602 603 def setIfChanged(k, v): 604 if self.uiState.get(k) != v: 605 self.uiState.set(k, v)
606 607 def update_ui_state_later(): 608 self._updateUI_DC = None 609 self.update_ui_state()
610 611 now = time.time() 612 613 # If we haven't updated too recently, do it immediately. 614 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD: 615 if self._updateUI_DC: 616 self._updateUI_DC.cancel() 617 self._updateUI_DC = None 618 619 self._lastUpdate = now 620 # fixme: have updateState just update what changed itself 621 # without the hack above 622 self.updateState(setIfChanged) 623 elif not self._updateUI_DC: 624 # Otherwise, schedule doing this in a few seconds (unless an update 625 # was already scheduled) 626 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD, 627 update_ui_state_later) 628
629 - def _client_added_handler(self, sink, fd):
630 self.log('[fd %5d] client_added_handler', fd) 631 Stats.clientAdded(self) 632 self.update_ui_state()
633
634 - def _client_removed_handler(self, sink, fd, reason, stats):
635 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason) 636 if reason.value_name == 'GST_CLIENT_STATUS_ERROR': 637 self.warning('[fd %5d] Client removed because of write error' % fd) 638 639 self.resource.clientRemoved(sink, fd, reason, stats) 640 Stats.clientRemoved(self) 641 self.update_ui_state()
642 643 ### START OF THREAD-AWARE CODE (called from non-reactor threads) 644
645 - def _notify_caps_cb(self, element, pad, param):
646 caps = pad.get_negotiated_caps() 647 if caps == None: 648 return 649 650 caps_str = gstreamer.caps_repr(caps) 651 self.debug('Got caps: %s' % caps_str) 652 653 if not self.caps == None: 654 self.warning('Already had caps: %s, replacing' % caps_str) 655 656 self.debug('Storing caps: %s' % caps_str) 657 self.caps = caps 658 659 reactor.callFromThread(self.update_ui_state)
660 661 # We now use both client-removed and client-fd-removed. We call get-stats 662 # from the first callback ('client-removed'), but don't actually start 663 # removing the client until we get 'client-fd-removed'. This ensures that 664 # there's no window in which multifdsink still knows about the fd, 665 # but we've actually closed it, so we no longer get spurious duplicates. 666 # this can be called from both application and streaming thread ! 667
668 - def _client_removed_cb(self, sink, fd, reason):
669 stats = sink.emit('get-stats', fd) 670 self._pending_removals[fd] = (stats, reason)
671 672 # this can be called from both application and streaming thread ! 673
674 - def _client_fd_removed_cb(self, sink, fd):
675 (stats, reason) = self._pending_removals.pop(fd) 676 677 reactor.callFromThread(self._client_removed_handler, sink, fd, 678 reason, stats)
679 680 ### END OF THREAD-AWARE CODE 681
682 - def do_stop(self):
683 if self._updateCallLaterId: 684 self._updateCallLaterId.cancel() 685 self._updateCallLaterId = None 686 687 if self.httpauth: 688 self.httpauth.stopKeepAlive() 689 690 if self._tport: 691 self._tport.stopListening() 692 693 l = [] 694 # After we stop listening (so new connections aren't possible), 695 # disconnect (and thus log) all the old ones. 696 clients = self.remove_all_clients() 697 if clients: 698 l.append(clients) 699 700 if self.type == 'slave' and self._pbclient: 701 l.append(self._pbclient.deregisterPath(self.mountPoint)) 702 return defer.DeferredList(l)
703
704 - def updatePorterDetails(self, path, username, password):
705 """Provide a new set of porter login information, for when we're 706 in slave mode and the porter changes. 707 If we're currently connected, this won't disconnect - it'll just change 708 the information so that next time we try and connect we'll use the 709 new ones 710 """ 711 if self.type == 'slave': 712 self._porterUsername = username 713 self._porterPassword = password 714 715 creds = credentials.UsernamePassword(self._porterUsername, 716 self._porterPassword) 717 718 self._pbclient.startLogin(creds, self._pbclient.medium) 719 720 # If we've changed paths, we must do some extra work. 721 if path != self._porterPath: 722 self.debug("Changing porter login to use \"%s\"", path) 723 self._porterPath = path 724 self._pbclient.stopTrying() # Stop trying to connect with the 725 # old connector. 726 self._pbclient.resetDelay() 727 reactor.connectWith( 728 fdserver.FDConnector, self._porterPath, 729 self._pbclient, 10, checkPID=False) 730 else: 731 raise errors.WrongStateError( 732 "Can't specify porter details in master mode")
733
734 - def do_pipeline_playing(self):
735 # Override this to not set the component happy; instead do this once 736 # both the pipeline has started AND we've logged in to the porter. 737 if hasattr(self, '_porterDeferred'): 738 d = self._porterDeferred 739 else: 740 d = defer.succeed(None) 741 self.httpauth.scheduleKeepAlive() 742 d.addCallback(lambda res: 743 feedcomponent.ParseLaunchComponent.do_pipeline_playing( 744 self)) 745 return d
746
747 - def do_setup(self):
748 root = resources.HTTPRoot() 749 # TwistedWeb wants the child path to not include the leading / 750 mount = self.mountPoint[1:] 751 root.putChild(mount, self.resource) 752 if self.type == 'slave': 753 # Streamer is slaved to a porter. 754 755 # We have two things we want to do in parallel: 756 # - ParseLaunchComponent.do_start() 757 # - log in to the porter, then register our mountpoint with 758 # the porter. 759 # So, we return a DeferredList with a deferred for each of 760 # these tasks. The second one's a bit tricky: we pass a dummy 761 # deferred to our PorterClientFactory that gets fired once 762 # we've done all of the tasks the first time (it's an 763 # automatically-reconnecting client factory, and we only fire 764 # this deferred the first time) 765 766 self._porterDeferred = d = defer.Deferred() 767 mountpoints = [self.mountPoint] 768 self._pbclient = porterclient.HTTPPorterClientFactory( 769 server.Site(resource=root), mountpoints, d) 770 771 creds = credentials.UsernamePassword(self._porterUsername, 772 self._porterPassword) 773 self._pbclient.startLogin(creds, self._pbclient.medium) 774 775 self.info("Starting porter login at \"%s\"", self._porterPath) 776 # This will eventually cause d to fire 777 reactor.connectWith( 778 fdserver.FDConnector, self._porterPath, 779 self._pbclient, 10, checkPID=False) 780 else: 781 # Streamer is standalone. 782 try: 783 iface = self.iface or "" 784 self.info('Listening on port %d, interface=%r', 785 self.port, iface) 786 self._tport = reactor.listenTCP( 787 self.port, server.Site(resource=root), 788 interface=iface) 789 except error.CannotListenError: 790 t = 'Port %d is not available.' % self.port 791 self.warning(t) 792 m = messages.Error(T_(N_( 793 "Network error: TCP port %d is not available."), 794 self.port)) 795 self.addMessage(m) 796 self.setMood(moods.sad) 797 return defer.fail(errors.ComponentSetupHandledError(t))
798