Skip to content

Commit

Permalink
add read source api
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 21, 2024
1 parent dae863b commit c4afe71
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 13 deletions.
2 changes: 1 addition & 1 deletion api/migrations/20240821104756_create_sources.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
create table
public.sources (
id bigint generated always as identity primary key,
tenant_id bigint references public.tenants(id),
tenant_id bigint references public.tenants(id) not null,
config jsonb not null
);
52 changes: 51 additions & 1 deletion api/src/db/sources.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use sqlx::PgPool;
use std::fmt::{Debug, Formatter};

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum SourceConfig {
Expand Down Expand Up @@ -27,10 +28,35 @@ pub enum SourceConfig {
},
}

impl Debug for SourceConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
SourceConfig::Postgres {
host,
port,
name,
username,
password: _,
slot_name,
publication,
} => f
.debug_struct("Postgres")
.field("host", host)
.field("port", port)
.field("name", name)
.field("username", username)
.field("password", &"REDACTED")
.field("slot_name", slot_name)
.field("publication", publication)
.finish(),
}
}
}

pub struct Source {
pub id: i64,
pub tenant_id: i64,
pub config: SourceConfig,
pub config: serde_json::Value,
}

pub async fn create_source(
Expand All @@ -53,3 +79,27 @@ pub async fn create_source(

Ok(record.id)
}

pub async fn read_source(
pool: &PgPool,
tenant_id: i64,
source_id: i64,
) -> Result<Option<Source>, sqlx::Error> {
let record = sqlx::query!(
r#"
select id, tenant_id, config
from sources
where tenant_id = $1 and id = $2
"#,
tenant_id,
source_id,
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| Source {
id: r.id,
tenant_id: r.tenant_id,
config: r.config,
}))
}
2 changes: 0 additions & 2 deletions api/src/replicator_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl Debug for SourceConfig {
.field("port", port)
.field("name", name)
.field("username", username)
.field("password", &"REDACTED")
.field("slot_name", slot_name)
.field("publication", publication)
.finish(),
Expand Down Expand Up @@ -68,7 +67,6 @@ impl Debug for SinkConfig {
.debug_struct("BigQuery")
.field("project_id", project_id)
.field("dataset_id", dataset_id)
.field("service_account_key", &"REDACTED")
.finish(),
}
}
Expand Down
73 changes: 67 additions & 6 deletions api/src/routes/sources.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use actix_web::{
get,
http::StatusCode,
post,
web::{Data, Json},
Responder, ResponseError,
web::{Data, Json, Path},
HttpRequest, Responder, ResponseError,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
Expand All @@ -14,15 +15,30 @@ use crate::db::{self, sources::SourceConfig};
enum SourceError {
#[error("database error: {0}")]
DatabaseError(#[from] sqlx::Error),
// #[error("source with id {0} not found")]
// NotFound(i64),

#[error("source with id {0} not found")]
NotFound(i64),

#[error("tenant id missing in request")]
TenantIdMissing,

#[error("tenant id ill formed in request")]
TenantIdIllFormed,

#[error("invalid source config")]
InvalidConfig(#[from] serde_json::Error),
}

impl ResponseError for SourceError {
fn status_code(&self) -> StatusCode {
match self {
SourceError::DatabaseError(_) => StatusCode::INTERNAL_SERVER_ERROR,
// SourceError::NotFound(_) => StatusCode::NOT_FOUND,
SourceError::DatabaseError(_) | SourceError::InvalidConfig(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
SourceError::NotFound(_) => StatusCode::NOT_FOUND,
SourceError::TenantIdMissing | SourceError::TenantIdIllFormed => {
StatusCode::BAD_REQUEST
}
}
}
}
Expand All @@ -38,6 +54,13 @@ struct PostSourceResponse {
id: i64,
}

#[derive(Serialize)]
struct GetSourceResponse {
id: i64,
tenant_id: i64,
config: SourceConfig,
}

#[post("/sources")]
pub async fn create_source(
pool: Data<PgPool>,
Expand All @@ -50,3 +73,41 @@ pub async fn create_source(
let response = PostSourceResponse { id };
Ok(Json(response))
}

// TODO: read tenant_id from a jwt
fn extract_tenant_id(req: &HttpRequest) -> Result<i64, SourceError> {
let headers = req.headers();
let tenant_id = headers
.get("tenant_id")
.ok_or(SourceError::TenantIdMissing)?;
let tenant_id = tenant_id
.to_str()
.map_err(|_| SourceError::TenantIdIllFormed)?;
let tenant_id: i64 = tenant_id
.parse()
.map_err(|_| SourceError::TenantIdIllFormed)?;
Ok(tenant_id)
}

#[get("/sources/{source_id}")]
pub async fn read_source(
req: HttpRequest,
pool: Data<PgPool>,
source_id: Path<i64>,
) -> Result<impl Responder, SourceError> {
let tenant_id = extract_tenant_id(&req)?;
let source_id = source_id.into_inner();
let response = db::sources::read_source(&pool, tenant_id, source_id)
.await?
.map(|s| {
let config: SourceConfig = serde_json::from_value(s.config)?;
Ok::<GetSourceResponse, serde_json::Error>(GetSourceResponse {
id: s.id,
tenant_id: s.tenant_id,
config,
})
})
.transpose()?
.ok_or(SourceError::NotFound(source_id))?;
Ok(Json(response))
}
5 changes: 3 additions & 2 deletions api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
configuration::{DatabaseSettings, Settings},
routes::{
health_check::health_check,
sources::create_source,
sources::{create_source, read_source},
tenants::{create_tenant, delete_tenant, read_tenant, update_tenant},
},
};
Expand Down Expand Up @@ -58,7 +58,8 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result<Serve
.service(read_tenant)
.service(update_tenant)
.service(delete_tenant)
.service(create_source),
.service(create_source)
.service(read_source),
)
.app_data(connection_pool.clone())
})
Expand Down
34 changes: 33 additions & 1 deletion api/tests/api/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use api::db::sources::SourceConfig;

use crate::test_app::{
spawn_app, CreateSourceRequest, CreateSourceResponse, CreateTenantRequest,
CreateTenantResponse, TestApp,
CreateTenantResponse, SourceResponse, TestApp,
};

fn new_source_config() -> SourceConfig {
Expand Down Expand Up @@ -51,3 +51,35 @@ async fn source_can_be_created() {
.expect("failed to deserialize response");
assert_eq!(response.id, 1);
}

#[tokio::test]
async fn an_existing_source_can_be_read() {
// Arrange
let app = spawn_app().await;
let tenant_id = create_tenant(&app).await;

let source = CreateSourceRequest {
tenant_id,
config: new_source_config(),
};
let response = app.create_source(&source).await;
let response: CreateSourceResponse = response
.json()
.await
.expect("failed to deserialize response");
let source_id = response.id;

// Act
let response = app.read_source(source_id, tenant_id).await;

// Assert
println!("RS: {}", response.status());
assert!(response.status().is_success());
let response: SourceResponse = response
.json()
.await
.expect("failed to deserialize response");
assert_eq!(response.id, source_id);
assert_eq!(response.tenant_id, tenant_id);
assert_eq!(response.config, source.config);
}
16 changes: 16 additions & 0 deletions api/tests/api/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ pub struct CreateSourceResponse {
pub id: i64,
}

#[derive(Deserialize)]
pub struct SourceResponse {
pub id: i64,
pub tenant_id: i64,
pub config: SourceConfig,
}

impl TestApp {
pub async fn create_tenant(&self, tenant: &CreateTenantRequest) -> reqwest::Response {
self.api_client
Expand Down Expand Up @@ -97,6 +104,15 @@ impl TestApp {
.await
.expect("Failed to execute request.")
}

pub async fn read_source(&self, source_id: i64, tenant_id: i64) -> reqwest::Response {
self.api_client
.get(&format!("{}/v1/sources/{source_id}", &self.address))
.header("tenant_id", tenant_id)
.send()
.await
.expect("failed to execute request")
}
}

pub async fn spawn_app() -> TestApp {
Expand Down

0 comments on commit c4afe71

Please sign in to comment.