1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import os
23 import string
24 import time
25
26 from twisted.web import server, http
27 from twisted.web.resource import Resource
28 from twisted.internet import defer, reactor, error
29 from twisted.cred import credentials
30 from zope.interface import implements
31
32 from flumotion.common import log, messages, errors, netutils, interfaces
33 from flumotion.common.i18n import N_, gettexter
34 from flumotion.component import component
35 from flumotion.component.base import http as httpbase
36 from flumotion.component.component import moods
37 from flumotion.component.misc.httpserver import httpfile, localprovider
38 from flumotion.component.misc.httpserver import serverstats
39 from flumotion.component.misc.porter import porterclient
40 from flumotion.twisted import fdserver
41
42 __version__ = "$Rev$"
43 T_ = gettexter()
44
45 UPTIME_UPDATE_INTERVAL = 5
46
47 FILEPROVIDER_SOCKET = 'flumotion.component.misc.httpserver' \
48 '.fileprovider.FileProviderPlug'
49
50
52
54 server.Request.__init__(self, channel, queued)
55 now = time.time()
56 self.lastTimeWritten = now
57
58
59
60 self.fd = self.transport.fileno()
61
62 self._component = channel.factory.component
63 self._transfer = None
64 self._provider = None
65 self._startTime = now
66 self._completionTime = None
67 self._rangeFirstByte = None
68 self._rangeLastByte = None
69 self._resourceSize = None
70 self._bytesWritten = 0L
71
72
73 self.stats = serverstats.RequestStatistics(self._component.stats)
74
75 self._component.requestStarted(self)
76
78 self._rangeFirstByte = first
79 self._rangeLastByte = last
80 self._resourceSize = size
81
89
98
104
113
115 headers = self.getAllHeaders()
116 duration = (self._completionTime or time.time()) - self._startTime
117 requestFields = {'ip': self.getClientIP(),
118 'method': self.method,
119 'uri': self.uri,
120 'get-parameters': self.args,
121 'clientproto': self.clientproto,
122 'response': self.code,
123 'bytes-sent': self._bytesWritten,
124 'referer': headers.get('referer', None),
125 'user-agent': headers.get('user-agent', None),
126 'time-connected': duration,
127 'resource-size': self._resourceSize,
128 'range-first': self._rangeFirstByte,
129 'range-last': self._rangeLastByte}
130 if self._provider:
131
132 providerFields = self._provider.getLogFields()
133 providerFields.update(requestFields)
134 requestFields = providerFields
135 return requestFields
136
137
138 -class Site(server.Site):
145
146
148 """
149 I wrap a statistics ui state entry, to allow updates.
150 """
151
153 self._state = state
154 self._key = key
155
156 - def update(self, name, value):
157 if value != self._state.get(self._key).get(name, None):
158 self._state.setitem(self._key, name, value)
159
160
162
168
170 """
171 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
172 """
173 return self.callRemote('authenticate', bouncerName, keycard)
174
175 - def keepAlive(self, bouncerName, issuerName, ttl):
176 """
177 @rtype: L{twisted.internet.defer.Deferred}
178 """
179 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
180
182 """
183 @rtype: L{twisted.internet.defer.Deferred}
184 """
185 return self.callRemote('removeKeycardId', bouncerName, keycardId)
186
189
192
195
198
201
204
205
207 implements(interfaces.IStreamingComponent)
208
209 componentMediumClass = HTTPFileMedium
210
211 REQUEST_TIMEOUT = 30
212
213
215 self.mountPoint = None
216 self.type = None
217 self.port = None
218 self.hostname = None
219 self.stats = None
220 self._rateControlPlug = None
221 self._fileProviderPlug = None
222 self._metadataProviderPlug = None
223 self._loggers = []
224 self._requestModifiers = []
225 self._logfilter = None
226 self.httpauth = None
227 self._startTime = time.time()
228 self._uptimeCallId = None
229 self._allowBrowsing = False
230
231 self._description = 'On-Demand Flumotion Stream'
232
233 self._singleFile = False
234 self._connected_clients = {}
235 self._total_bytes_written = 0
236
237 self._pbclient = None
238
239 self._twistedPort = None
240 self._timeoutRequestsCallLater = None
241
242 self._pendingDisconnects = {}
243 self._rootResource = None
244
245
246
247 self._mimeToResource = {
248 'video/x-flv': httpfile.FLVFile,
249 'video/mp4': httpfile.MP4File,
250 }
251
252 self.uiState.addKey('stream-url', None)
253 self.uiState.addKey('server-uptime', 0)
254 self.uiState.addKey('file-provider', None)
255 self.uiState.addKey('allow-browsing', False)
256 self.uiState.addDictKey('request-statistics')
257 self.uiState.addDictKey('provider-statistics')
258
260 props = self.config['properties']
261 self.fixRenamedProperties(props, [
262 ('issuer', 'issuer-class'),
263 ('porter_socket_path', 'porter-socket-path'),
264 ('porter_username', 'porter-username'),
265 ('porter_password', 'porter-password'),
266 ('mount_point', 'mount-point')])
267
268 path = props.get('path', None)
269 plugs = self.plugs.get(FILEPROVIDER_SOCKET, [])
270 if plugs:
271 if path:
272 self.warning("The component property 'path' should not be used"
273 " in conjunction with a file provider plug.")
274
275
276
277
278
279
280 if props.get('type', 'master') == 'slave':
281 for k in 'socket-path', 'username', 'password':
282 if not 'porter-' + k in props:
283 msg = 'slave mode, missing required property porter-%s' % k
284 return defer.fail(errors.ConfigError(msg))
285 if plugs or not path:
286 return
287 if os.path.isfile(path):
288 self._singleFile = True
289 elif os.path.isdir(path):
290 self._singleFile = False
291 else:
292 msg = "the file or directory specified in 'path': %s does " \
293 "not exist or is neither a file nor directory" % path
294 return defer.fail(errors.ConfigError(msg))
295
297 desc = props.get('description', None)
298 if desc:
299 self._description = desc
300
301
302 mountPoint = props.get('mount-point', '/')
303 if not mountPoint.startswith('/'):
304 mountPoint = '/' + mountPoint
305 self.mountPoint = mountPoint
306 self.hostname = props.get('hostname', None)
307 if not self.hostname:
308 self.hostname = netutils.guess_public_hostname()
309
310 self.type = props.get('type', 'master')
311 self.port = props.get('port', 8801)
312 self._allowBrowsing = props.get('allow-browsing', False)
313 if self.type == 'slave':
314
315 self._porterPath = props['porter-socket-path']
316 self._porterUsername = props['porter-username']
317 self._porterPassword = props['porter-password']
318 socket = 'flumotion.component.plugs.request.RequestLoggerPlug'
319 self._loggers = self.plugs.get(socket, [])
320 socket = \
321 'flumotion.component.plugs.requestmodifier.RequestModifierPlug'
322 self._requestModifiers = self.plugs.get(socket, [])
323
324 self.httpauth = httpbase.HTTPAuthentication(self)
325 if 'avatarId' in self.config:
326 self.httpauth.setRequesterId(self.config['avatarId'])
327 if 'bouncer' in props:
328 self.httpauth.setBouncerName(props['bouncer'])
329 if 'issuer-class' in props:
330 self.warning("The component property 'issuer-class' has been"
331 "deprecated.")
332 msg = messages.Warning(T_(N_(
333 "The component property 'issuer-class' has "
334 "been deprecated.")))
335 self.addMessage(msg)
336
337 if 'allow-default' in props:
338 self.httpauth.setAllowDefault(props['allow-default'])
339 if 'ip-filter' in props:
340 logFilter = http.LogFilter()
341 for f in props['ip-filter']:
342 logFilter.addIPFilter(f)
343 self._logfilter = logFilter
344 socket = \
345 'flumotion.component.misc.httpserver.ratecontrol.RateControllerPlug'
346 plugs = self.plugs.get(socket, [])
347 if plugs:
348
349 path = props.get('path')
350 self._rateControlPlug = self.plugs[socket][-1]
351
352 plugs = self.plugs.get(FILEPROVIDER_SOCKET, [])
353 if plugs:
354
355 self._fileProviderPlug = plugs[-1]
356 else:
357
358
359 plugProps = {"properties": {"path": props.get('path', None)}}
360 self._fileProviderPlug = localprovider.FileProviderLocalPlug(
361 plugProps)
362
363 socket = ('flumotion.component.misc.httpserver'
364 '.metadataprovider.MetadataProviderPlug')
365 plugs = self.plugs.get(socket, [])
366 if plugs:
367 self._metadataProviderPlug = plugs[-1]
368
369
370 self.uiState.set('stream-url', self.getUrl())
371 self.uiState.set('allow-browsing', self._allowBrowsing)
372
374 self.have_properties(self.config['properties'])
375
376 root = self._rootResource
377 if root is None:
378 root = self._getDefaultRootResource()
379
380 if root is None:
381 raise errors.WrongStateError(
382 "a resource or path property must be set")
383
384 site = Site(root, self)
385 self._timeoutRequestsCallLater = reactor.callLater(
386 self.REQUEST_TIMEOUT, self._timeoutRequests)
387
388
389 self.stats = serverstats.ServerStatistics()
390 updater = StatisticsUpdater(self.uiState, "request-statistics")
391 self.stats.startUpdates(updater)
392 updater = StatisticsUpdater(self.uiState, "provider-statistics")
393 self._fileProviderPlug.startStatsUpdates(updater)
394 self._updateUptime()
395
396 d = defer.Deferred()
397 if self.type == 'slave':
398
399 if self._singleFile:
400 self._pbclient = porterclient.HTTPPorterClientFactory(
401 site, [self.mountPoint], d)
402 else:
403 self._pbclient = porterclient.HTTPPorterClientFactory(
404 site, [], d,
405 prefixes=[self.mountPoint])
406 creds = credentials.UsernamePassword(self._porterUsername,
407 self._porterPassword)
408 self._pbclient.startLogin(creds, self._pbclient.medium)
409 self.info("Logging to porter on socketPath %s", self._porterPath)
410
411 reactor.connectWith(fdserver.FDConnector, self._porterPath,
412 self._pbclient, 10, checkPID=False)
413 else:
414
415 try:
416 self.debug('Going to listen on port %d' % self.port)
417 iface = ""
418
419
420 self._twistedPort = reactor.listenTCP(self.port,
421 site, interface=iface)
422 self.port = self._twistedPort.getHost().port
423 self.info('Listening on interface %r on port %d',
424 iface, self.port)
425 except error.CannotListenError:
426 t = 'Port %d is not available.' % self.port
427 self.warning(t)
428 m = messages.Error(T_(N_(
429 "Network error: TCP port %d is not available."),
430 self.port))
431 self.addMessage(m)
432 self.setMood(moods.sad)
433 return defer.fail(errors.ComponentSetupHandledError(t))
434
435 d.callback(None)
436
437
438 def setComponentHappy(result):
439 self.httpauth.scheduleKeepAlive()
440 self.setMood(moods.happy)
441 return result
442 d.addCallback(setComponentHappy)
443 return d
444
446 if self.stats:
447 self.stats.stopUpdates()
448 if self._fileProviderPlug:
449 self._fileProviderPlug.stopStatsUpdates()
450 if self.httpauth:
451 self.httpauth.stopKeepAlive()
452 if self._timeoutRequestsCallLater:
453 self._timeoutRequestsCallLater.cancel()
454 self._timeoutRequestsCallLater = None
455 if self._uptimeCallId:
456 self._uptimeCallId.cancel()
457 self._uptimeCallId = None
458 if self._twistedPort:
459 self._twistedPort.stopListening()
460
461 l = [self.remove_all_clients()]
462 if self.type == 'slave' and self._pbclient:
463 if self._singleFile:
464 l.append(self._pbclient.deregisterPath(self.mountPoint))
465 else:
466 l.append(self._pbclient.deregisterPrefix(self.mountPoint))
467 return defer.DeferredList(l)
468
470 """
471 Provide a new set of porter login information, for when we're in slave
472 mode and the porter changes.
473 If we're currently connected, this won't disconnect - it'll just change
474 the information so that next time we try and connect we'll use the
475 new ones
476 @param path: new path
477 @param username: new username
478 @param password: new password
479 """
480 if self.type != 'slave':
481 raise errors.WrongStateError(
482 "Can't specify porter details in master mode")
483
484 self._porterUsername = username
485 self._porterPassword = password
486
487 creds = credentials.UsernamePassword(self._porterUsername,
488 self._porterPassword)
489 self._pbclient.startLogin(creds, self.medium)
490
491 self._updatePath(path)
492
494
495 if path == self._porterPath:
496 return
497 self._porterPath = path
498
499
500 self._pbclient.stopTrying()
501
502 self._pbclient.resetDelay()
503 reactor.connectWith(fdserver.FDConnector, self._porterPath,
504 self._pbclient, 10, checkPID=False)
505
527
529 node = self._fileProviderPlug.getRootPath()
530 if node is None:
531 return None
532
533 self.debug('Starting with mount point "%s"' % self.mountPoint)
534 factory = httpfile.MimedFileFactory(self.httpauth,
535 mimeToResource=self._mimeToResource,
536 rateController=self._rateControlPlug,
537 requestModifiers=self._requestModifiers,
538 metadataProvider=self._metadataProviderPlug)
539
540 root = factory.create(node)
541 if self.mountPoint != '/':
542 root = self._createRootResourceForPath(self.mountPoint, root)
543
544 return root
545
547 if path.endswith('/'):
548 path = path[:-1]
549
550 root = Resource()
551 children = string.split(path[1:], '/')
552 parent = root
553 for child in children[:-1]:
554 resource = Resource()
555 self.debug("Putting Resource at %s", child)
556 parent.putChild(child, resource)
557 parent = resource
558 self.debug("Putting resource %r at %r", fileResource, children[-1])
559 parent.putChild(children[-1], fileResource)
560 return root
561
563 """
564 Remove a client when requested.
565
566 Used by keycard expiry.
567 """
568 if fd in self._connected_clients:
569 request = self._connected_clients[fd]
570 self.debug("Removing client for fd %d", fd)
571 request.unregisterProducer()
572 request.channel.transport.loseConnection()
573 else:
574 self.debug("No client with fd %d found", fd)
575
577 l = []
578 for fd in self._connected_clients:
579 d = defer.Deferred()
580 self._pendingDisconnects[fd] = d
581 l.append(d)
582
583 request = self._connected_clients[fd]
584 request.unregisterProducer()
585 request.channel.transport.loseConnection()
586
587 self.debug("Waiting for %d clients to finish", len(l))
588 return defer.DeferredList(l)
589
591
592 fd = request.transport.fileno()
593 self._connected_clients[fd] = request
594 self.debug("[fd %5d] (ts %f) request %r started",
595 fd, time.time(), request)
596
598
599
600 self.debug('[fd %5d] (ts %f) finishing request %r',
601 request.transport.fileno(), time.time(), request)
602
603 self.httpauth.cleanupAuth(fd)
604 ip = request.getClientIP()
605 if not self._logfilter or not self._logfilter.isInRange(ip):
606 fields = request.getLogFields()
607 fields.update({'time': time.gmtime(),
608 'username': '-'})
609 l = []
610 for logger in self._loggers:
611 l.append(defer.maybeDeferred(
612 logger.event, 'http_session_completed', fields))
613 d = defer.DeferredList(l)
614 else:
615 d = defer.succeed(None)
616
617 del self._connected_clients[fd]
618
619 self._total_bytes_written += bytesWritten
620
621 def firePendingDisconnect(_):
622 self.debug("Logging completed")
623 if fd in self._pendingDisconnects:
624 pending = self._pendingDisconnects.pop(fd)
625 self.debug("Firing pending disconnect deferred")
626 pending.callback(None)
627
628
629 self.debug('[fd %5d] (ts %f) finished request %r',
630 fd, time.time(), request)
631
632 d.addCallback(firePendingDisconnect)
633
635 return self._description
636
638 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
639
641 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug'
642 if self.plugs[socket]:
643 plug = self.plugs[socket][-1]
644 return plug.getStreamData()
645 else:
646 return {'protocol': 'HTTP',
647 'description': self._description,
648 'url': self.getUrl()}
649
651 """
652 Return the number of connected clients
653 """
654 return len(self._connected_clients)
655
657 """
658 Current Bandwidth
659 """
660 bytesTransferred = self._total_bytes_written
661 for request in self._connected_clients.values():
662 if request._transfer:
663 bytesTransferred += request._transfer.bytesWritten
664 return bytesTransferred
665
667 """
668 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
669 current_clients, current_load) of our current bandwidth and
670 user values. The deltas and current_load are NOT currently
671 implemented here, we set them as zero.
672 """
673 return (0, 0, self.getBytesSent(), self.getClients(), 0)
674
676 """
677 Close the logfile, then reopen using the previous logfilename
678 """
679 for logger in self._loggers:
680 self.debug('rotating logger %r' % logger)
681 logger.rotate()
682
684 """Attaches a root resource to this component. The root resource is the
685 once which will be used when accessing the mount point.
686 This is normally called from a plugs start() method.
687 @param resource: root resource
688 @type resource: L{twisted.web.resource.Resource}
689 """
690 rootResource = self._createRootResourceForPath(
691 self.getMountPoint(), resource)
692
693 self._rootResource = rootResource
694
696 """Get the mount point of this component
697 @returns: the mount point
698 """
699
700 return self.config['properties'].get('mount-point')
701
707