Package flumotion :: Package twisted :: Module defer
[hide private]

Source Code for Module flumotion.twisted.defer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_defer -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import random 
 19   
 20  from twisted import version 
 21  from twisted.internet import defer, reactor 
 22  from twisted.python import reflect 
 23   
 24  # FIXME: this is for HandledException - maybe it should move here instead ? 
 25  from flumotion.common import errors, common 
 26   
 27  __version__ = "$Rev$" 
 28   
 29   
 30  # See flumotion.test.test_defer for examples 
 31   
 32   
33 -def defer_generator(proc):
34 35 def wrapper(*args, **kwargs): 36 gen = proc(*args, **kwargs) 37 result = defer.Deferred() 38 39 # To support having the errback of last resort, we need to have 40 # an errback which runs after all the other errbacks, *at the 41 # point at which the deferred is fired*. So users of this code 42 # have from between the time the deferred is created and the 43 # time that the deferred is fired to attach their errbacks. 44 # 45 # Unfortunately we only control the time that the deferred is 46 # created. So we attach a first errback that then adds an 47 # errback to the end of the list. Unfortunately we can't add to 48 # the list while the deferred is firing. In a decision between 49 # having decent error reporting and being nice to a small part 50 # of twisted I chose the former. This code takes a reference to 51 # the callback list, so that we can add an errback to the list 52 # while the deferred is being fired. It temporarily sets the 53 # state of the deferred to not having been fired, so that adding 54 # the errbacks doesn't automatically call the newly added 55 # methods. 56 result.__callbacks = result.callbacks 57 58 def with_saved_callbacks(proc, *_args, **_kwargs): 59 saved_callbacks, saved_called = result.callbacks, result.called 60 result.callbacks, result.called = result.__callbacks, False 61 proc(*_args, **_kwargs) 62 result.callbacks, result.called = saved_callbacks, saved_called
63 64 # Add errback-of-last-resort 65 66 def default_errback(failure, d): 67 # an already handled exception just gets propagated up without 68 # doing a traceback 69 if failure.check(errors.HandledException): 70 return failure 71 72 def print_traceback(f): 73 import traceback 74 print 'flumotion.twisted.defer: ' + \ 75 'Unhandled error calling', proc.__name__, ':', f.type 76 traceback.print_exc() 77 with_saved_callbacks(lambda: d.addErrback(print_traceback)) 78 raise 79 result.addErrback(default_errback, result) 80 81 def generator_next(): 82 try: 83 x = gen.next() 84 if isinstance(x, defer.Deferred): 85 x.addCallback(callback, x).addErrback(errback, x) 86 else: 87 result.callback(x) 88 except StopIteration: 89 result.callback(None) 90 except Exception, e: 91 result.errback(e) 92 93 def errback(failure, d): 94 95 def raise_error(): 96 # failure.parents[-1] will be the exception class for local 97 # failures and the string name of the exception class 98 # for remote failures (which might not exist in our 99 # namespace) 100 # 101 # failure.value will be the tuple of arguments to the 102 # exception in the local case, or a string 103 # representation of that in the remote case (see 104 # pb.CopyableFailure.getStateToCopy()). 105 # 106 # we can only reproduce a remote exception if the 107 # exception class is in our namespace, and it only takes 108 # one string argument. if either condition is not true, 109 # we wrap the strings in a default Exception. 110 if common.versionStringToTuple(version.short()) >= (11, 1, 0): 111 k, v = failure.parents[0], failure.value 112 else: 113 k, v = failure.parents[-1], failure.value 114 try: 115 if isinstance(k, str): 116 k = reflect.namedClass(k) 117 if isinstance(v, tuple): 118 e = k(*v) 119 else: 120 e = k(v) 121 except Exception: 122 e = Exception('%s: %r' % (failure.type, v)) 123 raise e 124 d.value = raise_error 125 generator_next() 126 127 def callback(result, d): 128 d.value = lambda: result 129 generator_next() 130 131 generator_next() 132 133 return result 134 135 return wrapper 136 137
138 -def defer_generator_method(proc):
139 return lambda self, *args, **kwargs: \ 140 defer_generator(proc)(self, *args, **kwargs)
141 142
143 -def defer_call_later(deferred):
144 """ 145 Return a deferred which will fire from a callLater after d fires 146 """ 147 148 def fire(result, d): 149 reactor.callLater(0, d.callback, result)
150 res = defer.Deferred() 151 deferred.addCallback(fire, res) 152 return res 153 154
155 -class Resolution:
156 """ 157 I am a helper class to make sure that the deferred is fired only once 158 with either a result or exception. 159 160 @ivar d: the deferred that gets fired as part of the resolution 161 @type d: L{twisted.internet.defer.Deferred} 162 """ 163
164 - def __init__(self):
165 self.d = defer.Deferred() 166 self.fired = False
167
168 - def cleanup(self):
169 """ 170 Clean up any resources related to the resolution. 171 Subclasses can implement me. 172 """ 173 pass
174
175 - def callback(self, result):
176 """ 177 Make the result succeed, triggering the callbacks with 178 the given result. If a result was already reached, do nothing. 179 """ 180 if not self.fired: 181 self.fired = True 182 self.cleanup() 183 self.d.callback(result)
184
185 - def errback(self, exception):
186 """ 187 Make the result fail, triggering the errbacks with the given exception. 188 If a result was already reached, do nothing. 189 """ 190 if not self.fired: 191 self.fired = True 192 self.cleanup() 193 self.d.errback(exception)
194 195
196 -class RetryingDeferred(object):
197 """ 198 Provides a mechanism to attempt to run some deferred operation until it 199 succeeds. On failure, the operation is tried again later, exponentially 200 backing off. 201 """ 202 maxDelay = 1800 # Default to 30 minutes 203 initialDelay = 5.0 204 # Arbitrarily take these constants from twisted's ReconnectingClientFactory 205 factor = 2.7182818284590451 206 jitter = 0.11962656492 207 delay = None 208
209 - def __init__(self, deferredCreate, *args, **kwargs):
210 """ 211 Create a new RetryingDeferred. Will call 212 deferredCreate(*args, **kwargs) each time a new deferred is needed. 213 """ 214 self._create = deferredCreate 215 self._args = args 216 self._kwargs = kwargs 217 218 self._masterD = None 219 self._running = False 220 self._callId = None
221
222 - def start(self):
223 """ 224 Start trying. Returns a deferred that will fire when this operation 225 eventually succeeds. That deferred will only errback if this 226 RetryingDeferred is cancelled (it will then errback with the result of 227 the next attempt if one is in progress, or a CancelledError. 228 # TODO: yeah? 229 """ 230 self._masterD = defer.Deferred() 231 self._running = True 232 self.delay = None 233 234 self._retry() 235 236 return self._masterD
237
238 - def cancel(self):
239 if self._callId: 240 self._callId.cancel() 241 self._masterD.errback(errors.CancelledError()) 242 self._masterD = None 243 244 self._callId = None 245 self._running = False
246
247 - def _retry(self):
248 self._callId = None 249 d = self._create(*self._args, **self._kwargs) 250 d.addCallbacks(self._success, self._failed)
251
252 - def _success(self, val):
253 # TODO: what if we were cancelled and then get here? 254 self._masterD.callback(val) 255 self._masterD = None
256
257 - def _failed(self, failure):
258 if self._running: 259 nextDelay = self._nextDelay() 260 self._callId = reactor.callLater(nextDelay, self._retry) 261 else: 262 self._masterD.errback(failure) 263 self._masterD = None
264
265 - def _nextDelay(self):
266 if self.delay is None: 267 self.delay = self.initialDelay 268 else: 269 self.delay = self.delay * self.factor 270 271 if self.jitter: 272 self.delay = random.normalvariate(self.delay, 273 self.delay * self.jitter) 274 self.delay = min(self.delay, self.maxDelay) 275 276 return self.delay
277