Skip to content

Commit

Permalink
Revert "Revert back"
Browse files Browse the repository at this point in the history
This reverts commit 8afa220.
  • Loading branch information
olegeech-me committed Nov 11, 2024
1 parent 8afa220 commit 830ebbb
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions src/fetcher/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ def _create_ssl_context(self, ssl_params):
return context


async def _init_channel(self):
"""Initialize the channel and set QoS"""
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=MAX_MESSAGES)

async def _ensure_channel(self):
"""Ensure that the channel is open and ready"""
if self.channel is None or self.channel.is_closed:
logger.warning("Channel is closed or not initialized. Re-initializing...")
await self._init_channel()

async def connect(self, ssl_params=None):
"""Establish a connection to the message broker"""

Expand All @@ -53,8 +64,7 @@ async def connect(self, ssl_params=None):
heartbeat=60,
timeout=30
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=MAX_MESSAGES)
await self._init_channel()
logger.info(f"Connected to the RabbitMQ server at {self.host}")
break # Exit the loop if connection is successful
except AMQPConnectionError as e:
Expand All @@ -68,12 +78,14 @@ async def connect(self, ssl_params=None):

async def setup_queues(self, **queues):
"""Declare necessary queues and thier durability"""
await self._ensure_channel()
for queue_name, durable in queues.items():
queue = await self.channel.declare_queue(queue_name, durable=durable)
self.queues[queue_name] = queue

async def publish_message(self, queue_name, message_body, headers=None):
"""Publish a message to the specified queue"""
await self._ensure_channel()
message = aio_pika.Message(body=json.dumps(message_body).encode(), headers=headers)
try:
await self.channel.default_exchange.publish(
Expand All @@ -87,6 +99,7 @@ async def publish_message(self, queue_name, message_body, headers=None):

async def publish_service_message(self, message_body, queue_name="FetcherMetricsQueue", expiration=30, headers=None):
"""Publish a short-lived service message"""
await self._ensure_channel()
message = aio_pika.Message(body=json.dumps(message_body).encode(), expiration=expiration, headers=headers)
try:
await self.channel.default_exchange.publish(
Expand Down

0 comments on commit 830ebbb

Please sign in to comment.