Skip to content

Commit

Permalink
Add start_consuming function and calls
Browse files Browse the repository at this point in the history
  • Loading branch information
erikh360 committed Feb 13, 2024
1 parent 8270d64 commit 15037f8
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ async def setup(self) -> None:
methods=["POST"],
)

await self.start_consuming()

async def handle_inbound_message(self, message: Message) -> None:
"""
Send the vumi message as an HTTP request to the configured URL.
Expand Down
11 changes: 10 additions & 1 deletion src/vumi2/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,13 @@ async def _setup_consumer(self, message_type, handler, message_class) -> None:
message_class=message_class,
concurrency=self.concurrency,
)
await consumer.start()
self._consumers[message_type] = consumer

async def start_consuming(self):
async with trio.open_nursery() as nursery:
for consumer in self._consumers.values():
nursery.start_soon(consumer.start)

async def _setup_publisher(self, message_type: str) -> None:
routing_key = self.routing_key(message_type)
publisher = Publisher(self.connection, routing_key)
Expand Down Expand Up @@ -273,3 +277,8 @@ async def aclose(self):
async with trio.open_nursery() as nursery:
for connector in self.connectors:
nursery.start_soon(connector.aclose_publishers)

async def start_consuming(self):
async with trio.open_nursery() as nursery:
for connector in self.connectors:
nursery.start_soon(connector.start_consuming)
2 changes: 2 additions & 0 deletions src/vumi2/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ async def setup(self):
event_handler=self.handle_event,
)

await self.start_consuming()

# TODO: Teardown

async def handle_inbound_message(self, message: Message):
Expand Down
3 changes: 3 additions & 0 deletions src/vumi2/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ async def aclose(self):
async def setup(self):
pass

async def start_consuming(self):
await self._connectors.start_consuming()

async def setup_receive_inbound_connector(
self,
connector_name: str,
Expand Down
3 changes: 3 additions & 0 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ async def setup_ro(self, name: str) -> ROConn:
self._connectors.add(conn.conn)
return conn

async def start_consuming(self):
await self._connectors.start_consuming()


T = TypeVar("T")

Expand Down
3 changes: 3 additions & 0 deletions tests/test_routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ async def test_to_addr_router_event(to_addr_router, connector_factory):
sent_message_id=outbound.message_id,
)
ri_app1 = await connector_factory.setup_ri("app1")
await connector_factory.start_consuming()

await to_addr_router.handle_outbound_message(outbound)
await to_addr_router.handle_event(event)
Expand Down Expand Up @@ -99,6 +100,7 @@ async def test_to_addr_router_inbound(to_addr_router, connector_factory):
ri_app1 = await connector_factory.setup_ri("app1")
ri_app2 = await connector_factory.setup_ri("app2")
ro_test1 = await connector_factory.setup_ro("test1")
await connector_factory.start_consuming()

await ro_test1.publish_inbound(msg1)
await ro_test1.publish_inbound(msg2)
Expand Down Expand Up @@ -127,6 +129,7 @@ async def test_to_addr_router_outbound(to_addr_router, connector_factory):
ri_app1 = await connector_factory.setup_ri("app1")
ro_test1 = await connector_factory.setup_ro("test1")
ro_test2 = await connector_factory.setup_ro("test2")
await connector_factory.start_consuming()

await ri_app1.publish_outbound(msg1)
await ri_app1.publish_outbound(msg2)
Expand Down
4 changes: 4 additions & 0 deletions tests/test_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ async def setup(self):
self.allow_out, self.r_allow_out = open_memory_channel[None](0)
await self.setup_receive_inbound_connector("ri", self.handle_in, self.handle_ev)
await self.setup_receive_outbound_connector("ro", self.handle_out)
await self.start_consuming()

async def aclose(self):
await super().aclose()
Expand Down Expand Up @@ -185,6 +186,7 @@ async def test_aclose_pending_inbound(nursery, worker, connector_factory):
blocked until the handler finishes.
"""
ro_ri = await connector_factory.setup_ro("ri")
await connector_factory.start_consuming()
await worker.setup()
assert not worker.is_closed

Expand Down Expand Up @@ -212,6 +214,7 @@ async def test_aclose_pending_event(nursery, worker, connector_factory):
the handler finishes.
"""
ro_ri = await connector_factory.setup_ro("ri")
await connector_factory.start_consuming()
await worker.setup()
assert not worker.is_closed

Expand Down Expand Up @@ -239,6 +242,7 @@ async def test_aclose_pending_outbound(nursery, worker, connector_factory):
blocked until the handler finishes.
"""
ri_ro = await connector_factory.setup_ri("ro")
await connector_factory.start_consuming()
await worker.setup()
assert not worker.is_closed

Expand Down

0 comments on commit 15037f8

Please sign in to comment.