From 15037f8fc549e5dfbc809e81915f68e35896ab1b Mon Sep 17 00:00:00 2001 From: Erik Harding Date: Tue, 13 Feb 2024 09:17:49 +0200 Subject: [PATCH] Add start_consuming function and calls --- .../junebug_message_api/junebug_message_api.py | 2 ++ src/vumi2/connectors.py | 11 ++++++++++- src/vumi2/routers.py | 2 ++ src/vumi2/workers.py | 3 +++ tests/helpers.py | 3 +++ tests/test_routers.py | 3 +++ tests/test_workers.py | 4 ++++ 7 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/vumi2/applications/junebug_message_api/junebug_message_api.py b/src/vumi2/applications/junebug_message_api/junebug_message_api.py index 26d4e2e..b245a51 100644 --- a/src/vumi2/applications/junebug_message_api/junebug_message_api.py +++ b/src/vumi2/applications/junebug_message_api/junebug_message_api.py @@ -119,6 +119,8 @@ async def setup(self) -> None: methods=["POST"], ) + await self.start_consuming() + async def handle_inbound_message(self, message: Message) -> None: """ Send the vumi message as an HTTP request to the configured URL. diff --git a/src/vumi2/connectors.py b/src/vumi2/connectors.py index 16fd890..09a39c9 100644 --- a/src/vumi2/connectors.py +++ b/src/vumi2/connectors.py @@ -192,9 +192,13 @@ async def _setup_consumer(self, message_type, handler, message_class) -> None: message_class=message_class, concurrency=self.concurrency, ) - await consumer.start() self._consumers[message_type] = consumer + async def start_consuming(self): + async with trio.open_nursery() as nursery: + for consumer in self._consumers.values(): + nursery.start_soon(consumer.start) + async def _setup_publisher(self, message_type: str) -> None: routing_key = self.routing_key(message_type) publisher = Publisher(self.connection, routing_key) @@ -273,3 +277,8 @@ async def aclose(self): async with trio.open_nursery() as nursery: for connector in self.connectors: nursery.start_soon(connector.aclose_publishers) + + async def start_consuming(self): + async with trio.open_nursery() as nursery: + for connector in self.connectors: + nursery.start_soon(connector.start_consuming) diff --git a/src/vumi2/routers.py b/src/vumi2/routers.py index f4f59a5..858c04c 100644 --- a/src/vumi2/routers.py +++ b/src/vumi2/routers.py @@ -56,6 +56,8 @@ async def setup(self): event_handler=self.handle_event, ) + await self.start_consuming() + # TODO: Teardown async def handle_inbound_message(self, message: Message): diff --git a/src/vumi2/workers.py b/src/vumi2/workers.py index f39f3eb..d7da998 100644 --- a/src/vumi2/workers.py +++ b/src/vumi2/workers.py @@ -128,6 +128,9 @@ async def aclose(self): async def setup(self): pass + async def start_consuming(self): + await self._connectors.start_consuming() + async def setup_receive_inbound_connector( self, connector_name: str, diff --git a/tests/helpers.py b/tests/helpers.py index 701b184..c43db0d 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -142,6 +142,9 @@ async def setup_ro(self, name: str) -> ROConn: self._connectors.add(conn.conn) return conn + async def start_consuming(self): + await self._connectors.start_consuming() + T = TypeVar("T") diff --git a/tests/test_routers.py b/tests/test_routers.py index ac13c7b..3c9dce3 100644 --- a/tests/test_routers.py +++ b/tests/test_routers.py @@ -72,6 +72,7 @@ async def test_to_addr_router_event(to_addr_router, connector_factory): sent_message_id=outbound.message_id, ) ri_app1 = await connector_factory.setup_ri("app1") + await connector_factory.start_consuming() await to_addr_router.handle_outbound_message(outbound) await to_addr_router.handle_event(event) @@ -99,6 +100,7 @@ async def test_to_addr_router_inbound(to_addr_router, connector_factory): ri_app1 = await connector_factory.setup_ri("app1") ri_app2 = await connector_factory.setup_ri("app2") ro_test1 = await connector_factory.setup_ro("test1") + await connector_factory.start_consuming() await ro_test1.publish_inbound(msg1) await ro_test1.publish_inbound(msg2) @@ -127,6 +129,7 @@ async def test_to_addr_router_outbound(to_addr_router, connector_factory): ri_app1 = await connector_factory.setup_ri("app1") ro_test1 = await connector_factory.setup_ro("test1") ro_test2 = await connector_factory.setup_ro("test2") + await connector_factory.start_consuming() await ri_app1.publish_outbound(msg1) await ri_app1.publish_outbound(msg2) diff --git a/tests/test_workers.py b/tests/test_workers.py index b9230d9..987e31d 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -51,6 +51,7 @@ async def setup(self): self.allow_out, self.r_allow_out = open_memory_channel[None](0) await self.setup_receive_inbound_connector("ri", self.handle_in, self.handle_ev) await self.setup_receive_outbound_connector("ro", self.handle_out) + await self.start_consuming() async def aclose(self): await super().aclose() @@ -185,6 +186,7 @@ async def test_aclose_pending_inbound(nursery, worker, connector_factory): blocked until the handler finishes. """ ro_ri = await connector_factory.setup_ro("ri") + await connector_factory.start_consuming() await worker.setup() assert not worker.is_closed @@ -212,6 +214,7 @@ async def test_aclose_pending_event(nursery, worker, connector_factory): the handler finishes. """ ro_ri = await connector_factory.setup_ro("ri") + await connector_factory.start_consuming() await worker.setup() assert not worker.is_closed @@ -239,6 +242,7 @@ async def test_aclose_pending_outbound(nursery, worker, connector_factory): blocked until the handler finishes. """ ri_ro = await connector_factory.setup_ri("ro") + await connector_factory.start_consuming() await worker.setup() assert not worker.is_closed