Skip to content

Commit

Permalink
Implement lazy poll logic for not_found applications
Browse files Browse the repository at this point in the history
Closes #31
  • Loading branch information
olegeech-me committed Nov 17, 2024
1 parent fd64d51 commit 6543a55
Show file tree
Hide file tree
Showing 12 changed files with 150 additions and 25 deletions.
4 changes: 3 additions & 1 deletion bot.sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ REQUEUE_THRESHOLD_SECONDS = 3600

# Application monitor config
REFRESH_PERIOD=3600
SCHEDULER_PERIOD=300
SCHEDULER_PERIOD=300
NOT_FOUND_MAX_DAYS=30
NOT_FOUND_REFRESH_PERIOD=86400
1 change: 1 addition & 0 deletions db-init-scripts/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CREATE TABLE IF NOT EXISTS Applications (
application_year INT NOT NULL,
current_status VARCHAR(1000) DEFAULT 'Unknown',
application_state VARCHAR(50) NOT NULL DEFAULT 'UNKNOWN',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_updated TIMESTAMP,
is_resolved BOOLEAN NOT NULL DEFAULT FALSE
);
Expand Down
2 changes: 2 additions & 0 deletions k8s/bot.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ data:
REFRESH_PERIOD: "3600"
SCHEDULER_PERIOD: "300"
REQUEUE_THRESHOLD_SECONDS: "3600"
NOT_FOUND_MAX_DAYS: "30"
NOT_FOUND_REFRESH_PERIOD: "86400"
---
# Secret for Bot
apiVersion: v1
Expand Down
3 changes: 2 additions & 1 deletion src/bot/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ async def main():

# Run RabbitMQ consumers
asyncio.gather(
rabbit.consume_messages(),
rabbit.consume_update_messages(),
rabbit.consume_expiration_messages(),
rabbit.consume_service_messages(),
)

Expand Down
65 changes: 55 additions & 10 deletions src/bot/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,28 +232,73 @@ async def fetch_status_with_timestamp(self, chat_id, application_number, applica
)
return message_texts[lang]["error_generic"]

async def fetch_applications_needing_update(self, refresh_period):
"""Fetch applications that need updates based on the refresh period"""
async def fetch_applications_needing_update(self, refresh_period, not_found_refresh_period):
"""Fetch applications that need updates based on their refresh periods"""

# Convert the timedelta refresh period to seconds for the SQL interval
seconds = refresh_period.total_seconds()
refresh_seconds = refresh_period.total_seconds()
not_found_seconds = not_found_refresh_period.total_seconds()

# Fetch rows where the current time minus last_checked is more than the refresh period
query = """
SELECT u.chat_id, a.application_number, a.application_suffix, a.application_type, a.application_year, a.last_updated
SELECT u.chat_id, a.application_number, a.application_suffix, a.application_type,
a.application_year, a.last_updated, a.application_state
FROM Applications a
JOIN Users u ON a.user_id = u.user_id
WHERE EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - COALESCE(a.last_updated, TIMESTAMP '1970-01-01'))) > $1
WHERE (
(a.application_state != 'NOT_FOUND' AND
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - COALESCE(a.last_updated, TIMESTAMP '1970-01-01'))) > $1)
OR
(a.application_state = 'NOT_FOUND' AND
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - COALESCE(a.last_updated, TIMESTAMP '1970-01-01'))) > $2)
)
AND a.is_resolved = FALSE
"""

async with self.pool.acquire() as conn:
try:
return await conn.fetch(query, seconds)
return await conn.fetch(query, refresh_seconds, not_found_seconds)
except Exception as e:
logger.error(f"Error while fetching applications needing update from DB: {e}")
return []


async def fetch_applications_to_expire(self, not_found_max_age):
"""Fetch applications in NOT_FOUND state exceeding the max age"""
not_found_seconds = not_found_max_age.total_seconds()

query = """
SELECT a.application_id, u.chat_id, a.application_number, a.application_suffix,
a.application_type, a.application_year, a.created_at
FROM Applications a
JOIN Users u ON a.user_id = u.user_id
WHERE a.application_state = 'NOT_FOUND'
AND EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - a.created_at)) >= $1
AND a.is_resolved = FALSE
"""

async with self.pool.acquire() as conn:
try:
rows = await conn.fetch(query, not_found_seconds)
return [dict(row) for row in rows]
except Exception as e:
logger.error(f"Error while fetching applications needing update. Error: {e}")
logger.error(f"Error while fetching applications to expire from DB: {e}")
return []

async def resolve_application(self, application_id):
"""Mark application as resolved"""

query = """
UPDATE Applications
SET is_resolved = TRUE
WHERE application_id = $1
"""
async with self.pool.acquire() as conn:
try:
await conn.execute(query, application_id)
return True
except Exception as e:
logger.error(f"Error while marking as resolved application with ID {application_id} in DB: {e}")
return False

async def user_exists(self, chat_id):
"""Check if a user exists in the database"""

Expand All @@ -263,7 +308,7 @@ async def user_exists(self, chat_id):
exists = await conn.fetchval(query, chat_id)
return exists
except Exception as e:
logger.error(f"Error while checking if user with chat_id {chat_id} exists. Error: {e}")
logger.error(f"Error while checking if user with chat_id {chat_id} exists in DB: {e}")
return False

async def subscription_exists(self, chat_id, application_number, application_type, application_year):
Expand Down
2 changes: 2 additions & 0 deletions src/bot/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
# Application monitor config
REFRESH_PERIOD = int(os.getenv("REFRESH_PERIOD", 3600))
SCHEDULER_PERIOD = int(os.getenv("SCHEDULER_PERIOD", 300))
NOT_FOUND_MAX_DAYS = int(os.getenv("NOT_FOUND_MAX_DAYS", 30))
NOT_FOUND_REFRESH_PERIOD = int(os.getenv("NOT_FOUND_REFRESH_PERIOD", 86400))
# Run mode for tests
RUN_MODE = os.getenv("RUN_MODE", "PROD")

Expand Down
42 changes: 39 additions & 3 deletions src/bot/monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
from datetime import timedelta
from bot.loader import REFRESH_PERIOD, SCHEDULER_PERIOD
from bot.loader import REFRESH_PERIOD, SCHEDULER_PERIOD, NOT_FOUND_REFRESH_PERIOD, NOT_FOUND_MAX_DAYS
from bot.utils import generate_oam_full_string

logger = logging.getLogger(__name__)
Expand All @@ -12,22 +12,31 @@ def __init__(self, db, rabbit):
self.db = db
self.rabbit = rabbit
self.refresh = timedelta(seconds=REFRESH_PERIOD)
self.not_found_refresh = timedelta(seconds=NOT_FOUND_REFRESH_PERIOD)
self.not_found_max_age = timedelta(days=NOT_FOUND_MAX_DAYS)
self.shutdown_event = asyncio.Event()

async def start(self):
logger.info(
f"Application status monitor started, refresh_interval={REFRESH_PERIOD}, scheduler_interval={SCHEDULER_PERIOD}"
f"Application status monitor started, scheduler_interval={SCHEDULER_PERIOD}, "
f"refresh_interval={REFRESH_PERIOD}, not_found_refresh_interval={NOT_FOUND_REFRESH_PERIOD}, "
f"not_found_max_age={NOT_FOUND_MAX_DAYS}"
)

while not self.shutdown_event.is_set():
logger.info("Running periodic status checks")
await self.check_for_updates()
await self.expire_stale_not_found_applications()
try:
await asyncio.wait_for(self.shutdown_event.wait(), timeout=SCHEDULER_PERIOD)
except asyncio.TimeoutError:
pass

async def check_for_updates(self):
applications_to_update = await self.db.fetch_applications_needing_update(self.refresh)
applications_to_update = await self.db.fetch_applications_needing_update(
self.refresh,
self.not_found_refresh
)

if not applications_to_update:
logger.info("No applications need status refresh")
Expand All @@ -52,6 +61,33 @@ async def check_for_updates(self):
)
await self.rabbit.publish_message(message, routing_key="RefreshStatusQueue")

async def expire_stale_not_found_applications(self):
applications_to_expire = await self.db.fetch_applications_to_expire(self.not_found_max_age)
if not applications_to_expire:
logger.debug("No applications to expire")
return

logger.info(f"{len(applications_to_expire)} application(s) to expire")
for app in applications_to_expire:

message = {
"application_id": app['application_id'],
"chat_id": app["chat_id"],
"number": app['application_number'],
"suffix": app['application_suffix'],
"type": app['application_type'],
"year": app['application_year'],
"created_at": app['created_at']
}
oam_full_string = generate_oam_full_string(app)
logger.info(
f"Scheduling expiration for {oam_full_string}, user: {app['chat_id']}, created_at: {app['created_at']}"
)
await self.rabbit.publish_message(
message,
routing_key="ExpirationQueue"
)

def stop(self):
self.shutdown_event.set()

Expand Down
44 changes: 38 additions & 6 deletions src/bot/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

MAX_RETRIES = 5 # maximum number of connection retries
RETRY_DELAY = 5 # delay (in seconds) between retries
FINAL_STATUSES = [item for key, (value, emoji) in MVCR_STATUSES.items() if key != "in_progress" for item in value]

logger = logging.getLogger(__name__)

Expand All @@ -27,6 +26,7 @@ def __init__(self, host, user, password, bot, db, requeue_ttl, metrics, loop):
self.connection = None
self.channel = None
self.queue = None
self.expiration_queue = None
self.service_queue = None
self.default_exchange = None
self.published_messages = cachetools.TTLCache(maxsize=10000, ttl=requeue_ttl)
Expand All @@ -42,6 +42,7 @@ async def connect(self):
)
self.channel = await self.connection.channel()
self.queue = await self.channel.declare_queue("StatusUpdateQueue", durable=True)
self.expiration_queue = await self.channel.declare_queue("ExpirationQueue", durable=True)
self.service_queue = await self.channel.declare_queue("FetcherMetricsQueue", durable=False)
self.default_exchange = self.channel.default_exchange
logger.info("Connected to RabbitMQ")
Expand Down Expand Up @@ -79,15 +80,16 @@ def discard_message_id(self, unique_id):

def is_resolved(self, status):
"""Check if the application was resolved to its final status"""
return any(phrase in status for phrase in FINAL_STATUSES)
final_statuses = MVCR_STATUSES.get('approved')[0] + MVCR_STATUSES.get('denied')[0]
return any(final_status in status for final_status in final_statuses)

def _generate_error_message(self, app_details, lang):
"""Generate an error message for an application number"""
app_string = generate_oam_full_string(app_details)

return message_texts[lang]["application_failed"].format(app_string=app_string)

async def on_message(self, message: aio_pika.IncomingMessage):
async def on_update_message(self, message: aio_pika.IncomingMessage):
"""Async function to handle messages from StatusUpdateQueue"""
async with message.process():
msg_data = json.loads(message.body.decode("utf-8"))
Expand Down Expand Up @@ -198,6 +200,31 @@ async def on_message(self, message: aio_pika.IncomingMessage):
except Exception as e:
logger.error(f"Failed to send status update to {chat_id}: {e}")

async def on_expiration_message(self, message: aio_pika.IncomingMessage):
"""Async function to handle messages from ExpirationQueue"""
async with message.process():
msg_data = json.loads(message.body.decode('utf-8'))
oam_full_string = generate_oam_full_string(msg_data)
logger.info(
f"[EXPIRE] Application {oam_full_string} created at {msg_data['created_at']} "
"has been too long in the state NOT_FOUND, expiring"
)

application_id = msg_data.get("application_id")
chat_id = msg_data.get("chat_id")

if await self.db.resolve_application(application_id):
lang = await self.db.fetch_user_language(chat_id)
notification_text = message_texts[lang]["not_found_expired"].format(app_string=oam_full_string)

# notify the user
try:
await self.bot.updater.bot.send_message(chat_id=chat_id, text=notification_text)
logger.debug(f"Notifying user {chat_id} about application {oam_full_string} expiration")
except Exception as e:
logger.error(f"Failed to send expiration notification to {chat_id}: {e}")


async def on_service_message(self, message: aio_pika.IncomingMessage):
"""Async function to handle service messages from FetcherMetricsQueue"""
async with message.process():
Expand All @@ -209,11 +236,16 @@ async def on_service_message(self, message: aio_pika.IncomingMessage):
else:
logger.error(f"Couldn't find fetcher ID in the service message: {msg_data}")

async def consume_messages(self):
"""Consumes messages from the queue and handles them using on_message"""
await self.queue.consume(lambda message: self.on_message(message))
async def consume_update_messages(self):
"""Consumes messages with status updates"""
await self.queue.consume(lambda message: self.on_update_message(message))
logger.info("Started status updates consumer")

async def consume_expiration_messages(self):
"""Consumes messages with requests to expire stale NOT_FOUND applications"""
await self.expiration_queue.consume(lambda message: self.on_expiration_message(message))
logger.info("Started expiration requests consumer")

async def consume_service_messages(self):
"""Consumes service messages (fetcher stats )"""
await self.service_queue.consume(lambda message: self.on_service_message(message))
Expand Down
3 changes: 2 additions & 1 deletion src/bot/texts/CZ/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"ratelimit_exceeded": "Omlouváme se, tuto funkci můžete použít pouze pětkrát denně.",
"application_updated": "Stav vaší žádosti byl aktualizován!",
"application_resolved": "Vaše žádost byla vyřešena!",
"not_found": "❓ Bohužel jsme nemohli najít vaši žádost. Prosím, ujistěte se, že jste zadali správné číslo žádosti a zkuste to znovu. Pokud jste podali žádost teprve nedávno, počkejte chvíli, než Ministerstvo Vnitra zveřejní údaje o ní.\n\nAktuální stav {status_sign}:",
"not_found": "❓ Bohužel jsme nemohli najít vaši žádost. Prosím, ujistěte se, že jste zadali správné číslo žádosti a zkuste to znovu. Pokud jste podali žádost teprve nedávno, počkejte chvíli, než Ministerstvo Vnitra zveřejní údaje o ní. Bot bude každý den kontrolovat aktualizace, zda se žádost neobjeví.\n\nAktuální stav {status_sign}:",
"not_found_expired": "⚠️ Vaše žádost {app_string} nebyla nalezena během monitorovacího období a byla odstraněna ze sledování. Prosím, ověřte číslo vaší žádosti.",
"in_progress": "⏳ Vaše žádost je stále ve zpracování.\n\nAktuální stav {status_sign}:",
"approved": "🎉 Gratulujeme! Vaše žádost byla schválena!\n\nAktuální stav {status_sign}:",
"denied": "😔 Bohužel vaše žádost byla zamítnuta ...\n\nAktuální stav {status_sign}:",
Expand Down
3 changes: 2 additions & 1 deletion src/bot/texts/EN/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"ratelimit_exceeded": "Sorry, you can only use this command 5 times a day.",
"application_updated": "Your application status has been updated!",
"application_resolved": "Your application has been resolved!",
"not_found": "❓ Unfortunately, we couldn't find your application. Please ensure you've entered the correct application number and try again. If you've recently submitted the application, please wait for some time until the Ministry of Interior publishes the data about it.\n\nCurrent status {status_sign}:",
"not_found": "❓ Unfortunately, we couldn't find your application. Please ensure you've entered the correct application number and try again. If you've recently submitted the application, please wait for some time until the Ministry of Interior publishes the data about it. The bot will check daily for updates in case your application becomes available.\n\nCurrent status {status_sign}:",
"not_found_expired": "⚠️ Your application <b>{app_string}</b> was not found within the monitoring period and has been removed from tracking. Please verify your application number.",
"in_progress": "⏳ Your application is still under review.\n\nCurrent status {status_sign}:",
"approved": "🎉 Congratulations! Your application has been approved!\n\nCurrent status {status_sign}:",
"denied": "😔 I regret to inform that your application was declined. \n\nCurrent status: {status_sign}",
Expand Down
3 changes: 2 additions & 1 deletion src/bot/texts/RU/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"ratelimit_exceeded": "К сожалению, вы можете использовать эту команду только 5 раз в день.",
"application_updated": "Статус вашего заявления обновлен!",
"application_resolved": "Ваше заявление было решено!\n\nТекущий статус {status_sign}:",
"not_found": "❓ К сожалению, мы не смогли найти ваше заявление. Пожалуйста, убедитесь что вы ввели правильный номер заявления и попробуйте снова.\nЕсли вы подали заявление только недавно, необходимо подождать некоторое время, прежде чем МВД опубликует данные о нём.\n\nТекущий статус {status_sign}:",
"not_found": "❓ К сожалению, мы не смогли найти ваше заявление. Пожалуйста, убедитесь, что вы ввели правильный номер заявления и попробуйте снова. Если вы подали заявление недавно, подождите некоторое время, пока Министерство Внутренних Дел не опубликует данные о нем. Бот будет ежедневно проверять обновления на случай, если ваше заявление станет доступным.\n\nТекущий статус {status_sign}:",
"not_found_expired": "⚠️ Ваше заявление <b>{app_string}</b> не было найдено в течение заданного периода мониторинга, отслеживание было прекращено. Пожалуйста, проверьте номер вашего заявления.",
"in_progress": "⏳ Ваше заявление всё еще находится в процессе рассмотрения.\n\nТекущий статус {status_sign}:",
"approved": "🎉 Поздравляем! Ваше заявление было одобрено!\n\nТекущий статус {status_sign}:",
"denied": "😔 Нам очень жаль, но похоже ваше заявление было отклонено ...\n\nТекущий статус {status_sign}:",
Expand Down
Loading

0 comments on commit 6543a55

Please sign in to comment.