Skip to content

Alternative/lower-level API #11

Answered by frankie567
davidbrochart asked this question in Q&A
Discussion options

You must be logged in to vote

Well, didn't really thought about that 🤔 The key thing is to instantiate a AsyncWebSocketSession class. Problem is that a context manager is needed since we need to make a call to the .stream method of HTTPX, which is also a context manager.

A good trick I learned while developing this library is to use contextlib.AsyncExitStack. It wraps context manager(s) into a single object. So we could probably do something like this:

import contextlib
from httpx_ws import aconnect_ws

# Create the stack
stack = contextlib.AsyncExitStack()
ws = await stack.enter_async_context(aconnect_ws("http://localhost:8000/ws"))

# Work with your session
message = await ws.receive_text()
print(message)

# Close t…

Replies: 1 comment 1 reply

Comment options

You must be logged in to vote
1 reply
@davidbrochart
Comment options

Answer selected by davidbrochart
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
2 participants