-
I am trying out the FastStream plugin for FastAPI for a project and I am unable to figure out how to forward messages from a consumer to an open websocket. Before, I was using rstream to dynamically create consumers based on request parameters. Now, I am currently trying the following: from faststream.rabbit.fastapi import RabbitRouter, RabbitMessage
from faststream.rabbit import RabbitQueue, RabbitBroker
from fastapi import WebSocket, Depends
from typing_extensions import Annotated
broker_router = RabbitRouter("amqp://guest:guest@localhost:5672/", max_consumers=1)
def broker():
return broker_router.broker
@broker_router.websocket("/ws/collection/{cid}/{eid}")
async def stream_collection(
websocket: WebSocket,
cid: str,
eid: str,
broker: Annotated[RabbitBroker, Depends(broker)],
):
await websocket.accept()
collection_stream = RabbitQueue(
f"{cid}-stream-audio",
durable=True,
arguments={
"x-queue-type": "stream",
},
)
async def handle_message(m: RabbitMessage):
await websocket.send_bytes(m)
subscriber = broker.subscriber(
collection_stream, consume_args={"x-stream-offset": "first"}
)
subscriber(handle_message)
broker.setup_subscriber(subscriber)
await subscriber.start() And then I am just including the broker_router in my main FastAPI app like this: app.include_router(broker_router) Whenever I test the websocket endpoint I am able to connect successfully, the consumer begins receiving and processing messages, but every time So since Thanks in advance for any help and insight. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
I found a StackOverflow article that showed that from faststream.rabbit.fastapi import RabbitRouter, RabbitMessage
from faststream.rabbit import RabbitQueue, RabbitBroker
from fastapi import WebSocket, Depends
from typing_extensions import Annotated
from fastapi.websockets import WebSocketDisconnect
broker_router = RabbitRouter("amqp://guest:guest@localhost:5672/", max_consumers=1)
def broker():
return broker_router.broker
@broker_router.websocket("/ws/collection/{cid}/{eid}")
async def stream_collection(
websocket: WebSocket,
cid: str,
eid: str,
broker: Annotated[RabbitBroker, Depends(broker)],
):
await websocket.accept()
collection_stream = RabbitQueue(
f"{cid}-stream-audio",
durable=True,
arguments={
"x-queue-type": "stream",
},
)
async def handle_message(m: RabbitMessage):
await websocket.send_bytes(m)
subscriber = broker.subscriber(
collection_stream, consume_args={"x-stream-offset": "first"}
)
subscriber(handle_message)
broker.setup_subscriber(subscriber)
try:
await subscriber.start()
while True:
await websocket.receive_text()
except WebSocketDisconnect:
await subscriber.close()
print("WebSocket disconnected") |
Beta Was this translation helpful? Give feedback.
I found a StackOverflow article that showed that
subscriber.start()
is actually NOT blocking, therefore I needed to use a blocking loop to keep the socket connection open. Proper approach: