Package flumotion :: Package twisted :: Module flavors
[hide private]

Source Code for Module flumotion.twisted.flavors

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_flavors -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion Twisted-like flavors 
 24   
 25  Inspired by L{twisted.spread.flavors} 
 26  """ 
 27   
 28  from twisted.internet import defer 
 29  from twisted.spread import pb 
 30  from zope.interface import Interface 
 31  from flumotion.common import log 
 32   
 33  __version__ = "$Rev: 8746 $" 
 34   
 35   
 36  ### Generice Cacheable/RemoteCache for state objects 
 37   
 38   
39 -class IStateListener(Interface):
40 """ 41 I am an interface for objects that want to listen to changes on 42 cached states. 43 """ 44
45 - def stateSet(self, object, key, value):
46 """ 47 @type object: L{StateRemoteCache} 48 @param object: the state object having changed 49 @type key: string 50 @param key: the key being set 51 @param value: the value the key is being set to 52 53 The given key on the given object has been set to the given value. 54 """
55
56 - def stateAppend(self, object, key, value):
57 """ 58 @type object: L{StateRemoteCache} 59 @param object: the state object having changed 60 @type key: string 61 @param key: the key being appended to 62 @param value: the value being appended to the list given by key 63 64 The given value has been added to the list given by the key. 65 """
66
67 - def stateRemove(self, object, key, value):
68 """ 69 @type object: L{StateRemoteCache} 70 @param object: the state object having changed 71 @type key: string 72 @param key: the key being removed from 73 @param value: the value being removed from the list given by key 74 75 The given value has been removed from the list given by the key. 76 """
77 78
79 -class IStateCacheableListener(Interface):
80 """ 81 I am an interface for objects that want to listen to changes on 82 cacheable states 83 """ 84
85 - def observerAppend(self, observer, num):
86 """ 87 @type observer: L{twisted.spread.flavors.RemoteCacheObserver} 88 @param observer: reference to the peer's L{RemoteCache} 89 that was added 90 @type num: int 91 @param num: number of observers present 92 """
93
94 - def observerRemove(self, observer, num):
95 """ 96 @type observer: L{twisted.spread.flavors.RemoteCacheObserver} 97 @param observer: reference to the peer's L{RemoteCache} 98 that was removed 99 @type num: int 100 @param num: number of observers remaining 101 """
102 103
104 -class StateCacheable(pb.Cacheable):
105 """ 106 I am a cacheable state object. 107 108 I cache key-value pairs, where values can be either single objects 109 or list of objects. 110 """ 111
112 - def __init__(self):
113 self._observers = [] 114 self._hooks = [] 115 self._dict = {}
116
117 - def __getitem__(self, key):
118 return self.get(key)
119 120 # our methods 121
122 - def addKey(self, key, value=None):
123 """ 124 Add a key to the state cache so it can be used with set. 125 """ 126 self._dict[key] = value
127 128 # don't use [] as the default value, it creates only one reference and 129 # reuses it 130
131 - def addListKey(self, key, value=None):
132 """ 133 Add a key for a list of objects to the state cache. 134 """ 135 if value is None: 136 value = [] 137 self._dict[key] = value
138 139 # don't use {} as the default value, it creates only one reference and 140 # reuses it 141
142 - def addDictKey(self, key, value=None):
143 """ 144 Add a key for a dict value to the state cache. 145 """ 146 if value is None: 147 value = {} 148 self._dict[key] = value
149
150 - def hasKey(self, key):
151 return key in self._dict.keys()
152
153 - def keys(self):
154 return self._dict.keys()
155
156 - def get(self, key, otherwise=None):
157 """ 158 Get the state cache value for the given key. 159 160 Return otherwise in case where key is present but value None. 161 """ 162 if not key in self._dict.keys(): 163 raise KeyError('%s in %r' % (key, self)) 164 165 v = self._dict[key] 166 # not v would also trigger empty lists 167 if v == None: 168 return otherwise 169 170 return v
171
172 - def set(self, key, value):
173 """ 174 Set a given state key to the given value. 175 Notifies observers of this Cacheable through observe_set. 176 """ 177 if not key in self._dict.keys(): 178 raise KeyError('%s in %r' % (key, self)) 179 180 self._dict[key] = value 181 dList = [o.callRemote('set', key, value) for o in self._observers] 182 return defer.DeferredList(dList)
183
184 - def append(self, key, value):
185 """ 186 Append the given object to the given list. 187 Notifies observers of this Cacheable through observe_append. 188 """ 189 if not key in self._dict.keys(): 190 raise KeyError('%s in %r' % (key, self)) 191 192 self._dict[key].append(value) 193 dList = [o.callRemote('append', key, value) for o in self._observers] 194 return defer.DeferredList(dList)
195
196 - def remove(self, key, value):
197 """ 198 Remove the given object from the given list. 199 Notifies observers of this Cacheable through observe_remove. 200 """ 201 if not key in self._dict.keys(): 202 raise KeyError('%s in %r' % (key, self)) 203 204 try: 205 self._dict[key].remove(value) 206 except ValueError: 207 raise ValueError('value %r not in list %r for key %r' % ( 208 value, self._dict[key], key)) 209 dList = [o.callRemote('remove', key, value) for o in self._observers] 210 dl = defer.DeferredList(dList) 211 return dl
212
213 - def setitem(self, key, subkey, value):
214 """ 215 Set a value in the given dict. 216 Notifies observers of this Cacheable through observe_setitem. 217 """ 218 if not key in self._dict.keys(): 219 raise KeyError('%s in %r' % (key, self)) 220 221 self._dict[key][subkey] = value 222 dList = [o.callRemote('setitem', key, subkey, value) 223 for o in self._observers] 224 return defer.DeferredList(dList)
225
226 - def delitem(self, key, subkey):
227 """ 228 Removes an element from the given dict. Note that the key refers 229 to the dict; it is the subkey (and its value) that will be removed. 230 Notifies observers of this Cacheable through observe_delitem. 231 """ 232 if not key in self._dict.keys(): 233 raise KeyError('%s in %r' % (key, self)) 234 235 try: 236 value = self._dict[key].pop(subkey) 237 except KeyError: 238 raise KeyError('key %r not in dict %r for key %r' % ( 239 subkey, self._dict[key], key)) 240 dList = [o.callRemote('delitem', key, subkey, value) for o in 241 self._observers] 242 dl = defer.DeferredList(dList) 243 return dl
244 245 # pb.Cacheable methods 246
247 - def getStateToCacheAndObserveFor(self, perspective, observer):
248 self._observers.append(observer) 249 for hook in self._hooks: 250 hook.observerAppend(observer, len(self._observers)) 251 return self._dict
252
253 - def stoppedObserving(self, perspective, observer):
254 self._observers.remove(observer) 255 for hook in self._hooks: 256 hook.observerRemove(observer, len(self._observers))
257
258 - def addHook(self, hook):
259 """ 260 A helper function that adds an object that would like to get 261 informed by StateCacheable when observers has been added or 262 removed. 263 264 @param hook: an object who would like to receive state events 265 @type hook: object that implements 266 L{flumotion.twisted.flavors.IStateCacheableListener} 267 """ 268 if hook in self._hooks: 269 raise ValueError( 270 "%r is already a hook of %r" % (hook, self)) 271 self._hooks.append(hook)
272
273 - def removeHook(self, hook):
274 """ 275 Remove the object that listens to StateCacheable observer events 276 277 @param hook: the object who would like to unsubscribe to state 278 events 279 @type hook: object that implements 280 L{flumotion.twisted.flavors.IStateCacheableListener} 281 """ 282 self._hooks.remove(hook)
283 284 285 # At some point, a StateRemoteCache will become invalid. The normal way 286 # would be losing the connection to the RemoteCacheable, although 287 # particular kinds of RemoteCache objects might have other ways 288 # (e.g. component removed from flow). 289 # 290 # We support listening for invalidation events. However, in order to 291 # ensure predictable program behavior, we can't do a notifyOnDisconnect 292 # directly on the broker. If we did that, program semantics would be 293 # dependent on the call order of the notifyOnDisconnect methods, which 294 # would likely lead to heisenbugs. 295 # 296 # Instead, invalidation will only be performed by the application, if at 297 # all, via an explicit call to invalidate(). 298 299
300 -class StateRemoteCache(pb.RemoteCache):
301 """ 302 I am a remote cache of a state object. 303 """ 304
305 - def __init__(self):
306 self._listeners = {}
307 # no constructor 308 # pb.RemoteCache.__init__(self) 309
310 - def __getitem__(self, key):
311 return self.get(key)
312 313 # our methods 314
315 - def hasKey(self, key):
316 return key in self._dict.keys()
317
318 - def keys(self):
319 return self._dict.keys()
320
321 - def get(self, key, otherwise=None):
322 """ 323 Get the state cache value for the given key. 324 325 Return otherwise in case where key is present but value None. 326 """ 327 if not key in self._dict.keys(): 328 raise KeyError('%s in %r' % (key, self)) 329 330 v = self._dict[key] 331 # compare to actual None, otherwise we also get zero-like values 332 if v == None: 333 return otherwise 334 335 return v
336
337 - def _ensureListeners(self):
338 # when this is created through serialization from a JobCS, 339 # __init__ does not seem to get called, so create self._listeners 340 if not hasattr(self, '_listeners'): 341 # FIXME: this means that callbacks will be fired in 342 # arbitrary order; should be fired in order of connecting. 343 # Use twisted.python.util.OrderedDict instead 344 self._listeners = {}
345
346 - def addListener(self, listener, set_=None, append=None, remove=None, 347 setitem=None, delitem=None, invalidate=None):
348 """ 349 Adds a listener to the remote cache. 350 351 The caller will be notified of state events via the functions 352 given as the 'set_', 'append', and 'remove', 'setitem', and 353 'delitem' keyword arguments. 354 355 Always call this method using keyword arguments for the functions; 356 calling them with positional arguments is not supported. 357 358 Setting one of the event handlers to None will ignore that 359 event. It is an error for all event handlers to be None. 360 361 @param listener: new listener object that wants to receive 362 cache state change notifications. 363 @type listener: object implementing 364 L{flumotion.twisted.flavors.IStateListener} 365 @param set_: procedure to call when a value is set 366 @type set_: procedure(object, key, value) -> None 367 @param append: procedure to call when a value is appended to a list 368 @type append: procedure(object, key, value) -> None 369 @param remove: procedure to call when a value is removed from 370 a list 371 @type remove: procedure(object, key, value) -> None 372 @param setitem: procedure to call when a value is set in a dict 373 @type setitem: procedure(object, key, subkey, value) -> None 374 @param delitem: procedure to call when a value is removed 375 from a dict. 376 @type delitem: procedure(object, key, subkey, value) -> None 377 @param invalidate: procedure to call when this cache has been 378 invalidated. 379 @type invalidate: procedure(object) -> None 380 """ 381 if not (set_ or append or remove or setitem or delitem or invalidate): 382 raise ValueError("At least one event handler has to be specified") 383 384 self._ensureListeners() 385 if listener in self._listeners: 386 raise KeyError( 387 "%r is already a listener of %r" % (listener, self)) 388 self._listeners[listener] = [set_, append, remove, setitem, 389 delitem, invalidate] 390 if invalidate and hasattr(self, '_cache_invalid'): 391 invalidate(self)
392
393 - def removeListener(self, listener):
394 self._ensureListeners() 395 if listener not in self._listeners: 396 raise KeyError(listener) 397 del self._listeners[listener]
398 399 # pb.RemoteCache methods 400
401 - def setCopyableState(self, dict):
402 self._dict = dict
403
404 - def _notifyListeners(self, index, *args):
405 # notify our local listeners; compute set of procs first, so as 406 # to allow the listeners set to change during the calls 407 self._ensureListeners() 408 for proc in [tup[index] for tup in self._listeners.values()]: 409 if proc: 410 try: 411 proc(self, *args) 412 except Exception, e: 413 # These are all programming errors 414 log.warning("stateremotecache", 415 'Exception in StateCache handler: %s', 416 log.getExceptionMessage(e))
417
418 - def observe_set(self, key, value):
419 self._dict[key] = value 420 # if we also subclass from Cacheable, then we're a proxy, so proxy 421 if hasattr(self, 'set'): 422 StateCacheable.set(self, key, value) 423 424 self._notifyListeners(0, key, value)
425
426 - def observe_append(self, key, value):
427 # if we also subclass from Cacheable, then we're a proxy, so proxy 428 if hasattr(self, 'append'): 429 StateCacheable.append(self, key, value) 430 else: 431 self._dict[key].append(value) 432 433 self._notifyListeners(1, key, value)
434
435 - def observe_remove(self, key, value):
436 # if we also subclass from Cacheable, then we're a proxy, so proxy 437 if hasattr(self, 'remove'): 438 StateCacheable.remove(self, key, value) 439 else: 440 try: 441 self._dict[key].remove(value) 442 except ValueError: 443 raise ValueError("value %r not under key %r with values %r" % 444 (value, key, self._dict[key])) 445 446 self._notifyListeners(2, key, value)
447
448 - def observe_setitem(self, key, subkey, value):
449 # if we also subclass from Cacheable, then we're a proxy, so proxy 450 if hasattr(self, 'setitem'): 451 StateCacheable.setitem(self, key, subkey, value) 452 else: 453 self._dict[key][subkey] = value 454 455 self._notifyListeners(3, key, subkey, value)
456
457 - def observe_delitem(self, key, subkey, value):
458 # if we also subclass from Cacheable, then we're a proxy, so proxy 459 if hasattr(self, 'delitem'): 460 StateCacheable.delitem(self, key, subkey) 461 else: 462 try: 463 del self._dict[key][subkey] 464 except KeyError: 465 raise KeyError("key %r not in dict %r for state dict %r" % 466 (subkey, self._dict[key], self._dict)) 467 468 self._notifyListeners(4, key, subkey, value)
469
470 - def invalidate(self):
471 """Invalidate this StateRemoteCache. 472 473 Calling this method will result in the invalidate callback being 474 called for all listeners that passed an invalidate handler to 475 addListener. This method is not called automatically; it is 476 provided as a convenience to applications. 477 """ 478 assert not hasattr(self, '_cache_invalid'), \ 479 'object has already been invalidated' 480 # if we also subclass from Cacheable, there is currently no way 481 # to remotely invalidate the cache. that's ok though, because 482 # double-caches are currently only used by the manager, which 483 # does not call invalidate() on its caches. 484 self._cache_invalid = True 485 486 self._notifyListeners(5)
487