From a630c301abad96199a8af25f7237cb99d9fb3e89 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Sun, 27 Oct 2024 12:35:40 +0100 Subject: [PATCH] feat: cleanup task --- ...af2f356bdcd0b56e86ca7f1455567a3365a75.json | 14 +++++ Cargo.lock | 1 + crates/server/.env.example | 2 +- crates/server/Cargo.toml | 1 + crates/server/src/main.rs | 8 +-- crates/server/src/tasks/cleanup.rs | 59 ++++++++++++++++--- 6 files changed, 72 insertions(+), 13 deletions(-) create mode 100644 .sqlx/query-8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75.json diff --git a/.sqlx/query-8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75.json b/.sqlx/query-8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75.json new file mode 100644 index 0000000..d21ffe1 --- /dev/null +++ b/.sqlx/query-8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM share\n WHERE expires_at <= $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "8f494358e08f50cd7d9f9de866caf2f356bdcd0b56e86ca7f1455567a3365a75" +} diff --git a/Cargo.lock b/Cargo.lock index d515e74..2782620 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1164,6 +1164,7 @@ dependencies = [ "chrono", "cron", "http-serde", + "lazy_static", "serde", "sqlx", "thiserror", diff --git a/crates/server/.env.example b/crates/server/.env.example index 8147e3f..d7a5fb3 100644 --- a/crates/server/.env.example +++ b/crates/server/.env.example @@ -2,4 +2,4 @@ PORT=3000 DATABASE_URL=postgres://postgres:password@localhost:5432/db DATABASE_CONNECTIONS=5 MAX_SHARE_EXPIRATION_TIME_SECS=604800 -CLEANUP_TASK_CRON_EXPRESSION="0 0 0 * * * *" +CLEANUP_TASK_CRON_EXPRESSION="0 0 */4 * * * *" diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index e0bae22..79e762c 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -31,3 +31,4 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } apalis = { version = "0.5.5", features = ["cron", "layers"] } tower = { version = "0.5.1" } cron = "0.12.1" +lazy_static = "1.5.0" diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index 81b4f37..6402073 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -52,7 +52,7 @@ async fn main() { ); let cleanup_task_cron_expression = - env::var("CLEANUP_TASK_CRON_EXPRESSION").unwrap_or_else(|_| "0 0 0 * * * *".to_string()); + env::var("CLEANUP_TASK_CRON_EXPRESSION").unwrap_or_else(|_| "0 0 */4 * * * *".to_string()); sqlx::migrate!("./migrations") .run(&db_connection) @@ -105,10 +105,10 @@ async fn start_background_tasks( .parse() .map_err(|e| SchedulerError::CronExpressionError(cron_expression.to_string(), e))?; - tracing::info!(schedule = %schedule, "Starting cleanup task worker"); + tracing::info!(schedule = %schedule, "Started cleanup worker"); - let worker = WorkerBuilder::new("cleanup-task-worker") - .layer(RetryLayer::new(RetryPolicy::retries(5))) + let worker = WorkerBuilder::new("cleanup-worker") + .layer(RetryLayer::new(RetryPolicy::retries(3))) .stream(CronStream::new(schedule).into_stream()) .data(db_connection) .build_fn(cleanup::execute_cleanup); diff --git a/crates/server/src/tasks/cleanup.rs b/crates/server/src/tasks/cleanup.rs index c0d3461..a02809c 100644 --- a/crates/server/src/tasks/cleanup.rs +++ b/crates/server/src/tasks/cleanup.rs @@ -1,6 +1,12 @@ use apalis::prelude::{Data, Job}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, SecondsFormat, Utc}; +use lazy_static::lazy_static; use sqlx::PgPool; +use tokio::sync::Mutex; + +lazy_static! { + static ref CLEANUP_TASK_LOCK: Mutex<()> = Mutex::new(()); +} #[derive(Default, Debug, Clone)] pub struct CleanupTask { @@ -19,14 +25,51 @@ impl Job for CleanupTask { #[derive(Debug, thiserror::Error)] pub enum Error { - // TODO - #[error("Cleanup task error")] - CleanupTaskError, + #[error("Database error: {0}")] + Database(#[from] sqlx::Error), + #[error("Cleanup task was already running. Skipped execution with timestamp {timestamp}")] + AlreadyRunning { timestamp: String }, } -pub async fn execute_cleanup(task: CleanupTask, db_connection: Data) -> Result<(), Error> { - let rfc3339_start_time = task.start_time.to_rfc3339(); - tracing::info!(start_time = rfc3339_start_time, "Executing cleanup task"); +impl CleanupTask { + async fn cleanup(&self, db_connection: Data) -> Result { + let rfc3339_start_time = self.start_time.to_rfc3339_opts(SecondsFormat::Millis, true); + + let Ok(_lock) = CLEANUP_TASK_LOCK.try_lock() else { + return Err(Error::AlreadyRunning { + timestamp: rfc3339_start_time, + }); + }; + + tracing::info!(timestamp = rfc3339_start_time, "Executing cleanup task"); + + let result = sqlx::query!( + r#" + DELETE FROM share + WHERE expires_at <= $1 + "#, + self.start_time + ) + .execute(&*db_connection) + .await?; - Err(Error::CleanupTaskError) + Ok(result.rows_affected()) + } +} + +pub async fn execute_cleanup(task: CleanupTask, db_connection: Data) -> Result<(), Error> { + match task.cleanup(db_connection).await { + Ok(rows_affected) => { + tracing::info!(deleted_shares = rows_affected, "Cleanup task completed",); + Ok(()) + } + Err(e) => { + tracing::error!(reason = %e,"Error executing cleanup task"); + match e { + // Do not retry if the task is already running + Error::AlreadyRunning { .. } => Ok(()), + _ => Err(e), + } + } + } }