Skip to content

Commit

Permalink
add update pipeline api
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Aug 22, 2024
1 parent 7ebc3a2 commit 7e83512
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 4 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions api/src/db/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,31 @@ pub async fn read_pipeline(
config: r.config,
}))
}

pub async fn update_pipeline(
pool: &PgPool,
tenant_id: i64,
pipeline_id: i64,
source_id: i64,
sink_id: i64,
config: &PipelineConfig,
) -> Result<Option<i64>, sqlx::Error> {
let config = serde_json::to_value(config).expect("failed to serialize config");
let record = sqlx::query!(
r#"
update pipelines
set source_id = $1, sink_id = $2, config = $3
where tenant_id = $4 and id = $5
returning id
"#,
source_id,
sink_id,
config,
tenant_id,
pipeline_id
)
.fetch_optional(pool)
.await?;

Ok(record.map(|r| r.id))
}
25 changes: 24 additions & 1 deletion api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use actix_web::{
http::StatusCode,
post,
web::{Data, Json, Path},
HttpRequest, Responder, ResponseError,
HttpRequest, HttpResponse, Responder, ResponseError,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
Expand Down Expand Up @@ -124,3 +124,26 @@ pub async fn read_pipeline(
.ok_or(PipelineError::NotFound(pipeline_id))?;
Ok(Json(response))
}

#[post("/pipelines/{pipeline_id}")]
pub async fn update_pipeline(
req: HttpRequest,
pool: Data<PgPool>,
pipeline_id: Path<i64>,
pipeline: Json<PostPipelineRequest>,
) -> Result<impl Responder, PipelineError> {
let tenant_id = extract_tenant_id(&req)?;
let pipeline_id = pipeline_id.into_inner();
let config = &pipeline.config;
db::pipelines::update_pipeline(
&pool,
tenant_id,
pipeline_id,
pipeline.source_id,
pipeline.sink_id,
config,
)
.await?
.ok_or(PipelineError::NotFound(pipeline_id))?;
Ok(HttpResponse::Ok().finish())
}
5 changes: 3 additions & 2 deletions api/src/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
configuration::{DatabaseSettings, Settings},
routes::{
health_check::health_check,
pipelines::{create_pipeline, read_pipeline},
pipelines::{create_pipeline, read_pipeline, update_pipeline},
sinks::{create_sink, delete_sink, read_sink, update_sink},
sources::{create_source, delete_source, read_source, update_source},
tenants::{create_tenant, delete_tenant, read_tenant, update_tenant},
Expand Down Expand Up @@ -69,7 +69,8 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result<Serve
.service(update_sink)
.service(delete_sink)
.service(create_pipeline)
.service(read_pipeline),
.service(read_pipeline)
.service(update_pipeline),
)
.app_data(connection_pool.clone())
})
Expand Down
80 changes: 79 additions & 1 deletion api/tests/api/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::{
sinks::create_sink,
sources::create_source,
tenants::create_tenant,
test_app::{spawn_app, CreatePipelineRequest, CreatePipelineResponse, PipelineResponse},
test_app::{
spawn_app, CreatePipelineRequest, CreatePipelineResponse, PipelineResponse,
UpdatePipelineRequest,
},
};

fn new_pipeline_config() -> PipelineConfig {
Expand All @@ -17,6 +20,15 @@ fn new_pipeline_config() -> PipelineConfig {
}
}

fn updated_pipeline_config() -> PipelineConfig {
PipelineConfig {
config: BatchConfig {
max_size: 2000,
max_fill_secs: 10,
},
}
}

#[tokio::test]
async fn pipeline_can_be_created() {
// Arrange
Expand Down Expand Up @@ -89,3 +101,69 @@ async fn an_non_existing_pipeline_cant_be_read() {
// Assert
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}

#[tokio::test]
async fn an_existing_pipeline_can_be_updated() {
// Arrange
let app = spawn_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, tenant_id).await;
let sink_id = create_sink(&app, tenant_id).await;

let pipeline = CreatePipelineRequest {
source_id,
sink_id,
config: new_pipeline_config(),
};
let response = app.create_pipeline(tenant_id, &pipeline).await;
let response: CreatePipelineResponse = response
.json()
.await
.expect("failed to deserialize response");
let pipeline_id = response.id;

// Act
let source_id = create_source(&app, tenant_id).await;
let sink_id = create_sink(&app, tenant_id).await;
let updated_config = UpdatePipelineRequest {
source_id,
sink_id,
config: updated_pipeline_config(),
};
let response = app
.update_pipeline(tenant_id, pipeline_id, &updated_config)
.await;

// Assert
assert!(response.status().is_success());
let response = app.read_pipeline(tenant_id, pipeline_id).await;
let response: PipelineResponse = response
.json()
.await
.expect("failed to deserialize response");
assert_eq!(response.id, pipeline_id);
assert_eq!(response.tenant_id, tenant_id);
assert_eq!(response.source_id, source_id);
assert_eq!(response.sink_id, sink_id);
assert_eq!(response.config, updated_config.config);
}

#[tokio::test]
async fn an_non_existing_pipeline_cant_be_updated() {
// Arrange
let app = spawn_app().await;
let tenant_id = create_tenant(&app).await;
let source_id = create_source(&app, tenant_id).await;
let sink_id = create_sink(&app, tenant_id).await;

// Act
let updated_config = UpdatePipelineRequest {
source_id,
sink_id,
config: updated_pipeline_config(),
};
let response = app.update_pipeline(tenant_id, 42, &updated_config).await;

// Assert
assert_eq!(response.status(), StatusCode::NOT_FOUND);
}
22 changes: 22 additions & 0 deletions api/tests/api/test_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ pub struct PipelineResponse {
pub config: PipelineConfig,
}

#[derive(Serialize)]
pub struct UpdatePipelineRequest {
pub source_id: i64,
pub sink_id: i64,
pub config: PipelineConfig,
}

impl TestApp {
pub async fn create_tenant(&self, tenant: &CreateTenantRequest) -> reqwest::Response {
self.api_client
Expand Down Expand Up @@ -255,6 +262,21 @@ impl TestApp {
.await
.expect("failed to execute request")
}

pub async fn update_pipeline(
&self,
tenant_id: i64,
pipeline_id: i64,
pipeline: &UpdatePipelineRequest,
) -> reqwest::Response {
self.api_client
.post(&format!("{}/v1/pipelines/{pipeline_id}", &self.address))
.header("tenant_id", tenant_id)
.json(pipeline)
.send()
.await
.expect("failed to execute request")
}
}

pub async fn spawn_app() -> TestApp {
Expand Down

0 comments on commit 7e83512

Please sign in to comment.