From f0f0508dc6b593cd227263cd54d5167510532a11 Mon Sep 17 00:00:00 2001 From: Ben Dalling Date: Tue, 24 Dec 2024 07:44:12 +0000 Subject: [PATCH] new: Add Prometheus metrics. --- Dockerfile | 2 +- Makefile | 2 +- replay-dlq.py => replay_dlq.py | 0 requirements.txt | 1 + router.py | 21 ++++++++++++ setup.cfg | 2 +- tests/features/data-flow.feature | 5 +++ .../environment-config-parser.feature | 1 + tests/resources/docker-compose.yaml | 2 ++ tests/step_defs/test_data_flow.py | 34 +++++++++++++++++++ tests/step_defs/test_env_config_parser.py | 3 ++ 11 files changed, 70 insertions(+), 3 deletions(-) rename replay-dlq.py => replay_dlq.py (100%) mode change 100644 => 100755 diff --git a/Dockerfile b/Dockerfile index b59edad..43f2ac3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,6 @@ COPY --chown=appuser:appuser requirements.txt /home/appuser/requirements.txt RUN pip install --no-cache-dir -r requirements.txt --user COPY --chown=appuser:appuser --chmod=0644 rule-schema.json /home/appuser/rule-schema.json COPY --chown=appuser:appuser --chmod=0755 router.py /home/appuser/router.py -COPY --chown=appuser:appuser --chmod=0755 replay-dlq.py /home/appuser/replay-dlq.py +COPY --chown=appuser:appuser --chmod=0755 replay_dlq.py /home/appuser/replay-dlq.py ENTRYPOINT [ "/home/appuser/router.py" ] diff --git a/Makefile b/Makefile index 753f95b..6a56421 100644 --- a/Makefile +++ b/Makefile @@ -22,4 +22,4 @@ tag: test: docker compose -f tests/resources/docker-compose.yaml up -d --wait sleep 10 - PYTHONPATH=. pytest --timeout=5 + PYTHONPATH=. pytest --timeout=15 diff --git a/replay-dlq.py b/replay_dlq.py old mode 100644 new mode 100755 similarity index 100% rename from replay-dlq.py rename to replay_dlq.py diff --git a/requirements.txt b/requirements.txt index 3a82d83..e260247 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ azure-core jmespath jsonschema +prometheus-client python-qpid-proton diff --git a/router.py b/router.py index 62e1600..c54d10d 100644 --- a/router.py +++ b/router.py @@ -45,10 +45,14 @@ import jsonschema import jsonschema.exceptions from azure.core.utils import parse_connection_string +from prometheus_client import Counter, Summary, start_http_server from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container +PROCESSING_TIME = Summary('message_processing_seconds', 'The time spent processing messages.') +DLQ_COUNT = Counter('dlq_message_count', 'The number of messages sent to the DLQ.') + def get_logger(logger_name: str, log_level=os.getenv('LOG_LEVEL', 'WARN')) -> logging.Logger: """ @@ -554,6 +558,20 @@ def get_prefixed_values(self, prefix: str) -> list: return response + def get_prometheus_port(self) -> int: + """ + Get the prometheus port. + + If no port is specified, default to 8000. + + Returns + ------- + int + The port to be used with Prometheus. + """ + port = self._environ.get('ROUTER_PROMETHEUS_PORT', '8000') + return int(port) + def get_rules(self) -> list[RouterRule]: """ Extract a list of routing rules from the environment. @@ -660,6 +678,7 @@ def __init__(self, config: EnvironmentConfigParser): self.sources = [] self.source_namespace_url = config.get_source_url() self.parse_config_data() + start_http_server(config.get_prometheus_port()) def forward_message(self, message: Message, destination_namespaces: list, destination_topics: list): """ @@ -701,6 +720,7 @@ def handle_dlq_message(self, message: Message, source_topic_name: str) -> None: message.properties = properties message.send(self.senders['DLQ']) logger.warning('Message sent to the DLQ.') + DLQ_COUNT.inc() def on_message(self, event): """Handle a message event.""" @@ -767,6 +787,7 @@ def parse_config_data(self) -> dict: self.sources = list(set(sources)) + @PROCESSING_TIME.time() def process_message(self, source_topic: str, message: Message) -> int: """ Process the received message asynchronously. diff --git a/setup.cfg b/setup.cfg index 8af08b7..a7f06e9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [coverage:report] -fail_under = 4 +fail_under = 70 show_missing = True [coverage:run] diff --git a/tests/features/data-flow.feature b/tests/features/data-flow.feature index 82b9ff6..d839055 100644 --- a/tests/features/data-flow.feature +++ b/tests/features/data-flow.feature @@ -19,3 +19,8 @@ Scenario Outline: Inject a Message and Confirm the Destination | input-4.json | topic.2 | ie.topic | | input-5.json | topic.1 | N/A | | input-6.json | topic.2 | dlq | + +Scenario: Replay DLQ Message + Given the landing Service Bus Emulator + When the DLQ messags are replayed + Then the DLQ count is 2 diff --git a/tests/features/environment-config-parser.feature b/tests/features/environment-config-parser.feature index c9d81c9..9e6b466 100644 --- a/tests/features/environment-config-parser.feature +++ b/tests/features/environment-config-parser.feature @@ -21,4 +21,5 @@ Feature: Environment Configuration Parser Examples: | key | value | method_name | | ROUTER_DLQ_TOPIC | DLQ | get_dead_letter_queue | + | ROUTER_PROMETHEUS_PORT | 8042 | get_prometheus_port | | ROUTER_SOURCE_CONNECTION_STRING | Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE; | get_source_url | diff --git a/tests/resources/docker-compose.yaml b/tests/resources/docker-compose.yaml index 173df74..9612e08 100644 --- a/tests/resources/docker-compose.yaml +++ b/tests/resources/docker-compose.yaml @@ -35,6 +35,8 @@ services: ROUTER_RULE_IE_TELNO: '{"destination_namespaces":"IE","destination_topics":"ie.topic","jmespath":"details[?telephone_number].telephone_number","regexp":"^\\+353","source_subscription":"test","source_topic":"topic.2"}' ROUTER_SOURCE_CONNECTION_STRING: "Endpoint=sb://emulator;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;" image: router:latest + ports: + - "8000:8000" sqledge: container_name: "sqledge" diff --git a/tests/step_defs/test_data_flow.py b/tests/step_defs/test_data_flow.py index 2b27185..48d758e 100644 --- a/tests/step_defs/test_data_flow.py +++ b/tests/step_defs/test_data_flow.py @@ -3,11 +3,13 @@ import time import pytest +import testinfra from proton import Message from proton._exceptions import Timeout from proton.utils import BlockingConnection from pytest_bdd import given, parsers, scenario, then, when +from replay_dlq import Container, DLQReplayHandler, RuntimeParams from router import ConnectionStringHelper, get_logger logger = get_logger(__name__, logging.DEBUG) @@ -18,6 +20,11 @@ def test_inject_a_message_and_confirm_the_destination(): """Inject a Message and Confirm the Destination.""" +@scenario('data-flow.feature', 'Replay DLQ Message') +def test_replay_dlq_message(): + """Replay DLQ Message.""" + + @given(parsers.parse('the input topic is {input_topic}'), target_fixture='input_topic') def _(input_topic: str): """the input topic is .""" @@ -50,6 +57,25 @@ def _(output_topic: str): return output_topic +@when('the DLQ messags are replayed') +def _(aqmp_url: str): + """the DLQ messags are replayed.""" + runtime = RuntimeParams( + [ + '--connection-string', aqmp_url, + '--name', 'dlq', + '--subscription', 'dlq_replay' + ] + ) + replayer = DLQReplayHandler( + f'{runtime.dlq_topic_name}/Subscriptions/{runtime.subscription}', + aqmp_url, + 5, + logger + ) + Container(replayer).run() + + @when('the input message is sent') def _(aqmp_url: str, input_topic: str, output_topic: str, message_body: str): """the input message is sent.""" @@ -60,6 +86,14 @@ def _(aqmp_url: str, input_topic: str, output_topic: str, message_body: str): pytest.skip(f'Output topic is "{output_topic}".') +@then('the DLQ count is 2') +def _(): + """the DLQ count is 2.""" + host = testinfra.get_host('docker://router') + cmd = host.run('curl localhost:8000') + assert 'dlq_message_count_total 2.0' in cmd.stdout + + def is_message_valid(message: Message, expected_body: str, topic_name: str) -> bool: """ Check the validity of the received message. diff --git a/tests/step_defs/test_env_config_parser.py b/tests/step_defs/test_env_config_parser.py index 7860bb5..039c7f0 100644 --- a/tests/step_defs/test_env_config_parser.py +++ b/tests/step_defs/test_env_config_parser.py @@ -53,6 +53,9 @@ def _(method_name: str, expected_value: str, environ: dict): elif method_name == 'get_source_url': actual_value = widget.get_source_url() expected_value = 'amqps://RootManageSharedAccessKey:SAS_KEY_VALUE@localhost:5671' + elif method_name == 'get_prometheus_port': + expected_value = int(expected_value) + actual_value = widget.get_prometheus_port() else: raise NotImplementedError(f'No method name "{method_name}".')