From 67e15f1ff69b8992a61ca3ad65b6a03a31bc83f9 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Wed, 21 Aug 2024 17:16:43 +0530 Subject: [PATCH] add create source api --- ...571e1f372f0f610ffd96e8bab32dd16a139dd.json | 23 ++++++ .../20240821104756_create_sources.sql | 6 ++ api/src/db/mod.rs | 1 + api/src/db/sources.rs | 55 +++++++++++++ api/src/routes/mod.rs | 1 + api/src/routes/sources.rs | 52 +++++++++++++ api/src/startup.rs | 4 +- api/tests/api/database.rs | 52 +++++++++++++ api/tests/api/health_check.rs | 2 +- api/tests/api/main.rs | 4 +- api/tests/api/sources.rs | 48 ++++++++++++ api/tests/api/tenants.rs | 14 ++-- api/tests/api/{helpers.rs => test_app.rs} | 78 ++++++------------- 13 files changed, 277 insertions(+), 63 deletions(-) create mode 100644 api/.sqlx/query-09b545b248a172a6aed73e72ae4571e1f372f0f610ffd96e8bab32dd16a139dd.json create mode 100644 api/migrations/20240821104756_create_sources.sql create mode 100644 api/src/db/sources.rs create mode 100644 api/src/routes/sources.rs create mode 100644 api/tests/api/database.rs create mode 100644 api/tests/api/sources.rs rename api/tests/api/{helpers.rs => test_app.rs} (57%) diff --git a/api/.sqlx/query-09b545b248a172a6aed73e72ae4571e1f372f0f610ffd96e8bab32dd16a139dd.json b/api/.sqlx/query-09b545b248a172a6aed73e72ae4571e1f372f0f610ffd96e8bab32dd16a139dd.json new file mode 100644 index 0000000..998104a --- /dev/null +++ b/api/.sqlx/query-09b545b248a172a6aed73e72ae4571e1f372f0f610ffd96e8bab32dd16a139dd.json @@ -0,0 +1,23 @@ +{ + "db_name": "PostgreSQL", + "query": "\n insert into sources (tenant_id, config)\n values ($1, $2)\n returning id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Jsonb" + ] + }, + "nullable": [ + false + ] + }, + "hash": "09b545b248a172a6aed73e72ae4571e1f372f0f610ffd96e8bab32dd16a139dd" +} diff --git a/api/migrations/20240821104756_create_sources.sql b/api/migrations/20240821104756_create_sources.sql new file mode 100644 index 0000000..230c144 --- /dev/null +++ b/api/migrations/20240821104756_create_sources.sql @@ -0,0 +1,6 @@ +create table + public.sources ( + id bigint generated always as identity primary key, + tenant_id bigint references public.tenants(id), + config jsonb not null + ); diff --git a/api/src/db/mod.rs b/api/src/db/mod.rs index 554234f..0033f48 100644 --- a/api/src/db/mod.rs +++ b/api/src/db/mod.rs @@ -1 +1,2 @@ +pub mod sources; pub mod tenants; diff --git a/api/src/db/sources.rs b/api/src/db/sources.rs new file mode 100644 index 0000000..9688374 --- /dev/null +++ b/api/src/db/sources.rs @@ -0,0 +1,55 @@ +use sqlx::PgPool; + +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub enum SourceConfig { + Postgres { + /// Host on which Postgres is running + host: String, + + /// Port on which Postgres is running + port: u16, + + /// Postgres database name + name: String, + + /// Postgres database user name + username: String, + + //TODO: encrypt before storing in db + /// Postgres database user password + password: Option, + + /// Postgres slot name + slot_name: String, + + /// Postgres publication name + publication: String, + }, +} + +pub struct Source { + pub id: i64, + pub tenant_id: i64, + pub config: SourceConfig, +} + +pub async fn create_source( + pool: &PgPool, + tenant_id: i64, + config: &SourceConfig, +) -> Result { + let config = serde_json::to_value(config).expect("failed to serialize config"); + let record = sqlx::query!( + r#" + insert into sources (tenant_id, config) + values ($1, $2) + returning id + "#, + tenant_id, + config + ) + .fetch_one(pool) + .await?; + + Ok(record.id) +} diff --git a/api/src/routes/mod.rs b/api/src/routes/mod.rs index d1bd209..4d1e08e 100644 --- a/api/src/routes/mod.rs +++ b/api/src/routes/mod.rs @@ -1,2 +1,3 @@ pub mod health_check; +pub mod sources; pub mod tenants; diff --git a/api/src/routes/sources.rs b/api/src/routes/sources.rs new file mode 100644 index 0000000..acec241 --- /dev/null +++ b/api/src/routes/sources.rs @@ -0,0 +1,52 @@ +use actix_web::{ + http::StatusCode, + post, + web::{Data, Json}, + Responder, ResponseError, +}; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use thiserror::Error; + +use crate::db::{self, sources::SourceConfig}; + +#[derive(Debug, Error)] +enum SourceError { + #[error("database error: {0}")] + DatabaseError(#[from] sqlx::Error), + // #[error("source with id {0} not found")] + // NotFound(i64), +} + +impl ResponseError for SourceError { + fn status_code(&self) -> StatusCode { + match self { + SourceError::DatabaseError(_) => StatusCode::INTERNAL_SERVER_ERROR, + // SourceError::NotFound(_) => StatusCode::NOT_FOUND, + } + } +} + +#[derive(Deserialize)] +struct PostSourceRequest { + pub tenant_id: i64, + pub config: SourceConfig, +} + +#[derive(Serialize)] +struct PostSourceResponse { + id: i64, +} + +#[post("/sources")] +pub async fn create_source( + pool: Data, + source: Json, +) -> Result { + let source = source.0; + let tenant_id = source.tenant_id; + let config = source.config; + let id = db::sources::create_source(&pool, tenant_id, &config).await?; + let response = PostSourceResponse { id }; + Ok(Json(response)) +} diff --git a/api/src/startup.rs b/api/src/startup.rs index 6e1f605..f9d37ab 100644 --- a/api/src/startup.rs +++ b/api/src/startup.rs @@ -8,6 +8,7 @@ use crate::{ configuration::{DatabaseSettings, Settings}, routes::{ health_check::health_check, + sources::create_source, tenants::{create_tenant, delete_tenant, read_tenant, update_tenant}, }, }; @@ -56,7 +57,8 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result PgPool { + // Create database + let mut connection = PgConnection::connect_with(&config.without_db()) + .await + .expect("Failed to connect to Postgres"); + connection + .execute(&*format!(r#"CREATE DATABASE "{}";"#, config.name)) + .await + .expect("Failed to create database."); + + // Migrate database + let connection_pool = PgPool::connect_with(config.with_db()) + .await + .expect("Failed to connect to Postgres."); + sqlx::migrate!("./migrations") + .run(&connection_pool) + .await + .expect("Failed to migrate the database"); + + connection_pool +} + +// This is not an actual test. It is only used to delete test databases. +// Enabling it might interfere with other running tests, so keep the +// #[ignore] attribute. But remember to temporarily comment it out before +// running the test when you do want to cleanup the database. +#[ignore] +#[tokio::test] +async fn delete_test_databases() { + delete_all_test_databases().await; +} + +async fn delete_all_test_databases() { + let config = get_configuration().expect("Failed to read configuration"); + let mut connection = PgConnection::connect_with(&config.database.without_db()) + .await + .expect("Failed to connect to Postgres"); + let databases = connection + .fetch_all(&*format!(r#"select datname from pg_database where datname not in ('postgres', 'template0', 'template1');"#)) + .await + .expect("Failed to get databases."); + for database in databases { + let database_name: String = database.get("datname"); + connection + .execute(&*format!(r#"drop database "{database_name}""#)) + .await + .expect("Failed to delete database"); + } +} diff --git a/api/tests/api/health_check.rs b/api/tests/api/health_check.rs index 7593fde..e4e2853 100644 --- a/api/tests/api/health_check.rs +++ b/api/tests/api/health_check.rs @@ -1,4 +1,4 @@ -use crate::helpers::spawn_app; +use crate::test_app::spawn_app; #[tokio::test] async fn health_check_works() { diff --git a/api/tests/api/main.rs b/api/tests/api/main.rs index 4579951..30e72ad 100644 --- a/api/tests/api/main.rs +++ b/api/tests/api/main.rs @@ -1,3 +1,5 @@ +mod database; mod health_check; -mod helpers; +mod sources; mod tenants; +mod test_app; diff --git a/api/tests/api/sources.rs b/api/tests/api/sources.rs new file mode 100644 index 0000000..f7e801b --- /dev/null +++ b/api/tests/api/sources.rs @@ -0,0 +1,48 @@ +use api::db::sources::SourceConfig; + +use crate::test_app::{ + spawn_app, CreateSourceRequest, CreateSourceResponse, CreateTenantRequest, CreateTenantResponse, +}; + +fn new_source_config() -> SourceConfig { + SourceConfig::Postgres { + host: "localhost".to_string(), + port: 5432, + name: "postgres".to_string(), + username: "postgres".to_string(), + password: Some("postgres".to_string()), + slot_name: "slot".to_string(), + publication: "publication".to_string(), + } +} + +#[tokio::test] +async fn tenant_can_be_created_with_supabase_project_ref() { + // Arrange + let app = spawn_app().await; + let tenant = CreateTenantRequest { + name: "NewTenant".to_string(), + supabase_project_ref: None, + }; + let response = app.create_tenant(&tenant).await; + let response: CreateTenantResponse = response + .json() + .await + .expect("failed to deserialize response"); + let tenant_id = response.id; + + // Act + let source = CreateSourceRequest { + tenant_id, + config: new_source_config(), + }; + let response = app.create_source(&source).await; + + // Assert + assert!(response.status().is_success()); + let response: CreateSourceResponse = response + .json() + .await + .expect("failed to deserialize response"); + assert_eq!(response.id, 1); +} diff --git a/api/tests/api/tenants.rs b/api/tests/api/tenants.rs index c2761f2..d18e1b3 100644 --- a/api/tests/api/tenants.rs +++ b/api/tests/api/tenants.rs @@ -1,8 +1,8 @@ use api::utils::generate_random_alpha_str; use reqwest::StatusCode; -use crate::helpers::{ - spawn_app, CreateTenantRequest, TenantIdResponse, TenantResponse, UpdateTenantRequest, +use crate::test_app::{ + spawn_app, CreateTenantRequest, CreateTenantResponse, TenantResponse, UpdateTenantRequest, }; #[tokio::test] @@ -20,7 +20,7 @@ async fn tenant_can_be_created_with_supabase_project_ref() { // Assert assert!(response.status().is_success()); - let response: TenantIdResponse = response + let response: CreateTenantResponse = response .json() .await .expect("failed to deserialize response"); @@ -53,7 +53,7 @@ async fn tenant_can_be_created_without_supabase_project_ref() { // Assert assert!(response.status().is_success()); - let response: TenantIdResponse = response + let response: CreateTenantResponse = response .json() .await .expect("failed to deserialize response"); @@ -81,7 +81,7 @@ async fn an_existing_tenant_can_be_read() { supabase_project_ref: None, }; let response = app.create_tenant(&tenant).await; - let response: TenantIdResponse = response + let response: CreateTenantResponse = response .json() .await .expect("failed to deserialize response"); @@ -121,7 +121,7 @@ async fn an_existing_tenant_can_be_updated() { supabase_project_ref: None, }; let response = app.create_tenant(&tenant).await; - let response: TenantIdResponse = response + let response: CreateTenantResponse = response .json() .await .expect("failed to deserialize response"); @@ -168,7 +168,7 @@ async fn an_existing_tenant_can_be_deleted() { supabase_project_ref: None, }; let response = app.create_tenant(&tenant).await; - let response: TenantIdResponse = response + let response: CreateTenantResponse = response .json() .await .expect("failed to deserialize response"); diff --git a/api/tests/api/helpers.rs b/api/tests/api/test_app.rs similarity index 57% rename from api/tests/api/helpers.rs rename to api/tests/api/test_app.rs index 1a17987..57066e7 100644 --- a/api/tests/api/helpers.rs +++ b/api/tests/api/test_app.rs @@ -1,13 +1,15 @@ use std::net::TcpListener; use api::{ - configuration::{get_configuration, DatabaseSettings}, + configuration::get_configuration, + db::sources::SourceConfig, startup::{get_connection_pool, run}, }; use serde::{Deserialize, Serialize}; -use sqlx::{Connection, Executor, PgConnection, PgPool, Row}; use uuid::Uuid; +use crate::database::configure_database; + pub struct TestApp { pub address: String, pub api_client: reqwest::Client, @@ -25,7 +27,7 @@ pub struct UpdateTenantRequest { } #[derive(Deserialize)] -pub struct TenantIdResponse { +pub struct CreateTenantResponse { pub id: i64, } @@ -37,6 +39,17 @@ pub struct TenantResponse { pub prefix: String, } +#[derive(Serialize)] +pub struct CreateSourceRequest { + pub tenant_id: i64, + pub config: SourceConfig, +} + +#[derive(Deserialize)] +pub struct CreateSourceResponse { + pub id: i64, +} + impl TestApp { pub async fn create_tenant(&self, tenant: &CreateTenantRequest) -> reqwest::Response { self.api_client @@ -75,6 +88,15 @@ impl TestApp { .await .expect("Failed to execute request.") } + + pub async fn create_source(&self, source: &CreateSourceRequest) -> reqwest::Response { + self.api_client + .post(&format!("{}/v1/sources", &self.address)) + .json(source) + .send() + .await + .expect("Failed to execute request.") + } } pub async fn spawn_app() -> TestApp { @@ -95,53 +117,3 @@ pub async fn spawn_app() -> TestApp { api_client, } } - -async fn configure_database(config: &DatabaseSettings) -> PgPool { - // Create database - let mut connection = PgConnection::connect_with(&config.without_db()) - .await - .expect("Failed to connect to Postgres"); - connection - .execute(&*format!(r#"CREATE DATABASE "{}";"#, config.name)) - .await - .expect("Failed to create database."); - - // Migrate database - let connection_pool = PgPool::connect_with(config.with_db()) - .await - .expect("Failed to connect to Postgres."); - sqlx::migrate!("./migrations") - .run(&connection_pool) - .await - .expect("Failed to migrate the database"); - - connection_pool -} - -// This is not an actual test. It is only used to delete test databases. -// Enabling it might interfere with other running tests, so keep the -// #[ignore] attribute. But remember to temporarily comment it out before -// running the test when you do want to cleanup the database. -#[ignore] -#[tokio::test] -async fn delete_test_databases() { - delete_all_test_databases().await; -} - -async fn delete_all_test_databases() { - let config = get_configuration().expect("Failed to read configuration"); - let mut connection = PgConnection::connect_with(&config.database.without_db()) - .await - .expect("Failed to connect to Postgres"); - let databases = connection - .fetch_all(&*format!(r#"select datname from pg_database where datname not in ('postgres', 'template0', 'template1');"#)) - .await - .expect("Failed to get databases."); - for database in databases { - let database_name: String = database.get("datname"); - connection - .execute(&*format!(r#"drop database "{database_name}""#)) - .await - .expect("Failed to delete database"); - } -}