1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import stat
23 from cStringIO import StringIO
24 import time
25
26 from twisted.internet import defer, reactor, abstract
27
28 from flumotion.common import log
29
30 from flumotion.component.misc.httpserver import fileprovider
31 from flumotion.component.misc.httpserver import mimetypes
32 from flumotion.component.misc.httpserver import cachestats
33 from flumotion.component.misc.httpserver.httpcached import common
34 from flumotion.component.misc.httpserver.httpcached import resource_manager
35
36 EXP_TABLE_CLEANUP_PERIOD = 30
37 MAX_RESUME_COUNT = 20
38
39
40
41 PRODUCING_PERIOD = 0.08
42
43
45 """
46 Raised when a request used by a caching session
47 was using a conditional retrieval and it fails.
48 """
49
53
56
57
59 """
60 Base class for all caching strategies.
61
62 Handles the cache lookup, cache expiration checks,
63 statistics gathering and caching sessions managment.
64 """
65
66 logCategory = "base-caching"
67
68 - def __init__(self, cachemgr, reqmgr, ttl):
69 self.cachemgr = cachemgr
70 self.reqmgr = reqmgr
71 self.ttl = ttl
72
73 self._identifiers = {}
74 self._etimes = {}
75
76 self._cleanupCall = None
77
81
88
90 identifier = self.cachemgr.getIdentifier(url.path)
91 session = self._identifiers.get(identifier, None)
92 if session is not None:
93 self.debug("Caching session found for '%s'", url)
94
95 if (session.getState() in
96 (CachingSession.DETACHED, CachingSession.CACHED)):
97 stats.onStarted(session.size, cachestats.CACHE_HIT)
98 elif (session.getState() in
99 (CachingSession.REQUESTING, CachingSession.BUFFERING,
100 CachingSession.CACHING)):
101 stats.onStarted(session.size, cachestats.TEMP_HIT)
102 else:
103 stats.onStarted(session.size, cachestats.CACHE_MISS)
104
105
106 d = session.waitInfo()
107 d.addCallback(RemoteSource, stats)
108 return d
109
110 self.log("Looking for cached file for '%s'", url)
111 d = defer.Deferred()
112 d.addCallback(self.cachemgr.openCacheFile)
113 d.addErrback(self._cachedFileError, url)
114 d.addCallback(self._gotCachedFile, url, identifier, stats)
115
116 d.callback(url.path)
117
118 return d
119
120 - def requestData(self, url, offset=None, size=None, mtime=None):
123
125 return self._identifiers.values()
126
129
130
131
133 raise NotImplementedError()
134
136 raise NotImplementedError()
137
138
139
144
146 if self._cleanupCall:
147 self._cleanupCall.cancel()
148 self._cleanupCall = None
149
154
156 now = time.time()
157 expired = [i for i, e in self._etimes.items() if e < now]
158 for ident in expired:
159 del self._etimes[ident]
160
167
171
175
178
184
186 if cachedFile is not None:
187 self.log("Opened cached file '%s'", cachedFile.name)
188 etime = self._etimes.get(identifier, None)
189 if etime and (etime > time.time()):
190 stats.onStarted(cachedFile.stat[stat.ST_SIZE],
191 cachestats.CACHE_HIT)
192 return CachedSource(identifier, url, cachedFile, stats)
193 self.debug("Cached file may have expired '%s'", cachedFile.name)
194 return self._onCacheOutdated(url, identifier, cachedFile, stats)
195 self.debug("Resource not cached '%s'", url)
196 return self._onCacheMiss(url, stats)
197
198
200 """
201 Data source that read data directly from a localy cached file.
202 """
203
204 mimetypes = mimetypes.MimeTypes()
205
206 - def __init__(self, ident, url, cachedFile, stats):
217
218 - def produce(self, consumer, offset):
222
223 - def read(self, offset, size):
231
236
237
239 """
240 Base class for resource not yet cached.
241 It offers a push producer, it delegates read operations
242 to the session and start a block pipelining if the session
243 cannot serve the requested data.
244 Updates the cache statistics.
245 """
246
247 strategy = None
248 session = None
249 stats = None
250
251 - def produce(self, consumer, offset):
253
254 - def read(self, offset, size):
270
275
279
280
282 """
283 Simple remote source.
284 """
285
298
303
304
306 """
307 Base class of caching sessions.
308 Just an interface to be implemented or inherited
309 by all caching sessions.
310 """
311
312 strategy = None
313 url = None
314 size = 0
315 mtime = None
316 mimeType = None
317
318 - def read(self, offset, size):
320
322 raise NotImplementedError()
323
325 raise NotImplementedError()
326
328 raise NotImplementedError()
329
330
332 """
333 Caches a stream locally in a temporary file.
334 The already cached data can be read from the session.
335
336 Can be canceled, meaning the session is not valid anymore.
337
338 Can be aborted, meaning the session will stop caching locally
339 but is still valid.
340
341 The caching operation can be started at any moment, but the
342 session have to receive the stream info before it can be used
343 with a RemoteSource instance.
344
345 It can recover request failures up to MAX_RESUME_COUNT times.
346 """
347
348 logCategory = "caching-session"
349
350 (PIPELINING,
351 REQUESTING,
352 BUFFERING,
353 CACHING,
354 CACHED,
355 DETACHED,
356 CLOSED,
357 CANCELED,
358 ABORTED,
359 ERROR) = range(10)
360
361 mimetypes = mimetypes.MimeTypes()
362
363 - def __init__(self, strategy, url, cache_stats, ifModifiedSince=None):
391
393 return (self._state < self.CLOSED) or (self._state == self.ABORTED)
394
397
399 """
400 Starts caching the remote resource locally.
401 """
402 if self._state != self.PIPELINING:
403 return
404
405 self._state = self.REQUESTING
406
407 self.debug("Caching requested for %s", self.url)
408 self.cache_stats.onCopyStarted()
409
410 self._firstRetrieve()
411
420
429
431 if self._state < self.DETACHED:
432 d = defer.Deferred()
433 self._finishedDefers.append(d)
434 return d
435 if self._state <= self.CLOSED:
436 return defer.succeed(self)
437 return defer.fail(self._errorValue)
438
439 - def read(self, offset, size):
455
483
485 """
486 After calling this method the session will just stop caching
487 and return None when trying to read. Used when pipelining is wanted.
488 """
489 if self._state < self.REQUESTING or self._state >= self.CACHED:
490 return
491
492 self.log("Aborting caching session for %s", self.url)
493
494 self.strategy._onSessionCanceled(self)
495 self.cache_stats.onCopyCancelled(self.size, self._bytes)
496
497 self._close()
498
499 error = fileprovider.FileError("Caching aborted")
500 self._fireError(error)
501
502 if self._request:
503 self.debug("Caching aborted for %s", self.url)
504 self._request.cancel()
505 self._request = None
506 else:
507 self.debug("Caching aborted before starting to cache")
508
509 self._state = self.ABORTED
510
513
515 self._refcount -= 1
516 if self._refcount == 0:
517 if self._state == self.DETACHED:
518
519 self.log("Detached session not referenced anymore")
520 self._close()
521
523 return self._refcount > 0
524
525
526
528 self.warning("Session request error %s (%s) for %s using %s:%s",
529 message, code, self.url, getter.host, getter.port)
530 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT):
531 if self._resumes > 0:
532 self._resumes -= 1
533 if self._state > self.REQUESTING:
534
535 offset = self._bytes
536 size = self.size - self._bytes
537 self.debug("Resuming retrieval from offset %d with "
538 "size %d of %s (%d tries left)", offset, size,
539 self.url, self._resumes)
540
541 self._resumeRetrieve(offset, size)
542 return
543 else:
544
545 self.debug("Resuming retrieval from start of %s "
546 "(%d tries left)", self.url, self._resumes)
547 self._firstRetrieve()
548 return
549 self.debug("Too much resuming intents, stopping "
550 "after %d of %s bytes of %s",
551 self._bytes, self.size, self.url)
552 self._close()
553 self._error(fileprovider.UnavailableError(message))
554
564
574
575 - def onInfo(self, getter, info):
576 if self._state == self.BUFFERING:
577
578
579 self._request.pause()
580 return
581
582 if self._state != self.REQUESTING:
583
584 return
585
586 if info.size != (info.length - self._bytes):
587 self.log("Unexpected stream size: %s / %s bytes "
588 "(Already got %s bytes)",
589 info.size, info.length, self._bytes)
590 self._close()
591 msg = "Unexpected resource size: %d" % info.size
592 self._error(fileprovider.FileError(msg))
593 return
594
595 self._state = self.BUFFERING
596
597 self.mimeType = self.mimetypes.fromPath(self.url.path)
598 self.mtime = info.mtime
599 self.size = info.size
600
601 self.log("Caching session with type %s, size %s, mtime %s for %s",
602 self.mimeType, self.size, self.mtime, self.url)
603
604 self._file = StringIO()
605
606 self.log("Requesting temporary file for %s", self.url)
607 d = self.strategy.cachemgr.newTempFile(self.url.path, info.size,
608 info.mtime)
609
610
611
612
613 self._request.pause()
614
615
616 self._fireInfo(self)
617 self._fireStarted(self)
618
619 self.debug("Start buffering %s", self.url)
620 d.addCallback(self._gotTempFile)
621
623 if self._state not in (self.BUFFERING, self.CACHED):
624
625 if tempFile:
626 tempFile.close()
627 return
628
629 if tempFile is None:
630 self.warning("Temporary file creation failed, "
631 "aborting caching of %s", self.url)
632 self.abort()
633 return
634
635 self.log("Got temporary file for %s", self.url)
636
637 self.debug("Start caching %s", self.url)
638
639 data = self._file.getvalue()
640 self._file = tempFile
641 tempFile.write(data)
642
643 if self._request is not None:
644
645 self._request.resume()
646
647 if self._state == self.CACHED:
648
649 self._real_complete()
650 else:
651 self._state = self.CACHING
652
653 - def onData(self, getter, data):
654 assert self._state in (self.BUFFERING, self.CACHING), "Not caching"
655 self._file.seek(self._bytes)
656 size = len(data)
657 try:
658 self._file.write(data)
659 except Exception, e:
660 self.warning("Error writing in temporary file: %s", e)
661 self.debug("Got %s / %s bytes, would be %s with %s more",
662 self._bytes, self.size, self._bytes + size, size)
663 self.abort()
664 else:
665 self._bytes += size
666 self._correction += size
667
672
685
687 defers = list(self._infoDefers)
688
689 self._infoDefers = []
690 for d in defers:
691 d.callback(value)
692
694 defers = list(self._startedDefers)
695
696 self._startedDefers = []
697 for d in defers:
698 d.callback(value)
699
701 defers = list(self._finishedDefers)
702
703 self._finishedDefers = []
704 for d in defers:
705 d.callback(value)
706
708 self._errorValue = error
709 defers = list(self._infoDefers)
710 defers.extend(self._startedDefers)
711 defers.extend(self._finishedDefers)
712
713 self._infoDefers = []
714 self._startedDefers = []
715 self._finishedDefers = []
716 for d in defers:
717 d.errback(error)
718
720 if self._state >= self.CLOSED:
721 return
722
723 self.log("Closing caching session for %s", self.url)
724
725 if self._state >= self.BUFFERING:
726 self._file.close()
727 self._file = None
728
729 self._state = self.CLOSED
730
740
758
760 since = self.ifModifiedSince
761 self._request = self.strategy.reqmgr.retrieve(self, self.url,
762 ifModifiedSince=since)
763 self.log("Retrieving data using %s", self._request.logName)
764
772
773
775 """
776 Offers a IPushProducer interface to a caching session.
777 It starts producing data from the specified point.
778
779 If the data is already cached by the session,
780 it produce data with a reactor loop reading the data
781 from the session by block.
782
783 If the data is not yet cached, it starts a request
784 using the request manager and pipeline the data
785 to the specified consumer.
786
787 It can recover request failures up to MAX_RESUME_COUNT times.
788
789 It's not used yet in the context of http-server.
790 Until now, the simulations show that using a producer with
791 long-lived HTTP requests instead of short lived block request
792 is less efficient and produce bigger latency for the clients.
793 At least when used with HTTP proxies.
794 """
795
796 logCategory = "pipe-producer"
797
798 - def __init__(self, consumer, session, offset, stats):
821
822
823
825 if self.consumer is None:
826
827 return
828
829 self._paused = False
830
831 if self._pipelining:
832
833 if self._request:
834
835 self._request.resume()
836 else:
837
838 self._pipeline()
839 else:
840
841 self._produce()
842
844 if self.consumer is None:
845
846 return
847
848 self._paused = True
849
850 if self._pipelining:
851
852 if self._request:
853 self._request.pause()
854 else:
855
856 self._stop()
857
861
862
863
865 if self._request is None:
866
867 return
868 self._request = None
869
870 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT):
871 self.warning("Producer request error %s (%s) for %s "
872 "(%s tries left)", message, code,
873 self.session.url, self._resumes)
874
875 if self._resumes > 0:
876 self._resumes -= 1
877 if self._paused:
878 self.log("Producer paused, waiting to recover pipelining "
879 "(%d tries left)", self._resumes)
880 else:
881 self.log("Recovering pipelining (%d tries left)",
882 self._resumes)
883 self._pipeline()
884 return
885
886 self.debug("Too much resuming intents, stopping "
887 "after %d of %s", self._bytes, self.size)
888
889 self._terminate()
890
892 if self._request is None:
893
894 return
895 self._request = None
896 self.warning("Modifications detected while producing %s",
897 self.session.url)
898 self._terminate()
899
901 if self._request is None:
902
903 return
904 self._request = None
905 self.warning("%s detected while producing %s",
906 message, self.session.url)
907 self._terminate()
908
909 - def onData(self, getter, data):
910 if self._request is None:
911
912 return
913 self._write(data)
914
916 if self._request is None:
917
918 return
919 self.log("Pipelining finished")
920 self._terminate()
921
922
923
947
952
954 if self._call is not None:
955 self._call.cancel()
956 self._call = None
957
959 if not self.session.isActive():
960 self.log("Session %s not active anymore (%s), "
961 "aborting production of %s",
962 self.session.logName,
963 self.session._state,
964 self.session.url)
965 self._terminate()
966 return
967
968 self._pipelining = True
969
970 offset = self.offset + self._produced
971 size = self.session.size - offset
972 mtime = self.session.mtime
973
974 if size == 0:
975 self.log("No more data to be retrieved, pipelining finished")
976 self._terminate()
977 return
978
979 self.debug("Producing %s bytes from offset %d of %s",
980 size, offset, self.session.url)
981
982 self._request = self.reqmgr.retrieve(self, self.session.url,
983 start=offset, size=size,
984 ifUnmodifiedSince=mtime)
985 self.log("Retrieving data using %s", self._request.logName)
986
988 if self._request:
989
990 self._request.cancel()
991 self._request = None
992
993 self._stop()
994
995 expected = self.session.size - self.offset
996 if self._produced != expected:
997 self.warning("Only produced %s of the %s bytes "
998 "starting at %s of %s",
999 self._produced, expected,
1000 self.offset, self.session.url)
1001 else:
1002 self.log("Finished producing %s bytes starting at %s of %s",
1003 self._produced, self.offset, self.session.url)
1004
1005 self.consumer.unregisterProducer()
1006 self.consumer.finish()
1007 self.consumer = None
1008
1009 self.session.delref()
1010 self.session = None
1011
1012
1014 """
1015 Retrieves a block of data using a range request.
1016 A modification time can be specified for the retrieval to
1017 fail if the requested file modification time changed.
1018
1019 The data is returned as a block by triggering the deferred
1020 returned by calling the retrieve method.
1021
1022 It can recover request failures up to MAX_RESUME_COUNT times.
1023 """
1024
1025 logCategory = "block-requester"
1026
1027 - def __init__(self, reqmgr, url, mtime=None):
1028 self.reqmgr = reqmgr
1029 self._url = url
1030 self._mtime = mtime
1031 self._data = None
1032 self._deferred = None
1033 self._offset = None
1034 self._size = None
1035 self._resumes = MAX_RESUME_COUNT
1036
1037 self.logName = common.log_id(self)
1038
1040 assert self._deferred is None, "Already retrieving"
1041 self._deferred = defer.Deferred()
1042 self._data = []
1043 self._offset = offset
1044 self._size = size
1045 self._curr = 0
1046
1047 self._retrieve()
1048
1049 return self._deferred
1050
1052 assert self._deferred is not None, "Not retrieving anything"
1053 if code == common.RANGE_NOT_SATISFIABLE:
1054
1055 self._deferred.callback("")
1056 self._cleanup()
1057 return
1058 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT):
1059 self.warning("Block request error: %s (%s)", message, code)
1060 if self._resumes > 0:
1061 self._resumes -= 1
1062 self.debug("Resuming block retrieval from offset %d "
1063 "with size %d (%d tries left)",
1064 self._offset, self._size, self._resumes)
1065
1066 self._retrieve()
1067 return
1068 self.debug("Too much resuming intents, stopping "
1069 "after %d of %d", self._offset, self._size)
1070 self._deferred.errback(fileprovider.FileError(message))
1071 self._cleanup()
1072
1077
1083
1084 - def onData(self, getter, data):
1089
1094
1096 self.reqmgr.retrieve(self, self._url, start=self._offset,
1097 size=self._size, ifUnmodifiedSince=self._mtime)
1098
1100 self._deferred = None
1101 self._data = None
1102