From 4a5c24c1673e2bb87944cb109921790648d6d980 Mon Sep 17 00:00:00 2001 From: Oleg Basov Date: Mon, 11 Nov 2024 23:40:07 +0100 Subject: [PATCH] Reconnect to Rabbit server on network issues (#52) * Fetcher reconnect on network issues * No such method for robust connection * Use robust connection to avoid dead channel exchange * Re-init channel in case of reconnections * Revert back * Revert "Revert back" This reverts commit 8afa220692a572c47710a8fc052ee067750bde23. --- src/fetcher/__main__.py | 5 +-- src/fetcher/messaging.py | 60 +++++++++++++++++++++++++++----- src/fetcher/metrics_collector.py | 5 ++- 3 files changed, 58 insertions(+), 12 deletions(-) diff --git a/src/fetcher/__main__.py b/src/fetcher/__main__.py index 94b5121..507bbb0 100644 --- a/src/fetcher/__main__.py +++ b/src/fetcher/__main__.py @@ -54,8 +54,9 @@ async def main(): processor = ApplicationProcessor(messaging=messaging_instance, browser=browser_instance, metrics=metrics_collector, url=URL) # Register the signal handlers - signal.signal(signal.SIGINT, lambda s, f: shutdown_event.set()) - signal.signal(signal.SIGTERM, lambda s, f: shutdown_event.set()) + loop = asyncio.get_running_loop() + loop.add_signal_handler(signal.SIGINT, shutdown_event.set) + loop.add_signal_handler(signal.SIGTERM, shutdown_event.set) # Connect to RabbitMQ & set up queues with their respective durability await messaging_instance.connect(ssl_params=rabbit_ssl_params()) diff --git a/src/fetcher/messaging.py b/src/fetcher/messaging.py index c42e3ed..58b9b2a 100644 --- a/src/fetcher/messaging.py +++ b/src/fetcher/messaging.py @@ -4,7 +4,7 @@ import logging import ssl from fetcher.config import MAX_MESSAGES -from aiormq.exceptions import AMQPConnectionError +from aiormq.exceptions import AMQPConnectionError, ChannelInvalidStateError MAX_RETRIES = 25 # maximum number of connection retries RETRY_DELAY = 5 # delay (in seconds) between retries @@ -20,6 +20,8 @@ def __init__(self, host, user, password): self.port = 5672 self.connection = None self.channel = None + self.queues = {} + self.consumers = {} def _create_ssl_context(self, ssl_params): """Create an SSL context based on provided parameters""" @@ -29,6 +31,18 @@ def _create_ssl_context(self, ssl_params): context.verify_mode = ssl.CERT_REQUIRED return context + + async def _init_channel(self): + """Initialize the channel and set QoS""" + self.channel = await self.connection.channel() + await self.channel.set_qos(prefetch_count=MAX_MESSAGES) + + async def _ensure_channel(self): + """Ensure that the channel is open and ready""" + if self.channel is None or self.channel.is_closed: + logger.warning("Channel is closed or not initialized. Re-initializing...") + await self._init_channel() + async def connect(self, ssl_params=None): """Establish a connection to the message broker""" @@ -44,9 +58,13 @@ async def connect(self, ssl_params=None): for retry in range(1, MAX_RETRIES + 1): try: logger.info(f"Connecting to {self.host} ...") - self.connection = await aio_pika.connect_robust(conn_url, ssl_context=ssl_context) - self.channel = await self.connection.channel() - await self.channel.set_qos(prefetch_count=MAX_MESSAGES) + self.connection = await aio_pika.connect_robust( + conn_url, + ssl_context=ssl_context, + heartbeat=60, + timeout=30 + ) + await self._init_channel() logger.info(f"Connected to the RabbitMQ server at {self.host}") break # Exit the loop if connection is successful except AMQPConnectionError as e: @@ -60,25 +78,49 @@ async def connect(self, ssl_params=None): async def setup_queues(self, **queues): """Declare necessary queues and thier durability""" + await self._ensure_channel() for queue_name, durable in queues.items(): - await self.channel.declare_queue(queue_name, durable=durable) + queue = await self.channel.declare_queue(queue_name, durable=durable) + self.queues[queue_name] = queue async def publish_message(self, queue_name, message_body, headers=None): """Publish a message to the specified queue""" + await self._ensure_channel() message = aio_pika.Message(body=json.dumps(message_body).encode(), headers=headers) - await self.channel.default_exchange.publish(message, routing_key=queue_name) + try: + await self.channel.default_exchange.publish( + message, routing_key=queue_name + ) + logger.debug(f"Successfully published message to {queue_name}") + except Exception as e: + logger.error(f"Failed to publish message: {e}") + raise logger.debug(f"Successfully published message to {queue_name}") async def publish_service_message(self, message_body, queue_name="FetcherMetricsQueue", expiration=30, headers=None): """Publish a short-lived service message""" + await self._ensure_channel() message = aio_pika.Message(body=json.dumps(message_body).encode(), expiration=expiration, headers=headers) - await self.channel.default_exchange.publish(message, routing_key=queue_name) + try: + await self.channel.default_exchange.publish( + message, routing_key=queue_name + ) + logger.debug(f"Successfully published message to {queue_name}") + except Exception as e: + logger.error(f"Failed to publish service message: {e}") + raise logger.debug(f"Successfully published message to {queue_name}") async def consume_messages(self, queue_name, callback_func): """Consume messages from the specified queue""" - queue = await self.channel.declare_queue(queue_name, durable=True) - await queue.consume(callback_func) + queue = self.queues.get(queue_name) + if not queue: + queue = await self.channel.declare_queue(queue_name, durable=True) + self.queues[queue_name] = queue + + consumer_tag = await queue.consume(callback_func) + # internally used by aio_pika to keep track of consumers + self.consumers[queue_name] = (queue, consumer_tag) async def close(self): """Close the connection""" diff --git a/src/fetcher/metrics_collector.py b/src/fetcher/metrics_collector.py index cfa6368..b2e06b0 100644 --- a/src/fetcher/metrics_collector.py +++ b/src/fetcher/metrics_collector.py @@ -101,4 +101,7 @@ async def send_metrics(self): await self.get_website_latency() metrics = self.get_metrics() logger.debug(f"Sending metrics: {metrics}") - await self.messaging.publish_service_message(metrics) + try: + await self.messaging.publish_service_message(metrics) + except Exception as e: + logger.error(f"Failed to send metrics: {e}")