-
Hi!
import asyncio
from faststream import FastStream
from app.core.config import get_settings
settings = get_settings()
# create_app is our factory that creates FastAPI application
app: FastStream = create_app(settings=settings)
broker = app.broker
publisher = broker.publisher(settings.APP.TARGET_TOPIC)
async def relay_to_kafka() -> None:
async with broker:
for n in range(100): # for loop is just for test, it should be `while True`
# grab a message from DB table
msg = f"Ping {n}"
await publisher.publish(key=str(n), message=msg)
await asyncio.sleep(0.1)
def main() -> None:
asyncio.run(relay_to_kafka())
main() Thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
If you want to just a publish messages, you can use smth like async with KafkaBroker() as broker:
await broker.publish(...) In any code you want. Direct I think, the preffered way in your case is do not use But, if you want to have a doc, you should create regular |
Beta Was this translation helpful? Give feedback.
If you want to just a publish messages, you can use smth like
In any code you want.
Direct
broker.publisher().publish(...)
is just a trick to generate AsyncAPI schema based on your app.I think, the preffered way in your case is do not use
FastStream
application object (if you need no AsyncAPI documentation of your service) and just write your own service withbroker.publish(...)
usageBut, if you want to have a doc, you should create regular
FastStream
app (withour any subscribers) and start endless task inapp.on_startup
hook