Skip to content

Commit

Permalink
Allow publish to telemetry topic
Browse files Browse the repository at this point in the history
The new version of the MQTT broker we use also filters published topics from clients,
so we need to accept the published telemetry data from the gateway.
  • Loading branch information
kruton committed May 30, 2022
1 parent 0238f4d commit 20e08b5
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 7 deletions.
20 changes: 17 additions & 3 deletions feeder/util/mqtt/topic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
import logging

from amqtt.broker import Action
from amqtt.plugins.topic_checking import BaseTopicPlugin
from feeder.util.mqtt.authentication import local_username

Expand All @@ -9,6 +10,7 @@

class PetnetTopicPlugin(BaseTopicPlugin):
feeder_sub_topic_regex = re.compile(r"krs/(api|cmd)/stg/(?P<gateway_id>.*)$")
feeder_pub_topic_regex = re.compile(r"krs.(api|tel).gts.(?P<gateway_id>.*)$")
username_regex = re.compile(r"^/pegasus:(?P<gateway_id>.*)$")

async def topic_filtering(
Expand All @@ -19,8 +21,11 @@ async def topic_filtering(
return False

session = kwargs.get("session", None)
action = kwargs.get("action", None)
topic = kwargs.get("topic", None)
logger.debug("username: %s, topic: %s", session.username, topic)
logger.debug(
"username: %s, action: %s, topic: %s", session.username, action, topic
)

if session.username == local_username:
return True
Expand All @@ -34,13 +39,22 @@ async def topic_filtering(

gateway_id = user_match.group("gateway_id")

topic_match = self.feeder_sub_topic_regex.match(topic)
if action == Action.subscribe:
topic_match = self.feeder_sub_topic_regex.match(topic)
elif action == Action.publish:
topic_match = self.feeder_pub_topic_regex.match(topic)
else:
logger.warning("Unhandled action %s", action)
return False

if not topic_match:
return False

target_id = topic_match.group("gateway_id")
if gateway_id != target_id:
logger.warning("Gateway %s tried to subscribe to %s", gateway_id, target_id)
logger.warning(
"Gateway %s tried to %s to %s", gateway_id, action, target_id
)
return False

return True
38 changes: 34 additions & 4 deletions tests/test_utils/test_mqtt_plugins.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import logging
from amqtt.broker import Action


class MockSession:
Expand Down Expand Up @@ -145,10 +146,19 @@ async def test_mqtt_topic_plugin_device_correct_topic():
mock_context = MockContext()
mock_session = MockSession(username=f"/pegasus:{SAMPLE_GATEWAY_HID}")
assert await PetnetTopicPlugin(mock_context).topic_filtering(
session=mock_session, topic=f"krs/api/stg/{SAMPLE_GATEWAY_HID}"
session=mock_session,
action=Action.subscribe,
topic=f"krs/api/stg/{SAMPLE_GATEWAY_HID}",
)
assert await PetnetTopicPlugin(mock_context).topic_filtering(
session=mock_session, topic=f"krs/cmd/stg/{SAMPLE_GATEWAY_HID}"
session=mock_session,
action=Action.subscribe,
topic=f"krs/cmd/stg/{SAMPLE_GATEWAY_HID}",
)
assert await PetnetTopicPlugin(mock_context).topic_filtering(
session=mock_session,
action=Action.publish,
topic=f"krs.tel.gts.{SAMPLE_GATEWAY_HID}",
)


Expand All @@ -160,7 +170,10 @@ async def test_mqtt_topic_plugin_device_wrong_topic_pattern_match():
mock_context = MockContext()
mock_session = MockSession(username=f"/pegasus:{SAMPLE_GATEWAY_HID}")
assert not await PetnetTopicPlugin(mock_context).topic_filtering(
session=mock_session, topic="krs/cmd/stg/wrong-hid"
session=mock_session, action=Action.subscribe, topic="krs/cmd/stg/wrong-hid"
)
assert not await PetnetTopicPlugin(mock_context).topic_filtering(
session=mock_session, action=Action.publish, topic="krs.tel.gts.wrong-hid"
)


Expand All @@ -172,5 +185,22 @@ async def test_mqtt_topic_plugin_device_wrong_topic_pattern_miss():
mock_context = MockContext()
mock_session = MockSession(username=f"/pegasus:{SAMPLE_GATEWAY_HID}")
assert not await PetnetTopicPlugin(mock_context).topic_filtering(
session=mock_session, topic="wrong-hid"
session=mock_session, action=Action.subscribe, topic="wrong-hid"
)
assert not await PetnetTopicPlugin(mock_context).topic_filtering(
session=mock_session, action=Action.publish, topic="wrong-hid"
)


@pytest.mark.asyncio
async def test_mqtt_topic_plugin_device_unknown_action_miss():
from feeder.util.mqtt.topic import PetnetTopicPlugin
from tests.test_database_models import SAMPLE_GATEWAY_HID

mock_context = MockContext()
mock_session = MockSession(username=f"/pegasus:{SAMPLE_GATEWAY_HID}")
assert not await PetnetTopicPlugin(mock_context).topic_filtering(
session=mock_session,
action="explode",
topic=f"krs.tel.gts.{SAMPLE_GATEWAY_HID}",
)

0 comments on commit 20e08b5

Please sign in to comment.