diff --git a/bot.sample.env b/bot.sample.env
index daeb129..ca0750e 100644
--- a/bot.sample.env
+++ b/bot.sample.env
@@ -20,4 +20,6 @@ REQUEUE_THRESHOLD_SECONDS = 3600
# Application monitor config
REFRESH_PERIOD=3600
-SCHEDULER_PERIOD=300
\ No newline at end of file
+SCHEDULER_PERIOD=300
+NOT_FOUND_MAX_DAYS=30
+NOT_FOUND_REFRESH_PERIOD=86400
\ No newline at end of file
diff --git a/db-init-scripts/init.sql b/db-init-scripts/init.sql
index 0515e3e..4ea1eb4 100644
--- a/db-init-scripts/init.sql
+++ b/db-init-scripts/init.sql
@@ -16,6 +16,7 @@ CREATE TABLE IF NOT EXISTS Applications (
application_year INT NOT NULL,
current_status VARCHAR(1000) DEFAULT 'Unknown',
application_state VARCHAR(50) NOT NULL DEFAULT 'UNKNOWN',
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_updated TIMESTAMP,
is_resolved BOOLEAN NOT NULL DEFAULT FALSE
);
diff --git a/k8s/bot.sample.yaml b/k8s/bot.sample.yaml
index 29bc214..bcfd73b 100644
--- a/k8s/bot.sample.yaml
+++ b/k8s/bot.sample.yaml
@@ -15,6 +15,8 @@ data:
REFRESH_PERIOD: "3600"
SCHEDULER_PERIOD: "300"
REQUEUE_THRESHOLD_SECONDS: "3600"
+ NOT_FOUND_MAX_DAYS: "30"
+ NOT_FOUND_REFRESH_PERIOD: "86400"
---
# Secret for Bot
apiVersion: v1
diff --git a/src/bot/__main__.py b/src/bot/__main__.py
index ebf58f9..69eedde 100644
--- a/src/bot/__main__.py
+++ b/src/bot/__main__.py
@@ -162,7 +162,8 @@ async def main():
# Run RabbitMQ consumers
asyncio.gather(
- rabbit.consume_messages(),
+ rabbit.consume_update_messages(),
+ rabbit.consume_expiration_messages(),
rabbit.consume_service_messages(),
)
diff --git a/src/bot/database.py b/src/bot/database.py
index ac53822..8deca40 100644
--- a/src/bot/database.py
+++ b/src/bot/database.py
@@ -232,28 +232,73 @@ async def fetch_status_with_timestamp(self, chat_id, application_number, applica
)
return message_texts[lang]["error_generic"]
- async def fetch_applications_needing_update(self, refresh_period):
- """Fetch applications that need updates based on the refresh period"""
+ async def fetch_applications_needing_update(self, refresh_period, not_found_refresh_period):
+ """Fetch applications that need updates based on their refresh periods"""
- # Convert the timedelta refresh period to seconds for the SQL interval
- seconds = refresh_period.total_seconds()
+ refresh_seconds = refresh_period.total_seconds()
+ not_found_seconds = not_found_refresh_period.total_seconds()
- # Fetch rows where the current time minus last_checked is more than the refresh period
query = """
- SELECT u.chat_id, a.application_number, a.application_suffix, a.application_type, a.application_year, a.last_updated
+ SELECT u.chat_id, a.application_number, a.application_suffix, a.application_type,
+ a.application_year, a.last_updated, a.application_state
FROM Applications a
JOIN Users u ON a.user_id = u.user_id
- WHERE EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - COALESCE(a.last_updated, TIMESTAMP '1970-01-01'))) > $1
+ WHERE (
+ (a.application_state != 'NOT_FOUND' AND
+ EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - COALESCE(a.last_updated, TIMESTAMP '1970-01-01'))) > $1)
+ OR
+ (a.application_state = 'NOT_FOUND' AND
+ EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - COALESCE(a.last_updated, TIMESTAMP '1970-01-01'))) > $2)
+ )
AND a.is_resolved = FALSE
"""
async with self.pool.acquire() as conn:
try:
- return await conn.fetch(query, seconds)
+ return await conn.fetch(query, refresh_seconds, not_found_seconds)
+ except Exception as e:
+ logger.error(f"Error while fetching applications needing update from DB: {e}")
+ return []
+
+
+ async def fetch_applications_to_expire(self, not_found_max_age):
+ """Fetch applications in NOT_FOUND state exceeding the max age"""
+ not_found_seconds = not_found_max_age.total_seconds()
+
+ query = """
+ SELECT a.application_id, u.chat_id, a.application_number, a.application_suffix,
+ a.application_type, a.application_year, a.created_at
+ FROM Applications a
+ JOIN Users u ON a.user_id = u.user_id
+ WHERE a.application_state = 'NOT_FOUND'
+ AND EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - a.created_at)) >= $1
+ AND a.is_resolved = FALSE
+ """
+
+ async with self.pool.acquire() as conn:
+ try:
+ rows = await conn.fetch(query, not_found_seconds)
+ return [dict(row) for row in rows]
except Exception as e:
- logger.error(f"Error while fetching applications needing update. Error: {e}")
+ logger.error(f"Error while fetching applications to expire from DB: {e}")
return []
+ async def resolve_application(self, application_id):
+ """Mark application as resolved"""
+
+ query = """
+ UPDATE Applications
+ SET is_resolved = TRUE
+ WHERE application_id = $1
+ """
+ async with self.pool.acquire() as conn:
+ try:
+ await conn.execute(query, application_id)
+ return True
+ except Exception as e:
+ logger.error(f"Error while marking as resolved application with ID {application_id} in DB: {e}")
+ return False
+
async def user_exists(self, chat_id):
"""Check if a user exists in the database"""
@@ -263,7 +308,7 @@ async def user_exists(self, chat_id):
exists = await conn.fetchval(query, chat_id)
return exists
except Exception as e:
- logger.error(f"Error while checking if user with chat_id {chat_id} exists. Error: {e}")
+ logger.error(f"Error while checking if user with chat_id {chat_id} exists in DB: {e}")
return False
async def subscription_exists(self, chat_id, application_number, application_type, application_year):
diff --git a/src/bot/loader.py b/src/bot/loader.py
index d23710f..61a055a 100644
--- a/src/bot/loader.py
+++ b/src/bot/loader.py
@@ -31,6 +31,8 @@
# Application monitor config
REFRESH_PERIOD = int(os.getenv("REFRESH_PERIOD", 3600))
SCHEDULER_PERIOD = int(os.getenv("SCHEDULER_PERIOD", 300))
+NOT_FOUND_MAX_DAYS = int(os.getenv("NOT_FOUND_MAX_DAYS", 30))
+NOT_FOUND_REFRESH_PERIOD = int(os.getenv("NOT_FOUND_REFRESH_PERIOD", 86400))
# Run mode for tests
RUN_MODE = os.getenv("RUN_MODE", "PROD")
diff --git a/src/bot/monitor.py b/src/bot/monitor.py
index c6d93ea..e19ecea 100644
--- a/src/bot/monitor.py
+++ b/src/bot/monitor.py
@@ -1,7 +1,7 @@
import asyncio
import logging
from datetime import timedelta
-from bot.loader import REFRESH_PERIOD, SCHEDULER_PERIOD
+from bot.loader import REFRESH_PERIOD, SCHEDULER_PERIOD, NOT_FOUND_REFRESH_PERIOD, NOT_FOUND_MAX_DAYS
from bot.utils import generate_oam_full_string
logger = logging.getLogger(__name__)
@@ -12,22 +12,31 @@ def __init__(self, db, rabbit):
self.db = db
self.rabbit = rabbit
self.refresh = timedelta(seconds=REFRESH_PERIOD)
+ self.not_found_refresh = timedelta(seconds=NOT_FOUND_REFRESH_PERIOD)
+ self.not_found_max_age = timedelta(days=NOT_FOUND_MAX_DAYS)
self.shutdown_event = asyncio.Event()
async def start(self):
logger.info(
- f"Application status monitor started, refresh_interval={REFRESH_PERIOD}, scheduler_interval={SCHEDULER_PERIOD}"
+ f"Application status monitor started, scheduler_interval={SCHEDULER_PERIOD}, "
+ f"refresh_interval={REFRESH_PERIOD}, not_found_refresh_interval={NOT_FOUND_REFRESH_PERIOD}, "
+ f"not_found_max_age={NOT_FOUND_MAX_DAYS}"
)
+
while not self.shutdown_event.is_set():
logger.info("Running periodic status checks")
await self.check_for_updates()
+ await self.expire_stale_not_found_applications()
try:
await asyncio.wait_for(self.shutdown_event.wait(), timeout=SCHEDULER_PERIOD)
except asyncio.TimeoutError:
pass
async def check_for_updates(self):
- applications_to_update = await self.db.fetch_applications_needing_update(self.refresh)
+ applications_to_update = await self.db.fetch_applications_needing_update(
+ self.refresh,
+ self.not_found_refresh
+ )
if not applications_to_update:
logger.info("No applications need status refresh")
@@ -52,6 +61,33 @@ async def check_for_updates(self):
)
await self.rabbit.publish_message(message, routing_key="RefreshStatusQueue")
+ async def expire_stale_not_found_applications(self):
+ applications_to_expire = await self.db.fetch_applications_to_expire(self.not_found_max_age)
+ if not applications_to_expire:
+ logger.debug("No applications to expire")
+ return
+
+ logger.info(f"{len(applications_to_expire)} application(s) to expire")
+ for app in applications_to_expire:
+
+ message = {
+ "application_id": app['application_id'],
+ "chat_id": app["chat_id"],
+ "number": app['application_number'],
+ "suffix": app['application_suffix'],
+ "type": app['application_type'],
+ "year": app['application_year'],
+ "created_at": app['created_at']
+ }
+ oam_full_string = generate_oam_full_string(app)
+ logger.info(
+ f"Scheduling expiration for {oam_full_string}, user: {app['chat_id']}, created_at: {app['created_at']}"
+ )
+ await self.rabbit.publish_message(
+ message,
+ routing_key="ExpirationQueue"
+ )
+
def stop(self):
self.shutdown_event.set()
diff --git a/src/bot/rabbitmq.py b/src/bot/rabbitmq.py
index 5c8d0be..cfc2fa0 100644
--- a/src/bot/rabbitmq.py
+++ b/src/bot/rabbitmq.py
@@ -11,7 +11,6 @@
MAX_RETRIES = 5 # maximum number of connection retries
RETRY_DELAY = 5 # delay (in seconds) between retries
-FINAL_STATUSES = [item for key, (value, emoji) in MVCR_STATUSES.items() if key != "in_progress" for item in value]
logger = logging.getLogger(__name__)
@@ -27,6 +26,7 @@ def __init__(self, host, user, password, bot, db, requeue_ttl, metrics, loop):
self.connection = None
self.channel = None
self.queue = None
+ self.expiration_queue = None
self.service_queue = None
self.default_exchange = None
self.published_messages = cachetools.TTLCache(maxsize=10000, ttl=requeue_ttl)
@@ -42,6 +42,7 @@ async def connect(self):
)
self.channel = await self.connection.channel()
self.queue = await self.channel.declare_queue("StatusUpdateQueue", durable=True)
+ self.expiration_queue = await self.channel.declare_queue("ExpirationQueue", durable=True)
self.service_queue = await self.channel.declare_queue("FetcherMetricsQueue", durable=False)
self.default_exchange = self.channel.default_exchange
logger.info("Connected to RabbitMQ")
@@ -79,7 +80,8 @@ def discard_message_id(self, unique_id):
def is_resolved(self, status):
"""Check if the application was resolved to its final status"""
- return any(phrase in status for phrase in FINAL_STATUSES)
+ final_statuses = MVCR_STATUSES.get('approved')[0] + MVCR_STATUSES.get('denied')[0]
+ return any(final_status in status for final_status in final_statuses)
def _generate_error_message(self, app_details, lang):
"""Generate an error message for an application number"""
@@ -87,7 +89,7 @@ def _generate_error_message(self, app_details, lang):
return message_texts[lang]["application_failed"].format(app_string=app_string)
- async def on_message(self, message: aio_pika.IncomingMessage):
+ async def on_update_message(self, message: aio_pika.IncomingMessage):
"""Async function to handle messages from StatusUpdateQueue"""
async with message.process():
msg_data = json.loads(message.body.decode("utf-8"))
@@ -198,6 +200,31 @@ async def on_message(self, message: aio_pika.IncomingMessage):
except Exception as e:
logger.error(f"Failed to send status update to {chat_id}: {e}")
+ async def on_expiration_message(self, message: aio_pika.IncomingMessage):
+ """Async function to handle messages from ExpirationQueue"""
+ async with message.process():
+ msg_data = json.loads(message.body.decode('utf-8'))
+ oam_full_string = generate_oam_full_string(msg_data)
+ logger.info(
+ f"[EXPIRE] Application {oam_full_string} created at {msg_data['created_at']} "
+ "has been too long in the state NOT_FOUND, expiring"
+ )
+
+ application_id = msg_data.get("application_id")
+ chat_id = msg_data.get("chat_id")
+
+ if await self.db.resolve_application(application_id):
+ lang = await self.db.fetch_user_language(chat_id)
+ notification_text = message_texts[lang]["not_found_expired"].format(app_string=oam_full_string)
+
+ # notify the user
+ try:
+ await self.bot.updater.bot.send_message(chat_id=chat_id, text=notification_text)
+ logger.debug(f"Notifying user {chat_id} about application {oam_full_string} expiration")
+ except Exception as e:
+ logger.error(f"Failed to send expiration notification to {chat_id}: {e}")
+
+
async def on_service_message(self, message: aio_pika.IncomingMessage):
"""Async function to handle service messages from FetcherMetricsQueue"""
async with message.process():
@@ -209,11 +236,16 @@ async def on_service_message(self, message: aio_pika.IncomingMessage):
else:
logger.error(f"Couldn't find fetcher ID in the service message: {msg_data}")
- async def consume_messages(self):
- """Consumes messages from the queue and handles them using on_message"""
- await self.queue.consume(lambda message: self.on_message(message))
+ async def consume_update_messages(self):
+ """Consumes messages with status updates"""
+ await self.queue.consume(lambda message: self.on_update_message(message))
logger.info("Started status updates consumer")
+ async def consume_expiration_messages(self):
+ """Consumes messages with requests to expire stale NOT_FOUND applications"""
+ await self.expiration_queue.consume(lambda message: self.on_expiration_message(message))
+ logger.info("Started expiration requests consumer")
+
async def consume_service_messages(self):
"""Consumes service messages (fetcher stats )"""
await self.service_queue.consume(lambda message: self.on_service_message(message))
diff --git a/src/bot/texts/CZ/messages.json b/src/bot/texts/CZ/messages.json
index aaab83a..4330ec8 100644
--- a/src/bot/texts/CZ/messages.json
+++ b/src/bot/texts/CZ/messages.json
@@ -15,7 +15,8 @@
"ratelimit_exceeded": "Omlouváme se, tuto funkci můžete použít pouze pětkrát denně.",
"application_updated": "Stav vaší žádosti byl aktualizován!",
"application_resolved": "Vaše žádost byla vyřešena!",
- "not_found": "❓ Bohužel jsme nemohli najít vaši žádost. Prosím, ujistěte se, že jste zadali správné číslo žádosti a zkuste to znovu. Pokud jste podali žádost teprve nedávno, počkejte chvíli, než Ministerstvo Vnitra zveřejní údaje o ní.\n\nAktuální stav {status_sign}:",
+ "not_found": "❓ Bohužel jsme nemohli najít vaši žádost. Prosím, ujistěte se, že jste zadali správné číslo žádosti a zkuste to znovu. Pokud jste podali žádost teprve nedávno, počkejte chvíli, než Ministerstvo Vnitra zveřejní údaje o ní. Bot bude každý den kontrolovat aktualizace, zda se žádost neobjeví.\n\nAktuální stav {status_sign}:",
+ "not_found_expired": "⚠️ Vaše žádost {app_string} nebyla nalezena během monitorovacího období a byla odstraněna ze sledování. Prosím, ověřte číslo vaší žádosti.",
"in_progress": "⏳ Vaše žádost je stále ve zpracování.\n\nAktuální stav {status_sign}:",
"approved": "🎉 Gratulujeme! Vaše žádost byla schválena!\n\nAktuální stav {status_sign}:",
"denied": "😔 Bohužel vaše žádost byla zamítnuta ...\n\nAktuální stav {status_sign}:",
diff --git a/src/bot/texts/EN/messages.json b/src/bot/texts/EN/messages.json
index f07e818..f70dcd1 100644
--- a/src/bot/texts/EN/messages.json
+++ b/src/bot/texts/EN/messages.json
@@ -15,7 +15,8 @@
"ratelimit_exceeded": "Sorry, you can only use this command 5 times a day.",
"application_updated": "Your application status has been updated!",
"application_resolved": "Your application has been resolved!",
- "not_found": "❓ Unfortunately, we couldn't find your application. Please ensure you've entered the correct application number and try again. If you've recently submitted the application, please wait for some time until the Ministry of Interior publishes the data about it.\n\nCurrent status {status_sign}:",
+ "not_found": "❓ Unfortunately, we couldn't find your application. Please ensure you've entered the correct application number and try again. If you've recently submitted the application, please wait for some time until the Ministry of Interior publishes the data about it. The bot will check daily for updates in case your application becomes available.\n\nCurrent status {status_sign}:",
+ "not_found_expired": "⚠️ Your application {app_string} was not found within the monitoring period and has been removed from tracking. Please verify your application number.",
"in_progress": "⏳ Your application is still under review.\n\nCurrent status {status_sign}:",
"approved": "🎉 Congratulations! Your application has been approved!\n\nCurrent status {status_sign}:",
"denied": "😔 I regret to inform that your application was declined. \n\nCurrent status: {status_sign}",
diff --git a/src/bot/texts/RU/messages.json b/src/bot/texts/RU/messages.json
index 267750e..802cee2 100644
--- a/src/bot/texts/RU/messages.json
+++ b/src/bot/texts/RU/messages.json
@@ -15,7 +15,8 @@
"ratelimit_exceeded": "К сожалению, вы можете использовать эту команду только 5 раз в день.",
"application_updated": "Статус вашего заявления обновлен!",
"application_resolved": "Ваше заявление было решено!\n\nТекущий статус {status_sign}:",
- "not_found": "❓ К сожалению, мы не смогли найти ваше заявление. Пожалуйста, убедитесь что вы ввели правильный номер заявления и попробуйте снова.\nЕсли вы подали заявление только недавно, необходимо подождать некоторое время, прежде чем МВД опубликует данные о нём.\n\nТекущий статус {status_sign}:",
+ "not_found": "❓ К сожалению, мы не смогли найти ваше заявление. Пожалуйста, убедитесь, что вы ввели правильный номер заявления и попробуйте снова. Если вы подали заявление недавно, подождите некоторое время, пока Министерство Внутренних Дел не опубликует данные о нем. Бот будет ежедневно проверять обновления на случай, если ваше заявление станет доступным.\n\nТекущий статус {status_sign}:",
+ "not_found_expired": "⚠️ Ваше заявление {app_string} не было найдено в течение заданного периода мониторинга, отслеживание было прекращено. Пожалуйста, проверьте номер вашего заявления.",
"in_progress": "⏳ Ваше заявление всё еще находится в процессе рассмотрения.\n\nТекущий статус {status_sign}:",
"approved": "🎉 Поздравляем! Ваше заявление было одобрено!\n\nТекущий статус {status_sign}:",
"denied": "😔 Нам очень жаль, но похоже ваше заявление было отклонено ...\n\nТекущий статус {status_sign}:",
diff --git a/src/bot/texts/UA/messages.json b/src/bot/texts/UA/messages.json
index 0c59217..a6508bc 100644
--- a/src/bot/texts/UA/messages.json
+++ b/src/bot/texts/UA/messages.json
@@ -15,7 +15,8 @@
"ratelimit_exceeded": "Цю команду можна використати лише п'ятьма разами на день.",
"application_updated": "Статус Вашого клопотання оновився!",
"application_resolved": "За Вашим клопотанням винесено рішення!",
- "not_found": "❓ На жаль, ми не змогли знайти ваше клопотання. Будь ласка, перевірте чи ви ввели правильний номер клопотання та спробуйте знову. Якщо ви подали клопотання лише нещодавно, зачекайте деякий час, перш ніж МВС опублікує дані про нього.\n\nПоточний статус {status_sign}:",
+ "not_found": "❓ На жаль, ми не змогли знайти ваше клопотання. Будь ласка, перевірте, чи ви ввели правильний номер клопотання та спробуйте знову. Якщо ви подали клопотання нещодавно, зачекайте деякий час, перш ніж МВС опублікує дані про нього. Бот буде щоденно перевіряти оновлення на випадок, якщо ваше клопотання стане доступним.\n\nПоточний статус {status_sign}:",
+ "not_found_expired": "⚠️ Ваше клопотання {app_string} не було знайдено протягом періоду моніторингу, і його відстеження було припинено. Будь ласка, перевірте номер вашого клопотання.",
"in_progress": "⏳ Ваше клопотання досі розглядається.\n\nПоточний статус {status_sign}:",
"approved": "🎉 Вітаємо! Ваше клопотання було схвалено!\n\nПоточний статус {status_sign}:",
"denied": "😔 На жаль, ваше клопотання було відхилено ...\n\nПоточний статус {status_sign}:",