Skip to content

Commit

Permalink
add publication to the api
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 28, 2024
1 parent bdc4f7c commit e40fee4
Show file tree
Hide file tree
Showing 10 changed files with 627 additions and 3 deletions.
6 changes: 6 additions & 0 deletions api/migrations/20240828090309_create_publication.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
create table
public.publications (
id bigint generated always as identity primary key,
tenant_id bigint references public.tenants(id) not null,
config jsonb not null
);
1 change: 1 addition & 0 deletions api/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod pipelines;
pub mod publications;
pub mod sinks;
pub mod sources;
pub mod tenants;
126 changes: 126 additions & 0 deletions api/src/db/publications.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use sqlx::PgPool;

#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct PublicationConfig {
pub table_names: Vec<String>,
}

pub struct Publication {
pub id: i64,
pub tenant_id: i64,
pub config: serde_json::Value,
}

pub async fn create_publication(
pool: &PgPool,
tenant_id: i64,
config: &PublicationConfig,
) -> Result<i64, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let record = sqlx::query!(
r#"
insert into publications (tenant_id, config)
values ($1, $2)
returning id
"#,
tenant_id,
config
)
.fetch_one(pool)
.await?;

Ok(record.id)
}

pub async fn read_publication(
pool: &PgPool,
tenant_id: i64,
publication_id: i64,
) -> Result<Option<Publication>, sqlx::Error> {
let record = sqlx::query!(
r#"
select id, tenant_id, config
from publications
where tenant_id = $1 and id = $2
"#,
tenant_id,
publication_id,
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| Publication {
id: r.id,
tenant_id: r.tenant_id,
config: r.config,
}))
}

pub async fn update_publication(
pool: &PgPool,
tenant_id: i64,
publication_id: i64,
config: &PublicationConfig,
) -> Result<Option<i64>, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let record = sqlx::query!(
r#"
update publications
set config = $1
where tenant_id = $2 and id = $3
returning id
"#,
config,
tenant_id,
publication_id
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| r.id))
}

pub async fn delete_publication(
pool: &PgPool,
tenant_id: i64,
publication_id: i64,
) -> Result<Option<i64>, sqlx::Error> {
let record = sqlx::query!(
r#"
delete from publications
where tenant_id = $1 and id = $2
returning id
"#,
tenant_id,
publication_id
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| r.id))
}

pub async fn read_all_publications(
pool: &PgPool,
tenant_id: i64,
) -> Result<Vec<Publication>, sqlx::Error> {
let mut record = sqlx::query!(
r#"
select id, tenant_id, config
from publications
where tenant_id = $1
"#,
tenant_id,
)
.fetch_all(pool)
.await?;

Ok(record
.drain(..)
.map(|r| Publication {
id: r.id,
tenant_id: r.tenant_id,
config: r.config,
})
.collect())
}
1 change: 1 addition & 0 deletions api/src/routes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod health_check;
pub mod pipelines;
pub mod publications;
pub mod sinks;
pub mod sources;
pub mod tenants;
2 changes: 1 addition & 1 deletion api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ enum PipelineError {
#[error("database error: {0}")]
DatabaseError(#[from] sqlx::Error),

#[error("sink with id {0} not found")]
#[error("pipeline with id {0} not found")]
NotFound(i64),

#[error("tenant id missing in request")]
Expand Down
162 changes: 162 additions & 0 deletions api/src/routes/publications.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use actix_web::{
delete, get,
http::StatusCode,
post,
web::{Data, Json, Path},
HttpRequest, HttpResponse, Responder, ResponseError,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use thiserror::Error;

use crate::db::{self, publications::PublicationConfig};

#[derive(Debug, Error)]
enum PublicationError {
#[error("database error: {0}")]
DatabaseError(#[from] sqlx::Error),

#[error("publication with id {0} not found")]
NotFound(i64),

#[error("tenant id missing in request")]
TenantIdMissing,

#[error("tenant id ill formed in request")]
TenantIdIllFormed,

#[error("invalid sink config")]
InvalidConfig(#[from] serde_json::Error),
}

impl ResponseError for PublicationError {
fn status_code(&self) -> StatusCode {
match self {
PublicationError::DatabaseError(_) | PublicationError::InvalidConfig(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
PublicationError::NotFound(_) => StatusCode::NOT_FOUND,
PublicationError::TenantIdMissing | PublicationError::TenantIdIllFormed => {
StatusCode::BAD_REQUEST
}
}
}
}

#[derive(Deserialize)]
struct PostPublicationRequest {
pub config: PublicationConfig,
}

#[derive(Serialize)]
struct PostPublicationResponse {
id: i64,
}

#[derive(Serialize)]
struct GetPublicationResponse {
id: i64,
tenant_id: i64,
config: PublicationConfig,
}

// TODO: read tenant_id from a jwt
fn extract_tenant_id(req: &HttpRequest) -> Result<i64, PublicationError> {
let headers = req.headers();
let tenant_id = headers
.get("tenant_id")
.ok_or(PublicationError::TenantIdMissing)?;
let tenant_id = tenant_id
.to_str()
.map_err(|_| PublicationError::TenantIdIllFormed)?;
let tenant_id: i64 = tenant_id
.parse()
.map_err(|_| PublicationError::TenantIdIllFormed)?;
Ok(tenant_id)
}

#[post("/publications")]
pub async fn create_publication(
req: HttpRequest,
pool: Data<PgPool>,
publication: Json<PostPublicationRequest>,
) -> Result<impl Responder, PublicationError> {
let publication = publication.0;
let tenant_id = extract_tenant_id(&req)?;
let config = publication.config;
let id = db::publications::create_publication(&pool, tenant_id, &config).await?;
let response = PostPublicationResponse { id };
Ok(Json(response))
}

#[get("/publications/{publication_id}")]
pub async fn read_publication(
req: HttpRequest,
pool: Data<PgPool>,
publication_id: Path<i64>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let publication_id = publication_id.into_inner();
let response = db::publications::read_publication(&pool, tenant_id, publication_id)
.await?
.map(|s| {
let config: PublicationConfig = serde_json::from_value(s.config)?;
Ok::<GetPublicationResponse, serde_json::Error>(GetPublicationResponse {
id: s.id,
tenant_id: s.tenant_id,
config,
})
})
.transpose()?
.ok_or(PublicationError::NotFound(publication_id))?;
Ok(Json(response))
}

#[post("/publications/{publication_id}")]
pub async fn update_publication(
req: HttpRequest,
pool: Data<PgPool>,
publication_id: Path<i64>,
publication: Json<PostPublicationRequest>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let publication_id = publication_id.into_inner();
let config = &publication.config;
db::publications::update_publication(&pool, tenant_id, publication_id, config)
.await?
.ok_or(PublicationError::NotFound(publication_id))?;
Ok(HttpResponse::Ok().finish())
}

#[delete("/publications/{publication_id}")]
pub async fn delete_publication(
req: HttpRequest,
pool: Data<PgPool>,
publication_id: Path<i64>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let publication_id = publication_id.into_inner();
db::publications::delete_publication(&pool, tenant_id, publication_id)
.await?
.ok_or(PublicationError::NotFound(tenant_id))?;
Ok(HttpResponse::Ok().finish())
}

#[get("/publications")]
pub async fn read_all_publications(
req: HttpRequest,
pool: Data<PgPool>,
) -> Result<impl Responder, PublicationError> {
let tenant_id = extract_tenant_id(&req)?;
let mut publications = vec![];
for publication in db::publications::read_all_publications(&pool, tenant_id).await? {
let config: PublicationConfig = serde_json::from_value(publication.config)?;
let sink = GetPublicationResponse {
id: publication.id,
tenant_id: publication.tenant_id,
config,
};
publications.push(sink);
}
Ok(Json(publications))
}
12 changes: 11 additions & 1 deletion api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ use crate::{
pipelines::{
create_pipeline, delete_pipeline, read_all_pipelines, read_pipeline, update_pipeline,
},
publications::{
create_publication, delete_publication, read_all_publications, read_publication,
update_publication,
},
sinks::{create_sink, delete_sink, read_all_sinks, read_sink, update_sink},
sources::{create_source, delete_source, read_all_sources, read_source, update_source},
tenants::{create_tenant, delete_tenant, read_all_tenants, read_tenant, update_tenant},
Expand Down Expand Up @@ -81,7 +85,13 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result<Serve
.service(read_pipeline)
.service(update_pipeline)
.service(delete_pipeline)
.service(read_all_pipelines),
.service(read_all_pipelines)
//publications
.service(create_publication)
.service(read_publication)
.service(update_publication)
.service(delete_publication)
.service(read_all_publications),
)
.app_data(connection_pool.clone())
})
Expand Down
1 change: 1 addition & 0 deletions api/tests/api/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod database;
mod health_check;
mod pipelines;
mod publications;
mod sinks;
mod sources;
mod tenants;
Expand Down
Loading

0 comments on commit e40fee4

Please sign in to comment.