Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prometheus metrics. #17

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ COPY --chown=appuser:appuser requirements.txt /home/appuser/requirements.txt
RUN pip install --no-cache-dir -r requirements.txt --user
COPY --chown=appuser:appuser --chmod=0644 rule-schema.json /home/appuser/rule-schema.json
COPY --chown=appuser:appuser --chmod=0755 router.py /home/appuser/router.py
COPY --chown=appuser:appuser --chmod=0755 replay-dlq.py /home/appuser/replay-dlq.py
COPY --chown=appuser:appuser --chmod=0755 replay_dlq.py /home/appuser/replay-dlq.py

ENTRYPOINT [ "/home/appuser/router.py" ]
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ tag:
test:
docker compose -f tests/resources/docker-compose.yaml up -d --wait
sleep 10
PYTHONPATH=. pytest --timeout=5
PYTHONPATH=. pytest --timeout=15
0 replay-dlq.py → replay_dlq.py
100644 → 100755
File renamed without changes.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
azure-core
jmespath
jsonschema
prometheus-client
python-qpid-proton
21 changes: 21 additions & 0 deletions router.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@
import jsonschema
import jsonschema.exceptions
from azure.core.utils import parse_connection_string
from prometheus_client import Counter, Summary, start_http_server
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

PROCESSING_TIME = Summary('message_processing_seconds', 'The time spent processing messages.')
DLQ_COUNT = Counter('dlq_message_count', 'The number of messages sent to the DLQ.')


def get_logger(logger_name: str, log_level=os.getenv('LOG_LEVEL', 'WARN')) -> logging.Logger:
"""
Expand Down Expand Up @@ -554,6 +558,20 @@ def get_prefixed_values(self, prefix: str) -> list:

return response

def get_prometheus_port(self) -> int:
"""
Get the prometheus port.

If no port is specified, default to 8000.

Returns
-------
int
The port to be used with Prometheus.
"""
port = self._environ.get('ROUTER_PROMETHEUS_PORT', '8000')
return int(port)

def get_rules(self) -> list[RouterRule]:
"""
Extract a list of routing rules from the environment.
Expand Down Expand Up @@ -660,6 +678,7 @@ def __init__(self, config: EnvironmentConfigParser):
self.sources = []
self.source_namespace_url = config.get_source_url()
self.parse_config_data()
start_http_server(config.get_prometheus_port())

def forward_message(self, message: Message, destination_namespaces: list, destination_topics: list):
"""
Expand Down Expand Up @@ -701,6 +720,7 @@ def handle_dlq_message(self, message: Message, source_topic_name: str) -> None:
message.properties = properties
message.send(self.senders['DLQ'])
logger.warning('Message sent to the DLQ.')
DLQ_COUNT.inc()

def on_message(self, event):
"""Handle a message event."""
Expand Down Expand Up @@ -767,6 +787,7 @@ def parse_config_data(self) -> dict:

self.sources = list(set(sources))

@PROCESSING_TIME.time()
def process_message(self, source_topic: str, message: Message) -> int:
"""
Process the received message asynchronously.
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[coverage:report]
fail_under = 4
fail_under = 70
show_missing = True

[coverage:run]
Expand Down
5 changes: 5 additions & 0 deletions tests/features/data-flow.feature
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ Scenario Outline: Inject a Message and Confirm the Destination
| input-4.json | topic.2 | ie.topic |
| input-5.json | topic.1 | N/A |
| input-6.json | topic.2 | dlq |

Scenario: Replay DLQ Message
Given the landing Service Bus Emulator
When the DLQ messags are replayed
Then the DLQ count is 2
1 change: 1 addition & 0 deletions tests/features/environment-config-parser.feature
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ Feature: Environment Configuration Parser
Examples:
| key | value | method_name |
| ROUTER_DLQ_TOPIC | DLQ | get_dead_letter_queue |
| ROUTER_PROMETHEUS_PORT | 8042 | get_prometheus_port |
| ROUTER_SOURCE_CONNECTION_STRING | Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE; | get_source_url |
2 changes: 2 additions & 0 deletions tests/resources/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ services:
ROUTER_RULE_IE_TELNO: '{"destination_namespaces":"IE","destination_topics":"ie.topic","jmespath":"details[?telephone_number].telephone_number","regexp":"^\\+353","source_subscription":"test","source_topic":"topic.2"}'
ROUTER_SOURCE_CONNECTION_STRING: "Endpoint=sb://emulator;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
image: router:latest
ports:
- "8000:8000"

sqledge:
container_name: "sqledge"
Expand Down
34 changes: 34 additions & 0 deletions tests/step_defs/test_data_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import time

import pytest
import testinfra
from proton import Message
from proton._exceptions import Timeout
from proton.utils import BlockingConnection
from pytest_bdd import given, parsers, scenario, then, when

from replay_dlq import Container, DLQReplayHandler, RuntimeParams
from router import ConnectionStringHelper, get_logger

logger = get_logger(__name__, logging.DEBUG)
Expand All @@ -18,6 +20,11 @@ def test_inject_a_message_and_confirm_the_destination():
"""Inject a Message and Confirm the Destination."""


@scenario('data-flow.feature', 'Replay DLQ Message')
def test_replay_dlq_message():
"""Replay DLQ Message."""


@given(parsers.parse('the input topic is {input_topic}'), target_fixture='input_topic')
def _(input_topic: str):
"""the input topic is <input_topic>."""
Expand Down Expand Up @@ -50,6 +57,25 @@ def _(output_topic: str):
return output_topic


@when('the DLQ messags are replayed')
def _(aqmp_url: str):
"""the DLQ messags are replayed."""
runtime = RuntimeParams(
[
'--connection-string', aqmp_url,
'--name', 'dlq',
'--subscription', 'dlq_replay'
]
)
replayer = DLQReplayHandler(
f'{runtime.dlq_topic_name}/Subscriptions/{runtime.subscription}',
aqmp_url,
5,
logger
)
Container(replayer).run()


@when('the input message is sent')
def _(aqmp_url: str, input_topic: str, output_topic: str, message_body: str):
"""the input message is sent."""
Expand All @@ -60,6 +86,14 @@ def _(aqmp_url: str, input_topic: str, output_topic: str, message_body: str):
pytest.skip(f'Output topic is "{output_topic}".')


@then('the DLQ count is 2')
def _():
"""the DLQ count is 2."""
host = testinfra.get_host('docker://router')
cmd = host.run('curl localhost:8000')
assert 'dlq_message_count_total 2.0' in cmd.stdout


def is_message_valid(message: Message, expected_body: str, topic_name: str) -> bool:
"""
Check the validity of the received message.
Expand Down
3 changes: 3 additions & 0 deletions tests/step_defs/test_env_config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def _(method_name: str, expected_value: str, environ: dict):
elif method_name == 'get_source_url':
actual_value = widget.get_source_url()
expected_value = 'amqps://RootManageSharedAccessKey:SAS_KEY_VALUE@localhost:5671'
elif method_name == 'get_prometheus_port':
expected_value = int(expected_value)
actual_value = widget.get_prometheus_port()
else:
raise NotImplementedError(f'No method name "{method_name}".')

Expand Down
Loading