-
Hi, Describe the bug I get this error:
How to reproduce from asgiref.sync import async_to_sync
from faststream.nats import NatsBroker
from pydantic import BaseModel, Field
class RunProgressMessage(BaseModel):
progress_percentage: float = Field(ge=0, le=100)
remaining_time_seconds: float | None = Field(None, ge=0)
class NATSRunStatusPublisher:
def __init__(self, broker: NatsBroker, nats_subject_template: str) -> None:
self._broker = broker
self._nats_subject_template = nats_subject_template
def publish_run_status(self, progress_message: RunProgressMessage) -> None:
async_to_sync(self.apublish_run_status)(progress_message)
async def apublish_run_status(self, progress_message: RunProgressMessage) -> None:
nats_subject = self._nats_subject_template.format(run_id="some_run_id")
publisher = self._broker.publisher(nats_subject)
await publisher.publish(
subject=nats_subject,
message=progress_message,
)
if __name__ == "__main__":
broker = NatsBroker()
publisher = NATSRunStatusPublisher(broker, nats_subject_template="runStatus.{run_id}")
publisher.publish_run_status(
RunProgressMessage(progress_percentage=50, remaining_time_seconds=50)
) Expected behavior I'm not sure if my code is correct or not, but to me, it should send the message. Observed behavior
Environment Running FastStream 0.2.5 with CPython 3.11.4 on Darwin Additional context Thanks :) |
Beta Was this translation helpful? Give feedback.
Replies: 12 comments 2 replies
-
@Jonas1312 well, looks like I need to change error text, but it is not a bug: u just forgot to connect your broker before you will be able to publish smth |
Beta Was this translation helpful? Give feedback.
-
oh ok my bad! Not sure though if I should use broker.connect() or broker.start() ? |
Beta Was this translation helpful? Give feedback.
-
Connect should be enough |
Beta Was this translation helpful? Give feedback.
-
It still have the same error with this code after connecting the broker: from asgiref.sync import async_to_sync
from faststream.nats import NatsBroker
from pydantic import BaseModel, Field
class RunProgressMessage(BaseModel):
"""A message that contains the progress of a run."""
progress_percentage: float = Field(ge=0, le=100)
remaining_time_seconds: float | None = Field(None, ge=0)
class NATSRunStatusPublisher:
"""A publisher that publishes run status updates to NATS."""
def __init__(self, broker: NatsBroker, nats_subject_template: str) -> None:
"""Initialize the NATSRunStatusPublisher."""
self._broker = broker
self._nats_subject_template = nats_subject_template
async_to_sync(broker.connect)()
def publish_run_status(self, progress_message: RunProgressMessage) -> None:
"""Publish the status of a run."""
async_to_sync(self.apublish_run_status)(progress_message)
async def apublish_run_status(self, progress_message: RunProgressMessage) -> None:
"""Publish the status of a run."""
nats_subject = self._nats_subject_template.format(run_id="some_run_id")
publisher = self._broker.publisher(nats_subject)
await publisher.publish(
subject=nats_subject,
message=progress_message,
)
if __name__ == "__main__":
broker = NatsBroker()
publisher = NATSRunStatusPublisher(broker, nats_subject_template="runStatus.{run_id}")
publisher.publish_run_status(
RunProgressMessage(progress_percentage=50, remaining_time_seconds=50)
) |
Beta Was this translation helpful? Give feedback.
-
I think, it is connected with your asgiref usage. It runs all your async_to_sync functions on threads separately, so your publishing broker is not the same with your connected one |
Beta Was this translation helpful? Give feedback.
-
Btw, why are you trying to make FastStream client asynchronous? |
Beta Was this translation helpful? Give feedback.
-
I'm not sure if it's because of the thread, since threads share the same objects and memory within the same process. I try to use faststream synchronously since my NATSRunStatusPublisher runs within a sync method, and I can't change that. I tried to debug the execution of the program, and the broker looks connected, but not started, and a _producer is available: |
Beta Was this translation helpful? Give feedback.
-
@Jonas1312 well, I found the problem: publisher can't be created after the broker was connected/started async def apublish_run_status(self, progress_message: RunProgressMessage) -> None:
"""Publish the status of a run."""
nats_subject = self._nats_subject_template.format(run_id="some_run_id")
await self._broker.publish(
subject=nats_subject,
message=progress_message,
) |
Beta Was this translation helpful? Give feedback.
-
ok, so I changed the class like this: class NATSRunStatusPublisher(AbstractRunStatusPublisher):
"""A publisher that publishes run status updates to NATS."""
def __init__(self, broker: NatsBroker, nats_subject_template: str) -> None:
"""Initialize the NATSRunStatusPublisher."""
self._broker = broker
self._nats_subject_template = nats_subject_template
def publish_run_status(self, progress_message: RunProgressMessage) -> None:
"""Publish the status of a run."""
async_to_sync(self.apublish_run_status)(progress_message)
async def apublish_run_status(self, progress_message: RunProgressMessage) -> None:
"""Publish the status of a run."""
nats_subject = self._nats_subject_template.format(run_id=progress_message.run_id)
async with self._broker as br:
await br.publish(
subject=nats_subject,
message=progress_message,
) and indeed, the messages are sent to NATS. However, the TestClient doesn't like it, it doesn't catch the calls in my test: import asyncio
from unittest.mock import call
import pytest
from faststream.nats import NatsBroker, TestNatsBroker
@pytest.mark.asyncio()
async def test_given_run_progress_msg_when_i_publish_async_msg_on_nats_then_it_works():
broker = NatsBroker()
@broker.subscriber("runStatus.*")
def some_subscriber(msg: RunProgressMessage) -> None:
_ = msg
# Given
messages = [
RunProgressMessage(
progress_percentage=12,
remaining_time_seconds=42,
)
]
async with TestNatsBroker(broker, with_real=True) as br: # using real nats server
nats_publisher = NATSRunStatusPublisher(
br, nats_subject_template="runStatus.{run_id}"
)
# When
for msg in messages:
await nats_publisher.apublish_run_status(msg) # async
await asyncio.sleep(5) # wait for messages to be received
# Then
assert some_subscriber.mock
some_subscriber.mock.assert_has_calls(
[call(msg.model_dump()) for msg in messages], any_order=True
) No message is caught by the subscriber 🤔 When I set I have a nats server running: docker run --rm -p 4222:4222 nats -js |
Beta Was this translation helpful? Give feedback.
-
@Jonas1312 well, I found a solution: class NATSRunStatusPublisher:
"""A publisher that publishes run status updates to NATS."""
def __init__(self, broker: NatsBroker, nats_subject_template: str) -> None:
"""Initialize the NATSRunStatusPublisher."""
self._broker = broker
self._nats_subject_template = nats_subject_template
async def connect(self):
await self._broker.connect()
def publish_run_status(self, progress_message: RunProgressMessage) -> None:
"""Publish the status of a run."""
async_to_sync(self.apublish_run_status)(progress_message)
async def apublish_run_status(self, progress_message: RunProgressMessage) -> None:
"""Publish the status of a run."""
subject = self._nats_subject_template.format(run_id="some_run_id")
await self._broker.publish(progress_message, subject) Testing: @pytest.mark.asyncio
async def test():
broker = NatsBroker()
@broker.subscriber("runStatus.*")
async def handle(msg: RunProgressMessage):
pass
msg = RunProgressMessage(
progress_percentage=10,
remaining_time_seconds=None,
)
async with TestNatsBroker(broker, with_real=True) as br: # using real nats server
nats_publisher = NATSRunStatusPublisher(
br, nats_subject_template="runStatus.{run_id}"
)
await nats_publisher.connect()
await nats_publisher.apublish_run_status(msg)
await handle.wait_call(3)
handle.mock.assert_called_once_with(msg.model_dump()) Regular calling: if __name__ == "__main__":
broker = NatsBroker()
publisher = NATSRunStatusPublisher(broker, nats_subject_template="runStatus.{run_id}")
async_to_sync(publisher.connect)()
publisher.publish_run_status(
RunProgressMessage(progress_percentage=50, remaining_time_seconds=50)
) |
Beta Was this translation helpful? Give feedback.
-
Indeed, looks like asgiref conflicts with nats-py cuz all arguments are passing to nats-py correctly, but nothing was published. |
Beta Was this translation helpful? Give feedback.
-
@Jonas1312 this is a correct solution for making your broker syncronous. I'll add this wrapper to the framework in the future, but for now you can use it manually: from contextlib import ExitStack
from anyio.from_thread import start_blocking_portal
from faststream.nats import NatsBroker
class Pubsliher:
def __init__(self, broker: NatsBroker):
self.broker = broker
self.exit_stack = None
def __enter__(self) -> "Pubsliher":
return self.connect()
def __exit__(self, *args, **kwargs):
self.close()
def connect(self):
with ExitStack() as stack:
portal = self.portal = stack.enter_context(start_blocking_portal())
portal.call(self.aconnect)
@stack.callback
def wait_shutdown() -> None:
portal.call(self.aclose)
self.exit_stack = stack.pop_all()
return self
def close(self):
self.exit_stack.close()
self.portal = None
self.exit_stack = None
def publish(self, msg, subject):
assert self.exit_stack, "You should use `connect()` publisher at first"
self.portal.call(self.apublish, msg, subject)
async def aconnect(self):
await self.broker.connect()
async def aclose(self):
await self.broker.close()
async def apublish(self, msg, subject):
await self.broker.publish(msg, subject) Usage if __name__ == "__main__":
pub = Pubsliher(NatsBroker())
pub.connect()
pub.publish("Hi!", "test")
pub.close()
# or
with Pubsliher(NatsBroker()) as p:
p.publish("Hi!", "test") Testing import pytest
from faststream.nats import TestNatsBroker
@pytest.mark.asyncio
async def test():
broker = NatsBroker()
@broker.subscriber("test")
async def handler(): ...
async with TestNatsBroker(broker, with_real=True) as br:
await Pubsliher(br).apublish("Hi!", "test")
await handler.wait_call(3)
handler.mock.assert_called_once_with("Hi!") |
Beta Was this translation helpful? Give feedback.
@Jonas1312 this is a correct solution for making your broker syncronous. I'll add this wrapper to the framework in the future, but for now you can use it manually: