Package flumotion :: Package component :: Package misc :: Package httpserver :: Package httpcached :: Module server_selection
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpcached.server_selection

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22   
 23  import operator 
 24  import random 
 25  import socket 
 26   
 27  from twisted.internet import base, defer, threads, reactor 
 28  from twisted.python import threadpool 
 29  from flumotion.common import log 
 30   
 31  DEFAULT_PRIORITY = 1.0 
 32  DEFAULT_REFRESH_TIMEOUT = 300 
 33   
 34  LOG_CATEGORY = "server-selector" 
 35   
 36   
37 -class ThreadedResolver(base.ThreadedResolver):
38
39 - def __init__(self, reactor, sk=socket):
40 base.ThreadedResolver.__init__(self, reactor) 41 self.socket = sk
42
43 - def getHostByNameEx(self, name, timeout = (1, 3, 11, 45)):
44 if timeout: 45 timeoutDelay = reduce(operator.add, timeout) 46 else: 47 timeoutDelay = 60 48 userDeferred = defer.Deferred() 49 # lookupDeferred = threads.deferToThreadPool( 50 # self.reactor, self.reactor.getThreadPool(), 51 lookupDeferred = threads.deferToThread( 52 self.socket.gethostbyname_ex, name) 53 cancelCall = self.reactor.callLater( 54 timeoutDelay, self._cleanup, name, lookupDeferred) 55 self._runningQueries[lookupDeferred] = (userDeferred, cancelCall) 56 lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred) 57 return userDeferred
58 59
60 -class ServerSelector(log.Loggable):
61 62 logCategory = LOG_CATEGORY 63
64 - def __init__(self, timeout=DEFAULT_REFRESH_TIMEOUT, sk=socket):
65 self.servers = {} 66 self.hostnames = {} 67 self.timeout = timeout 68 self.socket = socket 69 70 self._resolver = ThreadedResolver(reactor, sk) 71 self._refresh = None
72
73 - def _addCallback(self, h, hostname, port, priority):
74 ip_list = h[2] 75 for ip in ip_list: 76 s = Server(ip, port, priority) 77 if s not in self.servers[priority]: 78 self.servers[priority].append(s) 79 80 self.hostnames[hostname] = (ip_list, priority, port)
81
82 - def _addErrback(self, err):
83 self.warning("Could not resolve host %s", 84 log.getFailureMessage(err)) 85 return
86
87 - def addServer(self, hostname, port, priority=DEFAULT_PRIORITY):
88 """ 89 Add a hostname to the list of servers, with a priority. (in 90 increasing order, 1 comes before 2). 91 92 @return None 93 """ 94 self.hostnames[hostname] = ([], priority, port) 95 if priority not in self.servers: 96 self.servers[priority] = [] 97 98 d = self._resolver.getHostByNameEx(hostname) 99 d.addCallbacks(self._addCallback, 100 self._addErrback, 101 callbackArgs=(hostname, port, priority)) 102 return d
103
104 - def getServers(self):
105 """ 106 Order the looked up servers by priority, and return them. 107 108 @return a generator of Server 109 """ 110 priorities = self.servers.keys() 111 priorities.sort() 112 for p in priorities: 113 servers = self.servers[p] 114 random.shuffle(servers) 115 for s in servers: 116 yield s
117
118 - def _refreshCallback(self, host, hostname):
119 # FIXME: improve me, avoid data duplication, Server info loss.. 120 new_ips = host[2] 121 old_ips, priority, port = self.hostnames[hostname] 122 to_be_added = [ip for ip in new_ips if ip not in old_ips] 123 to_be_removed = [ip for ip in old_ips if ip not in new_ips] 124 servers = self.servers[priority] 125 for ip in to_be_added: 126 servers.append(Server(ip, port, priority)) 127 self.hostnames[hostname][0].append(ip) 128 for ip in to_be_removed: 129 for s in servers: 130 if s.ip == ip: 131 servers.remove(s) 132 self.hostnames[hostname][0].remove(ip) 133 self.servers[priority] = servers
134
135 - def refreshServers(self):
136 dl = [] 137 for h in self.hostnames.keys(): 138 d = self._resolver.getHostByNameEx(h) 139 d.addCallbacks(self._refreshCallback, self._addErrback, 140 callbackArgs=(h, )) 141 dl.append(d) 142 self._resetRefresh() 143 d = defer.DeferredList(dl) 144 d.addCallback(lambda _: self) 145 return d
146
147 - def _resetRefresh(self):
148 if self.timeout: 149 self._refresh = reactor.callLater(self.timeout, self._onRefresh)
150
151 - def _onRefresh(self):
152 self.refreshServers()
153
154 - def setup(self):
155 return self.refreshServers()
156
157 - def cleanup(self):
158 if self._refresh: 159 self._refresh.cancel() 160 self._refresh = None
161 162
163 -class Server(object):
164
165 - def __init__(self, ip, port, priority):
166 self.ip = ip 167 self.port = port 168 self.priority = priority
169
170 - def reportError(self, code):
171 pass
172
173 - def __repr__(self):
174 return "<%s: %s:%d>" % (type(self).__name__, self.ip, self.port)
175
176 - def __eq__(self, other):
177 return self.__dict__ == other.__dict__
178