Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct best_hash for the frontier backend #20

Merged
merged 6 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub trait Backend<Block: BlockT>: Send + Sync {
fn is_indexed(&self) -> bool {
self.log_indexer().is_indexed()
}

/// Get the latest substrate block hash in the sql database.
async fn best_hash(&self) -> Result<Block::Hash, String>;
}

#[derive(Debug, Eq, PartialEq)]
Expand Down
13 changes: 6 additions & 7 deletions client/cli/src/frontier_db_cmd/mapping_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,21 @@ pub enum MappingKey {
EthBlockOrTransactionHash(H256),
}

pub struct MappingDb<'a, C, B: BlockT> {
pub struct MappingDb<'a, B: BlockT, C: HeaderBackend<B>> {
cmd: &'a FrontierDbCmd,
client: Arc<C>,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
}

impl<'a, C, B: BlockT> MappingDb<'a, C, B>
impl<'a, B: BlockT, C> MappingDb<'a, B, C>
where
C: ProvideRuntimeApi<B>,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
C::Api: EthereumRuntimeRPCApi<B>,
C: HeaderBackend<B>,
{
pub fn new(
cmd: &'a FrontierDbCmd,
client: Arc<C>,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
) -> Self {
Self {
cmd,
Expand Down Expand Up @@ -176,4 +175,4 @@ where
}
}

impl<'a, C, B: BlockT> FrontierDbMessage for MappingDb<'a, C, B> {}
impl<'a, B: BlockT, C: HeaderBackend<B>> FrontierDbMessage for MappingDb<'a, B, C> {}
11 changes: 6 additions & 5 deletions client/cli/src/frontier_db_cmd/meta_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
use ethereum_types::H256;
use serde::Deserialize;
// Substrate
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::Block as BlockT;

use super::{utils::FrontierDbMessage, FrontierDbCmd, Operation};
Expand Down Expand Up @@ -57,13 +58,13 @@ impl FromStr for MetaKey {
}
}

pub struct MetaDb<'a, B: BlockT> {
pub struct MetaDb<'a, B: BlockT, C: HeaderBackend<B>> {
cmd: &'a FrontierDbCmd,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
}

impl<'a, B: BlockT> MetaDb<'a, B> {
pub fn new(cmd: &'a FrontierDbCmd, backend: Arc<fc_db::kv::Backend<B>>) -> Self {
impl<'a, B: BlockT, C: HeaderBackend<B>> MetaDb<'a, B, C> {
pub fn new(cmd: &'a FrontierDbCmd, backend: Arc<fc_db::kv::Backend<B, C>>) -> Self {
Self { cmd, backend }
}

Expand Down Expand Up @@ -151,4 +152,4 @@ impl<'a, B: BlockT> MetaDb<'a, B> {
}
}

impl<'a, B: BlockT> FrontierDbMessage for MetaDb<'a, B> {}
impl<'a, B: BlockT, C: HeaderBackend<B>> FrontierDbMessage for MetaDb<'a, B, C> {}
7 changes: 3 additions & 4 deletions client/cli/src/frontier_db_cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ pub enum DbValue<H> {
}

impl FrontierDbCmd {
pub fn run<C, B: BlockT>(
pub fn run<B: BlockT, C>(
&self,
client: Arc<C>,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
) -> sc_cli::Result<()>
where
C: ProvideRuntimeApi<B>,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
C::Api: fp_rpc::EthereumRuntimeRPCApi<B>,
C: HeaderBackend<B>,
{
match self.column {
Column::Meta => {
Expand Down
4 changes: 2 additions & 2 deletions client/cli/src/frontier_db_cmd/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type OpaqueBlock =
pub fn open_frontier_backend<Block: BlockT, C: HeaderBackend<Block>>(
client: Arc<C>,
path: PathBuf,
) -> Result<Arc<fc_db::kv::Backend<Block>>, String> {
Ok(Arc::new(fc_db::kv::Backend::<Block>::new(
) -> Result<Arc<fc_db::kv::Backend<Block, C>>, String> {
Ok(Arc::new(fc_db::kv::Backend::<Block, C>::new(
client,
&fc_db::kv::DatabaseSettings {
source: sc_client_db::DatabaseSource::RocksDb {
Expand Down
22 changes: 13 additions & 9 deletions client/db/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use sp_blockchain::HeaderBackend;
use sp_core::{H160, H256};
pub use sp_database::Database;
use sp_runtime::traits::Block as BlockT;

// Frontier
use fc_api::{FilteredLog, TransactionMetadata};
use fp_storage::{EthereumStorageSchema, PALLET_ETHEREUM_SCHEMA_CACHE};
Expand Down Expand Up @@ -62,14 +63,15 @@ pub mod static_keys {
}

#[derive(Clone)]
pub struct Backend<Block: BlockT> {
pub struct Backend<Block: BlockT, C: HeaderBackend<Block>> {
client: Arc<C>,
meta: Arc<MetaDb<Block>>,
mapping: Arc<MappingDb<Block>>,
log_indexer: LogIndexerBackend<Block>,
}

#[async_trait::async_trait]
impl<Block: BlockT> fc_api::Backend<Block> for Backend<Block> {
impl<Block: BlockT, C: HeaderBackend<Block>> fc_api::Backend<Block> for Backend<Block, C> {
async fn block_hash(
&self,
ethereum_block_hash: &H256,
Expand All @@ -88,6 +90,10 @@ impl<Block: BlockT> fc_api::Backend<Block> for Backend<Block> {
fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
&self.log_indexer
}

async fn best_hash(&self) -> Result<Block::Hash, String> {
Ok(self.client.info().best_hash)
}
}

#[derive(Clone, Default)]
Expand Down Expand Up @@ -115,8 +121,8 @@ pub fn frontier_database_dir(db_config_dir: &Path, db_path: &str) -> PathBuf {
db_config_dir.join("frontier").join(db_path)
}

impl<Block: BlockT> Backend<Block> {
pub fn open<C: HeaderBackend<Block>>(
impl<Block: BlockT, C: HeaderBackend<Block>> Backend<Block, C> {
pub fn open(
client: Arc<C>,
database: &DatabaseSource,
db_config_dir: &Path,
Expand Down Expand Up @@ -148,13 +154,11 @@ impl<Block: BlockT> Backend<Block> {
)
}

pub fn new<C: HeaderBackend<Block>>(
client: Arc<C>,
config: &DatabaseSettings,
) -> Result<Self, String> {
let db = utils::open_database::<Block, C>(client, config)?;
pub fn new(client: Arc<C>, config: &DatabaseSettings) -> Result<Self, String> {
let db = utils::open_database::<Block, C>(client.clone(), config)?;

Ok(Self {
client,
mapping: Arc::new(MappingDb {
db: db.clone(),
write_lock: Arc::new(Mutex::new(())),
Expand Down
6 changes: 4 additions & 2 deletions client/db/src/kv/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,10 @@ mod tests {
pub fn open_frontier_backend<Block: BlockT, C: HeaderBackend<Block>>(
client: Arc<C>,
setting: &crate::kv::DatabaseSettings,
) -> Result<Arc<crate::kv::Backend<Block>>, String> {
Ok(Arc::new(crate::kv::Backend::<Block>::new(client, setting)?))
) -> Result<Arc<crate::kv::Backend<Block, C>>, String> {
Ok(Arc::new(crate::kv::Backend::<Block, C>::new(
client, setting,
)?))
}

#[cfg_attr(not(feature = "rocksdb"), ignore)]
Expand Down
11 changes: 7 additions & 4 deletions client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

#![deny(unused_crate_dependencies)]
// #![deny(unused_crate_dependencies)]

use std::sync::Arc;

// Substrate
pub use sc_client_db::DatabaseSource;
use sp_blockchain::HeaderBackend;
use sp_runtime::traits::Block as BlockT;

pub mod kv;
#[cfg(feature = "sql")]
pub mod sql;

#[derive(Clone)]
pub enum Backend<Block: BlockT> {
KeyValue(kv::Backend<Block>),
pub enum Backend<Block: BlockT, C: HeaderBackend<Block>> {
KeyValue(Arc<kv::Backend<Block, C>>),
#[cfg(feature = "sql")]
Sql(sql::Backend<Block>),
Sql(Arc<sql::Backend<Block>>),
}
35 changes: 30 additions & 5 deletions client/db/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,8 @@ pub enum BackendConfig<'a> {
pub struct Backend<Block: BlockT> {
/// The Sqlite connection.
pool: SqlitePool,

/// The additional overrides for the logs handler.
overrides: Arc<OverrideHandle<Block>>,

/// The number of allowed operations for the Sqlite filter call.
/// A value of `0` disables the timeout.
num_ops_timeout: i32,
Expand Down Expand Up @@ -239,6 +237,7 @@ where
let block_number = 0i32;
let is_canon = 1i32;

let mut tx = self.pool().begin().await?;
let _ = sqlx::query(
"INSERT OR IGNORE INTO blocks(
ethereum_block_hash,
Expand All @@ -253,8 +252,20 @@ where
.bind(block_number)
.bind(schema)
.bind(is_canon)
.execute(self.pool())
.execute(&mut *tx)
.await?;

sqlx::query("INSERT INTO sync_status(substrate_block_hash) VALUES (?)")
.bind(substrate_block_hash)
.execute(&mut *tx)
.await?;
sqlx::query("UPDATE sync_status SET status = 1 WHERE substrate_block_hash = ?")
.bind(substrate_block_hash)
.execute(&mut *tx)
.await?;

tx.commit().await?;
log::debug!(target: "frontier-sql", "The genesis block information has been submitted.");
}
Some(substrate_genesis_hash)
} else {
Expand Down Expand Up @@ -509,7 +520,6 @@ where
});
// https://www.sqlite.org/pragma.html#pragma_optimize
let _ = sqlx::query("PRAGMA optimize").execute(&pool).await;
log::debug!(target: "frontier-sql", "Batch committed");
}

fn get_logs<Client, BE>(
Expand Down Expand Up @@ -686,7 +696,7 @@ where
}

/// Retrieve the block hash for the last indexed canon block.
pub async fn get_last_indexed_canon_block(&self) -> Result<H256, Error> {
pub async fn last_indexed_canon_block(&self) -> Result<H256, Error> {
let row = sqlx::query(
"SELECT b.substrate_block_hash FROM blocks AS b
INNER JOIN sync_status AS s
Expand Down Expand Up @@ -853,6 +863,21 @@ impl<Block: BlockT<Hash = H256>> fc_api::Backend<Block> for Backend<Block> {
fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
self
}

async fn best_hash(&self) -> Result<Block::Hash, String> {
// Retrieves the block hash for the latest indexed block, maybe it's not canon.
sqlx::query(
"SELECT b.substrate_block_hash FROM blocks AS b
INNER JOIN sync_status AS s
ON s.substrate_block_hash = b.substrate_block_hash
WHERE s.status = 1
ORDER BY b.block_number DESC LIMIT 1",
)
.fetch_one(self.pool())
.await
.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
.map_err(|e| format!("Failed to fetch best hash: {}", e))
}
}

#[async_trait::async_trait]
Expand Down
15 changes: 8 additions & 7 deletions client/mapping-sync/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::{EthereumBlockNotification, EthereumBlockNotificationSinks, SyncStrat
pub fn sync_block<Block: BlockT, C, BE>(
client: &C,
overrides: Arc<OverrideHandle<Block>>,
backend: &fc_db::kv::Backend<Block>,
backend: &fc_db::kv::Backend<Block, C>,
header: &Block::Header,
) -> Result<(), String>
where
Expand Down Expand Up @@ -111,11 +111,11 @@ where

pub fn sync_genesis_block<Block: BlockT, C>(
client: &C,
backend: &fc_db::kv::Backend<Block>,
backend: &fc_db::kv::Backend<Block, C>,
header: &Block::Header,
) -> Result<(), String>
where
C: ProvideRuntimeApi<Block>,
C: HeaderBackend<Block> + ProvideRuntimeApi<Block>,
C::Api: EthereumRuntimeRPCApi<Block>,
{
let substrate_block_hash = header.hash();
Expand Down Expand Up @@ -159,7 +159,7 @@ pub fn sync_one_block<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
frontier_backend: &fc_db::kv::Backend<Block>,
frontier_backend: &fc_db::kv::Backend<Block, C>,
sync_from: <Block::Header as HeaderT>::Number,
strategy: SyncStrategy,
sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
Expand Down Expand Up @@ -248,7 +248,7 @@ pub fn sync_blocks<Block: BlockT, C, BE>(
client: &C,
substrate_backend: &BE,
overrides: Arc<OverrideHandle<Block>>,
frontier_backend: &fc_db::kv::Backend<Block>,
frontier_backend: &fc_db::kv::Backend<Block, C>,
limit: usize,
sync_from: <Block::Header as HeaderT>::Number,
strategy: SyncStrategy,
Expand Down Expand Up @@ -282,13 +282,14 @@ where
Ok(synced_any)
}

pub fn fetch_header<Block: BlockT, BE>(
pub fn fetch_header<Block: BlockT, C, BE>(
substrate_backend: &BE,
frontier_backend: &fc_db::kv::Backend<Block>,
frontier_backend: &fc_db::kv::Backend<Block, C>,
checking_tip: Block::Hash,
sync_from: <Block::Header as HeaderT>::Number,
) -> Result<Option<Block::Header>, String>
where
C: HeaderBackend<Block>,
BE: HeaderBackend<Block>,
{
if frontier_backend.mapping().is_synced(&checking_tip)? {
Expand Down
Loading
Loading