Skip to content

Commit

Permalink
feat: add delete endpoint (#61)
Browse files Browse the repository at this point in the history
* feat: add delete handler

* fix: clippy

* fix: prettier

* ci: ubuntu

* ci: add cache
  • Loading branch information
EvolveArt authored Aug 8, 2024
1 parent 1425fcc commit 29a8125
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 8 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
jobs:
coverage:
# sadly, for now we have to "rebuild" for the coverage
runs-on: self-hosted
runs-on: ubuntu-latest

env:
HOST: 0.0.0.0
Expand Down Expand Up @@ -95,6 +95,8 @@ jobs:
awslocal sqs create-queue --queue-name indexer-service-stop-indexer
awslocal sqs list-queues
- uses: Swatinem/rust-cache@v2

- name: Clean workspace
run: |
cargo llvm-cov clean --workspace
Expand Down
9 changes: 7 additions & 2 deletions examples/pragma/mainnet/mainnet-script-spot-checkpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ function decodeTransfersInBlock({ header, events }) {

const dataId = `${transactionHash}_${event.index ?? 0}`;

const [pairId, checkpointTimestamp, price, aggregationMode, nbSourcesAggregated] =
event.data;
const [
pairId,
checkpointTimestamp,
price,
aggregationMode,
nbSourcesAggregated,
] = event.data;

// Convert felts to string
const pairIdName = escapeInvalidCharacters(
Expand Down
9 changes: 7 additions & 2 deletions examples/pragma/testnet/sepolia-script-spot-checkpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ function decodeTransfersInBlock({ header, events }) {

const dataId = `${transactionHash}_${event.index ?? 0}`;

const [pairId, checkpointTimestamp, price, aggregationMode, nbSourcesAggregated] =
event.data;
const [
pairId,
checkpointTimestamp,
price,
aggregationMode,
nbSourcesAggregated,
] = event.data;

// Convert felts to string
const pairIdName = escapeInvalidCharacters(
Expand Down
23 changes: 23 additions & 0 deletions src/handlers/indexers/delete_indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use axum::extract::State;
use uuid::Uuid;

use crate::domain::models::indexer::{IndexerError, IndexerStatus};
use crate::infra::repositories::indexer_repository::{IndexerRepository, Repository};
use crate::utils::PathExtractor;
use crate::AppState;

pub async fn delete_indexer(
State(state): State<AppState>,
PathExtractor(id): PathExtractor<Uuid>,
) -> Result<(), IndexerError> {
let mut repository = IndexerRepository::new(&state.pool);
let indexer_model = repository.get(id).await.map_err(IndexerError::InfraError)?;
match indexer_model.status {
IndexerStatus::Stopped => (),
_ => return Err(IndexerError::InvalidIndexerStatus(indexer_model.status)),
}

repository.delete(id).await.map_err(IndexerError::InfraError)?;

Ok(())
}
1 change: 1 addition & 0 deletions src/handlers/indexers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod create_indexer;
pub mod delete_indexer;
pub mod fail_indexer;
pub mod get_indexer;
mod indexer_types;
Expand Down
12 changes: 12 additions & 0 deletions src/infra/repositories/indexer_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct UpdateIndexerStatusAndProcessIdDb {

#[async_trait]
pub trait Repository {
async fn delete(&mut self, id: Uuid) -> Result<(), InfraError>;
async fn insert(&mut self, new_indexer: NewIndexerDb) -> Result<IndexerModel, InfraError>;
async fn get(&self, id: Uuid) -> Result<IndexerModel, InfraError>;
async fn get_by_table_name(&self, table_name: String) -> Result<IndexerModel, InfraError>;
Expand Down Expand Up @@ -101,6 +102,10 @@ impl Repository for IndexerRepository<'_> {
update_status(self.pool, indexer).await
}

async fn delete(&mut self, id: Uuid) -> Result<(), InfraError> {
delete(self.pool, id).await
}

async fn update_status_and_process_id(
&mut self,
indexer: UpdateIndexerStatusAndProcessIdDb,
Expand Down Expand Up @@ -135,6 +140,13 @@ async fn get(pool: &Pool<AsyncPgConnection>, id: Uuid) -> Result<IndexerModel, I
Ok(res)
}

async fn delete(pool: &Pool<AsyncPgConnection>, id: Uuid) -> Result<(), InfraError> {
let mut conn = pool.get().await?;
diesel::delete(indexers::table.filter(indexers::id.eq(id))).execute(&mut conn).await?;

Ok(())
}

async fn get_by_table_name(pool: &Pool<AsyncPgConnection>, table_name: String) -> Result<IndexerModel, InfraError> {
let mut conn = pool.get().await?;
let res = indexers::table
Expand Down
4 changes: 3 additions & 1 deletion src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{get, post};
use axum::routing::{delete, get, post};
use axum::Router;
use tower_http::cors::{Any, CorsLayer};

use crate::handlers::global::health::health_check;
use crate::handlers::indexers::create_indexer::create_indexer;
use crate::handlers::indexers::delete_indexer::delete_indexer;
use crate::handlers::indexers::get_indexer::{
get_indexer, get_indexer_status, get_indexer_status_by_table_name, get_indexers,
};
Expand All @@ -31,6 +32,7 @@ fn indexers_routes(state: AppState) -> Router<AppState> {
.route("/indexers", get(get_indexers))
.route("/stop/:id", post(stop_indexer))
.route("/start/:id", post(start_indexer_api))
.route("/delete/:id", delete(delete_indexer))
.route("/:id", get(get_indexer))
.route("/status/:id", get(get_indexer_status))
.route("/status/table/:table_name", get(get_indexer_status_by_table_name))
Expand Down
5 changes: 5 additions & 0 deletions src/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ impl Repository for MockRepository {
self.indexers.push(indexer.clone());
Ok(indexer)
}
async fn delete(&mut self, id: Uuid) -> Result<(), InfraError> {
// Delete the indexer from the mock database
self.indexers.retain(|indexer| indexer.id != id);
Ok(())
}
async fn get(&self, id: Uuid) -> Result<IndexerModel, InfraError> {
let indexer = self.indexers.iter().find(|indexer| indexer.id == id);
if let Some(indexer) = indexer { Ok(indexer.clone()) } else { Err(InfraError::NotFound) }
Expand Down
18 changes: 18 additions & 0 deletions src/tests/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,24 @@ pub async fn send_stop_indexer_request(client: Client<HttpConnector>, id: Uuid,
assert_eq!(response.status(), StatusCode::OK);
}

/// Sends a request to stop the indexer with the specified script path.
/// Arguments
/// - client: The hyper client to use to send the request
/// - id: The id of the indexer to stop
/// - addr: The address of the server to send the request to
pub async fn send_delete_indexer_request(client: Client<HttpConnector>, id: Uuid, addr: SocketAddr) -> Response<Body> {
client
.request(
Request::builder()
.method(http::Method::DELETE)
.uri(format!("http://{}/v1/indexers/delete/{}", addr, id))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap()
}

/// Asserts that a queue contains a message which has a body equal to the
/// id of the specified indexer
pub async fn assert_queue_contains_message_with_indexer_id(queue_url: &str, body: String) {
Expand Down
64 changes: 62 additions & 2 deletions src/tests/server/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::routes::app_router;
use crate::tests::common::constants::{BROKEN_APIBARA_SCRIPT, WEHBHOOK_URL, WORKING_APIBARA_SCRIPT};
use crate::tests::common::utils::{
assert_queue_contains_message_with_indexer_id, get_indexer, get_indexers, is_process_running,
send_create_indexer_request, send_create_webhook_indexer_request, send_start_indexer_request,
send_stop_indexer_request,
send_create_indexer_request, send_create_webhook_indexer_request, send_delete_indexer_request,
send_start_indexer_request, send_stop_indexer_request,
};
use crate::types::sqs::{StartIndexerRequest, StopIndexerRequest};
use crate::AppState;
Expand Down Expand Up @@ -358,3 +358,63 @@ async fn get_all_indexers_test(#[future] setup_server: SocketAddr) {
assert_eq!(response_body.len(), 1);
assert_eq!(response_body[0].id, body.id);
}

#[rstest]
#[tokio::test]
async fn delete_indexer_test_works_only_when_stopped(#[future] setup_server: SocketAddr) {
let addr = setup_server.await;

let client = hyper::Client::new();

// Create indexer
let response = send_create_webhook_indexer_request(client.clone(), WORKING_APIBARA_SCRIPT, addr).await;

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body: IndexerModel = serde_json::from_slice(&body).unwrap();

// stop the indexer
send_stop_indexer_request(client.clone(), body.id, addr).await;

// delete the indexer
let response = send_delete_indexer_request(client.clone(), body.id, addr).await;
assert_eq!(response.status(), StatusCode::OK);

// check indexer is not present in DB
let indexers = get_indexers().await;
assert_eq!(indexers.len(), 0);
}

#[rstest]
#[tokio::test]
async fn test_delete_indexer_fail_if_not_stopped(#[future] setup_server: SocketAddr) {
let addr = setup_server.await;

let client = hyper::Client::new();

// Create indexer
let response = send_create_webhook_indexer_request(client.clone(), WORKING_APIBARA_SCRIPT, addr).await;

let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let body: IndexerModel = serde_json::from_slice(&body).unwrap();

// start the indexer
send_start_indexer_request(client.clone(), body.id, addr).await;

// delete the indexer
let response = client
.request(
Request::builder()
.uri(format!("http://{}/v1/indexers/delete/{}", addr, body.id))
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();

assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);

// check indexer is present in DB
let indexer = get_indexer(body.id).await;
assert_eq!(indexer.id, body.id);
assert_eq!(indexer.status, IndexerStatus::Running);
}

0 comments on commit 29a8125

Please sign in to comment.