diff --git a/crates/torii/indexer/src/processors/metadata_update.rs b/crates/torii/indexer/src/processors/metadata_update.rs index ec2c4b493d..c4b71c4a52 100644 --- a/crates/torii/indexer/src/processors/metadata_update.rs +++ b/crates/torii/indexer/src/processors/metadata_update.rs @@ -9,7 +9,6 @@ use dojo_world::contracts::world::WorldContractReader; use dojo_world::uri::Uri; use starknet::core::types::{Event, Felt}; use starknet::providers::Provider; -use torii_sqlite::constants::IPFS_CLIENT_MAX_RETRY; use torii_sqlite::utils::fetch_content_from_ipfs; use torii_sqlite::Sql; use tracing::{error, info}; @@ -107,7 +106,7 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option, Opt let uri = Uri::Ipfs(uri_str); let cid = uri.cid().ok_or("Uri is malformed").map_err(Error::msg)?; - let bytes = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY).await?; + let bytes = fetch_content_from_ipfs(cid).await?; let metadata: WorldMetadata = serde_json::from_str(std::str::from_utf8(&bytes)?)?; let icon_img = fetch_image(&metadata.icon_uri).await; @@ -118,7 +117,7 @@ async fn metadata(uri_str: String) -> Result<(WorldMetadata, Option, Opt async fn fetch_image(image_uri: &Option) -> Option { if let Some(uri) = image_uri { - let data = fetch_content_from_ipfs(uri.cid()?, IPFS_CLIENT_MAX_RETRY).await.ok()?; + let data = fetch_content_from_ipfs(uri.cid()?).await.ok()?; let encoded = general_purpose::STANDARD.encode(data); return Some(encoded); } diff --git a/crates/torii/server/src/artifacts.rs b/crates/torii/server/src/artifacts.rs index 30e3a55135..0bdec6cc93 100644 --- a/crates/torii/server/src/artifacts.rs +++ b/crates/torii/server/src/artifacts.rs @@ -15,7 +15,7 @@ use tokio::fs; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::broadcast::Receiver; -use torii_sqlite::constants::{IPFS_CLIENT_MAX_RETRY, TOKENS_TABLE}; +use torii_sqlite::constants::TOKENS_TABLE; use torii_sqlite::utils::fetch_content_from_ipfs; use tracing::{debug, error, trace}; use warp::http::Response; @@ -204,7 +204,7 @@ async fn fetch_and_process_image( uri if uri.starts_with("ipfs") => { debug!(image_uri = %uri, "Fetching image from IPFS"); let cid = uri.strip_prefix("ipfs://").unwrap(); - let response = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY) + let response = fetch_content_from_ipfs(cid) .await .context("Failed to read image bytes from IPFS response")?; diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index c91cdce8a6..5705b1bd64 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -13,7 +13,7 @@ use starknet_crypto::Felt; use tracing::{debug, trace, warn}; use super::{ApplyBalanceDiffQuery, Executor}; -use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE}; +use crate::constants::{SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE}; use crate::executor::LOG_TARGET; use crate::simple_broker::SimpleBroker; use crate::types::{ContractType, TokenBalance}; @@ -289,7 +289,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { uri if uri.starts_with("ipfs") => { let cid = uri.strip_prefix("ipfs://").unwrap(); debug!(cid = %cid, "Fetching metadata from IPFS"); - let response = fetch_content_from_ipfs(cid, IPFS_CLIENT_MAX_RETRY) + let response = fetch_content_from_ipfs(cid) .await .context("Failed to fetch metadata from IPFS")?; diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index 303e7291a1..0b79d5b75b 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -775,10 +775,10 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { if new_transaction { let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); transaction.commit().await?; - } - for message in self.publish_queue.drain(..) { - send_broker_message(message); + for message in self.publish_queue.drain(..) { + send_broker_message(message); + } } while let Some(result) = self.register_tasks.join_next().await { diff --git a/crates/torii/sqlite/src/utils.rs b/crates/torii/sqlite/src/utils.rs index 1d16e1a27e..fd77affa82 100644 --- a/crates/torii/sqlite/src/utils.rs +++ b/crates/torii/sqlite/src/utils.rs @@ -10,7 +10,7 @@ use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri}; use starknet::core::types::U256; use starknet_crypto::Felt; use tokio_util::bytes::Bytes; -use tracing::info; +use tracing::warn; use crate::constants::{ IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, @@ -53,22 +53,24 @@ pub fn sql_string_to_felts(sql_string: &str) -> Vec { sql_string.split(SQL_FELT_DELIMITER).map(|felt| Felt::from_str(felt).unwrap()).collect() } -pub async fn fetch_content_from_ipfs(cid: &str, mut retries: u8) -> Result { +pub async fn fetch_content_from_ipfs(cid: &str) -> Result { + let mut retries = IPFS_CLIENT_MAX_RETRY; let client = IpfsClient::from_str(IPFS_CLIENT_URL)? .with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD); + while retries > 0 { let response = client.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await; match response { Ok(stream) => return Ok(Bytes::from(stream)), Err(e) => { retries -= 1; - if retries > 0 { - info!( - error = %e, - "Fetch uri." - ); - tokio::time::sleep(Duration::from_secs(3)).await; - } + warn!( + error = %e, + remaining_attempts = retries, + cid = cid, + "Failed to fetch content from IPFS, retrying after delay" + ); + tokio::time::sleep(Duration::from_secs(3)).await; } } }