Skip to content

Commit

Permalink
feat: some quality of life improvements (#73)
Browse files Browse the repository at this point in the history
* feat: support starting block

* feat: better indexers id
  • Loading branch information
EvolveArt authored Jan 22, 2025
1 parent 7128f1a commit 77ccc68
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 160 deletions.
2 changes: 2 additions & 0 deletions migrations/2023-09-18-115821_create_indexers/up.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
-- Your SQL goes here
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE indexers
(
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`

ALTER TABLE indexers DROP COLUMN starting_block;
ALTER TABLE indexers DROP COLUMN indexer_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Your SQL goes here

ALTER TABLE indexers ADD COLUMN starting_block BIGINT;
ALTER TABLE indexers ADD COLUMN indexer_id VARCHAR;
2 changes: 2 additions & 0 deletions src/domain/models/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct IndexerModel {
pub table_name: Option<String>,
pub status_server_port: Option<i32>,
pub custom_connection_string: Option<String>,
pub starting_block: Option<i64>,
pub indexer_id: Option<String>,
}

#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
Expand Down
21 changes: 21 additions & 0 deletions src/handlers/indexers/create_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub struct CreateIndexerRequest {
pub target_url: Option<String>,
pub table_name: Option<String>,
pub custom_connection_string: Option<String>,
pub starting_block: Option<i64>,
pub indexer_id: Option<String>,
#[serde(skip)]
pub data: Bytes,
#[serde(skip)]
Expand All @@ -39,6 +41,8 @@ impl Default for CreateIndexerRequest {
target_url: None,
table_name: None,
custom_connection_string: None,
starting_block: None,
indexer_id: None,
data: Bytes::new(),
status_server_port: 1234,
}
Expand Down Expand Up @@ -100,12 +104,27 @@ async fn build_create_indexer_request(request: &mut Multipart) -> Result<CreateI
create_indexer_request.indexer_type =
IndexerType::from_str(field.as_str()).map_err(|_| IndexerError::InvalidIndexerType(field))?
}
"starting_block" => {
let field = field.text().await.map_err(IndexerError::FailedToReadMultipartField)?;
create_indexer_request.starting_block = Some(
field.parse().map_err(|_| IndexerError::InternalServerError("Invalid starting block".into()))?,
);
}
"indexer_id" => {
create_indexer_request.indexer_id =
Some(field.text().await.map_err(IndexerError::FailedToReadMultipartField)?)
}
_ => return Err(IndexerError::UnexpectedMultipartField(field_name.to_string())),
};
}

create_indexer_request.set_random_port();

// For Postgres indexers, use table_name as indexer_id if not provided
if create_indexer_request.indexer_type == IndexerType::Postgres && create_indexer_request.indexer_id.is_none() {
create_indexer_request.indexer_id = create_indexer_request.table_name.clone();
}

if !create_indexer_request.is_ready() {
return Err(IndexerError::FailedToBuildCreateIndexerRequest);
}
Expand All @@ -127,6 +146,8 @@ pub async fn create_indexer(
table_name: create_indexer_request.table_name.clone(),
status_server_port: None,
custom_connection_string: create_indexer_request.custom_connection_string.clone(),
starting_block: create_indexer_request.starting_block,
indexer_id: create_indexer_request.indexer_id.clone(),
};

let config = config().await;
Expand Down
14 changes: 9 additions & 5 deletions src/handlers/indexers/indexer_types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use chrono::Utc;
use shutil::pipe;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use uuid::Uuid;

use crate::constants::indexers::{
MAX_INDEXER_START_RETRIES, START_INDEXER_DELAY_SECONDS, WORKING_INDEXER_THRESHOLD_TIME_MINUTES,
Expand All @@ -19,6 +18,8 @@ use crate::handlers::indexers::utils::get_script_tmp_directory;
use crate::publishers::indexers::{publish_failed_indexer, publish_start_indexer, publish_stop_indexer};
use crate::utils::env::get_environment_variable;

pub const DEFAULT_STARTING_BLOCK: i64 = 1;

#[async_trait]
pub trait Indexer {
async fn start(&self, indexer: &IndexerModel, attempt: u32) -> Result<u32, IndexerError>;
Expand All @@ -35,7 +36,7 @@ pub trait Indexer {
let auth_token = get_environment_variable("APIBARA_AUTH_TOKEN");
let etcd_url = get_environment_variable("APIBARA_ETCD_URL");

let indexer_id = indexer.id.to_string();
let sink_id = indexer.indexer_id.clone().unwrap_or_else(|| indexer.id.to_string());
let status_server_address = format!("0.0.0.0:{port}", port = indexer.status_server_port.unwrap_or(1234));

let mut args = vec![
Expand All @@ -46,7 +47,7 @@ pub trait Indexer {
"--persist-to-etcd",
etcd_url.as_str(),
"--sink-id",
indexer_id.as_str(),
sink_id.as_str(),
"--status-server-address",
status_server_address.as_str(),
"--allow-env-from-env",
Expand All @@ -59,9 +60,10 @@ pub trait Indexer {
// Silence stdout and stderr
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("STARTING_BLOCK", indexer.starting_block.unwrap_or(DEFAULT_STARTING_BLOCK).to_string())
.args(args)
.spawn()
.map_err(|_| IndexerError::FailedToStartIndexer(indexer_id.clone()))?;
.map_err(|_| IndexerError::FailedToStartIndexer(indexer.id.to_string()))?;

let id = child_handle.id().expect("Failed to get the child process id");

Expand All @@ -71,6 +73,7 @@ pub trait Indexer {
let mut stdout_reader = BufReader::new(stdout).lines();
let mut stderr_reader = BufReader::new(stderr).lines();

let indexer_id = indexer.id;
tokio::spawn(async move {
loop {
tokio::select! {
Expand All @@ -89,7 +92,6 @@ pub trait Indexer {
}
}
result = child_handle.wait() => {
let indexer_id = Uuid::parse_str(indexer_id.as_str()).expect("Invalid UUID for indexer");
match result.unwrap().success() {
true => {
tracing::info!("Child process exited successfully {}", indexer_id);
Expand Down Expand Up @@ -210,9 +212,11 @@ mod tests {
process_id: None,
status: IndexerStatus::Created,
target_url: Some("https://example.com".to_string()),
starting_block: None,
table_name: None,
status_server_port: Some(1234),
custom_connection_string: None,
indexer_id: None,
};

// clear the sqs queue
Expand Down
29 changes: 2 additions & 27 deletions src/infra/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,7 @@ diesel::table! {
table_name -> Nullable<Varchar>,
status_server_port -> Nullable<Int4>,
custom_connection_string -> Nullable<Varchar>,
starting_block -> Nullable<Int8>,
indexer_id -> Nullable<Varchar>,
}
}

diesel::table! {
spot_entry (data_id) {
#[max_length = 255]
network -> Nullable<Varchar>,
#[max_length = 255]
pair_id -> Nullable<Varchar>,
#[max_length = 255]
data_id -> Varchar,
#[max_length = 255]
block_hash -> Nullable<Varchar>,
block_number -> Nullable<Int8>,
block_timestamp -> Nullable<Timestamp>,
#[max_length = 255]
transaction_hash -> Nullable<Varchar>,
price -> Nullable<Numeric>,
timestamp -> Nullable<Timestamp>,
#[max_length = 255]
publisher -> Nullable<Varchar>,
#[max_length = 255]
source -> Nullable<Varchar>,
volume -> Nullable<Numeric>,
_cursor -> Nullable<Int8>,
}
}

diesel::allow_tables_to_appear_in_same_query!(indexers, spot_entry,);
12 changes: 12 additions & 0 deletions src/infra/repositories/indexer_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct IndexerDb {
pub table_name: Option<String>,
pub status_server_port: Option<i32>,
pub custom_connection_string: Option<String>,
pub starting_block: Option<i64>,
pub indexer_id: Option<String>,
}

#[derive(Deserialize)]
Expand All @@ -41,6 +43,8 @@ pub struct NewIndexerDb {
pub table_name: Option<String>,
pub status_server_port: Option<i32>,
pub custom_connection_string: Option<String>,
pub starting_block: Option<i64>,
pub indexer_id: Option<String>,
}

#[derive(Deserialize, Insertable)]
Expand Down Expand Up @@ -223,6 +227,8 @@ impl TryFrom<NewIndexerDb> for IndexerModel {
table_name: value.table_name,
status_server_port: value.status_server_port,
custom_connection_string: value.custom_connection_string,
starting_block: value.starting_block,
indexer_id: value.indexer_id,
}
.try_into()?;
Ok(model)
Expand All @@ -241,6 +247,8 @@ impl TryFrom<IndexerDb> for IndexerModel {
table_name: value.table_name,
status_server_port: value.status_server_port,
custom_connection_string: value.custom_connection_string,
starting_block: value.starting_block,
indexer_id: value.indexer_id,
};
Ok(model)
}
Expand Down Expand Up @@ -278,6 +286,8 @@ mod tests {
table_name: Some(table_name.into()),
status_server_port: Some(1234),
custom_connection_string: None,
starting_block: None,
indexer_id: None,
};

let indexer_model: Result<IndexerModel, ParseError> = indexer_db.try_into();
Expand Down Expand Up @@ -319,6 +329,8 @@ mod tests {
table_name: Some(table_name.into()),
status_server_port: Some(1234),
custom_connection_string: None,
starting_block: None,
indexer_id: None,
};

let indexer_model: Result<IndexerModel, ParseError> = indexer_db.try_into();
Expand Down
128 changes: 0 additions & 128 deletions src/tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,130 +1,2 @@
use std::str::FromStr;

use axum::async_trait;
use uuid::Uuid;

use crate::domain::models::indexer::{IndexerModel, IndexerStatus, IndexerType};
use crate::infra::errors::InfraError;
use crate::infra::repositories::indexer_repository::{
IndexerFilter, NewIndexerDb, Repository, UpdateIndexerStatusAndProcessIdDb, UpdateIndexerStatusDb,
};

pub mod constants;
pub mod utils;

/// Mock the database
#[derive(Debug)]
pub struct MockRepository {
indexers: Vec<IndexerModel>,
}

impl MockRepository {
pub fn _new() -> Self {
Self {
indexers: vec![
IndexerModel {
id: uuid::Uuid::new_v4(),
status: IndexerStatus::Created,
indexer_type: IndexerType::Webhook,
process_id: None,
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
IndexerModel {
id: uuid::Uuid::new_v4(),
status: IndexerStatus::Running,
indexer_type: IndexerType::Webhook,
process_id: Some(123),
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
IndexerModel {
id: uuid::Uuid::new_v4(),
status: IndexerStatus::FailedRunning,
indexer_type: IndexerType::Webhook,
process_id: None,
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
IndexerModel {
id: uuid::Uuid::new_v4(),
status: IndexerStatus::Stopped,
indexer_type: IndexerType::Webhook,
process_id: None,
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
IndexerModel {
id: uuid::Uuid::new_v4(),
status: IndexerStatus::FailedStopping,
indexer_type: IndexerType::Webhook,
process_id: Some(123),
target_url: Some("https://example.com".to_string()),
table_name: None,
status_server_port: None,
custom_connection_string: None,
},
],
}
}
}

#[async_trait]
impl Repository for MockRepository {
async fn insert(&mut self, new_indexer: NewIndexerDb) -> Result<IndexerModel, InfraError> {
let indexer: IndexerModel = new_indexer.try_into().map_err(InfraError::ParseError)?;
// Insert the indexer in the mock database
self.indexers.push(indexer.clone());
Ok(indexer)
}
async fn delete(&mut self, id: Uuid) -> Result<(), InfraError> {
// Delete the indexer from the mock database
self.indexers.retain(|indexer| indexer.id != id);
Ok(())
}
async fn get(&self, id: Uuid) -> Result<IndexerModel, InfraError> {
let indexer = self.indexers.iter().find(|indexer| indexer.id == id);
if let Some(indexer) = indexer { Ok(indexer.clone()) } else { Err(InfraError::NotFound) }
}
async fn get_by_table_name(&self, table_name: String) -> Result<IndexerModel, InfraError> {
let indexer = self.indexers.iter().find(|indexer| indexer.table_name == Some(table_name.clone()));
if let Some(indexer) = indexer { Ok(indexer.clone()) } else { Err(InfraError::NotFound) }
}
async fn get_all(&self, filter: IndexerFilter) -> Result<Vec<IndexerModel>, InfraError> {
// Get all indexers with status filter if provided
let indexers = match filter.status {
Some(status) => self
.indexers
.iter()
.filter(|indexer| indexer.status == IndexerStatus::from_str(&status).unwrap())
.cloned()
.collect(),
None => self.indexers.clone(),
};
Ok(indexers)
}
async fn update_status(&mut self, indexer: UpdateIndexerStatusDb) -> Result<IndexerModel, InfraError> {
// Update the indexer in the mock database
let i = self.indexers.iter_mut().find(|other| indexer.id.eq(&other.id)).unwrap();
i.status = IndexerStatus::from_str(&indexer.status).map_err(InfraError::ParseError)?;
Ok(i.clone())
}
async fn update_status_and_process_id(
&mut self,
indexer: UpdateIndexerStatusAndProcessIdDb,
) -> Result<IndexerModel, InfraError> {
// Update the indexer in the self.indexers vector
let i = self.indexers.iter_mut().find(|other| indexer.id.eq(&other.id)).unwrap();
i.status = IndexerStatus::from_str(&indexer.status).map_err(InfraError::ParseError)?;
i.process_id = Some(indexer.process_id);
Ok(i.clone())
}
}
Loading

0 comments on commit 77ccc68

Please sign in to comment.