From de3a995a6213c5a392f94478363ed96d24708c5f Mon Sep 17 00:00:00 2001 From: Erik Harding Date: Thu, 15 Feb 2024 16:10:35 +0200 Subject: [PATCH 1/5] Add default app to router --- src/vumi2/routers.py | 36 ++++++++++---- tests/helpers.py | 6 +++ tests/test_routers.py | 108 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 140 insertions(+), 10 deletions(-) diff --git a/src/vumi2/routers.py b/src/vumi2/routers.py index 858c04c..8bd1458 100644 --- a/src/vumi2/routers.py +++ b/src/vumi2/routers.py @@ -21,6 +21,7 @@ class ToAddressRouterConfig(BaseConfig): to_address_mappings: dict[str, str] = Factory(dict) message_cache_class: str = "vumi2.message_caches.MemoryMessageCache" message_cache_config: dict = Factory(dict) + default_app: str = None class ToAddressRouter(BaseWorker): @@ -49,6 +50,12 @@ async def setup(self): connector_name=name, outbound_handler=self.handle_outbound_message ) + if self.config.default_app: + self.default_connector = await self.setup_receive_outbound_connector( + connector_name=self.config.default_app, + outbound_handler=self.handle_outbound_message, + ) + for name in self.config.transport_names: await self.setup_receive_inbound_connector( connector_name=name, @@ -60,12 +67,24 @@ async def setup(self): # TODO: Teardown + async def _get_matched_mapping_names(self, addr): + matched_names = [] + for name, pattern in self.mappings: + if pattern.match(addr): + matched_names.append(name) + + if matched_names == [] and self.config.default_app: + matched_names.append(self.config.default_app) + + return matched_names + async def handle_inbound_message(self, message: Message): logger.debug("Processing inbound message %s", message) - for name, pattern in self.mappings: - if pattern.match(message.to_addr): - logger.debug("Routing inbound message to %s", name) - await self.receive_outbound_connectors[name].publish_inbound(message) + + matched_names = await self._get_matched_mapping_names(message.to_addr) + for name in matched_names: + logger.debug("Routing inbound message to %s", name) + await self.receive_outbound_connectors[name].publish_inbound(message) async def handle_event(self, event: Event): logger.debug("Processing event %s", event) @@ -73,10 +92,11 @@ async def handle_event(self, event: Event): if outbound is None: logger.info("Cannot find outbound for event %s, not routing", event) return - for name, pattern in self.mappings: - if pattern.match(outbound.from_addr): - logger.debug("Routing event to %s", name) - await self.receive_outbound_connectors[name].publish_event(event) + + matched_names = await self._get_matched_mapping_names(outbound.from_addr) + for name in matched_names: + logger.debug("Routing event to %s", name) + await self.receive_outbound_connectors[name].publish_event(event) async def handle_outbound_message(self, message: Message): logger.debug("Processing outbound message %s", message) diff --git a/tests/helpers.py b/tests/helpers.py index 1479e0e..6e35e59 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -94,9 +94,15 @@ async def create(cls, nursery: Nursery, amqp: AmqpProtocol, name: str): async def consume_inbound(self) -> Message: return await self._recv_in.receive() + async def consume_inbound_nowait(self) -> Message: + return await self._recv_in.receive_nowait() + async def consume_event(self) -> Event: return await self._recv_ev.receive() + async def consume_event_nowait(self) -> Event: + return await self._recv_ev.receive_nowait() + async def publish_outbound(self, message: Message): return await self.conn.publish_outbound(message) diff --git a/tests/test_routers.py b/tests/test_routers.py index ac13c7b..e73f876 100644 --- a/tests/test_routers.py +++ b/tests/test_routers.py @@ -1,5 +1,5 @@ import pytest -from trio import open_memory_channel +from trio import WouldBlock, open_memory_channel from vumi2.messages import Event, EventType, Message, MessageType, TransportType from vumi2.routers import ToAddressRouter @@ -10,6 +10,7 @@ "app1": r"^1", "app2": r"^2", }, + "default_app": "app3", } @@ -19,6 +20,14 @@ async def to_addr_router(worker_factory): yield worker +@pytest.fixture() +async def to_addr_router_no_default(worker_factory): + new_config = TEST_CONFIG.copy() + del new_config["default_app"] + async with worker_factory.with_cleanup(ToAddressRouter, new_config) as worker: + yield worker + + def msg_ch_pair(bufsize: int): return open_memory_channel[MessageType](bufsize) @@ -33,7 +42,7 @@ async def test_to_addr_router_setup(to_addr_router): """ await to_addr_router.setup() receive_inbound_connectors = ["test1", "test2"] - receive_outbound_connectors = ["app1", "app2"] + receive_outbound_connectors = ["app1", "app2", "app3"] assert set(to_addr_router.receive_inbound_connectors.keys()) == set( receive_inbound_connectors ) @@ -79,6 +88,59 @@ async def test_to_addr_router_event(to_addr_router, connector_factory): assert event == await ri_app1.consume_event() +async def test_to_addr_router_event_default_app(to_addr_router, connector_factory): + """ + Events that have an outbound in the store should be routed on the default app if + there is one configured and no mappings match + """ + await to_addr_router.setup() + outbound = Message( + to_addr="+27820001001", + from_addr="33345", + transport_name="test1", + transport_type=TransportType.SMS, + ) + event = Event( + user_message_id=outbound.message_id, + event_type=EventType.ACK, + sent_message_id=outbound.message_id, + ) + ri_app3 = await connector_factory.setup_ri("app3") + + await to_addr_router.handle_outbound_message(outbound) + await to_addr_router.handle_event(event) + + assert event == await ri_app3.consume_event() + + +async def test_to_addr_router_event_no_default( + to_addr_router_no_default, connector_factory +): + """ + Events that have an outbound in the store should not be routed if they don't match + any mappings and there is no default app + """ + await to_addr_router_no_default.setup() + outbound = Message( + to_addr="+27820001001", + from_addr="33345", + transport_name="test1", + transport_type=TransportType.SMS, + ) + event = Event( + user_message_id=outbound.message_id, + event_type=EventType.ACK, + sent_message_id=outbound.message_id, + ) + ri_app3 = await connector_factory.setup_ri("app3") + + await to_addr_router_no_default.handle_outbound_message(outbound) + await to_addr_router_no_default.handle_event(event) + + with pytest.raises(WouldBlock): + await ri_app3.consume_event_nowait() + + async def test_to_addr_router_inbound(to_addr_router, connector_factory): """ Should be routed according to the to address @@ -96,15 +158,57 @@ async def test_to_addr_router_inbound(to_addr_router, connector_factory): transport_name="test", transport_type=TransportType.SMS, ) + msg3 = Message( + to_addr="33345", + from_addr="54321", + transport_name="test", + transport_type=TransportType.SMS, + ) ri_app1 = await connector_factory.setup_ri("app1") ri_app2 = await connector_factory.setup_ri("app2") + ri_app3 = await connector_factory.setup_ri("app3") ro_test1 = await connector_factory.setup_ro("test1") await ro_test1.publish_inbound(msg1) await ro_test1.publish_inbound(msg2) + await ro_test1.publish_inbound(msg3) assert msg1 == await ri_app1.consume_inbound() assert msg2 == await ri_app2.consume_inbound() + assert msg3 == await ri_app3.consume_inbound() + + +async def test_to_addr_router_inbound_no_default( + to_addr_router_no_default, connector_factory +): + """ + Should not be routed according if the to_addr doesn't match any mappings and there + isn't a default app configured. + """ + await to_addr_router_no_default.setup() + msg1 = Message( + to_addr="12345", + from_addr="54321", + transport_name="test", + transport_type=TransportType.SMS, + ) + msg2 = Message( + to_addr="33345", + from_addr="54321", + transport_name="test", + transport_type=TransportType.SMS, + ) + ri_app1 = await connector_factory.setup_ri("app1") + ri_app2 = await connector_factory.setup_ri("app2") + ro_test1 = await connector_factory.setup_ro("test1") + + await ro_test1.publish_inbound(msg1) + await ro_test1.publish_inbound(msg2) + + assert msg1 == await ri_app1.consume_inbound() + + with pytest.raises(WouldBlock): + await ri_app2.consume_inbound_nowait() async def test_to_addr_router_outbound(to_addr_router, connector_factory): From 47145baa52f36096ad3ffd6302b948a55d5354b6 Mon Sep 17 00:00:00 2001 From: Erik Harding Date: Thu, 15 Feb 2024 16:14:56 +0200 Subject: [PATCH 2/5] fix lint error --- src/vumi2/routers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vumi2/routers.py b/src/vumi2/routers.py index 8bd1458..ff7146d 100644 --- a/src/vumi2/routers.py +++ b/src/vumi2/routers.py @@ -21,7 +21,7 @@ class ToAddressRouterConfig(BaseConfig): to_address_mappings: dict[str, str] = Factory(dict) message_cache_class: str = "vumi2.message_caches.MemoryMessageCache" message_cache_config: dict = Factory(dict) - default_app: str = None + default_app: str | None = None class ToAddressRouter(BaseWorker): From a75d44fb806a9debcf63405a228c28fdb9e3e6ab Mon Sep 17 00:00:00 2001 From: Erik Harding Date: Thu, 15 Feb 2024 16:25:58 +0200 Subject: [PATCH 3/5] Update changelog --- CHANGELOG.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 982fcd0..466caca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,15 @@ # Changelog -## [0.1.1.dev0] - UNRELEASED +## [0.1.2.dev0] - UNRELEASED + +### Added + +### Fixed + +### Changed +- Add default app to ToAddressRouter (#60) + +## [0.1.1] - 2024-02-13 ### Added From f94f9efb0f6eb6456a59292e436a7db1c36179ce Mon Sep 17 00:00:00 2001 From: Erik Harding Date: Mon, 19 Feb 2024 08:39:06 +0200 Subject: [PATCH 4/5] minor default app fixes --- src/vumi2/routers.py | 9 ++++++--- tests/test_routers.py | 24 ++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/vumi2/routers.py b/src/vumi2/routers.py index ff7146d..a63f3b3 100644 --- a/src/vumi2/routers.py +++ b/src/vumi2/routers.py @@ -50,7 +50,10 @@ async def setup(self): connector_name=name, outbound_handler=self.handle_outbound_message ) - if self.config.default_app: + if ( + self.config.default_app + and self.config.default_app not in self.receive_outbound_connectors + ): self.default_connector = await self.setup_receive_outbound_connector( connector_name=self.config.default_app, outbound_handler=self.handle_outbound_message, @@ -73,8 +76,8 @@ async def _get_matched_mapping_names(self, addr): if pattern.match(addr): matched_names.append(name) - if matched_names == [] and self.config.default_app: - matched_names.append(self.config.default_app) + if self.config.default_app and not matched_names: + return [self.config.default_app] return matched_names diff --git a/tests/test_routers.py b/tests/test_routers.py index e73f876..2b96df8 100644 --- a/tests/test_routers.py +++ b/tests/test_routers.py @@ -28,6 +28,14 @@ async def to_addr_router_no_default(worker_factory): yield worker +@pytest.fixture() +async def to_addr_router_duplicate_default(worker_factory): + new_config = TEST_CONFIG.copy() + new_config["default_app"] = "app1" + async with worker_factory.with_cleanup(ToAddressRouter, new_config) as worker: + yield worker + + def msg_ch_pair(bufsize: int): return open_memory_channel[MessageType](bufsize) @@ -51,6 +59,22 @@ async def test_to_addr_router_setup(to_addr_router): ) +async def test_to_addr_router_setup_duplicate_default(to_addr_router_duplicate_default): + """ + It should not attempt to add a duplicate connector if the default app is one of the + to address mappings + """ + await to_addr_router_duplicate_default.setup() + receive_inbound_connectors = ["test1", "test2"] + receive_outbound_connectors = ["app1", "app2"] + assert set( + to_addr_router_duplicate_default.receive_inbound_connectors.keys() + ) == set(receive_inbound_connectors) + assert set( + to_addr_router_duplicate_default.receive_outbound_connectors.keys() + ) == set(receive_outbound_connectors) + + async def test_to_addr_router_event_no_routing(to_addr_router): """ Events that don't have an outbound in the store should be ignored From 6f3da6732148d0da4ada74cfe3679e4ddc8362e6 Mon Sep 17 00:00:00 2001 From: Erik Harding Date: Mon, 19 Feb 2024 09:55:45 +0200 Subject: [PATCH 5/5] small code cleanup --- src/vumi2/routers.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/vumi2/routers.py b/src/vumi2/routers.py index a63f3b3..b2da8eb 100644 --- a/src/vumi2/routers.py +++ b/src/vumi2/routers.py @@ -50,12 +50,10 @@ async def setup(self): connector_name=name, outbound_handler=self.handle_outbound_message ) - if ( - self.config.default_app - and self.config.default_app not in self.receive_outbound_connectors - ): - self.default_connector = await self.setup_receive_outbound_connector( - connector_name=self.config.default_app, + default_app = self.config.default_app + if default_app and default_app not in self.receive_outbound_connectors: + await self.setup_receive_outbound_connector( + connector_name=default_app, outbound_handler=self.handle_outbound_message, )