1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Framework for writing automated integration tests.
24
25 This module provides a way of writing automated integration tests from
26 within Twisted's unit testing framework, trial. Test cases are
27 constructed as subclasses of the normal trial
28 L{twisted.trial.unittest.TestCase} class.
29
30 Integration tests look like normal test methods, except that they are
31 decorated with L{integration.test}, take an extra "plan" argument, and
32 do not return anything. For example::
33
34 from twisted.trial import unittest
35 from flumotion.twisted import integration
36
37 class IntegrationTestExample(unittest.TestCase):
38 @integration.test
39 def testEchoFunctionality(self, plan):
40 process = plan.spawn('echo', 'hello world')
41 plan.wait(process, 0)
42
43 This example will spawn a process, as if you typed "echo 'hello world'"
44 at the shell prompt. It then waits for the process to exit, expecting
45 the exit status to be 0.
46
47 The example illustrates two of the fundamental plan operators, spawn and
48 wait. "spawn" spawns a process. "wait" waits for a process to finish.
49 The other operators are "spawnPar", which spawns a number of processes
50 in parallel, "waitPar", which waits for a number of processes in
51 parallel, and "kill", which kills one or more processes via SIGTERM and
52 then waits for them to exit.
53
54 It is evident that this framework is most appropriate for testing the
55 integration of multiple processes, and is not suitable for in-process
56 tests. The plan that is built up is only executed after the test method
57 exits, via the L{integration.test} decorator; the writer of the
58 integration test does not have access to the plan's state.
59
60 Note that all process exits must be anticipated. If at any point the
61 integration tester receives SIGCHLD, the next operation must be a wait
62 for that process. If this is not the case, the test is interpreted as
63 having failed.
64
65 Also note that while the test is running, the stdout and stderr of each
66 spawned process is redirected into log files in a subdirectory of where
67 the test is located. For example, in the previous example, the following
68 files will be created::
69
70 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout
71 $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr
72
73 In the case that multiple echo commands are run in the same plan, the
74 subsequent commands will be named as echo-1, echo-2, and the like. Upon
75 successful completion of the test case, the log directory will be
76 deleted.
77 """
78
79 import os
80 import signal
81 import tempfile
82
83 from twisted.python import failure
84 from twisted.internet import reactor, protocol, defer
85 from flumotion.common import log as flog
86
87 __version__ = "$Rev: 8133 $"
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp()
111
112
113 -def log(format, *args):
115
116
117 -def debug(format, *args):
119
120
121 -def info(format, *args):
123
124
127
128
129 -def error(format, *args):
131
132
134 if os.sep in executable:
135 if os.access(os.path.abspath(executable), os.X_OK):
136 return os.path.abspath(executable)
137 elif os.getenv('PATH'):
138 for path in os.getenv('PATH').split(os.pathsep):
139 if os.access(os.path.join(path, executable), os.X_OK):
140 return os.path.join(path, executable)
141 raise CommandNotFoundException(executable)
142
143
145
146 - def __init__(self, process, expectedCode, actualCode):
147 Exception.__init__(self)
148 self.process = process
149 self.expected = expectedCode
150 self.actual = actualCode
151
153 return ('Expected exit code %r from %r, but got %r'
154 % (self.expected, self.process, self.actual))
155
156
158
162
164 return 'The process %r exited prematurely.' % self.process
165
166
168
172
174 return 'Command %r not found in the PATH.' % self.command
175
176
178
180 Exception.__init__(self)
181 self.processes = processes
182
184 return ('Processes still running at end of test: %r'
185 % (self.processes, ))
186
187
189
193
195 return ('Timed out waiting for %r to exit with status %r'
196 % (self.process, self.status))
197
198
200
202 self.exitDeferred = defer.Deferred()
203 self.timedOut = False
204
206 return self.exitDeferred
207
208 - def timeout(self, process, status):
212
214 info('process ended with status %r, exit code %r',
215 status, status.value.exitCode)
216 if self.timedOut:
217 warning('already timed out??')
218 print 'already timed out quoi?'
219 else:
220 info('process ended with status %r, exit code %r',
221 status, status.value.exitCode)
222 self.exitDeferred.callback(status.value.exitCode)
223
224
226 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED'
227
228 - def __init__(self, name, argv, testDir):
229 self.name = name
230 self.argv = (_which(argv[0]), ) + argv[1:]
231 self.testDir = testDir
232
233 self.pid = None
234 self.protocol = None
235 self.state = self.NOT_STARTED
236 self._timeoutDC = None
237
238 log('created process object %r', self)
239
241 assert self.state == self.NOT_STARTED
242
243 self.protocol = ProcessProtocol()
244
245 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w')
246 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w')
247
248 childFDs = {1: stdout.fileno(), 2: stderr.fileno()}
249
250
251
252
253
254
255
256
257
258
259
260
261 info('spawning process %r, argv=%r', self, self.argv)
262 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL)
263 env = dict(os.environ)
264 env['FLU_DEBUG'] = '5'
265 process = reactor.spawnProcess(self.protocol, self.argv[0],
266 env=env, args=self.argv,
267 childFDs=childFDs)
268 signal.signal(signal.SIGTERM, termHandler)
269
270 stdout.close()
271 stderr.close()
272
273
274
275
276 self.pid = process.pid
277 self.state = self.STARTED
278
279 def got_exit(res):
280 self.state = self.STOPPED
281 info('process %r has stopped', self)
282 return res
283 self.protocol.getDeferred().addCallback(got_exit)
284
285 - def kill(self, sig=signal.SIGTERM):
286 assert self.state == self.STARTED
287 info('killing process %r, signal %d', self, sig)
288 os.kill(self.pid, sig)
289
290 - def wait(self, status, timeout=20):
301 d.addCallback(got_exit)
302 if self.state == self.STARTED:
303 self._timeoutDC = reactor.callLater(timeout,
304 self.protocol.timeout,
305 self,
306 status)
307
308 def cancel_timeout(res):
309 debug('cancelling timeout for %r', self)
310 if self._timeoutDC.active():
311 self._timeoutDC.cancel()
312 return res
313 d.addCallbacks(cancel_timeout, cancel_timeout)
314 return d
315
317 return '<Process %s in state %s>' % (self.name, self.state)
318
319
321
322
324 self.processes = []
325 self.timeout = 20
326
327 - def spawn(self, process):
332
338
339 - def kill(self, process):
343
344 - def wait(self, process, exitCode):
345 assert process in self.processes
346
347 def remove_from_processes_list(_):
348 self.processes.remove(process)
349 d = process.wait(exitCode, timeout=self.timeout)
350 d.addCallback(remove_from_processes_list)
351 return d
352
368 p.protocol.processEnded = callbacker(d)
369 p.kill(sig=signal.SIGKILL)
370 d = defer.DeferredList(dlist)
371
372 def error(_):
373 if failure:
374 return failure
375 else:
376 raise e
377 d.addCallback(error)
378 return d
379 return failure
380
381 - def run(self, ops, timeout=20):
382 self.timeout = timeout
383 d = defer.Deferred()
384
385 def run_op(_, op):
386
387
388 return op[0](*op[1:])
389 for op in ops:
390 d.addCallback(run_op, op)
391 d.addCallbacks(lambda _: self._checkProcesses(failure=None),
392 lambda failure: self._checkProcesses(failure=failure))
393
394
395
396
397
398 reactor.callLater(0, d.callback, None)
399 return d
400
401
403
404 - def __init__(self, testCase, testName):
416
418 testDir = tempfile.mkdtemp(prefix="test_integration")
419 return testDir
420
422 tail = '%s-%s' % (self.testCaseName, self.name)
423 outputDir = os.path.join(testDir, tail)
424 os.mkdir(outputDir)
425 return outputDir
426
428 for root, dirs, files in os.walk(self.outputDir, topdown=False):
429 for name in files:
430 os.remove(os.path.join(root, name))
431 for name in dirs:
432 os.rmdir(os.path.join(root, name))
433 os.rmdir(self.outputDir)
434 os.rmdir(self.testDir)
435 self.testDir = None
436 self.outputDir = None
437
448
451
454
455 - def spawn(self, command, *args):
459
461 processes = []
462 self._appendOp(self.vm.checkExits, ())
463 for argv in argvs:
464 assert isinstance(argv, tuple), \
465 'all arguments to spawnPar must be tuples'
466 for arg in argv:
467 assert isinstance(arg, str), \
468 'all subarguments to spawnPar must be strings'
469 processes.append(self._allocProcess(argv))
470 for process in processes:
471 self._appendOp(self.vm.spawn, process)
472 return tuple(processes)
473
474 - def wait(self, process, status):
476
477 - def waitPar(self, *processStatusPairs):
482
483 - def kill(self, process, status=None):
487
492
493
495 testName = proc.__name__
496
497 def wrappedtest(self):
498 plan = Plan(self, testName)
499 proc(self, plan)
500 return plan.execute()
501 try:
502 wrappedtest.__name__ = testName
503 except TypeError:
504
505 pass
506
507
508 wrappedtest.timeout = 666
509 return wrappedtest
510