From 6a17620f8a99a507049299a862a5e7bd5a9f115e Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 6 Jan 2025 17:32:31 +0700 Subject: [PATCH 1/7] feat(torii): POW inter torii messaging through libp2p eg. for replicas --- crates/torii/client/src/client/mod.rs | 2 +- crates/torii/core/src/executor/mod.rs | 40 ++++++++++----------------- crates/torii/libp2p/src/client/mod.rs | 9 ++++-- crates/torii/libp2p/src/constants.rs | 1 + crates/torii/libp2p/src/types.rs | 17 ++++++++++++ 5 files changed, 41 insertions(+), 28 deletions(-) diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index e2fd1f58bd..d6b019bc71 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -49,7 +49,7 @@ impl Client { ) -> Result { let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?; - let relay_client = torii_relay::client::RelayClient::new(relay_url)?; + let relay_client = torii_relay::client::RelayClient::new(relay_url, false)?; let metadata = grpc_client.metadata().await?; diff --git a/crates/torii/core/src/executor/mod.rs b/crates/torii/core/src/executor/mod.rs index acac0469f0..5fa33d1d1d 100644 --- a/crates/torii/core/src/executor/mod.rs +++ b/crates/torii/core/src/executor/mod.rs @@ -100,6 +100,16 @@ pub struct EventMessageQuery { pub ty: Ty, } +#[derive(Debug, Clone)] +pub enum TransactionCommand { + // Similar to execute but doesn't create a new transaction + Flush, + // Commits current transaction and starts a new one + Commit, + // Rollbacks the current transaction and starts a new one + Rollback, +} + #[derive(Debug, Clone)] pub enum QueryType { SetHead(SetHeadQuery), @@ -111,36 +121,16 @@ pub enum QueryType { ApplyBalanceDiff(ApplyBalanceDiffQuery), RegisterErc721Token(RegisterErc721TokenQuery), RegisterErc20Token(RegisterErc20TokenQuery), + StoreEvent, TokenTransfer, RegisterModel, - StoreEvent, - // similar to execute but doesn't create a new transaction - Flush, - Execute, - // rollback's the current transaction and starts a new one - Rollback, Other, } #[derive(Debug)] -pub struct Executor<'c, P: Provider + Sync + Send + 'static> { - // Queries should use `transaction` instead of `pool` - // This `pool` is only used to create a new `transaction` - pool: Pool, - transaction: Transaction<'c, Sqlite>, - publish_queue: Vec, - rx: UnboundedReceiver, - shutdown_rx: Receiver<()>, - // These tasks are spawned to fetch ERC721 token metadata from the chain - // to not block the main loop - register_tasks: JoinSet>, - // Some queries depends on the metadata being registered, so we defer them - // until the metadata is fetched - deferred_query_messages: Vec, - // It is used to make RPC calls to fetch token_uri data for erc721 contracts - provider: Arc

, - // Used to limit number of tasks that run in parallel to fetch metadata - semaphore: Arc, +pub enum ExecutorMessage { + Query(QueryMessage), + Transaction(TransactionCommand), } #[derive(Debug)] @@ -620,7 +610,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .fetch_one(&mut **tx) .await; - // If we find a token already registered for this contract_address we dont need to + // If we find a token already registeredregistered for this contract_address we dont need to // refetch the data since its same for all ERC721 tokens let (name, symbol) = match res { Ok((name, symbol)) => { diff --git a/crates/torii/libp2p/src/client/mod.rs b/crates/torii/libp2p/src/client/mod.rs index f438d23072..ac59f567df 100644 --- a/crates/torii/libp2p/src/client/mod.rs +++ b/crates/torii/libp2p/src/client/mod.rs @@ -49,7 +49,7 @@ enum Command { impl RelayClient { #[cfg(not(target_arch = "wasm32"))] - pub fn new(relay_addr: String) -> Result { + pub fn new(relay_addr: String, replica: bool) -> Result { let local_key = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(local_key.public()); @@ -87,6 +87,10 @@ impl RelayClient { }) .build(); + if replica { + swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(constants::UPDATE_MESSAGING_TOPIC))?; + } + info!(target: LOG_TARGET, addr = %relay_addr, "Dialing relay."); swarm.dial(relay_addr.parse::()?)?; @@ -98,7 +102,8 @@ impl RelayClient { } #[cfg(target_arch = "wasm32")] - pub fn new(relay_addr: String) -> Result { + // We are never gonna be a replica in the browser. + pub fn new(relay_addr: String, _replica: bool) -> Result { let local_key = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(local_key.public()); diff --git a/crates/torii/libp2p/src/constants.rs b/crates/torii/libp2p/src/constants.rs index 2d061e72e4..edb1566edb 100644 --- a/crates/torii/libp2p/src/constants.rs +++ b/crates/torii/libp2p/src/constants.rs @@ -1,3 +1,4 @@ pub(crate) const GOSSIPSUB_HEARTBEAT_INTERVAL_SECS: u64 = 10; pub(crate) const MESSAGING_TOPIC: &str = "message"; +pub(crate) const UPDATE_MESSAGING_TOPIC: &str = "update"; pub(crate) const IDLE_CONNECTION_TIMEOUT_SECS: u64 = 60; diff --git a/crates/torii/libp2p/src/types.rs b/crates/torii/libp2p/src/types.rs index 1122f046fe..72ddb40a95 100644 --- a/crates/torii/libp2p/src/types.rs +++ b/crates/torii/libp2p/src/types.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use starknet::core::types::Felt; +use torii_core::types::{ContractCursor, Entity, EventMessage, Model}; use crate::typed_data::TypedData; @@ -8,3 +9,19 @@ pub struct Message { pub message: TypedData, pub signature: Vec, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Update { + // Latest contract head + Head(ContractCursor), + // Registered model + Model(Model), + // Updated entity state + Entity(Entity), + // Indexed event message + EventMessage(EventMessage), + // Indexed raw event + Event(Event), + + // TODO: Add more types of updates here. +} \ No newline at end of file From 80b1c01a2ad664ca430cb67f326ca257e5feb564 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 6 Jan 2025 18:06:37 +0700 Subject: [PATCH 2/7] wip write db --- crates/torii/core/src/types.rs | 2 + crates/torii/libp2p/src/client/mod.rs | 60 ++++++++++++++++++++++++--- crates/torii/libp2p/src/types.rs | 5 +-- 3 files changed, 58 insertions(+), 9 deletions(-) diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index d56d33cb50..fca679119d 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -105,6 +105,8 @@ pub struct Model { pub namespace: String, pub name: String, pub class_hash: String, + pub packed_size: u32, + pub unpacked_size: u32, pub contract_address: String, pub transaction_hash: String, pub layout: String, diff --git a/crates/torii/libp2p/src/client/mod.rs b/crates/torii/libp2p/src/client/mod.rs index ac59f567df..7425d364f5 100644 --- a/crates/torii/libp2p/src/client/mod.rs +++ b/crates/torii/libp2p/src/client/mod.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use dojo_world::contracts::abigen::model::Layout; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot; use futures::lock::Mutex; @@ -12,13 +13,15 @@ use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; #[cfg(not(target_arch = "wasm32"))] use libp2p::tcp; use libp2p::{identify, identity, noise, ping, yamux, Multiaddr, PeerId}; +use torii_core::executor::QueryMessage; +use torii_core::sql::Sql; use tracing::info; pub mod events; use crate::client::events::ClientEvent; use crate::constants; use crate::errors::Error; -use crate::types::Message; +use crate::types::{Message, Update}; pub(crate) const LOG_TARGET: &str = "torii::relay::client"; @@ -39,6 +42,7 @@ pub struct RelayClient { #[allow(missing_debug_implementations)] pub struct EventLoop { swarm: Swarm, + sql: Option, command_receiver: UnboundedReceiver, } @@ -49,7 +53,7 @@ enum Command { impl RelayClient { #[cfg(not(target_arch = "wasm32"))] - pub fn new(relay_addr: String, replica: bool) -> Result { + pub fn new(relay_addr: String, replica_db: Option) -> Result { let local_key = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(local_key.public()); @@ -87,8 +91,11 @@ impl RelayClient { }) .build(); - if replica { - swarm.behaviour_mut().gossipsub.subscribe(&IdentTopic::new(constants::UPDATE_MESSAGING_TOPIC))?; + if replica_db.is_some() { + swarm + .behaviour_mut() + .gossipsub + .subscribe(&IdentTopic::new(constants::UPDATE_MESSAGING_TOPIC))?; } info!(target: LOG_TARGET, addr = %relay_addr, "Dialing relay."); @@ -97,13 +104,17 @@ impl RelayClient { let (command_sender, command_receiver) = futures::channel::mpsc::unbounded(); Ok(Self { command_sender: CommandSender::new(command_sender), - event_loop: Arc::new(Mutex::new(EventLoop { swarm, command_receiver })), + event_loop: Arc::new(Mutex::new(EventLoop { + swarm, + command_receiver, + sql: replica_db, + })), }) } #[cfg(target_arch = "wasm32")] // We are never gonna be a replica in the browser. - pub fn new(relay_addr: String, _replica: bool) -> Result { + pub fn new(relay_addr: String, _replica_db: Option) -> Result { let local_key = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(local_key.public()); @@ -200,6 +211,38 @@ impl EventLoop { } } + async fn handle_update(&mut self, update: Update) { + // TODO: Implement update handling. + info!(target: LOG_TARGET, update = ?update, "Received update."); + // We can safely unwrap because we subscribe to updates only if replica_db is provided. + let sql = self.sql.unwrap(); + + match update { + Update::Head(cursor) => { + sql.executor.send(QueryMessage::new( + "UPDATE contracts SET head = ?, last_block_timestamp = ?, contract_address = ?, last_pending_block_tx = ?, last_pending_block_contract_tx = ?".to_string(), + vec![Argument::Int(cursor.head), Argument::Int(cursor.last_block_timestamp), Argument::FieldElement(cursor.contract_address), Argument::FieldElement(cursor.last_pending_block_tx), Argument::FieldElement(cursor.last_pending_block_contract_tx)], + QueryType::Other, + )).await.unwrap(); + } + Update::Model(model) => { + let schema: Ty = serde_json::from_slice(&model.schema).unwrap(); + let layout: Layout = serde_json::from_slice(&model.layout).unwrap(); + let block_timestamp = model.executed_at.timestamp(); + sql.register_model(&model.namespace, &schema, layout, model.class_hash, model.contract_address, model.packed_size, model.unpacked_size, block_timestamp, None).await.unwrap(); + } + Update::Entity(entity) => { + // TODO: Handle entity update. + } + Update::EventMessage(event_message) => { + // TODO: Handle event message update. + } + Update::Event(event) => { + // TODO: Handle event update. + } + } + } + pub async fn run(&mut self) { let mut is_relay_ready = false; let commands_queue = Arc::new(Mutex::new(Vec::new())); @@ -212,6 +255,11 @@ impl EventLoop { }, event = self.swarm.select_next_some() => { match event { + SwarmEvent::Behaviour(ClientEvent::Gossipsub(gossipsub::Event::Message { message, .. })) => { + if let Ok(update) = serde_json::from_slice::(&message.data) { + self.handle_update(update).await; + } + }, SwarmEvent::Behaviour(ClientEvent::Gossipsub(gossipsub::Event::Subscribed { topic, .. })) => { // Handle behaviour events. info!(target: LOG_TARGET, topic = ?topic, "Relay ready. Received subscription confirmation."); diff --git a/crates/torii/libp2p/src/types.rs b/crates/torii/libp2p/src/types.rs index 72ddb40a95..726370ca27 100644 --- a/crates/torii/libp2p/src/types.rs +++ b/crates/torii/libp2p/src/types.rs @@ -20,8 +20,7 @@ pub enum Update { Entity(Entity), // Indexed event message EventMessage(EventMessage), - // Indexed raw event + // Indexed raw event Event(Event), - // TODO: Add more types of updates here. -} \ No newline at end of file +} From c3e0ca141f8acedb13dc36fd91291edc44085fc5 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 13 Jan 2025 17:03:03 +0700 Subject: [PATCH 3/7] revert executor --- crates/torii/sqlite/src/executor/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index 480153b5f2..12d60b0681 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -820,4 +820,4 @@ fn send_broker_message(message: BrokerMessage) { BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), } -} +} \ No newline at end of file From 8b05f30ef090ac7d56912756928d45ae5be7cef3 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 13 Jan 2025 17:04:25 +0700 Subject: [PATCH 4/7] fix: executor --- crates/torii/sqlite/src/executor/mod.rs | 40 +++++++++++++++---------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index 12d60b0681..3130a51bba 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -100,16 +100,6 @@ pub struct EventMessageQuery { pub ty: Ty, } -#[derive(Debug, Clone)] -pub enum TransactionCommand { - // Similar to execute but doesn't create a new transaction - Flush, - // Commits current transaction and starts a new one - Commit, - // Rollbacks the current transaction and starts a new one - Rollback, -} - #[derive(Debug, Clone)] pub enum QueryType { SetHead(SetHeadQuery), @@ -121,16 +111,36 @@ pub enum QueryType { ApplyBalanceDiff(ApplyBalanceDiffQuery), RegisterErc721Token(RegisterErc721TokenQuery), RegisterErc20Token(RegisterErc20TokenQuery), - StoreEvent, TokenTransfer, RegisterModel, + StoreEvent, + // similar to execute but doesn't create a new transaction + Flush, + Execute, + // rollback's the current transaction and starts a new one + Rollback, Other, } #[derive(Debug)] -pub enum ExecutorMessage { - Query(QueryMessage), - Transaction(TransactionCommand), +pub struct Executor<'c, P: Provider + Sync + Send + 'static> { + // Queries should use `transaction` instead of `pool` + // This `pool` is only used to create a new `transaction` + pool: Pool, + transaction: Transaction<'c, Sqlite>, + publish_queue: Vec, + rx: UnboundedReceiver, + shutdown_rx: Receiver<()>, + // These tasks are spawned to fetch ERC721 token metadata from the chain + // to not block the main loop + register_tasks: JoinSet>, + // Some queries depends on the metadata being registered, so we defer them + // until the metadata is fetched + deferred_query_messages: Vec, + // It is used to make RPC calls to fetch token_uri data for erc721 contracts + provider: Arc

, + // Used to limit number of tasks that run in parallel to fetch metadata + semaphore: Arc, } #[derive(Debug)] @@ -610,7 +620,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .fetch_one(&mut **tx) .await; - // If we find a token already registeredregistered for this contract_address we dont need to + // If we find a token already registered for this contract_address we dont need to // refetch the data since its same for all ERC721 tokens let (name, symbol) = match res { Ok((name, symbol)) => { From 933bd448dcbc027d21d17e00a2fa10e4df990043 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 13 Jan 2025 17:44:39 +0700 Subject: [PATCH 5/7] updating sql types with helpers --- crates/torii/libp2p/src/client/mod.rs | 12 +-- crates/torii/libp2p/src/types.rs | 1 + crates/torii/sqlite/src/cache.rs | 57 ++----------- crates/torii/sqlite/src/executor/mod.rs | 21 +++-- crates/torii/sqlite/src/lib.rs | 7 +- crates/torii/sqlite/src/types.rs | 103 ++++++++++++++++-------- crates/torii/sqlite/src/utils.rs | 8 ++ 7 files changed, 101 insertions(+), 108 deletions(-) diff --git a/crates/torii/libp2p/src/client/mod.rs b/crates/torii/libp2p/src/client/mod.rs index 7425d364f5..7717e930c2 100644 --- a/crates/torii/libp2p/src/client/mod.rs +++ b/crates/torii/libp2p/src/client/mod.rs @@ -13,14 +13,14 @@ use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; #[cfg(not(target_arch = "wasm32"))] use libp2p::tcp; use libp2p::{identify, identity, noise, ping, yamux, Multiaddr, PeerId}; -use torii_core::executor::QueryMessage; -use torii_core::sql::Sql; +use torii_sqlite::Sql; +use torii_sqlite::executor::QueryMessage; use tracing::info; pub mod events; use crate::client::events::ClientEvent; use crate::constants; -use crate::errors::Error; +use crate::error::Error; use crate::types::{Message, Update}; pub(crate) const LOG_TARGET: &str = "torii::relay::client"; @@ -219,11 +219,7 @@ impl EventLoop { match update { Update::Head(cursor) => { - sql.executor.send(QueryMessage::new( - "UPDATE contracts SET head = ?, last_block_timestamp = ?, contract_address = ?, last_pending_block_tx = ?, last_pending_block_contract_tx = ?".to_string(), - vec![Argument::Int(cursor.head), Argument::Int(cursor.last_block_timestamp), Argument::FieldElement(cursor.contract_address), Argument::FieldElement(cursor.last_pending_block_tx), Argument::FieldElement(cursor.last_pending_block_contract_tx)], - QueryType::Other, - )).await.unwrap(); + sql.set_head(cursor.head, cursor.last_block_timestamp, 0, cursor.contract_address).await.unwrap(); } Update::Model(model) => { let schema: Ty = serde_json::from_slice(&model.schema).unwrap(); diff --git a/crates/torii/libp2p/src/types.rs b/crates/torii/libp2p/src/types.rs index 6988d3c6cf..a4f02ef170 100644 --- a/crates/torii/libp2p/src/types.rs +++ b/crates/torii/libp2p/src/types.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use starknet::core::types::Felt; +use torii_sqlite::types::{ContractCursor, Model}; use torii_typed_data::TypedData; #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/torii/sqlite/src/cache.rs b/crates/torii/sqlite/src/cache.rs index bbfad566db..a63dcf54a4 100644 --- a/crates/torii/sqlite/src/cache.rs +++ b/crates/torii/sqlite/src/cache.rs @@ -1,34 +1,14 @@ use std::collections::{HashMap, HashSet}; -use dojo_types::schema::Ty; -use dojo_world::contracts::abigen::model::Layout; use sqlx::{Pool, Sqlite, SqlitePool}; use starknet_crypto::Felt; use tokio::sync::RwLock; use crate::constants::TOKEN_BALANCE_TABLE; -use crate::error::{Error, ParseError}; -use crate::types::ContractType; +use crate::error::Error; +use crate::types::{ContractType, Model}; use crate::utils::I256; -#[derive(Debug, Clone)] -pub struct Model { - /// Namespace of the model - pub namespace: String, - /// The name of the model - pub name: String, - /// The selector of the model - pub selector: Felt, - /// The class hash of the model - pub class_hash: Felt, - /// The contract address of the model - pub contract_address: Felt, - pub packed_size: u32, - pub unpacked_size: u32, - pub layout: Layout, - pub schema: Ty, -} - #[derive(Debug)] pub struct ModelCache { pool: SqlitePool, @@ -65,42 +45,15 @@ impl ModelCache { } async fn update_model(&self, selector: &Felt) -> Result { - let ( - namespace, - name, - class_hash, - contract_address, - packed_size, - unpacked_size, - layout, - schema, - ): (String, String, String, String, u32, u32, String, String) = sqlx::query_as( - "SELECT namespace, name, class_hash, contract_address, packed_size, unpacked_size, \ - layout, schema FROM models WHERE id = ?", + let model: Model = sqlx::query_as( + "SELECT id, namespace, name, class_hash, contract_address, transaction_hash, packed_size, unpacked_size, \ + layout, schema, executed_at, created_at FROM models WHERE id = ?", ) .bind(format!("{:#x}", selector)) .fetch_one(&self.pool) .await?; - let class_hash = Felt::from_hex(&class_hash).map_err(ParseError::FromStr)?; - let contract_address = Felt::from_hex(&contract_address).map_err(ParseError::FromStr)?; - - let layout = serde_json::from_str(&layout).map_err(ParseError::FromJsonStr)?; - let schema = serde_json::from_str(&schema).map_err(ParseError::FromJsonStr)?; - let mut cache = self.model_cache.write().await; - - let model = Model { - namespace, - name, - selector: *selector, - class_hash, - contract_address, - packed_size, - unpacked_size, - layout, - schema, - }; cache.insert(*selector, model.clone()); Ok(model) diff --git a/crates/torii/sqlite/src/executor/mod.rs b/crates/torii/sqlite/src/executor/mod.rs index 3130a51bba..0f80d4331d 100644 --- a/crates/torii/sqlite/src/executor/mod.rs +++ b/crates/torii/sqlite/src/executor/mod.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::mem; -use std::str::FromStr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -339,7 +338,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { for cursor in &mut cursors { if let Some(new_cursor) = reset_heads .cursor_map - .get(&Felt::from_str(&cursor.contract_address).unwrap()) + .get(&cursor.contract_address) { let cursor_timestamp: u64 = cursor.last_block_timestamp.try_into().expect("doesn't fit in i64"); @@ -368,9 +367,9 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { ) .bind(cursor.head) .bind(cursor.last_block_timestamp) - .bind(&cursor.last_pending_block_tx) - .bind(&cursor.last_pending_block_contract_tx) - .bind(&cursor.contract_address) + .bind(cursor.last_pending_block_tx.map(|felt| felt_to_sql_string(&felt))) + .bind(cursor.last_pending_block_contract_tx.map(|felt| felt_to_sql_string(&felt))) + .bind(felt_to_sql_string(&cursor.contract_address)) .execute(&mut **tx) .await?; @@ -390,7 +389,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { for cursor in &mut cursors { if let Some(new_cursor) = update_cursors .cursor_map - .get(&Felt::from_str(&cursor.contract_address).unwrap()) + .get(&cursor.contract_address) { let cursor_timestamp: u64 = cursor.last_block_timestamp.try_into().expect("doesn't fit in i64"); @@ -407,7 +406,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { }; cursor.last_pending_block_contract_tx = - Some(felt_to_sql_string(&new_cursor.0)); + Some(new_cursor.0); cursor.tps = new_tps.try_into().expect("does't fit in i64"); } else { cursor.tps = 0; @@ -418,7 +417,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { .expect("doesn't fit in i64"); cursor.head = new_head; cursor.last_pending_block_tx = - update_cursors.last_pending_block_tx.map(|felt| felt_to_sql_string(&felt)); + update_cursors.last_pending_block_tx; sqlx::query( "UPDATE contracts SET head = ?, last_block_timestamp = ?, \ @@ -427,9 +426,9 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> { ) .bind(cursor.head) .bind(cursor.last_block_timestamp) - .bind(&cursor.last_pending_block_tx) - .bind(&cursor.last_pending_block_contract_tx) - .bind(&cursor.contract_address) + .bind(cursor.last_pending_block_tx.map(|felt| felt_to_sql_string(&felt))) + .bind(cursor.last_pending_block_contract_tx.map(|felt| felt_to_sql_string(&felt))) + .bind(felt_to_sql_string(&cursor.contract_address)) .execute(&mut **tx) .await?; diff --git a/crates/torii/sqlite/src/lib.rs b/crates/torii/sqlite/src/lib.rs index 7666108620..0155c4cff4 100644 --- a/crates/torii/sqlite/src/lib.rs +++ b/crates/torii/sqlite/src/lib.rs @@ -4,6 +4,7 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; +use chrono::DateTime; use dojo_types::naming::get_tag; use dojo_types::schema::{Struct, Ty}; use dojo_world::config::WorldMetadata; @@ -14,6 +15,7 @@ use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; use tokio::sync::mpsc::UnboundedSender; +use types::Model; use utils::felts_to_sql_string; use crate::constants::SQL_FELT_DELIMITER; @@ -37,7 +39,7 @@ pub mod simple_broker; pub mod types; pub mod utils; -use cache::{LocalCache, Model, ModelCache}; +use cache::{LocalCache, ModelCache}; #[derive(Debug, Clone)] pub struct Sql { @@ -299,15 +301,16 @@ impl Sql { .set( selector, Model { + selector, namespace: namespace.to_string(), name: model.name().to_string(), - selector, class_hash, contract_address, packed_size, unpacked_size, layout, schema: namespaced_schema, + executed_at: DateTime::from_timestamp(block_timestamp as i64, 0).unwrap(), }, ) .await; diff --git a/crates/torii/sqlite/src/types.rs b/crates/torii/sqlite/src/types.rs index fca679119d..9b5f484bc8 100644 --- a/crates/torii/sqlite/src/types.rs +++ b/crates/torii/sqlite/src/types.rs @@ -3,32 +3,12 @@ use std::str::FromStr; use chrono::{DateTime, Utc}; use dojo_types::schema::Ty; +use dojo_world::contracts::abigen::model::Layout; use serde::{Deserialize, Serialize}; -use sqlx::FromRow; +use sqlx::{sqlite::SqliteRow, FromRow, Row, Sqlite}; use starknet::core::types::Felt; -#[derive(Debug, Serialize, Deserialize)] -pub struct SQLFelt(pub Felt); - -impl From for Felt { - fn from(field_element: SQLFelt) -> Self { - field_element.0 - } -} - -impl TryFrom for SQLFelt { - type Error = anyhow::Error; - - fn try_from(value: String) -> Result { - Ok(SQLFelt(Felt::from_hex(&value)?)) - } -} - -impl fmt::LowerHex for SQLFelt { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} +use crate::utils::map_column_decode_error; #[derive(FromRow, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] @@ -98,21 +78,47 @@ pub struct OptimisticEventMessage { pub historical: bool, } -#[derive(FromRow, Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Model { - pub id: String, + pub selector: Felt, pub namespace: String, pub name: String, - pub class_hash: String, + pub class_hash: Felt, pub packed_size: u32, pub unpacked_size: u32, - pub contract_address: String, - pub transaction_hash: String, - pub layout: String, - pub schema: String, + pub contract_address: Felt, + pub layout: Layout, + pub schema: Ty, pub executed_at: DateTime, - pub created_at: DateTime, +} + +impl FromRow<'_, SqliteRow> for Model { + fn from_row(row: &'_ SqliteRow) -> sqlx::Result { + let selector = row.try_get::("id")?; + let class_hash = row.try_get::("class_hash")?; + let contract_address = row.try_get::("contract_address")?; + let layout = row.try_get::("layout")?; + let schema = row.try_get::("schema")?; + + Ok(Model { + selector: Felt::from_str(&selector) + .map_err(|e| map_column_decode_error("id", Box::new(e)))?, + namespace: row.get("namespace"), + name: row.get("name"), + class_hash: Felt::from_str(&class_hash) + .map_err(|e| map_column_decode_error("class_hash", Box::new(e)))?, + packed_size: row.get("packed_size"), + unpacked_size: row.get("unpacked_size"), + contract_address: Felt::from_str(&contract_address) + .map_err(|e| map_column_decode_error("contract_address", Box::new(e)))?, + layout: serde_json::from_str(&layout) + .map_err(|e| map_column_decode_error("layout", Box::new(e)))?, + schema: serde_json::from_str(&schema) + .map_err(|e| map_column_decode_error("schema", Box::new(e)))?, + executed_at: row.get("executed_at"), + }) + } } #[derive(FromRow, Deserialize, Debug, Clone)] @@ -189,13 +195,40 @@ impl std::fmt::Display for ContractType { } } -#[derive(FromRow, Deserialize, Debug, Clone, Default)] +#[derive(Deserialize, Debug, Clone, Default)] #[serde(rename_all = "camelCase")] pub struct ContractCursor { pub head: i64, pub tps: i64, pub last_block_timestamp: i64, - pub contract_address: String, - pub last_pending_block_tx: Option, - pub last_pending_block_contract_tx: Option, + pub contract_address: Felt, + pub last_pending_block_tx: Option, + pub last_pending_block_contract_tx: Option, +} + +impl FromRow<'_, SqliteRow> for ContractCursor { + fn from_row(row: &'_ SqliteRow) -> sqlx::Result { + let contract_address = row.try_get::("contract_address")?; + let last_pending_block_tx = row.try_get::, &str>("last_pending_block_tx")?; + let last_pending_block_contract_tx = + row.try_get::, &str>("last_pending_block_contract_tx")?; + + Ok(ContractCursor { + head: row.get("head"), + tps: row.get("tps"), + last_block_timestamp: row.get("last_block_timestamp"), + contract_address: Felt::from_str(&contract_address) + .map_err(|e| map_column_decode_error("contract_address", Box::new(e)))?, + last_pending_block_tx: last_pending_block_tx + .map(|c| Felt::from_str(&c)) + .transpose() + .map_err(|e| map_column_decode_error("last_pending_block_tx", Box::new(e)))?, + last_pending_block_contract_tx: last_pending_block_contract_tx + .map(|c| Felt::from_str(&c)) + .transpose() + .map_err(|e| { + map_column_decode_error("last_pending_block_contract_tx", Box::new(e)) + })?, + }) + } } diff --git a/crates/torii/sqlite/src/utils.rs b/crates/torii/sqlite/src/utils.rs index 1d16e1a27e..bf7d435641 100644 --- a/crates/torii/sqlite/src/utils.rs +++ b/crates/torii/sqlite/src/utils.rs @@ -7,6 +7,7 @@ use anyhow::Result; use chrono::{DateTime, Utc}; use futures_util::TryStreamExt; use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri}; +use sqlx::error::BoxDynError; use starknet::core::types::U256; use starknet_crypto::Felt; use tokio_util::bytes::Bytes; @@ -17,6 +18,13 @@ use crate::constants::{ SQL_FELT_DELIMITER, }; +pub(crate) fn map_column_decode_error(column_name: &str, error: BoxDynError) -> sqlx::Error { + sqlx::Error::ColumnDecode { + index: column_name.to_string(), + source: error, + } +} + pub fn must_utc_datetime_from_timestamp(timestamp: u64) -> DateTime { let naive_dt = DateTime::from_timestamp(timestamp as i64, 0) .expect("Failed to convert timestamp to NaiveDateTime"); From 5123d8b08f1c0bdb8e2bfc71bec4f68fed4de812 Mon Sep 17 00:00:00 2001 From: Nasr Date: Mon, 13 Jan 2025 18:06:31 +0700 Subject: [PATCH 6/7] receive events --- crates/torii/libp2p/src/client/mod.rs | 72 +++++++++++++++++++++++---- crates/torii/libp2p/src/test.rs | 2 +- crates/torii/libp2p/src/types.rs | 2 +- crates/torii/sqlite/src/types.rs | 10 ++-- 4 files changed, 69 insertions(+), 17 deletions(-) diff --git a/crates/torii/libp2p/src/client/mod.rs b/crates/torii/libp2p/src/client/mod.rs index 7717e930c2..68bf9c6123 100644 --- a/crates/torii/libp2p/src/client/mod.rs +++ b/crates/torii/libp2p/src/client/mod.rs @@ -1,6 +1,8 @@ +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use dojo_types::naming::compute_selector_from_tag; use dojo_world::contracts::abigen::model::Layout; use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot; @@ -13,8 +15,10 @@ use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; #[cfg(not(target_arch = "wasm32"))] use libp2p::tcp; use libp2p::{identify, identity, noise, ping, yamux, Multiaddr, PeerId}; -use torii_sqlite::Sql; +use starknet::core::types::Event; +use starknet_crypto::Felt; use torii_sqlite::executor::QueryMessage; +use torii_sqlite::Sql; use tracing::info; pub mod events; @@ -215,26 +219,74 @@ impl EventLoop { // TODO: Implement update handling. info!(target: LOG_TARGET, update = ?update, "Received update."); // We can safely unwrap because we subscribe to updates only if replica_db is provided. - let sql = self.sql.unwrap(); + let sql = self.sql.as_mut().unwrap(); match update { Update::Head(cursor) => { - sql.set_head(cursor.head, cursor.last_block_timestamp, 0, cursor.contract_address).await.unwrap(); + sql.set_head( + cursor.head as u64, + cursor.last_block_timestamp as u64, + 0, + cursor.contract_address, + ) + .await + .unwrap(); } Update::Model(model) => { - let schema: Ty = serde_json::from_slice(&model.schema).unwrap(); - let layout: Layout = serde_json::from_slice(&model.layout).unwrap(); - let block_timestamp = model.executed_at.timestamp(); - sql.register_model(&model.namespace, &schema, layout, model.class_hash, model.contract_address, model.packed_size, model.unpacked_size, block_timestamp, None).await.unwrap(); + sql.register_model( + &model.namespace, + &model.schema, + model.layout, + model.class_hash, + model.contract_address, + model.packed_size, + model.unpacked_size, + model.executed_at.timestamp() as u64, + None, + ) + .await + .unwrap(); } Update::Entity(entity) => { - // TODO: Handle entity update. + let id = Felt::from_str(&entity.id).unwrap(); + let model = entity.updated_model.unwrap(); + let model_id = compute_selector_from_tag(&model.name()); + if entity.deleted { + sql.delete_entity( + id, + model_id, + model, + &entity.event_id, + entity.executed_at.timestamp() as u64, + ) + .await + .unwrap(); + } else { + sql.set_entity( + model, + &entity.event_id, + entity.executed_at.timestamp() as u64, + id, + model_id, + Some(&entity.keys), + ) + .await + .unwrap(); + } } Update::EventMessage(event_message) => { - // TODO: Handle event message update. + let model = event_message.updated_model.unwrap(); + sql.set_event_message( + model, + &event_message.event_id, + event_message.executed_at.timestamp() as u64, + event_message.historical, + ) + .await + .unwrap(); } Update::Event(event) => { - // TODO: Handle event update. + // TODO } } } diff --git a/crates/torii/libp2p/src/test.rs b/crates/torii/libp2p/src/test.rs index 35c6a85965..9f0c9fe012 100644 --- a/crates/torii/libp2p/src/test.rs +++ b/crates/torii/libp2p/src/test.rs @@ -116,7 +116,7 @@ async fn test_client_messaging() -> Result<(), Box> { }); // Initialize the first client (listener) - let client = RelayClient::new("/ip4/127.0.0.1/tcp/9900".to_string())?; + let client = RelayClient::new("/ip4/127.0.0.1/tcp/9900".to_string(), None)?; tokio::spawn(async move { client.event_loop.lock().await.run().await; }); diff --git a/crates/torii/libp2p/src/types.rs b/crates/torii/libp2p/src/types.rs index a4f02ef170..79dd786218 100644 --- a/crates/torii/libp2p/src/types.rs +++ b/crates/torii/libp2p/src/types.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use starknet::core::types::Felt; -use torii_sqlite::types::{ContractCursor, Model}; +use torii_sqlite::types::{ContractCursor, Entity, Event, EventMessage, Model}; use torii_typed_data::TypedData; #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/torii/sqlite/src/types.rs b/crates/torii/sqlite/src/types.rs index 9b5f484bc8..d0a5a26460 100644 --- a/crates/torii/sqlite/src/types.rs +++ b/crates/torii/sqlite/src/types.rs @@ -10,7 +10,7 @@ use starknet::core::types::Felt; use crate::utils::map_column_decode_error; -#[derive(FromRow, Deserialize, Debug, Clone)] +#[derive(FromRow, Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Entity { pub id: String, @@ -44,7 +44,7 @@ pub struct OptimisticEntity { pub deleted: bool, } -#[derive(FromRow, Deserialize, Debug, Clone)] +#[derive(FromRow, Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct EventMessage { pub id: String, @@ -78,7 +78,7 @@ pub struct OptimisticEventMessage { pub historical: bool, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Model { pub selector: Felt, @@ -121,7 +121,7 @@ impl FromRow<'_, SqliteRow> for Model { } } -#[derive(FromRow, Deserialize, Debug, Clone)] +#[derive(FromRow, Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Event { pub id: String, @@ -195,7 +195,7 @@ impl std::fmt::Display for ContractType { } } -#[derive(Deserialize, Debug, Clone, Default)] +#[derive(Serialize, Deserialize, Debug, Clone, Default)] #[serde(rename_all = "camelCase")] pub struct ContractCursor { pub head: i64, From 4702d250064da3fe085a0e5387fd0ce555481ee7 Mon Sep 17 00:00:00 2001 From: Nasr Date: Tue, 14 Jan 2025 10:52:53 +0700 Subject: [PATCH 7/7] update sqlite types and modeL --- crates/torii/client/src/client/mod.rs | 2 +- crates/torii/graphql/src/mapping.rs | 8 +++--- crates/torii/graphql/src/object/model.rs | 13 ++++++---- crates/torii/graphql/src/schema.rs | 3 +-- .../grpc/src/server/subscriptions/indexer.rs | 3 +-- crates/torii/sqlite/src/lib.rs | 6 +++-- crates/torii/sqlite/src/types.rs | 26 +++++++++---------- 7 files changed, 32 insertions(+), 29 deletions(-) diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index d6b019bc71..ec54045abd 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -49,7 +49,7 @@ impl Client { ) -> Result { let mut grpc_client = torii_grpc::client::WorldClient::new(torii_url, world).await?; - let relay_client = torii_relay::client::RelayClient::new(relay_url, false)?; + let relay_client = torii_relay::client::RelayClient::new(relay_url, None)?; let metadata = grpc_client.metadata().await?; diff --git a/crates/torii/graphql/src/mapping.rs b/crates/torii/graphql/src/mapping.rs index 02bbd5618d..5ee47f54cb 100644 --- a/crates/torii/graphql/src/mapping.rs +++ b/crates/torii/graphql/src/mapping.rs @@ -51,10 +51,10 @@ lazy_static! { Name::new("contractAddress"), TypeData::Simple(TypeRef::named(Primitive::Felt252(None).to_string())), ), - ( - Name::new("transactionHash"), - TypeData::Simple(TypeRef::named(Primitive::Felt252(None).to_string())), - ), + // ( + // Name::new("transactionHash"), + // TypeData::Simple(TypeRef::named(Primitive::Felt252(None).to_string())), + // ), ( Name::new("executedAt"), TypeData::Simple(TypeRef::named(GraphqlType::DateTime.to_string())), diff --git a/crates/torii/graphql/src/object/model.rs b/crates/torii/graphql/src/object/model.rs index 281187bca4..a93b048f4e 100644 --- a/crates/torii/graphql/src/object/model.rs +++ b/crates/torii/graphql/src/object/model.rs @@ -1,8 +1,11 @@ +use std::str::FromStr; + use async_graphql::dynamic::indexmap::IndexMap; use async_graphql::dynamic::{ Enum, Field, InputObject, InputValue, SubscriptionField, SubscriptionFieldFuture, TypeRef, }; use async_graphql::{Name, Value}; +use starknet_crypto::Felt; use tokio_stream::StreamExt; use torii_sqlite::simple_broker::SimpleBroker; use torii_sqlite::types::Model; @@ -80,7 +83,7 @@ impl ResolvableObject for ModelObject { { SubscriptionFieldFuture::new(async move { let id = match ctx.args.get("id") { - Some(id) => Some(id.string()?.to_string()), + Some(id) => Some(Felt::from_str(&id.string()?.to_string())?), None => None, }; // if id is None, then subscribe to all models @@ -104,12 +107,12 @@ impl ResolvableObject for ModelObject { impl ModelObject { pub fn value_mapping(model: Model) -> ValueMapping { IndexMap::from([ - (Name::new("id"), Value::from(model.id)), + (Name::new("id"), Value::from(format!("{:#x}", model.id))), (Name::new("name"), Value::from(model.name)), (Name::new("namespace"), Value::from(model.namespace)), - (Name::new("classHash"), Value::from(model.class_hash)), - (Name::new("contractAddress"), Value::from(model.contract_address)), - (Name::new("transactionHash"), Value::from(model.transaction_hash)), + (Name::new("classHash"), Value::from(format!("{:#x}", model.class_hash))), + (Name::new("contractAddress"), Value::from(format!("{:#x}", model.contract_address))), + // (Name::new("transactionHash"), Value::from(format!("{:#x}", model.transaction_hash))), ( Name::new("createdAt"), Value::from(model.created_at.format(DATETIME_FORMAT).to_string()), diff --git a/crates/torii/graphql/src/schema.rs b/crates/torii/graphql/src/schema.rs index adee496350..d75d30090f 100644 --- a/crates/torii/graphql/src/schema.rs +++ b/crates/torii/graphql/src/schema.rs @@ -140,8 +140,7 @@ async fn build_objects(pool: &SqlitePool) -> Result<(Vec, Vec Result<(), Error> { let mut closed_stream = Vec::new(); - let contract_address = - Felt::from_str(&update.contract_address).map_err(ParseError::FromStr)?; + let contract_address = update.contract_address; for (idx, sub) in subs.subscribers.read().await.iter() { if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address { diff --git a/crates/torii/sqlite/src/lib.rs b/crates/torii/sqlite/src/lib.rs index 0155c4cff4..e7d320c4f9 100644 --- a/crates/torii/sqlite/src/lib.rs +++ b/crates/torii/sqlite/src/lib.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; -use chrono::DateTime; +use chrono::{DateTime, Utc}; use dojo_types::naming::get_tag; use dojo_types::schema::{Struct, Ty}; use dojo_world::config::WorldMetadata; @@ -301,7 +301,7 @@ impl Sql { .set( selector, Model { - selector, + id: selector, namespace: namespace.to_string(), name: model.name().to_string(), class_hash, @@ -311,6 +311,8 @@ impl Sql { layout, schema: namespaced_schema, executed_at: DateTime::from_timestamp(block_timestamp as i64, 0).unwrap(), + // we're not using the true created_at + created_at: Utc::now(), }, ) .await; diff --git a/crates/torii/sqlite/src/types.rs b/crates/torii/sqlite/src/types.rs index d0a5a26460..0b6b7b7778 100644 --- a/crates/torii/sqlite/src/types.rs +++ b/crates/torii/sqlite/src/types.rs @@ -1,11 +1,10 @@ -use core::fmt; use std::str::FromStr; use chrono::{DateTime, Utc}; use dojo_types::schema::Ty; use dojo_world::contracts::abigen::model::Layout; use serde::{Deserialize, Serialize}; -use sqlx::{sqlite::SqliteRow, FromRow, Row, Sqlite}; +use sqlx::{sqlite::SqliteRow, FromRow, Row}; use starknet::core::types::Felt; use crate::utils::map_column_decode_error; @@ -81,7 +80,7 @@ pub struct OptimisticEventMessage { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Model { - pub selector: Felt, + pub id: Felt, pub namespace: String, pub name: String, pub class_hash: Felt, @@ -91,19 +90,19 @@ pub struct Model { pub layout: Layout, pub schema: Ty, pub executed_at: DateTime, + pub created_at: DateTime, } impl FromRow<'_, SqliteRow> for Model { fn from_row(row: &'_ SqliteRow) -> sqlx::Result { - let selector = row.try_get::("id")?; - let class_hash = row.try_get::("class_hash")?; - let contract_address = row.try_get::("contract_address")?; - let layout = row.try_get::("layout")?; - let schema = row.try_get::("schema")?; + let id = row.get::("id"); + let class_hash = row.get::("class_hash"); + let contract_address = row.get::("contract_address"); + let layout = row.get::("layout"); + let schema = row.get::("schema"); Ok(Model { - selector: Felt::from_str(&selector) - .map_err(|e| map_column_decode_error("id", Box::new(e)))?, + id: Felt::from_str(&id).map_err(|e| map_column_decode_error("id", Box::new(e)))?, namespace: row.get("namespace"), name: row.get("name"), class_hash: Felt::from_str(&class_hash) @@ -117,6 +116,7 @@ impl FromRow<'_, SqliteRow> for Model { schema: serde_json::from_str(&schema) .map_err(|e| map_column_decode_error("schema", Box::new(e)))?, executed_at: row.get("executed_at"), + created_at: row.get("created_at"), }) } } @@ -208,10 +208,10 @@ pub struct ContractCursor { impl FromRow<'_, SqliteRow> for ContractCursor { fn from_row(row: &'_ SqliteRow) -> sqlx::Result { - let contract_address = row.try_get::("contract_address")?; - let last_pending_block_tx = row.try_get::, &str>("last_pending_block_tx")?; + let contract_address = row.get::("contract_address"); + let last_pending_block_tx = row.get::, &str>("last_pending_block_tx"); let last_pending_block_contract_tx = - row.try_get::, &str>("last_pending_block_contract_tx")?; + row.get::, &str>("last_pending_block_contract_tx"); Ok(ContractCursor { head: row.get("head"),