diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index e2fd1f58bd..ec54045abd 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -49,7 +49,7 @@ impl Client { ) -> Result { let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?; - let relay_client = torii_relay::client::RelayClient::new(relay_url)?; + let relay_client = torii_relay::client::RelayClient::new(relay_url, None)?; let metadata = grpc_client.metadata().await?; diff --git a/crates/torii/graphql/src/mapping.rs b/crates/torii/graphql/src/mapping.rs index 02bbd5618d..5ee47f54cb 100644 --- a/crates/torii/graphql/src/mapping.rs +++ b/crates/torii/graphql/src/mapping.rs @@ -51,10 +51,10 @@ lazy_static! { Name::new("contractAddress"), TypeData::Simple(TypeRef::named(Primitive::Felt252(None).to_string())), ), - ( - Name::new("transactionHash"), - TypeData::Simple(TypeRef::named(Primitive::Felt252(None).to_string())), - ), + // ( + // Name::new("transactionHash"), + // TypeData::Simple(TypeRef::named(Primitive::Felt252(None).to_string())), + // ), ( Name::new("executedAt"), TypeData::Simple(TypeRef::named(GraphqlType::DateTime.to_string())), diff --git a/crates/torii/graphql/src/object/model.rs b/crates/torii/graphql/src/object/model.rs index 281187bca4..a93b048f4e 100644 --- a/crates/torii/graphql/src/object/model.rs +++ b/crates/torii/graphql/src/object/model.rs @@ -1,8 +1,11 @@ +use std::str::FromStr; + use async_graphql::dynamic::indexmap::IndexMap; use async_graphql::dynamic::{ Enum, Field, InputObject, InputValue, SubscriptionField, SubscriptionFieldFuture, TypeRef, }; use async_graphql::{Name, Value}; +use starknet_crypto::Felt; use tokio_stream::StreamExt; use torii_sqlite::simple_broker::SimpleBroker; use torii_sqlite::types::Model; @@ -80,7 +83,7 @@ impl ResolvableObject for ModelObject { { SubscriptionFieldFuture::new(async move { let id = match ctx.args.get("id") { - Some(id) => Some(id.string()?.to_string()), + Some(id) => Some(Felt::from_str(&id.string()?.to_string())?), None => None, }; // if id is None, then subscribe to all models @@ -104,12 +107,12 @@ impl ResolvableObject for ModelObject { impl ModelObject { pub fn value_mapping(model: Model) -> ValueMapping { IndexMap::from([ - (Name::new("id"), Value::from(model.id)), + (Name::new("id"), Value::from(format!("{:#x}", model.id))), (Name::new("name"), Value::from(model.name)), (Name::new("namespace"), Value::from(model.namespace)), - (Name::new("classHash"), Value::from(model.class_hash)), - (Name::new("contractAddress"), Value::from(model.contract_address)), - (Name::new("transactionHash"), Value::from(model.transaction_hash)), + (Name::new("classHash"), Value::from(format!("{:#x}", model.class_hash))), + (Name::new("contractAddress"), Value::from(format!("{:#x}", model.contract_address))), + // (Name::new("transactionHash"), Value::from(format!("{:#x}", model.transaction_hash))), ( Name::new("createdAt"), Value::from(model.created_at.format(DATETIME_FORMAT).to_string()), diff --git a/crates/torii/graphql/src/schema.rs b/crates/torii/graphql/src/schema.rs index adee496350..d75d30090f 100644 --- a/crates/torii/graphql/src/schema.rs +++ b/crates/torii/graphql/src/schema.rs @@ -140,8 +140,7 @@ async fn build_objects(pool: &SqlitePool) -> Result<(Vec, Vec Result<(), Error> { let mut closed_stream = Vec::new(); - let contract_address = - Felt::from_str(&update.contract_address).map_err(ParseError::FromStr)?; + let contract_address = update.contract_address; for (idx, sub) in subs.subscribers.read().await.iter() { if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address { diff --git a/crates/torii/libp2p/src/client/mod.rs b/crates/torii/libp2p/src/client/mod.rs index eb5e66bb18..68bf9c6123 100644 --- a/crates/torii/libp2p/src/client/mod.rs +++ b/crates/torii/libp2p/src/client/mod.rs @@ -1,6 +1,9 @@ +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use dojo_types::naming::compute_selector_from_tag; +use dojo_world::contracts::abigen::model::Layout; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot; use futures::lock::Mutex; @@ -12,13 +15,17 @@ use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; #[cfg(not(target_arch = "wasm32"))] use libp2p::tcp; use libp2p::{identify, identity, noise, ping, yamux, Multiaddr, PeerId}; +use starknet::core::types::Event; +use starknet_crypto::Felt; +use torii_sqlite::executor::QueryMessage; +use torii_sqlite::Sql; use tracing::info; pub mod events; use crate::client::events::ClientEvent; use crate::constants; use crate::error::Error; -use crate::types::Message; +use crate::types::{Message, Update}; pub(crate) const LOG_TARGET: &str = "torii::relay::client"; @@ -39,6 +46,7 @@ pub struct RelayClient { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: Swarm, + sql: Option, command_receiver: UnboundedReceiver, } @@ -49,7 +57,7 @@ enum Command { impl RelayClient { #[cfg(not(target_arch = "wasm32"))] - pub fn new(relay_addr: String) -> Result { + pub fn new(relay_addr: String, replica_db: Option) -> Result { let local_key = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(local_key.public()); @@ -87,18 +95,30 @@ impl RelayClient { }) .build(); + if replica_db.is_some() { + swarm + .behaviour_mut() + .gossipsub + .subscribe(&IdentTopic::new(constants::UPDATE_MESSAGING_TOPIC))?; + } + info!(target: LOG_TARGET, addr = %relay_addr, "Dialing relay."); swarm.dial(relay_addr.parse::()?)?; let (command_sender, command_receiver) = futures::channel::mpsc::unbounded(); Ok(Self { command_sender: CommandSender::new(command_sender), - event_loop: Arc::new(Mutex::new(EventLoop { swarm, command_receiver })), + event_loop: Arc::new(Mutex::new(EventLoop { + swarm, + command_receiver, + sql: replica_db, + })), }) } #[cfg(target_arch = "wasm32")] - pub fn new(relay_addr: String) -> Result { + // We are never gonna be a replica in the browser. + pub fn new(relay_addr: String, _replica_db: Option) -> Result { let local_key = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(local_key.public()); @@ -195,6 +215,82 @@ impl EventLoop { } } + async fn handle_update(&mut self, update: Update) { + // TODO: Implement update handling. + info!(target: LOG_TARGET, update = ?update, "Received update."); + // We can safely unwrap because we subscribe to updates only if replica_db is provided. + let sql = self.sql.as_mut().unwrap(); + + match update { + Update::Head(cursor) => { + sql.set_head( + cursor.head as u64, + cursor.last_block_timestamp as u64, + 0, + cursor.contract_address, + ) + .await + .unwrap(); + } + Update::Model(model) => { + sql.register_model( + &model.namespace, + &model.schema, + model.layout, + model.class_hash, + model.contract_address, + model.packed_size, + model.unpacked_size, + model.executed_at.timestamp() as u64, + None, + ) + .await + .unwrap(); + } + Update::Entity(entity) => { + let id = Felt::from_str(&entity.id).unwrap(); + let model = entity.updated_model.unwrap(); + let model_id = compute_selector_from_tag(&model.name()); + if entity.deleted { + sql.delete_entity( + id, + model_id, + model, + &entity.event_id, + entity.executed_at.timestamp() as u64, + ) + .await + .unwrap(); + } else { + sql.set_entity( + model, + &entity.event_id, + entity.executed_at.timestamp() as u64, + id, + model_id, + Some(&entity.keys), + ) + .await + .unwrap(); + } + } + Update::EventMessage(event_message) => { + let model = event_message.updated_model.unwrap(); + sql.set_event_message( + model, + &event_message.event_id, + event_message.executed_at.timestamp() as u64, + event_message.historical, + ) + .await + .unwrap(); + } + Update::Event(event) => { + // TODO + } + } + } + pub async fn run(&mut self) { let mut is_relay_ready = false; let commands_queue = Arc::new(Mutex::new(Vec::new())); @@ -207,6 +303,11 @@ impl EventLoop { }, event = self.swarm.select_next_some() => { match event { + SwarmEvent::Behaviour(ClientEvent::Gossipsub(gossipsub::Event::Message { message, .. })) => { + if let Ok(update) = serde_json::from_slice::(&message.data) { + self.handle_update(update).await; + } + }, SwarmEvent::Behaviour(ClientEvent::Gossipsub(gossipsub::Event::Subscribed { topic, .. })) => { // Handle behaviour events. info!(target: LOG_TARGET, topic = ?topic, "Relay ready. Received subscription confirmation."); diff --git a/crates/torii/libp2p/src/constants.rs b/crates/torii/libp2p/src/constants.rs index 2d061e72e4..edb1566edb 100644 --- a/crates/torii/libp2p/src/constants.rs +++ b/crates/torii/libp2p/src/constants.rs @@ -1,3 +1,4 @@ pub(crate) const GOSSIPSUB_HEARTBEAT_INTERVAL_SECS: u64 = 10; pub(crate) const MESSAGING_TOPIC: &str = "message"; +pub(crate) const UPDATE_MESSAGING_TOPIC: &str = "update"; pub(crate) const IDLE_CONNECTION_TIMEOUT_SECS: u64 = 60; diff --git a/crates/torii/libp2p/src/test.rs b/crates/torii/libp2p/src/test.rs index 35c6a85965..9f0c9fe012 100644 --- a/crates/torii/libp2p/src/test.rs +++ b/crates/torii/libp2p/src/test.rs @@ -116,7 +116,7 @@ async fn test_client_messaging() -> Result<(), Box> { }); // Initialize the first client (listener) - let client = RelayClient::new("/ip4/127.0.0.1/tcp/9900".to_string())?; + let client = RelayClient::new("/ip4/127.0.0.1/tcp/9900".to_string(), None)?; tokio::spawn(async move { client.event_loop.lock().await.run().await; }); diff --git a/crates/torii/libp2p/src/types.rs b/crates/torii/libp2p/src/types.rs index e8d52c2402..79dd786218 100644 --- a/crates/torii/libp2p/src/types.rs +++ b/crates/torii/libp2p/src/types.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use starknet::core::types::Felt; +use torii_sqlite::types::{ContractCursor, Entity, Event, EventMessage, Model}; use torii_typed_data::TypedData; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -7,3 +8,18 @@ pub struct Message { pub message: TypedData, pub signature: Vec, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Update { + // Latest contract head + Head(ContractCursor), + // Registered model + Model(Model), + // Updated entity state + Entity(Entity), + // Indexed event message + EventMessage(EventMessage), + // Indexed raw event + Event(Event), + // TODO: Add more types of updates here. +} diff --git a/crates/torii/sqlite/src/cache.rs b/crates/torii/sqlite/src/cache.rs index bbfad566db..a63dcf54a4 100644 --- a/crates/torii/sqlite/src/cache.rs +++ b/crates/torii/sqlite/src/cache.rs @@ -1,34 +1,14 @@ use std::collections::{HashMap, HashSet}; -use dojo_types::schema::Ty; -use dojo_world::contracts::abigen::model::Layout; use sqlx::{Pool, Sqlite, SqlitePool}; use starknet_crypto::Felt; use tokio::sync::RwLock; use crate::constants::TOKEN_BALANCE_TABLE; -use crate::error::{Error, ParseError}; -use crate::types::ContractType; +use crate::error::Error; +use crate::types::{ContractType, Model}; use crate::utils::I256; -#[derive(Debug, Clone)] -pub struct Model { - /// Namespace of the model - pub namespace: String, - /// The name of the model - pub name: String, - /// The selector of the model - pub selector: Felt, - /// The class hash of the model - pub class_hash: Felt, - /// The contract address of the model - pub contract_address: Felt, - pub packed_size: u32, - pub unpacked_size: u32, - pub layout: Layout, - pub schema: Ty, -} - #[derive(Debug)] pub struct ModelCache { pool: SqlitePool, @@ -65,42 +45,15 @@ impl ModelCache { } async fn update_model(&self, selector: &Felt) -> Result { - let ( - namespace, - name, - class_hash, - contract_address, - packed_size, - unpacked_size, - layout, - schema, - ): (String, String, String, String, u32, u32, String, String) = sqlx::query_as( - "SELECT namespace, name, class_hash, contract_address, packed_size, unpacked_size, \ - layout, schema FROM models WHERE id = ?", + let model: Model = sqlx::query_as( + "SELECT id, namespace, name, class_hash, contract_address, transaction_hash, packed_size, unpacked_size, \ + layout, schema, executed_at, created_at FROM models WHERE id = ?", ) .bind(format!("{:#x}", selector)) .fetch_one(&self.pool) .await?; - let class_hash = Felt::from_hex(&class_hash).map_err(ParseError::FromStr)?; - let contract_address = Felt::from_hex(&contract_address).map_err(ParseError::FromStr)?; - - let layout = serde_json::from_str(&layout).map_err(ParseError::FromJsonStr)?; - let schema = serde_json::from_str(&schema).map_err(ParseError::FromJsonStr)?; - let mut cache = self.model_cache.write().await; - - let model = Model { - namespace, - name, - selector: *selector, - class_hash, - contract_address, - packed_size, - unpacked_size, - layout, - schema, - }; cache.insert(*selector, model.clone()); Ok(model) diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index 0b0c19b6ff..0f80d4331d 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::mem; -use std::str::FromStr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -339,7 +338,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { for cursor in &mut cursors { if let Some(new_cursor) = reset_heads .cursor_map - .get(&Felt::from_str(&cursor.contract_address).unwrap()) + .get(&cursor.contract_address) { let cursor_timestamp: u64 = cursor.last_block_timestamp.try_into().expect("doesn't fit in i64"); @@ -368,9 +367,9 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { ) .bind(cursor.head) .bind(cursor.last_block_timestamp) - .bind(&cursor.last_pending_block_tx) - .bind(&cursor.last_pending_block_contract_tx) - .bind(&cursor.contract_address) + .bind(cursor.last_pending_block_tx.map(|felt| felt_to_sql_string(&felt))) + .bind(cursor.last_pending_block_contract_tx.map(|felt| felt_to_sql_string(&felt))) + .bind(felt_to_sql_string(&cursor.contract_address)) .execute(&mut **tx) .await?; @@ -390,7 +389,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { for cursor in &mut cursors { if let Some(new_cursor) = update_cursors .cursor_map - .get(&Felt::from_str(&cursor.contract_address).unwrap()) + .get(&cursor.contract_address) { let cursor_timestamp: u64 = cursor.last_block_timestamp.try_into().expect("doesn't fit in i64"); @@ -407,7 +406,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { }; cursor.last_pending_block_contract_tx = - Some(felt_to_sql_string(&new_cursor.0)); + Some(new_cursor.0); cursor.tps = new_tps.try_into().expect("does't fit in i64"); } else { cursor.tps = 0; @@ -418,7 +417,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .expect("doesn't fit in i64"); cursor.head = new_head; cursor.last_pending_block_tx = - update_cursors.last_pending_block_tx.map(|felt| felt_to_sql_string(&felt)); + update_cursors.last_pending_block_tx; sqlx::query( "UPDATE contracts SET head = ?, last_block_timestamp = ?, \ @@ -427,9 +426,9 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { ) .bind(cursor.head) .bind(cursor.last_block_timestamp) - .bind(&cursor.last_pending_block_tx) - .bind(&cursor.last_pending_block_contract_tx) - .bind(&cursor.contract_address) + .bind(cursor.last_pending_block_tx.map(|felt| felt_to_sql_string(&felt))) + .bind(cursor.last_pending_block_contract_tx.map(|felt| felt_to_sql_string(&felt))) + .bind(felt_to_sql_string(&cursor.contract_address)) .execute(&mut **tx) .await?; @@ -830,4 +829,4 @@ fn send_broker_message(message: BrokerMessage) { BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), } -} +} \ No newline at end of file diff --git a/crates/torii/sqlite/src/lib.rs b/crates/torii/sqlite/src/lib.rs index 7666108620..e7d320c4f9 100644 --- a/crates/torii/sqlite/src/lib.rs +++ b/crates/torii/sqlite/src/lib.rs @@ -4,6 +4,7 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; +use chrono::{DateTime, Utc}; use dojo_types::naming::get_tag; use dojo_types::schema::{Struct, Ty}; use dojo_world::config::WorldMetadata; @@ -14,6 +15,7 @@ use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; use tokio::sync::mpsc::UnboundedSender; +use types::Model; use utils::felts_to_sql_string; use crate::constants::SQL_FELT_DELIMITER; @@ -37,7 +39,7 @@ pub mod simple_broker; pub mod types; pub mod utils; -use cache::{LocalCache, Model, ModelCache}; +use cache::{LocalCache, ModelCache}; #[derive(Debug, Clone)] pub struct Sql { @@ -299,15 +301,18 @@ impl Sql { .set( selector, Model { + id: selector, namespace: namespace.to_string(), name: model.name().to_string(), - selector, class_hash, contract_address, packed_size, unpacked_size, layout, schema: namespaced_schema, + executed_at: DateTime::from_timestamp(block_timestamp as i64, 0).unwrap(), + // we're not using the true created_at + created_at: Utc::now(), }, ) .await; diff --git a/crates/torii/sqlite/src/types.rs b/crates/torii/sqlite/src/types.rs index d56d33cb50..0b6b7b7778 100644 --- a/crates/torii/sqlite/src/types.rs +++ b/crates/torii/sqlite/src/types.rs @@ -1,36 +1,15 @@ -use core::fmt; use std::str::FromStr; use chrono::{DateTime, Utc}; use dojo_types::schema::Ty; +use dojo_world::contracts::abigen::model::Layout; use serde::{Deserialize, Serialize}; -use sqlx::FromRow; +use sqlx::{sqlite::SqliteRow, FromRow, Row}; use starknet::core::types::Felt; -#[derive(Debug, Serialize, Deserialize)] -pub struct SQLFelt(pub Felt); +use crate::utils::map_column_decode_error; -impl From for Felt { - fn from(field_element: SQLFelt) -> Self { - field_element.0 - } -} - -impl TryFrom for SQLFelt { - type Error = anyhow::Error; - - fn try_from(value: String) -> Result { - Ok(SQLFelt(Felt::from_hex(&value)?)) - } -} - -impl fmt::LowerHex for SQLFelt { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -#[derive(FromRow, Deserialize, Debug, Clone)] +#[derive(FromRow, Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Entity { pub id: String, @@ -64,7 +43,7 @@ pub struct OptimisticEntity { pub deleted: bool, } -#[derive(FromRow, Deserialize, Debug, Clone)] +#[derive(FromRow, Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct EventMessage { pub id: String, @@ -98,22 +77,51 @@ pub struct OptimisticEventMessage { pub historical: bool, } -#[derive(FromRow, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Model { - pub id: String, + pub id: Felt, pub namespace: String, pub name: String, - pub class_hash: String, - pub contract_address: String, - pub transaction_hash: String, - pub layout: String, - pub schema: String, + pub class_hash: Felt, + pub packed_size: u32, + pub unpacked_size: u32, + pub contract_address: Felt, + pub layout: Layout, + pub schema: Ty, pub executed_at: DateTime, pub created_at: DateTime, } -#[derive(FromRow, Deserialize, Debug, Clone)] +impl FromRow<'_, SqliteRow> for Model { + fn from_row(row: &'_ SqliteRow) -> sqlx::Result { + let id = row.get::("id"); + let class_hash = row.get::("class_hash"); + let contract_address = row.get::("contract_address"); + let layout = row.get::("layout"); + let schema = row.get::("schema"); + + Ok(Model { + id: Felt::from_str(&id).map_err(|e| map_column_decode_error("id", Box::new(e)))?, + namespace: row.get("namespace"), + name: row.get("name"), + class_hash: Felt::from_str(&class_hash) + .map_err(|e| map_column_decode_error("class_hash", Box::new(e)))?, + packed_size: row.get("packed_size"), + unpacked_size: row.get("unpacked_size"), + contract_address: Felt::from_str(&contract_address) + .map_err(|e| map_column_decode_error("contract_address", Box::new(e)))?, + layout: serde_json::from_str(&layout) + .map_err(|e| map_column_decode_error("layout", Box::new(e)))?, + schema: serde_json::from_str(&schema) + .map_err(|e| map_column_decode_error("schema", Box::new(e)))?, + executed_at: row.get("executed_at"), + created_at: row.get("created_at"), + }) + } +} + +#[derive(FromRow, Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Event { pub id: String, @@ -187,13 +195,40 @@ impl std::fmt::Display for ContractType { } } -#[derive(FromRow, Deserialize, Debug, Clone, Default)] +#[derive(Serialize, Deserialize, Debug, Clone, Default)] #[serde(rename_all = "camelCase")] pub struct ContractCursor { pub head: i64, pub tps: i64, pub last_block_timestamp: i64, - pub contract_address: String, - pub last_pending_block_tx: Option, - pub last_pending_block_contract_tx: Option, + pub contract_address: Felt, + pub last_pending_block_tx: Option, + pub last_pending_block_contract_tx: Option, +} + +impl FromRow<'_, SqliteRow> for ContractCursor { + fn from_row(row: &'_ SqliteRow) -> sqlx::Result { + let contract_address = row.get::("contract_address"); + let last_pending_block_tx = row.get::, &str>("last_pending_block_tx"); + let last_pending_block_contract_tx = + row.get::, &str>("last_pending_block_contract_tx"); + + Ok(ContractCursor { + head: row.get("head"), + tps: row.get("tps"), + last_block_timestamp: row.get("last_block_timestamp"), + contract_address: Felt::from_str(&contract_address) + .map_err(|e| map_column_decode_error("contract_address", Box::new(e)))?, + last_pending_block_tx: last_pending_block_tx + .map(|c| Felt::from_str(&c)) + .transpose() + .map_err(|e| map_column_decode_error("last_pending_block_tx", Box::new(e)))?, + last_pending_block_contract_tx: last_pending_block_contract_tx + .map(|c| Felt::from_str(&c)) + .transpose() + .map_err(|e| { + map_column_decode_error("last_pending_block_contract_tx", Box::new(e)) + })?, + }) + } } diff --git a/crates/torii/sqlite/src/utils.rs b/crates/torii/sqlite/src/utils.rs index 1d16e1a27e..bf7d435641 100644 --- a/crates/torii/sqlite/src/utils.rs +++ b/crates/torii/sqlite/src/utils.rs @@ -7,6 +7,7 @@ use anyhow::Result; use chrono::{DateTime, Utc}; use futures_util::TryStreamExt; use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri}; +use sqlx::error::BoxDynError; use starknet::core::types::U256; use starknet_crypto::Felt; use tokio_util::bytes::Bytes; @@ -17,6 +18,13 @@ use crate::constants::{ SQL_FELT_DELIMITER, }; +pub(crate) fn map_column_decode_error(column_name: &str, error: BoxDynError) -> sqlx::Error { + sqlx::Error::ColumnDecode { + index: column_name.to_string(), + source: error, + } +} + pub fn must_utc_datetime_from_timestamp(timestamp: u64) -> DateTime { let naive_dt = DateTime::from_timestamp(timestamp as i64, 0) .expect("Failed to convert timestamp to NaiveDateTime");