Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default app to router #60

Merged
merged 5 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Changelog

## [0.1.1.dev0] - UNRELEASED
## [0.1.2.dev0] - UNRELEASED

### Added

### Fixed

### Changed
- Add default app to ToAddressRouter (#60)

## [0.1.1] - 2024-02-13

### Added

Expand Down
37 changes: 29 additions & 8 deletions src/vumi2/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ToAddressRouterConfig(BaseConfig):
to_address_mappings: dict[str, str] = Factory(dict)
message_cache_class: str = "vumi2.message_caches.MemoryMessageCache"
message_cache_config: dict = Factory(dict)
default_app: str | None = None


class ToAddressRouter(BaseWorker):
Expand Down Expand Up @@ -49,6 +50,13 @@ async def setup(self):
connector_name=name, outbound_handler=self.handle_outbound_message
)

default_app = self.config.default_app
if default_app and default_app not in self.receive_outbound_connectors:
await self.setup_receive_outbound_connector(
connector_name=default_app,
outbound_handler=self.handle_outbound_message,
)
jerith marked this conversation as resolved.
Show resolved Hide resolved

for name in self.config.transport_names:
await self.setup_receive_inbound_connector(
connector_name=name,
Expand All @@ -60,23 +68,36 @@ async def setup(self):

# TODO: Teardown

async def _get_matched_mapping_names(self, addr):
matched_names = []
for name, pattern in self.mappings:
if pattern.match(addr):
matched_names.append(name)

if self.config.default_app and not matched_names:
return [self.config.default_app]

return matched_names

async def handle_inbound_message(self, message: Message):
logger.debug("Processing inbound message %s", message)
for name, pattern in self.mappings:
if pattern.match(message.to_addr):
logger.debug("Routing inbound message to %s", name)
await self.receive_outbound_connectors[name].publish_inbound(message)

matched_names = await self._get_matched_mapping_names(message.to_addr)
for name in matched_names:
logger.debug("Routing inbound message to %s", name)
await self.receive_outbound_connectors[name].publish_inbound(message)

async def handle_event(self, event: Event):
logger.debug("Processing event %s", event)
outbound = await self.message_cache.fetch_outbound(event.user_message_id)
if outbound is None:
logger.info("Cannot find outbound for event %s, not routing", event)
return
for name, pattern in self.mappings:
if pattern.match(outbound.from_addr):
logger.debug("Routing event to %s", name)
await self.receive_outbound_connectors[name].publish_event(event)

matched_names = await self._get_matched_mapping_names(outbound.from_addr)
for name in matched_names:
logger.debug("Routing event to %s", name)
await self.receive_outbound_connectors[name].publish_event(event)

async def handle_outbound_message(self, message: Message):
logger.debug("Processing outbound message %s", message)
Expand Down
6 changes: 6 additions & 0 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,15 @@ async def create(cls, nursery: Nursery, amqp: AmqpProtocol, name: str):
async def consume_inbound(self) -> Message:
return await self._recv_in.receive()

async def consume_inbound_nowait(self) -> Message:
return await self._recv_in.receive_nowait()

async def consume_event(self) -> Event:
return await self._recv_ev.receive()

async def consume_event_nowait(self) -> Event:
return await self._recv_ev.receive_nowait()

async def publish_outbound(self, message: Message):
return await self.conn.publish_outbound(message)

Expand Down
132 changes: 130 additions & 2 deletions tests/test_routers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from trio import open_memory_channel
from trio import WouldBlock, open_memory_channel

from vumi2.messages import Event, EventType, Message, MessageType, TransportType
from vumi2.routers import ToAddressRouter
Expand All @@ -10,6 +10,7 @@
"app1": r"^1",
"app2": r"^2",
},
"default_app": "app3",
}


Expand All @@ -19,6 +20,22 @@ async def to_addr_router(worker_factory):
yield worker


@pytest.fixture()
async def to_addr_router_no_default(worker_factory):
new_config = TEST_CONFIG.copy()
del new_config["default_app"]
async with worker_factory.with_cleanup(ToAddressRouter, new_config) as worker:
yield worker


@pytest.fixture()
async def to_addr_router_duplicate_default(worker_factory):
new_config = TEST_CONFIG.copy()
new_config["default_app"] = "app1"
async with worker_factory.with_cleanup(ToAddressRouter, new_config) as worker:
yield worker


def msg_ch_pair(bufsize: int):
return open_memory_channel[MessageType](bufsize)

Expand All @@ -33,7 +50,7 @@ async def test_to_addr_router_setup(to_addr_router):
"""
await to_addr_router.setup()
receive_inbound_connectors = ["test1", "test2"]
receive_outbound_connectors = ["app1", "app2"]
receive_outbound_connectors = ["app1", "app2", "app3"]
assert set(to_addr_router.receive_inbound_connectors.keys()) == set(
receive_inbound_connectors
)
Expand All @@ -42,6 +59,22 @@ async def test_to_addr_router_setup(to_addr_router):
)


async def test_to_addr_router_setup_duplicate_default(to_addr_router_duplicate_default):
"""
It should not attempt to add a duplicate connector if the default app is one of the
to address mappings
"""
await to_addr_router_duplicate_default.setup()
receive_inbound_connectors = ["test1", "test2"]
receive_outbound_connectors = ["app1", "app2"]
assert set(
to_addr_router_duplicate_default.receive_inbound_connectors.keys()
) == set(receive_inbound_connectors)
assert set(
to_addr_router_duplicate_default.receive_outbound_connectors.keys()
) == set(receive_outbound_connectors)


async def test_to_addr_router_event_no_routing(to_addr_router):
"""
Events that don't have an outbound in the store should be ignored
Expand Down Expand Up @@ -79,6 +112,59 @@ async def test_to_addr_router_event(to_addr_router, connector_factory):
assert event == await ri_app1.consume_event()


async def test_to_addr_router_event_default_app(to_addr_router, connector_factory):
"""
Events that have an outbound in the store should be routed on the default app if
there is one configured and no mappings match
"""
await to_addr_router.setup()
outbound = Message(
to_addr="+27820001001",
from_addr="33345",
transport_name="test1",
transport_type=TransportType.SMS,
)
event = Event(
user_message_id=outbound.message_id,
event_type=EventType.ACK,
sent_message_id=outbound.message_id,
)
ri_app3 = await connector_factory.setup_ri("app3")

await to_addr_router.handle_outbound_message(outbound)
await to_addr_router.handle_event(event)

assert event == await ri_app3.consume_event()


async def test_to_addr_router_event_no_default(
to_addr_router_no_default, connector_factory
):
"""
Events that have an outbound in the store should not be routed if they don't match
any mappings and there is no default app
"""
await to_addr_router_no_default.setup()
outbound = Message(
to_addr="+27820001001",
from_addr="33345",
transport_name="test1",
transport_type=TransportType.SMS,
)
event = Event(
user_message_id=outbound.message_id,
event_type=EventType.ACK,
sent_message_id=outbound.message_id,
)
ri_app3 = await connector_factory.setup_ri("app3")

await to_addr_router_no_default.handle_outbound_message(outbound)
await to_addr_router_no_default.handle_event(event)

with pytest.raises(WouldBlock):
await ri_app3.consume_event_nowait()


async def test_to_addr_router_inbound(to_addr_router, connector_factory):
"""
Should be routed according to the to address
Expand All @@ -96,15 +182,57 @@ async def test_to_addr_router_inbound(to_addr_router, connector_factory):
transport_name="test",
transport_type=TransportType.SMS,
)
msg3 = Message(
to_addr="33345",
from_addr="54321",
transport_name="test",
transport_type=TransportType.SMS,
)
ri_app1 = await connector_factory.setup_ri("app1")
ri_app2 = await connector_factory.setup_ri("app2")
ri_app3 = await connector_factory.setup_ri("app3")
ro_test1 = await connector_factory.setup_ro("test1")

await ro_test1.publish_inbound(msg1)
await ro_test1.publish_inbound(msg2)
await ro_test1.publish_inbound(msg3)

assert msg1 == await ri_app1.consume_inbound()
assert msg2 == await ri_app2.consume_inbound()
assert msg3 == await ri_app3.consume_inbound()


async def test_to_addr_router_inbound_no_default(
to_addr_router_no_default, connector_factory
):
"""
Should not be routed according if the to_addr doesn't match any mappings and there
isn't a default app configured.
"""
await to_addr_router_no_default.setup()
msg1 = Message(
to_addr="12345",
from_addr="54321",
transport_name="test",
transport_type=TransportType.SMS,
)
msg2 = Message(
to_addr="33345",
from_addr="54321",
transport_name="test",
transport_type=TransportType.SMS,
)
ri_app1 = await connector_factory.setup_ri("app1")
ri_app2 = await connector_factory.setup_ri("app2")
ro_test1 = await connector_factory.setup_ro("test1")

await ro_test1.publish_inbound(msg1)
await ro_test1.publish_inbound(msg2)

assert msg1 == await ri_app1.consume_inbound()

with pytest.raises(WouldBlock):
await ri_app2.consume_inbound_nowait()


async def test_to_addr_router_outbound(to_addr_router, connector_factory):
Expand Down
Loading