-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(warp): added warp update
to madara and other changes
#393
Changes from 75 commits
135a3c0
5e4f779
5e8ee41
75e3d91
c4a0055
b0c456d
f8ab3e4
f774276
b29f886
fbd89b6
a72deba
d2afdf2
3fbe996
7263c8d
6ac5b72
3141d73
7d10c97
555c2d8
3acaa86
8f969bd
14c8209
912e0c9
c841913
31baf41
d564466
340171d
7c7031b
0813c0e
bc7fe96
6f52fce
0e0a120
1135eb6
71278b2
2646179
69c287c
804a235
e50bf22
817f97d
6b81330
76a82ba
4c05802
b55d686
6bbfee7
e9256d2
8e08cbe
7482581
0d75dc6
a9c1574
74023d3
23ad801
14a5501
6c3b211
5233339
9b01100
0fded70
373e55d
be08e4c
675f214
a614117
b3b781d
5a3e244
998017f
93455d9
2d88efa
46fa8cf
be2f41c
ab5759a
fa020c1
ada9a64
556d640
dc83e43
5e9ff44
1e23f46
529561c
64743aa
0d71ca1
602d705
7448c4b
f5754d6
3950c1a
3769030
75dc17d
4c0b1f8
17eb84d
b91914a
470890b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -119,16 +119,11 @@ pub struct BlockImporter { | |
backend: Arc<MadaraBackend>, | ||
verify_apply: VerifyApply, | ||
metrics: BlockMetrics, | ||
always_force_flush: bool, | ||
} | ||
|
||
impl BlockImporter { | ||
/// The starting block is used for metrics. Setting it to None means it will look at the database latest block number. | ||
pub fn new( | ||
backend: Arc<MadaraBackend>, | ||
starting_block: Option<u64>, | ||
always_force_flush: bool, | ||
) -> anyhow::Result<Self> { | ||
pub fn new(backend: Arc<MadaraBackend>, starting_block: Option<u64>) -> anyhow::Result<Self> { | ||
let pool = Arc::new(RayonPool::new()); | ||
let starting_block = if let Some(n) = starting_block { | ||
n | ||
|
@@ -145,7 +140,6 @@ impl BlockImporter { | |
pool, | ||
metrics: BlockMetrics::register(starting_block).context("Registering metrics for block import")?, | ||
backend, | ||
always_force_flush, | ||
}) | ||
} | ||
|
||
|
@@ -176,11 +170,6 @@ impl BlockImporter { | |
validation: BlockValidationContext, | ||
) -> Result<BlockImportResult, BlockImportError> { | ||
let result = self.verify_apply.verify_apply(block, validation).await?; | ||
// Flush step. | ||
let force = self.always_force_flush; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aaah!! no, we really need to flush every block, always, in block_production mode There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has no importance anymore as |
||
self.backend | ||
.maybe_flush(force) | ||
.map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?; | ||
self.metrics.update(&result.header, &self.backend); | ||
Ok(result) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,18 @@ | ||
//! Madara database | ||
|
||
use anyhow::{Context, Result}; | ||
use anyhow::Context; | ||
use bonsai_db::{BonsaiDb, DatabaseKeyMapping}; | ||
use bonsai_trie::id::BasicId; | ||
use bonsai_trie::{BonsaiStorage, BonsaiStorageConfig}; | ||
use db_metrics::DbMetrics; | ||
use mp_chain_config::ChainConfig; | ||
use mp_utils::service::Service; | ||
use mp_utils::service::{MadaraCapability, Service}; | ||
use rocksdb::backup::{BackupEngine, BackupEngineOptions}; | ||
use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, Env, FlushOptions, MultiThreaded}; | ||
use rocksdb_options::rocksdb_global_options; | ||
use starknet_types_core::hash::{Pedersen, Poseidon, StarkHash}; | ||
use std::path::{Path, PathBuf}; | ||
use std::sync::{Arc, Mutex}; | ||
use std::time::{Duration, Instant}; | ||
use std::sync::Arc; | ||
use std::{fmt, fs}; | ||
use tokio::sync::{mpsc, oneshot}; | ||
|
||
|
@@ -37,7 +36,7 @@ pub type WriteBatchWithTransaction = rocksdb::WriteBatchWithTransaction<false>; | |
|
||
const DB_UPDATES_BATCH_SIZE: usize = 1024; | ||
|
||
pub fn open_rocksdb(path: &Path) -> Result<Arc<DB>> { | ||
pub fn open_rocksdb(path: &Path) -> anyhow::Result<Arc<DB>> { | ||
let opts = rocksdb_global_options()?; | ||
tracing::debug!("opening db at {:?}", path.display()); | ||
let db = DB::open_cf_descriptors( | ||
|
@@ -56,7 +55,7 @@ fn spawn_backup_db_task( | |
db_path: &Path, | ||
db_restored_cb: oneshot::Sender<()>, | ||
mut recv: mpsc::Receiver<BackupRequest>, | ||
) -> Result<()> { | ||
) -> anyhow::Result<()> { | ||
let mut backup_opts = BackupEngineOptions::new(backup_dir).context("Creating backup options")?; | ||
let cores = std::thread::available_parallelism().map(|e| e.get() as i32).unwrap_or(1); | ||
backup_opts.set_max_background_operations(cores); | ||
|
@@ -254,7 +253,6 @@ impl DatabaseExt for DB { | |
pub struct MadaraBackend { | ||
backup_handle: Option<mpsc::Sender<BackupRequest>>, | ||
db: Arc<DB>, | ||
last_flush_time: Mutex<Option<Instant>>, | ||
chain_config: Arc<ChainConfig>, | ||
db_metrics: DbMetrics, | ||
sender_block_info: tokio::sync::broadcast::Sender<mp_block::MadaraBlockInfo>, | ||
|
@@ -305,7 +303,11 @@ impl DatabaseService { | |
} | ||
} | ||
|
||
impl Service for DatabaseService {} | ||
impl Service for DatabaseService { | ||
fn id(&self) -> MadaraCapability { | ||
MadaraCapability::Database | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cancellation needs to affect the backup thread if backups are enabled There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually not necessary as This makes sense since even before adding |
||
} | ||
} | ||
|
||
struct BackupRequest { | ||
callback: oneshot::Sender<()>, | ||
|
@@ -315,7 +317,7 @@ struct BackupRequest { | |
impl Drop for MadaraBackend { | ||
fn drop(&mut self) { | ||
tracing::info!("⏳ Gracefully closing the database..."); | ||
self.maybe_flush(true).expect("Error when flushing the database"); // flush :) | ||
self.flush().expect("Error when flushing the database"); // flush :) | ||
} | ||
} | ||
|
||
|
@@ -330,7 +332,6 @@ impl MadaraBackend { | |
Arc::new(Self { | ||
backup_handle: None, | ||
db: open_rocksdb(temp_dir.as_ref()).unwrap(), | ||
last_flush_time: Default::default(), | ||
chain_config, | ||
db_metrics: DbMetrics::register().unwrap(), | ||
sender_block_info: tokio::sync::broadcast::channel(100).0, | ||
|
@@ -344,7 +345,7 @@ impl MadaraBackend { | |
backup_dir: Option<PathBuf>, | ||
restore_from_latest_backup: bool, | ||
chain_config: Arc<ChainConfig>, | ||
) -> Result<Arc<MadaraBackend>> { | ||
) -> anyhow::Result<Arc<MadaraBackend>> { | ||
let db_path = db_config_dir.join("db"); | ||
|
||
// when backups are enabled, a thread is spawned that owns the rocksdb BackupEngine (it is not thread safe) and it receives backup requests using a mpsc channel | ||
|
@@ -374,7 +375,6 @@ impl MadaraBackend { | |
db_metrics: DbMetrics::register().context("Registering db metrics")?, | ||
backup_handle, | ||
db, | ||
last_flush_time: Default::default(), | ||
chain_config: Arc::clone(&chain_config), | ||
sender_block_info: tokio::sync::broadcast::channel(100).0, | ||
#[cfg(feature = "testing")] | ||
|
@@ -384,30 +384,21 @@ impl MadaraBackend { | |
Ok(backend) | ||
} | ||
|
||
pub fn maybe_flush(&self, force: bool) -> Result<bool> { | ||
let mut inst = self.last_flush_time.lock().expect("poisoned mutex"); | ||
let will_flush = force | ||
|| match *inst { | ||
Some(inst) => inst.elapsed() >= Duration::from_secs(5), | ||
None => true, | ||
}; | ||
if will_flush { | ||
tracing::debug!("doing a db flush"); | ||
let mut opts = FlushOptions::default(); | ||
opts.set_wait(true); | ||
// we have to collect twice here :/ | ||
let columns = Column::ALL.iter().map(|e| self.db.get_column(*e)).collect::<Vec<_>>(); | ||
let columns = columns.iter().collect::<Vec<_>>(); | ||
self.db.flush_cfs_opt(&columns, &opts).context("Flushing database")?; | ||
|
||
*inst = Some(Instant::now()); | ||
} | ||
pub fn flush(&self) -> anyhow::Result<()> { | ||
tracing::debug!("doing a db flush"); | ||
let mut opts = FlushOptions::default(); | ||
opts.set_wait(true); | ||
// we have to collect twice here :/ | ||
let columns = Column::ALL.iter().map(|e| self.db.get_column(*e)).collect::<Vec<_>>(); | ||
let columns = columns.iter().collect::<Vec<_>>(); | ||
|
||
Ok(will_flush) | ||
self.db.flush_cfs_opt(&columns, &opts).context("Flushing database")?; | ||
|
||
Ok(()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not mandatory now but we should add metrics to follow the flush frequency and behavior |
||
} | ||
|
||
#[tracing::instrument(skip(self))] | ||
pub async fn backup(&self) -> Result<()> { | ||
pub async fn backup(&self) -> anyhow::Result<()> { | ||
let (callback_sender, callback_recv) = oneshot::channel(); | ||
let _res = self | ||
.backup_handle | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When migrating to a newer version of Madara [that introduces breaking changes]
i would probably simply remove the later part of the sentence tbh, people are not expected to migrate to new versions that dont require upgrading the db?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been fixed.