-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtests.py
261 lines (192 loc) · 5.84 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import gc
import sys
import weakref
from contextlib import contextmanager
from twisted.internet.defer import Deferred, returnValue, fail, succeed, CancelledError
from txcoroutine import noreturn, coroutine
# from unnamedframework.util.testing import assert_raises, deferred_result, assert_not_raises
def test_pause_unpause():
after_yield_reached = [False]
ctrl_d = Deferred()
@coroutine
def fn():
yield ctrl_d
after_yield_reached[0] = True
d = fn()
assert not after_yield_reached[0]
d.pause()
ctrl_d.callback(None)
assert not after_yield_reached[0]
d.unpause()
assert after_yield_reached[0]
def test_cancel():
after_yield_reached = [False]
ctrl_d = Deferred()
@coroutine
def fn():
yield ctrl_d
after_yield_reached[0] = True
d = fn()
assert not after_yield_reached[0]
d.addErrback(lambda f: f.trap(CancelledError))
d.cancel()
ctrl_d.callback(None)
assert not after_yield_reached[0]
def test_cancel_stops_the_generator():
generator_exit_caught = [False]
ctrl_d = Deferred()
@coroutine
def fn():
try:
yield ctrl_d
except GeneratorExit:
generator_exit_caught[0] = True
d = fn()
d.addErrback(lambda f: f.trap(CancelledError))
d.cancel()
assert generator_exit_caught[0]
def test_main():
d_ctrl_1, d_ctrl_2 = Deferred(), Deferred()
wref = [None]
never_called = [True]
@coroutine
def test_coroutine():
dummy = Dummy()
wref[0] = weakref.ref(dummy)
yield d_ctrl_1 # pause for state inspection
noreturn(other_coroutine())
never_called[0] = False
other_coroutine_called = [False]
def other_coroutine():
other_coroutine_called[0] = True
yield d_ctrl_2 # pause for state inspection
final_d = test_coroutine()
gc.collect() # don't rely on automatic collection
assert wref[0]() # just to reinforce the test logic
d_ctrl_1.callback(None) # resume test_coroutine
assert other_coroutine_called[0], "the other coroutine should be executed"
gc.collect() # don't rely on automatic collection
assert not wref[0](), "objects in the caller should become garbage"
assert not final_d.called
d_ctrl_2.callback(None) # resume other_coroutine
assert final_d.called, "the whole procedure should have been completed"
assert never_called[0], "flow should never return to the noreturn-calling coroutine"
def test_deep_recursion():
def fact(n, result=1):
if n <= 1:
returnValue(result)
else:
noreturn(fact(n - 1, n * result))
yield
assert deferred_result(coroutine(fact)(1)) == 1
assert deferred_result(coroutine(fact)(10)) == safe_fact(10)
with recursion_limit(100):
try:
# +10 is actually too high here as we probably already have some stuff on the stack, but just to be sure
assert deferred_result(coroutine(fact)(110)) == safe_fact(110)
except RuntimeError:
assert False
# ...and now let's prove that the same (tail call optimizable) algorithm without noreturn will eat up the stack
def normal_fact(n, result=1):
if n <= 1:
returnValue(result)
else:
return normal_fact(n - 1, n * result)
with recursion_limit(100):
try:
normal_fact(110)
except RuntimeError:
pass
else:
assert False, "normal_fact(110)"
def test_with_previous_yield_result_not_none():
class MockException(Exception):
pass
fn_called = [False]
@coroutine
def fn():
fn_called[0] = True
try:
yield fail(MockException())
except MockException:
pass
noreturn(fn2())
def fn2():
yield succeed(None)
try:
fn()
except MockException:
assert False
assert fn_called[0]
def test_noreturn_of_other_inlineCallbacks_wrapped_callable():
after_noreturn_reached = [False]
@coroutine
def fn():
yield
noreturn(fn2())
after_noreturn_reached[0] = True
fn2_called = [False]
@coroutine
def fn2():
fn2_called[0] = True
yield
returnValue('someretval')
retval = deferred_result(fn())
assert fn2_called[0]
assert not after_noreturn_reached[0]
assert retval == 'someretval'
def test_noreturn_with_regular_function():
after_noreturn_reached = [False]
@coroutine
def fn():
yield
noreturn(fn2())
after_noreturn_reached[0] = True
def fn2():
return 'someretval'
retval = deferred_result(fn())
assert not after_noreturn_reached[0]
assert retval == 'someretval'
def test_pause_unpause_cancel_on_coroutine_with_no_depends_on():
@coroutine
def fn():
yield
c = fn()
c.pause()
c.unpause()
c.cancel()
def test_cancelled_coroutine_does_not_keep_its_last_depends_on_from_being_garbage_collected():
depends_on = Deferred()
@coroutine
def fn():
yield depends_on
c = fn()
depends_on_weakref = weakref.ref(depends_on)
depends_on = None
c.cancel()
assert not depends_on_weakref()
@contextmanager
def recursion_limit(n):
old = sys.getrecursionlimit()
sys.setrecursionlimit(n)
try:
yield
finally:
sys.setrecursionlimit(old)
# because object() cannot be weakly referenced
class Dummy:
pass
def safe_fact(n):
return reduce(lambda a, b: a * b, xrange(1, n + 1))
def deferred_result(d):
ret = [None]
exc = [None]
if isinstance(d, Deferred):
assert d.called
d.addCallback(lambda result: ret.__setitem__(0, result))
d.addErrback(lambda f: exc.__setitem__(0, f))
if exc[0]:
exc[0].raiseException()
return ret[0]
else:
return d