From 9637ab9549f7dd35a35cdc3b6bee0c2d35dba67f Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 27 Jan 2025 14:20:55 +0700 Subject: [PATCH 01/11] feat(torii): add support for erc1155 --- crates/torii/sqlite/src/erc.rs | 70 +++++++++++++++++++++-- crates/torii/sqlite/src/executor/erc.rs | 72 +++++++++++++++++------- crates/torii/sqlite/src/executor/mod.rs | 75 +++++++++++++------------ crates/torii/sqlite/src/types.rs | 3 + 4 files changed, 161 insertions(+), 59 deletions(-) diff --git a/crates/torii/sqlite/src/erc.rs b/crates/torii/sqlite/src/erc.rs index 427a49a8d8..37cbfec640 100644 --- a/crates/torii/sqlite/src/erc.rs +++ b/crates/torii/sqlite/src/erc.rs @@ -10,9 +10,9 @@ use starknet::providers::Provider; use super::utils::{u256_to_sql_string, I256}; use super::{Sql, SQL_FELT_DELIMITER}; use crate::constants::TOKEN_TRANSFER_TABLE; +use crate::executor::erc::RegisterNftTokenQuery; use crate::executor::{ ApplyBalanceDiffQuery, Argument, QueryMessage, QueryType, RegisterErc20TokenQuery, - RegisterErc721TokenQuery, }; use crate::types::ContractType; use crate::utils::{ @@ -95,7 +95,7 @@ impl Sql { let token_exists: bool = self.local_cache.contains_token_id(&token_id).await; if !token_exists { - self.register_erc721_token_metadata(contract_address, &token_id, actual_token_id) + self.register_nft_token_metadata(contract_address, &token_id, actual_token_id) .await?; } @@ -142,6 +142,68 @@ impl Sql { Ok(()) } + #[allow(clippy::too_many_arguments)] +pub async fn handle_erc1155_transfer( + &mut self, + contract_address: Felt, + from_address: Felt, + to_address: Felt, + token_id: U256, + amount: U256, + block_timestamp: u64, + event_id: &str, + block_number: u64, +) -> Result<()> { + // contract_address:id + let actual_token_id = token_id; + let token_id = felt_and_u256_to_sql_string(&contract_address, &token_id); + let token_exists: bool = self.local_cache.contains_token_id(&token_id).await; + + if !token_exists { + self.register_nft_token_metadata(contract_address, &token_id, actual_token_id) + .await?; + } + + self.store_erc_transfer_event( + contract_address, + from_address, + to_address, + amount, + &token_id, + block_timestamp, + event_id, + )?; + + { + let mut erc_cache = self.local_cache.erc_cache.write().await; + if from_address != Felt::ZERO { + let from_balance_id = format!( + "{}{SQL_FELT_DELIMITER}{}", + felt_to_sql_string(&from_address), + &token_id + ); + let from_balance = erc_cache.entry((ContractType::ERC1155, from_balance_id)).or_default(); + *from_balance -= I256::from(amount); + } + + if to_address != Felt::ZERO { + let to_balance_id = + format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); + let to_balance = erc_cache.entry((ContractType::ERC1155, to_balance_id)).or_default(); + *to_balance += I256::from(amount); + } + } + + let block_id = BlockId::Number(block_number); + + if self.local_cache.erc_cache.read().await.len() >= 100000 { + self.flush().await.with_context(|| "Failed to flush in handle_erc1155_single_transfer")?; + self.apply_cache_diff(block_id).await?; + } + + Ok(()) +} + async fn register_erc20_token_metadata( &mut self, contract_address: Felt, @@ -220,7 +282,7 @@ impl Sql { Ok(()) } - async fn register_erc721_token_metadata( + async fn register_nft_token_metadata( &mut self, contract_address: Felt, token_id: &str, @@ -229,7 +291,7 @@ impl Sql { self.executor.send(QueryMessage::new( "".to_string(), vec![], - QueryType::RegisterErc721Token(RegisterErc721TokenQuery { + QueryType::RegisterNftToken(RegisterNftTokenQuery { token_id: token_id.to_string(), contract_address, actual_token_id, diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index 5705b1bd64..ebfb36421b 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -22,15 +22,15 @@ use crate::utils::{ }; #[derive(Debug, Clone)] -pub struct RegisterErc721TokenQuery { +pub struct RegisterNftTokenQuery { pub token_id: String, pub contract_address: Felt, pub actual_token_id: U256, } #[derive(Debug, Clone)] -pub struct RegisterErc721TokenMetadata { - pub query: RegisterErc721TokenQuery, +pub struct RegisterNftTokenMetadata { + pub query: RegisterNftTokenQuery, pub name: String, pub symbol: String, pub metadata: String, @@ -83,6 +83,26 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { let contract_address = id[1]; let token_id = id[1]; + self.apply_balance_diff_helper( + id_str, + account_address, + contract_address, + token_id, + balance, + Arc::clone(&provider), + apply_balance_diff.block_id, + ) + .await + .with_context(|| "Failed to apply balance diff in apply_cache_diff")?; + } + ContractType::ERC1155 => { + // account_address/contract_address:id => ERC1155 + assert!(id.len() == 2); + let account_address = id[0]; + let token_id = id[1]; + let mid = token_id.split(":").collect::>(); + let contract_address = mid[0]; + self.apply_balance_diff_helper( id_str, account_address, @@ -179,20 +199,20 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { Ok(()) } - pub async fn process_register_erc721_token_query( - register_erc721_token: RegisterErc721TokenQuery, + pub async fn process_register_nft_token_query( + register_nft_token: RegisterNftTokenQuery, provider: Arc

, name: String, symbol: String, - ) -> Result { + ) -> Result { let token_uri = if let Ok(token_uri) = provider .call( FunctionCall { - contract_address: register_erc721_token.contract_address, + contract_address: register_nft_token.contract_address, entry_point_selector: get_selector_from_name("token_uri").unwrap(), calldata: vec![ - register_erc721_token.actual_token_id.low().into(), - register_erc721_token.actual_token_id.high().into(), + register_nft_token.actual_token_id.low().into(), + register_nft_token.actual_token_id.high().into(), ], }, BlockId::Tag(BlockTag::Pending), @@ -203,11 +223,11 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { } else if let Ok(token_uri) = provider .call( FunctionCall { - contract_address: register_erc721_token.contract_address, + contract_address: register_nft_token.contract_address, entry_point_selector: get_selector_from_name("tokenURI").unwrap(), calldata: vec![ - register_erc721_token.actual_token_id.low().into(), - register_erc721_token.actual_token_id.high().into(), + register_nft_token.actual_token_id.low().into(), + register_nft_token.actual_token_id.high().into(), ], }, BlockId::Tag(BlockTag::Pending), @@ -215,10 +235,22 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .await { token_uri + } // erc1155 + else if let Ok(token_uri) = provider + .call( + FunctionCall { + contract_address: register_nft_token.contract_address, + entry_point_selector: get_selector_from_name("tokenURI").unwrap(), + calldata: vec![], + }, + BlockId::Tag(BlockTag::Pending), + ) + .await { + token_uri } else { warn!( - contract_address = format!("{:#x}", register_erc721_token.contract_address), - token_id = %register_erc721_token.actual_token_id, + contract_address = format!("{:#x}", register_nft_token.contract_address), + token_id = %register_nft_token.actual_token_id, "Error fetching token URI, empty metadata will be used instead.", ); @@ -245,7 +277,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { let metadata = Self::fetch_metadata(&token_uri).await.with_context(|| { format!( "Failed to fetch metadata for token_id: {}", - register_erc721_token.actual_token_id + register_nft_token.actual_token_id ) }); @@ -253,15 +285,15 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { serde_json::to_string(&metadata).context("Failed to serialize metadata")? } else { warn!( - contract_address = format!("{:#x}", register_erc721_token.contract_address), - token_id = %register_erc721_token.actual_token_id, + contract_address = format!("{:#x}", register_nft_token.contract_address), + token_id = %register_nft_token.actual_token_id, "Error fetching metadata, empty metadata will be used instead.", ); "".to_string() } }; - Ok(RegisterErc721TokenMetadata { query: register_erc721_token, metadata, name, symbol }) + Ok(RegisterNftTokenMetadata { query: register_nft_token, metadata, name, symbol }) } // given a uri which can be either http/https url or data uri, fetch the metadata erc721 @@ -333,9 +365,9 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { } } - pub async fn handle_erc721_token_metadata( + pub async fn handle_nft_token_metadata( &mut self, - result: RegisterErc721TokenMetadata, + result: RegisterNftTokenMetadata, ) -> Result<()> { let query = sqlx::query( "INSERT INTO tokens (id, contract_address, name, symbol, decimals, metadata) VALUES \ diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index 0b79d5b75b..6f0f6c096f 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -7,6 +7,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use anyhow::{Context, Result}; use cainome::cairo_serde::{ByteArray, CairoSerde}; use dojo_types::schema::{Struct, Ty}; +use erc::RegisterNftTokenMetadata; use sqlx::{FromRow, Pool, Sqlite, Transaction}; use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall}; use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string}; @@ -28,7 +29,7 @@ use crate::types::{ use crate::utils::{felt_to_sql_string, I256}; pub mod erc; -pub use erc::{RegisterErc20TokenQuery, RegisterErc721TokenMetadata, RegisterErc721TokenQuery}; +pub use erc::{RegisterErc20TokenQuery, RegisterNftTokenQuery}; pub(crate) const LOG_TARGET: &str = "torii::sqlite::executor"; @@ -109,7 +110,7 @@ pub enum QueryType { DeleteEntity(DeleteEntityQuery), EventMessage(EventMessageQuery), ApplyBalanceDiff(ApplyBalanceDiffQuery), - RegisterErc721Token(RegisterErc721TokenQuery), + RegisterNftToken(RegisterNftTokenQuery), RegisterErc20Token(RegisterErc20TokenQuery), TokenTransfer, RegisterModel, @@ -133,7 +134,7 @@ pub struct Executor<'c, P: Provider + Sync + Send + 'static> { shutdown_rx: Receiver<()>, // These tasks are spawned to fetch ERC721 token metadata from the chain // to not block the main loop - register_tasks: JoinSet>, + register_tasks: JoinSet>, // Some queries depends on the metadata being registered, so we defer them // until the metadata is fetched deferred_query_messages: Vec, @@ -273,7 +274,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { } Some(result) = self.register_tasks.join_next() => { let result = result??; - self.handle_erc721_token_metadata(result).await?; + self.handle_nft_token_metadata(result).await?; } } } @@ -610,67 +611,71 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { self.apply_balance_diff(apply_balance_diff, self.provider.clone()).await?; debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Applied balance diff."); } - QueryType::RegisterErc721Token(register_erc721_token) => { + QueryType::RegisterNftToken(register_nft_token) => { let semaphore = self.semaphore.clone(); let provider = self.provider.clone(); let res = sqlx::query_as::<_, (String, String)>(&format!( "SELECT name, symbol FROM {TOKENS_TABLE} WHERE contract_address = ? LIMIT 1" )) - .bind(felt_to_sql_string(®ister_erc721_token.contract_address)) + .bind(felt_to_sql_string(®ister_nft_token.contract_address)) .fetch_one(&mut **tx) .await; // If we find a token already registered for this contract_address we dont need to - // refetch the data since its same for all ERC721 tokens + // refetch the data since its same for all tokens of this contract let (name, symbol) = match res { Ok((name, symbol)) => { debug!( - contract_address = %felt_to_sql_string(®ister_erc721_token.contract_address), + contract_address = %felt_to_sql_string(®ister_nft_token.contract_address), "Token already registered for contract_address, so reusing fetched data", ); (name, symbol) } Err(_) => { - // Fetch token information from the chain - let name = provider + // Try to fetch name, use empty string if it fails + let name = match provider .call( FunctionCall { - contract_address: register_erc721_token.contract_address, + contract_address: register_nft_token.contract_address, entry_point_selector: get_selector_from_name("name").unwrap(), calldata: vec![], }, BlockId::Tag(BlockTag::Pending), ) - .await?; - - // len = 1 => return value felt (i.e. legacy erc721 token) - // len > 1 => return value ByteArray (i.e. new erc721 token) - let name = if name.len() == 1 { - parse_cairo_short_string(&name[0]).unwrap() - } else { - ByteArray::cairo_deserialize(&name, 0) - .expect("Return value not ByteArray") - .to_string() - .expect("Return value not String") + .await + { + Ok(name) => { + // len = 1 => return value felt (i.e. legacy erc721 token) + // len > 1 => return value ByteArray (i.e. new erc721 token) + if name.len() == 1 { + parse_cairo_short_string(&name[0])? + } else { + ByteArray::cairo_deserialize(&name, 0)?.to_string()? + } + } + Err(_) => "".to_string(), }; - let symbol = provider + // Try to fetch symbol, use empty string if it fails + let symbol = match provider .call( FunctionCall { - contract_address: register_erc721_token.contract_address, + contract_address: register_nft_token.contract_address, entry_point_selector: get_selector_from_name("symbol").unwrap(), calldata: vec![], }, BlockId::Tag(BlockTag::Pending), ) - .await?; - let symbol = if symbol.len() == 1 { - parse_cairo_short_string(&symbol[0]).unwrap() - } else { - ByteArray::cairo_deserialize(&symbol, 0) - .expect("Return value not ByteArray") - .to_string() - .expect("Return value not String") + .await + { + Ok(symbol) => { + if symbol.len() == 1 { + parse_cairo_short_string(&symbol[0])? + } else { + ByteArray::cairo_deserialize(&symbol, 0)?.to_string()? + } + } + Err(_) => "".to_string(), }; (name, symbol) @@ -680,8 +685,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { self.register_tasks.spawn(async move { let permit = semaphore.acquire().await.unwrap(); - let result = Self::process_register_erc721_token_query( - register_erc721_token, + let result = Self::process_register_nft_token_query( + register_nft_token, provider, name, symbol, @@ -783,7 +788,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { while let Some(result) = self.register_tasks.join_next().await { let result = result??; - self.handle_erc721_token_metadata(result).await?; + self.handle_nft_token_metadata(result).await?; } let mut deferred_query_messages = mem::take(&mut self.deferred_query_messages); diff --git a/crates/torii/sqlite/src/types.rs b/crates/torii/sqlite/src/types.rs index d56d33cb50..38daa9cab4 100644 --- a/crates/torii/sqlite/src/types.rs +++ b/crates/torii/sqlite/src/types.rs @@ -156,6 +156,7 @@ pub enum ContractType { WORLD, ERC20, ERC721, + ERC1155, } impl std::fmt::Display for Contract { @@ -172,6 +173,7 @@ impl FromStr for ContractType { "world" => Ok(ContractType::WORLD), "erc20" => Ok(ContractType::ERC20), "erc721" => Ok(ContractType::ERC721), + "erc1155" => Ok(ContractType::ERC1155), _ => Err(anyhow::anyhow!("Invalid ERC type: {}", input)), } } @@ -183,6 +185,7 @@ impl std::fmt::Display for ContractType { ContractType::WORLD => write!(f, "WORLD"), ContractType::ERC20 => write!(f, "ERC20"), ContractType::ERC721 => write!(f, "ERC721"), + ContractType::ERC1155 => write!(f, "ERC1155"), } } } From bd1a72d785f1dccd9cb9e2586cd7a48a8e7891b2 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 27 Jan 2025 14:57:34 +0700 Subject: [PATCH 02/11] add erc1155 processors --- .../src/processors/erc1155_transfer_batch.rs | 110 ++++++++++++++++++ .../src/processors/erc1155_transfer_single.rs | 81 +++++++++++++ .../src/processors/erc721_legacy_transfer.rs | 3 +- .../indexer/src/processors/erc721_transfer.rs | 3 +- crates/torii/indexer/src/processors/mod.rs | 2 + crates/torii/sqlite/src/erc.rs | 74 +----------- crates/torii/sqlite/src/executor/erc.rs | 6 +- 7 files changed, 207 insertions(+), 72 deletions(-) create mode 100644 crates/torii/indexer/src/processors/erc1155_transfer_batch.rs create mode 100644 crates/torii/indexer/src/processors/erc1155_transfer_single.rs diff --git a/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs b/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs new file mode 100644 index 0000000000..2fcd7efc25 --- /dev/null +++ b/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs @@ -0,0 +1,110 @@ +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use torii_sqlite::Sql; +use tracing::debug; + +use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{self, TaskId, TaskPriority}; + +pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc1155_transfer_batch"; + +#[derive(Default, Debug)] +pub struct Erc1155TransferBatchProcessor; + +#[async_trait] +impl

EventProcessor

for Erc1155TransferBatchProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "TransferBatch".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // key: [hash(TransferBatch), operator, from, to] + // data: [ids_len, ids[0].low, ids[0].high, ..., values_len, values[0].low, values[0].high, + // ...] + if event.keys.len() == 4 && !event.data.is_empty() { + return true; + } + false + } + + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, _event: &Event) -> TaskId { + task_manager::TASK_ID_SEQUENTIAL + } + + async fn process( + &self, + _world: &WorldContractReader

, + db: &mut Sql, + block_number: u64, + block_timestamp: u64, + event_id: &str, + event: &Event, + _config: &EventProcessorConfig, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.keys[2]; + let to = event.keys[3]; + + let ids_len = event.data[0].try_into().unwrap_or(0u64) as usize; + let mut current_idx = 1; + + // First pass: read all token IDs + let mut token_ids = Vec::with_capacity(ids_len); + for _ in 0..ids_len { + if current_idx + 1 >= event.data.len() { + break; + } + let token_id = U256Cainome::cairo_deserialize(&event.data, current_idx)?; + token_ids.push(U256::from_words(token_id.low, token_id.high)); + current_idx += 2; + } + + // Move index to values array + let values_len = event.data[current_idx].try_into().unwrap_or(0u64) as usize; + current_idx += 1; + + // Second pass: read and process amounts + for (idx, token_id) in token_ids.iter().enumerate() { + if idx >= values_len || current_idx + (idx * 2) + 1 >= event.data.len() { + break; + } + + let amount = U256Cainome::cairo_deserialize(&event.data, current_idx + (idx * 2))?; + let amount = U256::from_words(amount.low, amount.high); + + db.handle_nft_transfer( + token_address, + from, + to, + *token_id, + amount, + block_timestamp, + event_id, + block_number, + ) + .await?; + + debug!( + target: LOG_TARGET, + from = ?from, + to = ?to, + token_id = ?token_id, + amount = ?amount, + "ERC1155 TransferBatch" + ); + } + + Ok(()) + } +} diff --git a/crates/torii/indexer/src/processors/erc1155_transfer_single.rs b/crates/torii/indexer/src/processors/erc1155_transfer_single.rs new file mode 100644 index 0000000000..55d0877c0a --- /dev/null +++ b/crates/torii/indexer/src/processors/erc1155_transfer_single.rs @@ -0,0 +1,81 @@ +use std::hash::{DefaultHasher, Hash, Hasher}; + +use anyhow::Error; +use async_trait::async_trait; +use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; +use dojo_world::contracts::world::WorldContractReader; +use starknet::core::types::{Event, U256}; +use starknet::providers::Provider; +use torii_sqlite::Sql; +use tracing::debug; + +use super::{EventProcessor, EventProcessorConfig}; +use crate::task_manager::{self, TaskId, TaskPriority}; + +pub(crate) const LOG_TARGET: &str = "torii_indexer::processors::erc1155_transfer_single"; + +#[derive(Default, Debug)] +pub struct Erc1155TransferSingleProcessor; + +#[async_trait] +impl

EventProcessor

for Erc1155TransferSingleProcessor +where + P: Provider + Send + Sync + std::fmt::Debug, +{ + fn event_key(&self) -> String { + "TransferSingle".to_string() + } + + fn validate(&self, event: &Event) -> bool { + // key: [hash(TransferSingle), operator, from, to] + // data: [id.low, id.high, value.low, value.high] + if event.keys.len() == 4 && event.data.len() == 4 { + return true; + } + false + } + + fn task_priority(&self) -> TaskPriority { + 1 + } + + fn task_identifier(&self, _event: &Event) -> TaskId { + task_manager::TASK_ID_SEQUENTIAL + } + + async fn process( + &self, + _world: &WorldContractReader

, + db: &mut Sql, + block_number: u64, + block_timestamp: u64, + event_id: &str, + event: &Event, + _config: &EventProcessorConfig, + ) -> Result<(), Error> { + let token_address = event.from_address; + let from = event.keys[2]; + let to = event.keys[3]; + + let token_id = U256Cainome::cairo_deserialize(&event.data, 0)?; + let token_id = U256::from_words(token_id.low, token_id.high); + + let amount = U256Cainome::cairo_deserialize(&event.data, 2)?; + let amount = U256::from_words(amount.low, amount.high); + + db.handle_nft_transfer( + token_address, + from, + to, + token_id, + amount, + block_timestamp, + event_id, + block_number, + ) + .await?; + debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, amount = ?amount, "ERC1155 TransferSingle"); + + Ok(()) + } +} diff --git a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs index e02304ed9c..3cef04a5bc 100644 --- a/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_legacy_transfer.rs @@ -80,11 +80,12 @@ where let token_id = U256Cainome::cairo_deserialize(&event.data, 2)?; let token_id = U256::from_words(token_id.low, token_id.high); - db.handle_erc721_transfer( + db.handle_nft_transfer( token_address, from, to, token_id, + U256::from(1u8), block_timestamp, event_id, block_number, diff --git a/crates/torii/indexer/src/processors/erc721_transfer.rs b/crates/torii/indexer/src/processors/erc721_transfer.rs index 4a0383a4f1..8268f8cffa 100644 --- a/crates/torii/indexer/src/processors/erc721_transfer.rs +++ b/crates/torii/indexer/src/processors/erc721_transfer.rs @@ -80,11 +80,12 @@ where let token_id = U256Cainome::cairo_deserialize(&event.keys, 3)?; let token_id = U256::from_words(token_id.low, token_id.high); - db.handle_erc721_transfer( + db.handle_nft_transfer( token_address, from, to, token_id, + U256::from(1u8), block_timestamp, event_id, block_number, diff --git a/crates/torii/indexer/src/processors/mod.rs b/crates/torii/indexer/src/processors/mod.rs index 420dd798a7..ceaa9c7f3e 100644 --- a/crates/torii/indexer/src/processors/mod.rs +++ b/crates/torii/indexer/src/processors/mod.rs @@ -9,6 +9,8 @@ use torii_sqlite::Sql; use crate::task_manager::{TaskId, TaskPriority}; +pub mod erc1155_transfer_batch; +pub mod erc1155_transfer_single; pub mod erc20_legacy_transfer; pub mod erc20_transfer; pub mod erc721_legacy_transfer; diff --git a/crates/torii/sqlite/src/erc.rs b/crates/torii/sqlite/src/erc.rs index 37cbfec640..1e59ca452e 100644 --- a/crates/torii/sqlite/src/erc.rs +++ b/crates/torii/sqlite/src/erc.rs @@ -79,12 +79,13 @@ impl Sql { } #[allow(clippy::too_many_arguments)] - pub async fn handle_erc721_transfer( + pub async fn handle_nft_transfer( &mut self, contract_address: Felt, from_address: Felt, to_address: Felt, token_id: U256, + amount: U256, block_timestamp: u64, event_id: &str, block_number: u64, @@ -95,15 +96,14 @@ impl Sql { let token_exists: bool = self.local_cache.contains_token_id(&token_id).await; if !token_exists { - self.register_nft_token_metadata(contract_address, &token_id, actual_token_id) - .await?; + self.register_nft_token_metadata(contract_address, &token_id, actual_token_id).await?; } self.store_erc_transfer_event( contract_address, from_address, to_address, - U256::from(1u8), + amount, &token_id, block_timestamp, event_id, @@ -120,7 +120,7 @@ impl Sql { ); let from_balance = erc_cache.entry((ContractType::ERC721, from_balance_id)).or_default(); - *from_balance -= I256::from(1u8); + *from_balance -= I256::from(amount); } if to_address != Felt::ZERO { @@ -128,7 +128,7 @@ impl Sql { format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); let to_balance = erc_cache.entry((ContractType::ERC721, to_balance_id)).or_default(); - *to_balance += I256::from(1u8); + *to_balance += I256::from(amount); } } @@ -142,68 +142,6 @@ impl Sql { Ok(()) } - #[allow(clippy::too_many_arguments)] -pub async fn handle_erc1155_transfer( - &mut self, - contract_address: Felt, - from_address: Felt, - to_address: Felt, - token_id: U256, - amount: U256, - block_timestamp: u64, - event_id: &str, - block_number: u64, -) -> Result<()> { - // contract_address:id - let actual_token_id = token_id; - let token_id = felt_and_u256_to_sql_string(&contract_address, &token_id); - let token_exists: bool = self.local_cache.contains_token_id(&token_id).await; - - if !token_exists { - self.register_nft_token_metadata(contract_address, &token_id, actual_token_id) - .await?; - } - - self.store_erc_transfer_event( - contract_address, - from_address, - to_address, - amount, - &token_id, - block_timestamp, - event_id, - )?; - - { - let mut erc_cache = self.local_cache.erc_cache.write().await; - if from_address != Felt::ZERO { - let from_balance_id = format!( - "{}{SQL_FELT_DELIMITER}{}", - felt_to_sql_string(&from_address), - &token_id - ); - let from_balance = erc_cache.entry((ContractType::ERC1155, from_balance_id)).or_default(); - *from_balance -= I256::from(amount); - } - - if to_address != Felt::ZERO { - let to_balance_id = - format!("{}{SQL_FELT_DELIMITER}{}", felt_to_sql_string(&to_address), &token_id); - let to_balance = erc_cache.entry((ContractType::ERC1155, to_balance_id)).or_default(); - *to_balance += I256::from(amount); - } - } - - let block_id = BlockId::Number(block_number); - - if self.local_cache.erc_cache.read().await.len() >= 100000 { - self.flush().await.with_context(|| "Failed to flush in handle_erc1155_single_transfer")?; - self.apply_cache_diff(block_id).await?; - } - - Ok(()) -} - async fn register_erc20_token_metadata( &mut self, contract_address: Felt, diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index ebfb36421b..956709922d 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -235,7 +235,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .await { token_uri - } // erc1155 + } + // erc1155 else if let Ok(token_uri) = provider .call( FunctionCall { @@ -245,7 +246,8 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { }, BlockId::Tag(BlockTag::Pending), ) - .await { + .await + { token_uri } else { warn!( From 4cec8cea85c005becf3034ab209f898a92a34945 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 27 Jan 2025 15:12:35 +0700 Subject: [PATCH 03/11] processors & uri --- crates/torii/indexer/src/engine.rs | 9 +++++++++ .../indexer/src/processors/erc1155_transfer_single.rs | 2 -- crates/torii/sqlite/src/executor/erc.rs | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/crates/torii/indexer/src/engine.rs b/crates/torii/indexer/src/engine.rs index 815849184d..f8db9dbb70 100644 --- a/crates/torii/indexer/src/engine.rs +++ b/crates/torii/indexer/src/engine.rs @@ -28,6 +28,8 @@ use torii_sqlite::{Cursors, Sql}; use tracing::{debug, error, info, trace, warn}; use crate::constants::LOG_TARGET; +use crate::processors::erc1155_transfer_batch::Erc1155TransferBatchProcessor; +use crate::processors::erc1155_transfer_single::Erc1155TransferSingleProcessor; use crate::processors::erc20_legacy_transfer::Erc20LegacyTransferProcessor; use crate::processors::erc20_transfer::Erc20TransferProcessor; use crate::processors::erc721_legacy_transfer::Erc721LegacyTransferProcessor; @@ -105,6 +107,13 @@ impl Processors

{ Box::new(Erc721LegacyTransferProcessor) as Box>, ], ), + ( + ContractType::ERC1155, + vec![ + Box::new(Erc1155TransferBatchProcessor) as Box>, + Box::new(Erc1155TransferSingleProcessor) as Box>, + ], + ), ]; for (contract_type, processors) in event_processors { diff --git a/crates/torii/indexer/src/processors/erc1155_transfer_single.rs b/crates/torii/indexer/src/processors/erc1155_transfer_single.rs index 55d0877c0a..6854b375fc 100644 --- a/crates/torii/indexer/src/processors/erc1155_transfer_single.rs +++ b/crates/torii/indexer/src/processors/erc1155_transfer_single.rs @@ -1,5 +1,3 @@ -use std::hash::{DefaultHasher, Hash, Hasher}; - use anyhow::Error; use async_trait::async_trait; use cainome::cairo_serde::{CairoSerde, U256 as U256Cainome}; diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index 956709922d..27048dab72 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -241,7 +241,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .call( FunctionCall { contract_address: register_nft_token.contract_address, - entry_point_selector: get_selector_from_name("tokenURI").unwrap(), + entry_point_selector: get_selector_from_name("uri").unwrap(), calldata: vec![], }, BlockId::Tag(BlockTag::Pending), From 9201560619505a7beacac305d2dc9c244b399b66 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 27 Jan 2025 15:14:56 +0700 Subject: [PATCH 04/11] uri token id --- crates/torii/sqlite/src/executor/erc.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index 27048dab72..a9b54a9072 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -242,7 +242,10 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { FunctionCall { contract_address: register_nft_token.contract_address, entry_point_selector: get_selector_from_name("uri").unwrap(), - calldata: vec![], + calldata: vec![ + register_nft_token.actual_token_id.low().into(), + register_nft_token.actual_token_id.high().into(), + ], }, BlockId::Tag(BlockTag::Pending), ) From 51b1185940daa27a4de0a481d83a6cf74c71d5ab Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 27 Jan 2025 15:31:06 +0700 Subject: [PATCH 05/11] erc1155 {id} substitution from spec --- crates/torii/sqlite/src/executor/erc.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index a9b54a9072..81b3acbdc3 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -263,7 +263,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { ByteArray::cairo_serialize(&"".try_into().unwrap()) }; - let token_uri = if let Ok(byte_array) = ByteArray::cairo_deserialize(&token_uri, 0) { + let mut token_uri = if let Ok(byte_array) = ByteArray::cairo_deserialize(&token_uri, 0) { byte_array.to_string().expect("Return value not String") } else if let Ok(felt_array) = Vec::::cairo_deserialize(&token_uri, 0) { felt_array @@ -276,6 +276,13 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { return Err(anyhow::anyhow!("token_uri is neither ByteArray nor Array")); }; + // ERC1155 standard (https://eips.ethereum.org/EIPS/eip-1155#metadata) + // requires replacing {id} in the URI with the hex representation of the token ID + // padded to 64 hex chars (32 bytes). Example: + // "ipfs://QmSome/metadata/{id}.json" -> "ipfs://QmSome/metadata/000000000000000000000000000000000000000000000000000000000000000a.json" + let token_id_hex = format!("{:064x}", register_nft_token.actual_token_id); + token_uri = token_uri.replace("{id}", &token_id_hex); + let metadata = if token_uri.is_empty() { "".to_string() } else { From 91126fca655bb6e9e244bb241b3e18f25390faf5 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 27 Jan 2025 15:31:21 +0700 Subject: [PATCH 06/11] fmt --- crates/torii/sqlite/src/executor/erc.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index 81b3acbdc3..3566c5d821 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -279,7 +279,9 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { // ERC1155 standard (https://eips.ethereum.org/EIPS/eip-1155#metadata) // requires replacing {id} in the URI with the hex representation of the token ID // padded to 64 hex chars (32 bytes). Example: - // "ipfs://QmSome/metadata/{id}.json" -> "ipfs://QmSome/metadata/000000000000000000000000000000000000000000000000000000000000000a.json" + // "ipfs://QmSome/metadata/{id}.json" -> + // "ipfs://QmSome/metadata/000000000000000000000000000000000000000000000000000000000000000a. + // json" let token_id_hex = format!("{:064x}", register_nft_token.actual_token_id); token_uri = token_uri.replace("{id}", &token_id_hex); From b0a9c08fa2e55222c83f6112a2341b58522c8688 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 27 Jan 2025 15:32:45 +0700 Subject: [PATCH 07/11] show in debug --- crates/torii/sqlite/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/sqlite/src/utils.rs b/crates/torii/sqlite/src/utils.rs index fd77affa82..11c49d50c0 100644 --- a/crates/torii/sqlite/src/utils.rs +++ b/crates/torii/sqlite/src/utils.rs @@ -64,7 +64,7 @@ pub async fn fetch_content_from_ipfs(cid: &str) -> Result { Ok(stream) => return Ok(Bytes::from(stream)), Err(e) => { retries -= 1; - warn!( + debug!( error = %e, remaining_attempts = retries, cid = cid, From 32b368216a80b0b42e7e5187d20559a9340868fe Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 27 Jan 2025 15:33:59 +0700 Subject: [PATCH 08/11] debug import --- crates/torii/sqlite/src/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/sqlite/src/utils.rs b/crates/torii/sqlite/src/utils.rs index 11c49d50c0..7591640362 100644 --- a/crates/torii/sqlite/src/utils.rs +++ b/crates/torii/sqlite/src/utils.rs @@ -10,7 +10,7 @@ use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri}; use starknet::core::types::U256; use starknet_crypto::Felt; use tokio_util::bytes::Bytes; -use tracing::warn; +use tracing::debug; use crate::constants::{ IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, From ea78b867ede63b84574e242d4acba11b201c2028 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 27 Jan 2025 15:35:07 +0700 Subject: [PATCH 09/11] token uri in warn --- crates/torii/sqlite/src/executor/erc.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/torii/sqlite/src/executor/erc.rs b/crates/torii/sqlite/src/executor/erc.rs index 3566c5d821..cd909aa463 100644 --- a/crates/torii/sqlite/src/executor/erc.rs +++ b/crates/torii/sqlite/src/executor/erc.rs @@ -299,8 +299,9 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { serde_json::to_string(&metadata).context("Failed to serialize metadata")? } else { warn!( - contract_address = format!("{:#x}", register_nft_token.contract_address), + contract_address = format!("{:#x}", register_nft_token.contract_address), token_id = %register_nft_token.actual_token_id, + token_uri = %token_uri, "Error fetching metadata, empty metadata will be used instead.", ); "".to_string() From 9dccfed1ddc032379cb1e3c614a826f9505283cb Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 28 Jan 2025 11:09:25 +0700 Subject: [PATCH 10/11] cleanup and comment --- .../indexer/src/processors/erc1155_transfer_batch.rs | 11 +++++++---- .../indexer/src/processors/erc1155_transfer_single.rs | 5 +---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs b/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs index 2fcd7efc25..a1d7414668 100644 --- a/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs +++ b/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs @@ -28,10 +28,7 @@ where // key: [hash(TransferBatch), operator, from, to] // data: [ids_len, ids[0].low, ids[0].high, ..., values_len, values[0].low, values[0].high, // ...] - if event.keys.len() == 4 && !event.data.is_empty() { - return true; - } - false + event.keys.len() == 4 && !event.data.is_empty() } fn task_priority(&self) -> TaskPriority { @@ -56,6 +53,12 @@ where let from = event.keys[2]; let to = event.keys[3]; + // ERC1155 TransferBatch event data format: + // - ids_len: felt (first element) + // - ids: U256[] (each element stored as 2 felts: [low, high]) + // - values_len: felt + // - values: U256[] (each element stored as 2 felts: [low, high]) + // Spec reference: https://eips.ethereum.org/EIPS/eip-1155#transferbatch let ids_len = event.data[0].try_into().unwrap_or(0u64) as usize; let mut current_idx = 1; diff --git a/crates/torii/indexer/src/processors/erc1155_transfer_single.rs b/crates/torii/indexer/src/processors/erc1155_transfer_single.rs index 6854b375fc..f9deecc729 100644 --- a/crates/torii/indexer/src/processors/erc1155_transfer_single.rs +++ b/crates/torii/indexer/src/processors/erc1155_transfer_single.rs @@ -27,10 +27,7 @@ where fn validate(&self, event: &Event) -> bool { // key: [hash(TransferSingle), operator, from, to] // data: [id.low, id.high, value.low, value.high] - if event.keys.len() == 4 && event.data.len() == 4 { - return true; - } - false + event.keys.len() == 4 && event.data.len() == 4 } fn task_priority(&self) -> TaskPriority { From f123ced1f90d5a7ce8bb3d2add7e6ff761693fac Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 28 Jan 2025 11:09:36 +0700 Subject: [PATCH 11/11] fmt --- crates/torii/indexer/src/processors/erc1155_transfer_batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs b/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs index a1d7414668..3738983755 100644 --- a/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs +++ b/crates/torii/indexer/src/processors/erc1155_transfer_batch.rs @@ -56,7 +56,7 @@ where // ERC1155 TransferBatch event data format: // - ids_len: felt (first element) // - ids: U256[] (each element stored as 2 felts: [low, high]) - // - values_len: felt + // - values_len: felt // - values: U256[] (each element stored as 2 felts: [low, high]) // Spec reference: https://eips.ethereum.org/EIPS/eip-1155#transferbatch let ids_len = event.data[0].try_into().unwrap_or(0u64) as usize;