-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_usot.py
157 lines (124 loc) · 4.49 KB
/
test_usot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import pytest,asyncio
import logging
from htagweb import Usot
class SessionMemory:
def __init__(self):
self.SESSIONS={}
def get(self,uid:str):
return self.SESSIONS.get(uid,{})
def set(self,uid:str,value:dict):
assert isinstance(value,dict)
self.SESSIONS[uid]=value
# class BuggedObject:
# def testko(self):
# return 42/0 # runtime error ;-)
# def testok(self):
# return 42
# class ObjectTest:
# async def test(self):
# await asyncio.sleep(0.1)
# return 42
# def testsize(self,buf):
# return buf
# def f(klass,port):
# async def loop():
# px = ProxySingleton( klass, port=port )
# d=await px.get("uid")
# d["nb"]+=1
# await px.set("uid",d)
# asyncio.run(loop())
# @pytest.mark.asyncio
# async def test_base():
# m=ProxySingleton( SessionMemory, port=19999 )
# assert await m.get("uid1") == {}
# await m.set("uid1", dict(a=42))
# assert await m.get("uid1") == dict(a=42)
# m2=ProxySingleton( SessionMemory, port=19999 )
# assert await m2.get("uid1") == dict(a=42)
# m2=ProxySingleton( SessionMemory, port=19999 )
# assert await m2.get("uid1") == dict(a=42)
# assert await m.get("uid1") == dict(a=42)
# @pytest.mark.asyncio
# async def test_compatibility_in_multiprocessing():
# # ensure is compatible in different process
# import multiprocessing
# m=ProxySingleton( SessionMemory, port=19999 )
# await m.set("uid",dict(nb=0))
# p=multiprocessing.Process(target=f,args=(SessionMemory,19999,))
# p.start()
# while p.is_alive():
# await asyncio.sleep(0.2)
# xx=await m.get("uid")
# assert {"nb":1} == xx
# @pytest.mark.asyncio
# async def test_classical_use():
# # ensure the classic use works
# async with ServeUnique( SessionMemory, port=19999 ) as m:
# assert m.is_server()
# print(m)
# assert await m.get("uid1") == {}
# await m.set("uid1", dict(a=42))
# assert await m.get("uid1") == dict(a=42)
# assert not m.is_server() # and it's closed
# async with ServeUnique( SessionMemory, port=21213 ) as m:
# assert {} == await m.get("uid") # previous was closed, so new one
# @pytest.mark.asyncio
# async def test_exception_are_well_managed():
# # ensure exception are well managed
# async with ServeUnique( BuggedObject, port=19999 ) as m:
# with pytest.raises(ZeroDivisionError):
# await m.testko()
# # it works after crash
# assert 42==await m.testok()
# @pytest.mark.asyncio
# async def test_async_methods_on_object():
# # ensure object can have async methods
# async with ServeUnique( ObjectTest, port=19999 ) as m:
# assert 42==await m.test()
# buf=500_000*"x"
# assert buf==await m.testsize(buf) # work better than redys
# @pytest.mark.asyncio
# async def test_compatibility_in_inner_thread():
# # ensure is compatible in same thread
# async with ServeUnique( SessionMemory, port=21213 ) as m:
# assert m.is_server()
# await m.set("uid",dict(nb=0))
# async with ServeUnique( SessionMemory, port=21213 ) as m2:
# assert not m2.is_server() # m is not the real server
# d=await m2.get("uid")
# d["nb"]+=1
# await m2.set("uid",d)
# assert {"nb":1} == await m.get("uid")
def sync_test(client):
client.set("uid1", dict(a=43))
assert client.get("uid1") == dict(a=43)
async def async_test(client):
await client.set("uid1", dict(a=42))
assert await client.get("uid1") == dict(a=42)
@pytest.mark.asyncio
async def test_task():
print(">>> TASK")
p=Usot(SessionMemory,port=19999)
p.start()
try:
await async_test( p.clientasync )
##sync_test( p.clientsync ) # sync not possible
finally:
p.stop()
# def test_thread():
# print(">>> THREAD")
# p=Usot(SessionMemory,port=19919)
# p.start_thread()
# try:
# #await async_test( p.clientasync )
# sync_test( p.clientsync )
# finally:
# p.stop()
if __name__=="__main__":
logging.basicConfig(format='[%(levelname)-5s] %(name)s: %(message)s',level=logging.DEBUG)
# asyncio.run( test_base() )
# asyncio.run( test_classical_use() )
# asyncio.run( test_exception_are_well_managed() )
# asyncio.run( test_async_methods_on_object() )
# asyncio.run( test_compatibility_in_inner_thread() )
# asyncio.run( test_compatibility_in_multiprocessing() )