Skip to content

Commit

Permalink
Reconnect to Rabbit server on network issues (#52)
Browse files Browse the repository at this point in the history
* Fetcher reconnect on network issues

* No such method for robust connection

* Use robust connection to avoid dead channel exchange

* Re-init channel in case of reconnections

* Revert back

* Revert "Revert back"

This reverts commit 8afa220.
  • Loading branch information
olegeech-me authored Nov 11, 2024
1 parent ae2571a commit 4a5c24c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 12 deletions.
5 changes: 3 additions & 2 deletions src/fetcher/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ async def main():
processor = ApplicationProcessor(messaging=messaging_instance, browser=browser_instance, metrics=metrics_collector, url=URL)

# Register the signal handlers
signal.signal(signal.SIGINT, lambda s, f: shutdown_event.set())
signal.signal(signal.SIGTERM, lambda s, f: shutdown_event.set())
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, shutdown_event.set)
loop.add_signal_handler(signal.SIGTERM, shutdown_event.set)

# Connect to RabbitMQ & set up queues with their respective durability
await messaging_instance.connect(ssl_params=rabbit_ssl_params())
Expand Down
60 changes: 51 additions & 9 deletions src/fetcher/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import ssl
from fetcher.config import MAX_MESSAGES
from aiormq.exceptions import AMQPConnectionError
from aiormq.exceptions import AMQPConnectionError, ChannelInvalidStateError

MAX_RETRIES = 25 # maximum number of connection retries
RETRY_DELAY = 5 # delay (in seconds) between retries
Expand All @@ -20,6 +20,8 @@ def __init__(self, host, user, password):
self.port = 5672
self.connection = None
self.channel = None
self.queues = {}
self.consumers = {}

def _create_ssl_context(self, ssl_params):
"""Create an SSL context based on provided parameters"""
Expand All @@ -29,6 +31,18 @@ def _create_ssl_context(self, ssl_params):
context.verify_mode = ssl.CERT_REQUIRED
return context


async def _init_channel(self):
"""Initialize the channel and set QoS"""
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=MAX_MESSAGES)

async def _ensure_channel(self):
"""Ensure that the channel is open and ready"""
if self.channel is None or self.channel.is_closed:
logger.warning("Channel is closed or not initialized. Re-initializing...")
await self._init_channel()

async def connect(self, ssl_params=None):
"""Establish a connection to the message broker"""

Expand All @@ -44,9 +58,13 @@ async def connect(self, ssl_params=None):
for retry in range(1, MAX_RETRIES + 1):
try:
logger.info(f"Connecting to {self.host} ...")
self.connection = await aio_pika.connect_robust(conn_url, ssl_context=ssl_context)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=MAX_MESSAGES)
self.connection = await aio_pika.connect_robust(
conn_url,
ssl_context=ssl_context,
heartbeat=60,
timeout=30
)
await self._init_channel()
logger.info(f"Connected to the RabbitMQ server at {self.host}")
break # Exit the loop if connection is successful
except AMQPConnectionError as e:
Expand All @@ -60,25 +78,49 @@ async def connect(self, ssl_params=None):

async def setup_queues(self, **queues):
"""Declare necessary queues and thier durability"""
await self._ensure_channel()
for queue_name, durable in queues.items():
await self.channel.declare_queue(queue_name, durable=durable)
queue = await self.channel.declare_queue(queue_name, durable=durable)
self.queues[queue_name] = queue

async def publish_message(self, queue_name, message_body, headers=None):
"""Publish a message to the specified queue"""
await self._ensure_channel()
message = aio_pika.Message(body=json.dumps(message_body).encode(), headers=headers)
await self.channel.default_exchange.publish(message, routing_key=queue_name)
try:
await self.channel.default_exchange.publish(
message, routing_key=queue_name
)
logger.debug(f"Successfully published message to {queue_name}")
except Exception as e:
logger.error(f"Failed to publish message: {e}")
raise
logger.debug(f"Successfully published message to {queue_name}")

async def publish_service_message(self, message_body, queue_name="FetcherMetricsQueue", expiration=30, headers=None):
"""Publish a short-lived service message"""
await self._ensure_channel()
message = aio_pika.Message(body=json.dumps(message_body).encode(), expiration=expiration, headers=headers)
await self.channel.default_exchange.publish(message, routing_key=queue_name)
try:
await self.channel.default_exchange.publish(
message, routing_key=queue_name
)
logger.debug(f"Successfully published message to {queue_name}")
except Exception as e:
logger.error(f"Failed to publish service message: {e}")
raise
logger.debug(f"Successfully published message to {queue_name}")

async def consume_messages(self, queue_name, callback_func):
"""Consume messages from the specified queue"""
queue = await self.channel.declare_queue(queue_name, durable=True)
await queue.consume(callback_func)
queue = self.queues.get(queue_name)
if not queue:
queue = await self.channel.declare_queue(queue_name, durable=True)
self.queues[queue_name] = queue

consumer_tag = await queue.consume(callback_func)
# internally used by aio_pika to keep track of consumers
self.consumers[queue_name] = (queue, consumer_tag)

async def close(self):
"""Close the connection"""
Expand Down
5 changes: 4 additions & 1 deletion src/fetcher/metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,7 @@ async def send_metrics(self):
await self.get_website_latency()
metrics = self.get_metrics()
logger.debug(f"Sending metrics: {metrics}")
await self.messaging.publish_service_message(metrics)
try:
await self.messaging.publish_service_message(metrics)
except Exception as e:
logger.error(f"Failed to send metrics: {e}")

0 comments on commit 4a5c24c

Please sign in to comment.