From 7e83512c293c784416f6696c2694a8bf4d9dbbb7 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Thu, 22 Aug 2024 16:12:45 +0530 Subject: [PATCH] add update pipeline api --- ...b12e115e805db2cd2cf934e192824f52177a3.json | 26 ++++++ api/src/db/pipelines.rs | 28 +++++++ api/src/routes/pipelines.rs | 25 +++++- api/src/startup.rs | 5 +- api/tests/api/pipelines.rs | 80 ++++++++++++++++++- api/tests/api/test_app.rs | 22 +++++ 6 files changed, 182 insertions(+), 4 deletions(-) create mode 100644 api/.sqlx/query-74c7a7fed4273ae6c263cc56f79b12e115e805db2cd2cf934e192824f52177a3.json diff --git a/api/.sqlx/query-74c7a7fed4273ae6c263cc56f79b12e115e805db2cd2cf934e192824f52177a3.json b/api/.sqlx/query-74c7a7fed4273ae6c263cc56f79b12e115e805db2cd2cf934e192824f52177a3.json new file mode 100644 index 0000000..ff7cc6a --- /dev/null +++ b/api/.sqlx/query-74c7a7fed4273ae6c263cc56f79b12e115e805db2cd2cf934e192824f52177a3.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n update pipelines\n set source_id = $1, sink_id = $2, config = $3\n where tenant_id = $4 and id = $5\n returning id\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8", + "Int8", + "Jsonb", + "Int8", + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "74c7a7fed4273ae6c263cc56f79b12e115e805db2cd2cf934e192824f52177a3" +} diff --git a/api/src/db/pipelines.rs b/api/src/db/pipelines.rs index 57bde06..a2fb4ea 100644 --- a/api/src/db/pipelines.rs +++ b/api/src/db/pipelines.rs @@ -72,3 +72,31 @@ pub async fn read_pipeline( config: r.config, })) } + +pub async fn update_pipeline( + pool: &PgPool, + tenant_id: i64, + pipeline_id: i64, + source_id: i64, + sink_id: i64, + config: &PipelineConfig, +) -> Result, sqlx::Error> { + let config = serde_json::to_value(config).expect("failed to serialize config"); + let record = sqlx::query!( + r#" + update pipelines + set source_id = $1, sink_id = $2, config = $3 + where tenant_id = $4 and id = $5 + returning id + "#, + source_id, + sink_id, + config, + tenant_id, + pipeline_id + ) + .fetch_optional(pool) + .await?; + + Ok(record.map(|r| r.id)) +} diff --git a/api/src/routes/pipelines.rs b/api/src/routes/pipelines.rs index 456b260..507cc50 100644 --- a/api/src/routes/pipelines.rs +++ b/api/src/routes/pipelines.rs @@ -3,7 +3,7 @@ use actix_web::{ http::StatusCode, post, web::{Data, Json, Path}, - HttpRequest, Responder, ResponseError, + HttpRequest, HttpResponse, Responder, ResponseError, }; use serde::{Deserialize, Serialize}; use sqlx::PgPool; @@ -124,3 +124,26 @@ pub async fn read_pipeline( .ok_or(PipelineError::NotFound(pipeline_id))?; Ok(Json(response)) } + +#[post("/pipelines/{pipeline_id}")] +pub async fn update_pipeline( + req: HttpRequest, + pool: Data, + pipeline_id: Path, + pipeline: Json, +) -> Result { + let tenant_id = extract_tenant_id(&req)?; + let pipeline_id = pipeline_id.into_inner(); + let config = &pipeline.config; + db::pipelines::update_pipeline( + &pool, + tenant_id, + pipeline_id, + pipeline.source_id, + pipeline.sink_id, + config, + ) + .await? + .ok_or(PipelineError::NotFound(pipeline_id))?; + Ok(HttpResponse::Ok().finish()) +} diff --git a/api/src/startup.rs b/api/src/startup.rs index 88446e8..8bf72d5 100644 --- a/api/src/startup.rs +++ b/api/src/startup.rs @@ -8,7 +8,7 @@ use crate::{ configuration::{DatabaseSettings, Settings}, routes::{ health_check::health_check, - pipelines::{create_pipeline, read_pipeline}, + pipelines::{create_pipeline, read_pipeline, update_pipeline}, sinks::{create_sink, delete_sink, read_sink, update_sink}, sources::{create_source, delete_source, read_source, update_source}, tenants::{create_tenant, delete_tenant, read_tenant, update_tenant}, @@ -69,7 +69,8 @@ pub async fn run(listener: TcpListener, connection_pool: PgPool) -> Result PipelineConfig { @@ -17,6 +20,15 @@ fn new_pipeline_config() -> PipelineConfig { } } +fn updated_pipeline_config() -> PipelineConfig { + PipelineConfig { + config: BatchConfig { + max_size: 2000, + max_fill_secs: 10, + }, + } +} + #[tokio::test] async fn pipeline_can_be_created() { // Arrange @@ -89,3 +101,69 @@ async fn an_non_existing_pipeline_cant_be_read() { // Assert assert_eq!(response.status(), StatusCode::NOT_FOUND); } + +#[tokio::test] +async fn an_existing_pipeline_can_be_updated() { + // Arrange + let app = spawn_app().await; + let tenant_id = create_tenant(&app).await; + let source_id = create_source(&app, tenant_id).await; + let sink_id = create_sink(&app, tenant_id).await; + + let pipeline = CreatePipelineRequest { + source_id, + sink_id, + config: new_pipeline_config(), + }; + let response = app.create_pipeline(tenant_id, &pipeline).await; + let response: CreatePipelineResponse = response + .json() + .await + .expect("failed to deserialize response"); + let pipeline_id = response.id; + + // Act + let source_id = create_source(&app, tenant_id).await; + let sink_id = create_sink(&app, tenant_id).await; + let updated_config = UpdatePipelineRequest { + source_id, + sink_id, + config: updated_pipeline_config(), + }; + let response = app + .update_pipeline(tenant_id, pipeline_id, &updated_config) + .await; + + // Assert + assert!(response.status().is_success()); + let response = app.read_pipeline(tenant_id, pipeline_id).await; + let response: PipelineResponse = response + .json() + .await + .expect("failed to deserialize response"); + assert_eq!(response.id, pipeline_id); + assert_eq!(response.tenant_id, tenant_id); + assert_eq!(response.source_id, source_id); + assert_eq!(response.sink_id, sink_id); + assert_eq!(response.config, updated_config.config); +} + +#[tokio::test] +async fn an_non_existing_pipeline_cant_be_updated() { + // Arrange + let app = spawn_app().await; + let tenant_id = create_tenant(&app).await; + let source_id = create_source(&app, tenant_id).await; + let sink_id = create_sink(&app, tenant_id).await; + + // Act + let updated_config = UpdatePipelineRequest { + source_id, + sink_id, + config: updated_pipeline_config(), + }; + let response = app.update_pipeline(tenant_id, 42, &updated_config).await; + + // Assert + assert_eq!(response.status(), StatusCode::NOT_FOUND); +} diff --git a/api/tests/api/test_app.rs b/api/tests/api/test_app.rs index ec4f4d5..3cf61d1 100644 --- a/api/tests/api/test_app.rs +++ b/api/tests/api/test_app.rs @@ -104,6 +104,13 @@ pub struct PipelineResponse { pub config: PipelineConfig, } +#[derive(Serialize)] +pub struct UpdatePipelineRequest { + pub source_id: i64, + pub sink_id: i64, + pub config: PipelineConfig, +} + impl TestApp { pub async fn create_tenant(&self, tenant: &CreateTenantRequest) -> reqwest::Response { self.api_client @@ -255,6 +262,21 @@ impl TestApp { .await .expect("failed to execute request") } + + pub async fn update_pipeline( + &self, + tenant_id: i64, + pipeline_id: i64, + pipeline: &UpdatePipelineRequest, + ) -> reqwest::Response { + self.api_client + .post(&format!("{}/v1/pipelines/{pipeline_id}", &self.address)) + .header("tenant_id", tenant_id) + .json(pipeline) + .send() + .await + .expect("failed to execute request") + } } pub async fn spawn_app() -> TestApp {