Skip to content

Commit

Permalink
add create source api
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 21, 2024
1 parent 7ab58a7 commit 67e15f1
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 63 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions api/migrations/20240821104756_create_sources.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
create table
public.sources (
id bigint generated always as identity primary key,
tenant_id bigint references public.tenants(id),
config jsonb not null
);
1 change: 1 addition & 0 deletions api/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod sources;
pub mod tenants;
55 changes: 55 additions & 0 deletions api/src/db/sources.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use sqlx::PgPool;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum SourceConfig {
Postgres {
/// Host on which Postgres is running
host: String,

/// Port on which Postgres is running
port: u16,

/// Postgres database name
name: String,

/// Postgres database user name
username: String,

//TODO: encrypt before storing in db
/// Postgres database user password
password: Option<String>,

/// Postgres slot name
slot_name: String,

/// Postgres publication name
publication: String,
},
}

pub struct Source {
pub id: i64,
pub tenant_id: i64,
pub config: SourceConfig,
}

pub async fn create_source(
pool: &PgPool,
tenant_id: i64,
config: &SourceConfig,
) -> Result<i64, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let record = sqlx::query!(
r#"
insert into sources (tenant_id, config)
values ($1, $2)
returning id
"#,
tenant_id,
config
)
.fetch_one(pool)
.await?;

Ok(record.id)
}
1 change: 1 addition & 0 deletions api/src/routes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod health_check;
pub mod sources;
pub mod tenants;
52 changes: 52 additions & 0 deletions api/src/routes/sources.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use actix_web::{
http::StatusCode,
post,
web::{Data, Json},
Responder, ResponseError,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use thiserror::Error;

use crate::db::{self, sources::SourceConfig};

#[derive(Debug, Error)]
enum SourceError {
#[error("database error: {0}")]
DatabaseError(#[from] sqlx::Error),
// #[error("source with id {0} not found")]
// NotFound(i64),
}

impl ResponseError for SourceError {
fn status_code(&self) -> StatusCode {
match self {
SourceError::DatabaseError(_) => StatusCode::INTERNAL_SERVER_ERROR,
// SourceError::NotFound(_) => StatusCode::NOT_FOUND,
}
}
}

#[derive(Deserialize)]
struct PostSourceRequest {
pub tenant_id: i64,
pub config: SourceConfig,
}

#[derive(Serialize)]
struct PostSourceResponse {
id: i64,
}

#[post("/sources")]
pub async fn create_source(
pool: Data<PgPool>,
source: Json<PostSourceRequest>,
) -> Result<impl Responder, SourceError> {
let source = source.0;
let tenant_id = source.tenant_id;
let config = source.config;
let id = db::sources::create_source(&pool, tenant_id, &config).await?;
let response = PostSourceResponse { id };
Ok(Json(response))
}
4 changes: 3 additions & 1 deletion api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
configuration::{DatabaseSettings, Settings},
routes::{
health_check::health_check,
sources::create_source,
tenants::{create_tenant, delete_tenant, read_tenant, update_tenant},
},
};
Expand Down Expand Up @@ -56,7 +57,8 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result<Serve
.service(create_tenant)
.service(read_tenant)
.service(update_tenant)
.service(delete_tenant),
.service(delete_tenant)
.service(create_source),
)
.app_data(connection_pool.clone())
})
Expand Down
52 changes: 52 additions & 0 deletions api/tests/api/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use api::configuration::{get_configuration, DatabaseSettings};
use sqlx::{Connection, Executor, PgConnection, PgPool, Row};

pub async fn configure_database(config: &DatabaseSettings) -> PgPool {
// Create database
let mut connection = PgConnection::connect_with(&config.without_db())
.await
.expect("Failed to connect to Postgres");
connection
.execute(&*format!(r#"CREATE DATABASE "{}";"#, config.name))
.await
.expect("Failed to create database.");

// Migrate database
let connection_pool = PgPool::connect_with(config.with_db())
.await
.expect("Failed to connect to Postgres.");
sqlx::migrate!("./migrations")
.run(&connection_pool)
.await
.expect("Failed to migrate the database");

connection_pool
}

// This is not an actual test. It is only used to delete test databases.
// Enabling it might interfere with other running tests, so keep the
// #[ignore] attribute. But remember to temporarily comment it out before
// running the test when you do want to cleanup the database.
#[ignore]
#[tokio::test]
async fn delete_test_databases() {
delete_all_test_databases().await;
}

async fn delete_all_test_databases() {
let config = get_configuration().expect("Failed to read configuration");
let mut connection = PgConnection::connect_with(&config.database.without_db())
.await
.expect("Failed to connect to Postgres");
let databases = connection
.fetch_all(&*format!(r#"select datname from pg_database where datname not in ('postgres', 'template0', 'template1');"#))
.await
.expect("Failed to get databases.");
for database in databases {
let database_name: String = database.get("datname");
connection
.execute(&*format!(r#"drop database "{database_name}""#))
.await
.expect("Failed to delete database");
}
}
2 changes: 1 addition & 1 deletion api/tests/api/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::helpers::spawn_app;
use crate::test_app::spawn_app;

#[tokio::test]
async fn health_check_works() {
Expand Down
4 changes: 3 additions & 1 deletion api/tests/api/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod database;
mod health_check;
mod helpers;
mod sources;
mod tenants;
mod test_app;
48 changes: 48 additions & 0 deletions api/tests/api/sources.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use api::db::sources::SourceConfig;

use crate::test_app::{
spawn_app, CreateSourceRequest, CreateSourceResponse, CreateTenantRequest, CreateTenantResponse,
};

fn new_source_config() -> SourceConfig {
SourceConfig::Postgres {
host: "localhost".to_string(),
port: 5432,
name: "postgres".to_string(),
username: "postgres".to_string(),
password: Some("postgres".to_string()),
slot_name: "slot".to_string(),
publication: "publication".to_string(),
}
}

#[tokio::test]
async fn tenant_can_be_created_with_supabase_project_ref() {
// Arrange
let app = spawn_app().await;
let tenant = CreateTenantRequest {
name: "NewTenant".to_string(),
supabase_project_ref: None,
};
let response = app.create_tenant(&tenant).await;
let response: CreateTenantResponse = response
.json()
.await
.expect("failed to deserialize response");
let tenant_id = response.id;

// Act
let source = CreateSourceRequest {
tenant_id,
config: new_source_config(),
};
let response = app.create_source(&source).await;

// Assert
assert!(response.status().is_success());
let response: CreateSourceResponse = response
.json()
.await
.expect("failed to deserialize response");
assert_eq!(response.id, 1);
}
14 changes: 7 additions & 7 deletions api/tests/api/tenants.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use api::utils::generate_random_alpha_str;
use reqwest::StatusCode;

use crate::helpers::{
spawn_app, CreateTenantRequest, TenantIdResponse, TenantResponse, UpdateTenantRequest,
use crate::test_app::{
spawn_app, CreateTenantRequest, CreateTenantResponse, TenantResponse, UpdateTenantRequest,
};

#[tokio::test]
Expand All @@ -20,7 +20,7 @@ async fn tenant_can_be_created_with_supabase_project_ref() {

// Assert
assert!(response.status().is_success());
let response: TenantIdResponse = response
let response: CreateTenantResponse = response
.json()
.await
.expect("failed to deserialize response");
Expand Down Expand Up @@ -53,7 +53,7 @@ async fn tenant_can_be_created_without_supabase_project_ref() {

// Assert
assert!(response.status().is_success());
let response: TenantIdResponse = response
let response: CreateTenantResponse = response
.json()
.await
.expect("failed to deserialize response");
Expand Down Expand Up @@ -81,7 +81,7 @@ async fn an_existing_tenant_can_be_read() {
supabase_project_ref: None,
};
let response = app.create_tenant(&tenant).await;
let response: TenantIdResponse = response
let response: CreateTenantResponse = response
.json()
.await
.expect("failed to deserialize response");
Expand Down Expand Up @@ -121,7 +121,7 @@ async fn an_existing_tenant_can_be_updated() {
supabase_project_ref: None,
};
let response = app.create_tenant(&tenant).await;
let response: TenantIdResponse = response
let response: CreateTenantResponse = response
.json()
.await
.expect("failed to deserialize response");
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn an_existing_tenant_can_be_deleted() {
supabase_project_ref: None,
};
let response = app.create_tenant(&tenant).await;
let response: TenantIdResponse = response
let response: CreateTenantResponse = response
.json()
.await
.expect("failed to deserialize response");
Expand Down
Loading

0 comments on commit 67e15f1

Please sign in to comment.