-
-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
restore AsyncExitStack, and use it correctly #96
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,26 +39,36 @@ def __init__(self, app: ASGIApp, scope: Scope) -> None: | |
self._send_queue: queue.Queue[Message] = queue.Queue() | ||
self.connection = wsproto.WSConnection(wsproto.ConnectionType.SERVER) | ||
self.connection.initiate_upgrade_connection(scope["headers"], scope["path"]) | ||
self._exit_stack = contextlib.AsyncExitStack() | ||
self._aentered = False | ||
|
||
async def __aenter__( | ||
self, | ||
) -> tuple["ASGIWebSocketAsyncNetworkStream", bytes]: | ||
self._task_group = await anyio.create_task_group().__aenter__() | ||
self._task_group.start_soon(self._run) | ||
if self._aentered: | ||
raise RuntimeError( | ||
"Cannot use ASGIWebSocketAsyncNetworkStream in a context manager twice" | ||
) | ||
self._aentered = True | ||
async with contextlib.AsyncExitStack() as stack: | ||
self._task_group = await stack.enter_async_context(anyio.create_task_group()) | ||
self._task_group.start_soon(self._run) | ||
|
||
await self.send({"type": "websocket.connect"}) | ||
message = await self.receive() | ||
|
||
await self.send({"type": "websocket.connect"}) | ||
message = await self.receive() | ||
stack.push_async_callback(self.aclose) | ||
|
||
if message["type"] == "websocket.close": | ||
await self.aclose() | ||
raise WebSocketDisconnect(message["code"], message.get("reason")) | ||
if message["type"] == "websocket.close": | ||
raise WebSocketDisconnect(message["code"], message.get("reason")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aclose was run here and then later in the |
||
|
||
assert message["type"] == "websocket.accept" | ||
return self, self._build_accept_response(message) | ||
assert message["type"] == "websocket.accept" | ||
retval = self, self._build_accept_response(message) | ||
self._exit_stack = stack.pop_all() | ||
return retval | ||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | ||
await self.aclose() | ||
await self._task_group.__aexit__(exc_type, exc_val, exc_tb) | ||
Comment on lines
-60
to
-61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. before it was possible for this aclose to raise eg CancelledError, but then it would not be raised into the taskgroup so the exception would not be processed |
||
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool | None: | ||
return await self._exit_stack.__aexit__(exc_type, exc_val, exc_tb) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. before the return code was not passed from |
||
|
||
async def read( | ||
self, max_bytes: int, timeout: typing.Optional[float] = None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if send was cancelled it would not propagate into the TaskGroup async context manager!
This was fixed by using two AsyncExitStacks and pop_all(), so all code is covered by a context manager and therefore the TaskGroup.