From b65a4b0e491fe3c7f57f25e2be477af72db07a9a Mon Sep 17 00:00:00 2001 From: robin-near <111538878+robin-near@users.noreply.github.com> Date: Wed, 1 Nov 2023 13:31:32 -0700 Subject: [PATCH] Revert "Split TrieRefcountChange to TrieRefcountAddition/Subtraction. (#10006)" This reverts commit 22716f889f58d20fd638210a52fe5ac00f9c7d2f. --- core/store/src/cold_storage.rs | 8 +- core/store/src/trie/insert_delete.rs | 30 ++++--- core/store/src/trie/mod.rs | 122 +++++++++++---------------- core/store/src/trie/shard_tries.rs | 17 ++-- core/store/src/trie/state_parts.rs | 37 ++++---- core/store/src/trie/trie_tests.rs | 4 +- 6 files changed, 94 insertions(+), 124 deletions(-) diff --git a/core/store/src/cold_storage.rs b/core/store/src/cold_storage.rs index f3d8066d1b6..3898308fae1 100644 --- a/core/store/src/cold_storage.rs +++ b/core/store/src/cold_storage.rs @@ -1,6 +1,6 @@ use crate::columns::DBKeyType; use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY}; -use crate::trie::TrieRefcountAddition; +use crate::trie::TrieRefcountChange; use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges}; use borsh::BorshDeserialize; @@ -475,11 +475,7 @@ impl StoreWithCache<'_> { option_to_not_found(self.get_ser(column, key), format_args!("{:?}: {:?}", column, key)) } - pub fn insert_state_to_cache_from_op( - &mut self, - op: &TrieRefcountAddition, - shard_uid_key: &[u8], - ) { + pub fn insert_state_to_cache_from_op(&mut self, op: &TrieRefcountChange, shard_uid_key: &[u8]) { debug_assert_eq!( DBCol::State.key_type(), &[DBKeyType::ShardUId, DBKeyType::TrieNodeOrValueHash] diff --git a/core/store/src/trie/insert_delete.rs b/core/store/src/trie/insert_delete.rs index 050d467ac3d..0d7a5ebd776 100644 --- a/core/store/src/trie/insert_delete.rs +++ b/core/store/src/trie/insert_delete.rs @@ -1,18 +1,21 @@ -use super::TrieRefcountDeltaMap; +use std::collections::HashMap; + +use borsh::BorshSerialize; + +use near_primitives::hash::{hash, CryptoHash}; +use near_primitives::state::ValueRef; + use crate::trie::nibble_slice::NibbleSlice; use crate::trie::{ Children, NodeHandle, RawTrieNode, RawTrieNodeWithSize, StorageHandle, StorageValueHandle, TrieNode, TrieNodeWithSize, ValueHandle, }; use crate::{StorageError, Trie, TrieChanges}; -use borsh::BorshSerialize; -use near_primitives::hash::{hash, CryptoHash}; -use near_primitives::state::ValueRef; pub(crate) struct NodesStorage { nodes: Vec>, values: Vec>>, - pub(crate) refcount_changes: TrieRefcountDeltaMap, + pub(crate) refcount_changes: HashMap, i32)>, } const INVALID_STORAGE_HANDLE: &str = "invalid storage handle"; @@ -20,11 +23,7 @@ const INVALID_STORAGE_HANDLE: &str = "invalid storage handle"; /// Local mutable storage that owns node objects. impl NodesStorage { pub fn new() -> NodesStorage { - NodesStorage { - nodes: Vec::new(), - refcount_changes: TrieRefcountDeltaMap::new(), - values: Vec::new(), - } + NodesStorage { nodes: Vec::new(), refcount_changes: HashMap::new(), values: Vec::new() } } fn destroy(&mut self, handle: StorageHandle) -> TrieNodeWithSize { @@ -605,11 +604,14 @@ impl Trie { raw_node_with_size.serialize(&mut buffer).unwrap(); let key = hash(&buffer); - memory.refcount_changes.add(key, buffer.clone(), 1); + let (_value, rc) = + memory.refcount_changes.entry(key).or_insert_with(|| (buffer.clone(), 0)); + *rc += 1; buffer.clear(); last_hash = key; } - let (insertions, deletions) = memory.refcount_changes.into_changes(); + let (insertions, deletions) = + Trie::convert_to_insertions_and_deletions(memory.refcount_changes); Ok(TrieChanges { old_root: *old_root, new_root: last_hash, insertions, deletions }) } @@ -619,7 +621,9 @@ impl Trie { let value = memory.value_ref(value_handle).to_vec(); let value_length = value.len() as u32; let value_hash = hash(&value); - memory.refcount_changes.add(value_hash, value, 1); + let (_value, rc) = + memory.refcount_changes.entry(value_hash).or_insert_with(|| (value, 0)); + *rc += 1; ValueRef { length: value_length, hash: value_hash } } ValueHandle::HashAndSize(value) => value, diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index 7a111125224..652caab2d97 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -26,7 +26,7 @@ pub use raw_node::{Children, RawTrieNode, RawTrieNodeWithSize}; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Write; -use std::hash::Hash; +use std::hash::{Hash, Hasher}; use std::rc::Rc; use std::str; use std::sync::Arc; @@ -364,30 +364,21 @@ pub trait TrieAccess { fn get(&self, key: &TrieKey) -> Result>, StorageError>; } -/// Stores reference count addition for some key-value pair in DB. -#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)] -pub struct TrieRefcountAddition { +/// Stores reference count change for some key-value pair in DB. +#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +pub struct TrieRefcountChange { /// Hash of trie_node_or_value and part of the DB key. /// Used for uniting with shard id to get actual DB key. trie_node_or_value_hash: CryptoHash, /// DB value. Can be either serialized RawTrieNodeWithSize or value corresponding to /// some TrieKey. trie_node_or_value: Vec, - /// Reference count difference which will be added to the total refcount. + /// Reference count difference which will be added to the total refcount if it corresponds to + /// insertion and subtracted from it in the case of deletion. rc: std::num::NonZeroU32, } -/// Stores reference count subtraction for some key in DB. -#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)] -pub struct TrieRefcountSubtraction { - /// Hash of trie_node_or_value and part of the DB key. - /// Used for uniting with shard id to get actual DB key. - trie_node_or_value_hash: CryptoHash, - /// Reference count difference which will be subtracted to the total refcount. - rc: std::num::NonZeroU32, -} - -impl TrieRefcountAddition { +impl TrieRefcountChange { pub fn hash(&self) -> &CryptoHash { &self.trie_node_or_value_hash } @@ -395,61 +386,14 @@ impl TrieRefcountAddition { pub fn payload(&self) -> &[u8] { self.trie_node_or_value.as_slice() } - - pub fn revert(&self) -> TrieRefcountSubtraction { - TrieRefcountSubtraction { - trie_node_or_value_hash: self.trie_node_or_value_hash, - rc: self.rc, - } - } } -/// Helps produce a list of additions and subtractions to the trie, -/// especially in the case where deletions don't carry the full value. -pub struct TrieRefcountDeltaMap { - map: HashMap>, i32)>, -} - -impl TrieRefcountDeltaMap { - pub fn new() -> Self { - Self { map: HashMap::new() } - } - - pub fn add(&mut self, hash: CryptoHash, data: Vec, refcount: u32) { - let (old_value, old_rc) = self.map.entry(hash).or_insert((None, 0)); - *old_value = Some(data); - *old_rc += refcount as i32; - } - - pub fn subtract(&mut self, hash: CryptoHash, refcount: u32) { - let (_, old_rc) = self.map.entry(hash).or_insert((None, 0)); - *old_rc -= refcount as i32; - } - - pub fn into_changes(self) -> (Vec, Vec) { - let mut insertions = Vec::new(); - let mut deletions = Vec::new(); - for (hash, (value, rc)) in self.map.into_iter() { - if rc > 0 { - insertions.push(TrieRefcountAddition { - trie_node_or_value_hash: hash, - trie_node_or_value: value.expect("value must be present"), - rc: std::num::NonZeroU32::new(rc as u32).unwrap(), - }); - } else if rc < 0 { - deletions.push(TrieRefcountSubtraction { - trie_node_or_value_hash: hash, - rc: std::num::NonZeroU32::new((-rc) as u32).unwrap(), - }); - } - } - // Sort so that trie changes have unique representation. - insertions.sort(); - deletions.sort(); - (insertions, deletions) +impl Hash for TrieRefcountChange { + fn hash(&self, state: &mut H) { + state.write(&self.trie_node_or_value_hash.0); + state.write_u32(self.rc.into()); } } - /// /// TrieChanges stores delta for refcount. /// Multiple versions of the state work the following way: @@ -477,8 +421,8 @@ impl TrieRefcountDeltaMap { pub struct TrieChanges { pub old_root: StateRoot, pub new_root: StateRoot, - insertions: Vec, - deletions: Vec, + insertions: Vec, + deletions: Vec, } impl TrieChanges { @@ -486,7 +430,7 @@ impl TrieChanges { TrieChanges { old_root, new_root: old_root, insertions: vec![], deletions: vec![] } } - pub fn insertions(&self) -> &[TrieRefcountAddition] { + pub fn insertions(&self) -> &[TrieRefcountChange] { self.insertions.as_slice() } } @@ -654,8 +598,12 @@ impl Trie { ) -> Result<(), StorageError> { match value { ValueHandle::HashAndSize(value) => { - self.internal_retrieve_trie_node(&value.hash, true)?; - memory.refcount_changes.subtract(value.hash, 1); + let bytes = self.internal_retrieve_trie_node(&value.hash, true)?; + memory + .refcount_changes + .entry(value.hash) + .or_insert_with(|| (bytes.to_vec(), 0)) + .1 -= 1; } ValueHandle::InMemory(_) => { // do nothing @@ -993,9 +941,9 @@ impl Trie { ) -> Result { match self.retrieve_raw_node(hash, true)? { None => Ok(memory.store(TrieNodeWithSize::empty())), - Some((_, node)) => { + Some((bytes, node)) => { let result = memory.store(TrieNodeWithSize::from_raw(node)); - memory.refcount_changes.subtract(*hash, 1); + memory.refcount_changes.entry(*hash).or_insert_with(|| (bytes.to_vec(), 0)).1 -= 1; Ok(result) } } @@ -1265,6 +1213,32 @@ impl Trie { } } + pub(crate) fn convert_to_insertions_and_deletions( + changes: HashMap, i32)>, + ) -> (Vec, Vec) { + let mut deletions = Vec::new(); + let mut insertions = Vec::new(); + for (trie_node_or_value_hash, (trie_node_or_value, rc)) in changes.into_iter() { + if rc > 0 { + insertions.push(TrieRefcountChange { + trie_node_or_value_hash, + trie_node_or_value, + rc: std::num::NonZeroU32::new(rc as u32).unwrap(), + }); + } else if rc < 0 { + deletions.push(TrieRefcountChange { + trie_node_or_value_hash, + trie_node_or_value, + rc: std::num::NonZeroU32::new((-rc) as u32).unwrap(), + }); + } + } + // Sort so that trie changes have unique representation + insertions.sort(); + deletions.sort(); + (insertions, deletions) + } + pub fn update(&self, changes: I) -> Result where I: IntoIterator, Option>)>, diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index adcc962d28f..b1336e8603f 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -2,7 +2,7 @@ use crate::flat::{FlatStorageManager, FlatStorageStatus}; use crate::trie::config::TrieConfig; use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle; use crate::trie::trie_storage::{TrieCache, TrieCachingStorage}; -use crate::trie::{TrieRefcountAddition, POISONED_LOCK_ERR}; +use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR}; use crate::{metrics, DBCol, PrefetchApi}; use crate::{Store, StoreUpdate, Trie, TrieChanges, TrieUpdate}; @@ -18,7 +18,6 @@ use std::rc::Rc; use std::sync::{Arc, RwLock}; use super::state_snapshot::{StateSnapshot, StateSnapshotConfig}; -use super::TrieRefcountSubtraction; struct ShardTriesInner { store: Store, @@ -236,12 +235,12 @@ impl ShardTries { fn apply_deletions_inner( &self, - deletions: &[TrieRefcountSubtraction], + deletions: &[TrieRefcountChange], shard_uid: ShardUId, store_update: &mut StoreUpdate, ) { let mut ops = Vec::new(); - for TrieRefcountSubtraction { trie_node_or_value_hash, rc } in deletions.iter() { + for TrieRefcountChange { trie_node_or_value_hash, rc, .. } in deletions.iter() { let key = TrieCachingStorage::get_key_from_shard_uid_and_hash( shard_uid, trie_node_or_value_hash, @@ -255,12 +254,12 @@ impl ShardTries { fn apply_insertions_inner( &self, - insertions: &[TrieRefcountAddition], + insertions: &[TrieRefcountChange], shard_uid: ShardUId, store_update: &mut StoreUpdate, ) { let mut ops = Vec::new(); - for TrieRefcountAddition { trie_node_or_value_hash, trie_node_or_value, rc } in + for TrieRefcountChange { trie_node_or_value_hash, trie_node_or_value, rc } in insertions.iter() { let key = TrieCachingStorage::get_key_from_shard_uid_and_hash( @@ -332,11 +331,7 @@ impl ShardTries { metrics::REVERTED_TRIE_INSERTIONS .with_label_values(&[&shard_id]) .inc_by(trie_changes.insertions.len() as u64); - self.apply_deletions_inner( - &trie_changes.insertions.iter().map(|insertion| insertion.revert()).collect::>(), - shard_uid, - store_update, - ) + self.apply_deletions_inner(&trie_changes.insertions, shard_uid, store_update) } pub fn apply_all( diff --git a/core/store/src/trie/state_parts.rs b/core/store/src/trie/state_parts.rs index 2d1d3e39137..986d8801090 100644 --- a/core/store/src/trie/state_parts.rs +++ b/core/store/src/trie/state_parts.rs @@ -36,8 +36,6 @@ use std::collections::{HashMap, HashSet}; use std::rc::Rc; use std::sync::Arc; -use super::TrieRefcountDeltaMap; - /// Trie key in nibbles corresponding to the right boundary for the last state part. /// Guaranteed to be bigger than any existing trie key. const LAST_STATE_PART_BOUNDARY: &[u8; 1] = &[16]; @@ -459,12 +457,12 @@ impl Trie { let path_end = trie.find_state_part_boundary(part_id.idx + 1, part_id.total)?; let mut iterator = trie.iter()?; let trie_traversal_items = iterator.visit_nodes_interval(&path_begin, &path_end)?; - let mut refcount_changes = TrieRefcountDeltaMap::new(); + let mut map = HashMap::new(); let mut flat_state_delta = FlatStateChanges::default(); let mut contract_codes = Vec::new(); for TrieTraversalItem { hash, key } in trie_traversal_items { let value = trie.retrieve_value(&hash)?; - refcount_changes.add(hash, value.to_vec(), 1); + map.entry(hash).or_insert_with(|| (value.to_vec(), 0)).1 += 1; if let Some(trie_key) = key { let flat_state_value = FlatStateValue::on_disk(&value); flat_state_delta.insert(trie_key.clone(), Some(flat_state_value)); @@ -473,7 +471,7 @@ impl Trie { } } } - let (insertions, deletions) = refcount_changes.into_changes(); + let (insertions, deletions) = Trie::convert_to_insertions_and_deletions(map); Ok(ApplyStatePartResult { trie_changes: TrieChanges { old_root: Trie::EMPTY_ROOT, @@ -508,8 +506,6 @@ impl Trie { mod tests { use assert_matches::assert_matches; use std::collections::{HashMap, HashSet}; - use std::fmt::Debug; - use std::hash::Hash; use std::sync::Arc; use rand::prelude::ThreadRng; @@ -522,9 +518,7 @@ mod tests { create_tries, create_tries_with_flat_storage, gen_changes, test_populate_trie, }; use crate::trie::iterator::CrumbStatus; - use crate::trie::{ - TrieRefcountAddition, TrieRefcountDeltaMap, TrieRefcountSubtraction, ValueHandle, - }; + use crate::trie::{TrieRefcountChange, ValueHandle}; use super::*; use crate::{DBCol, MissingTrieValueContext, TrieCachingStorage}; @@ -635,7 +629,7 @@ mod tests { })?; let mut insertions = insertions .into_iter() - .map(|(k, (v, rc))| TrieRefcountAddition { + .map(|(k, (v, rc))| TrieRefcountChange { trie_node_or_value_hash: k, trie_node_or_value: v, rc: std::num::NonZeroU32::new(rc).unwrap(), @@ -887,19 +881,23 @@ mod tests { return TrieChanges::empty(Trie::EMPTY_ROOT); } let new_root = changes[0].new_root; - let mut map = TrieRefcountDeltaMap::new(); + let mut map = HashMap::new(); for changes_set in changes { assert!(changes_set.deletions.is_empty(), "state parts only have insertions"); - for TrieRefcountAddition { trie_node_or_value_hash, trie_node_or_value, rc } in + for TrieRefcountChange { trie_node_or_value_hash, trie_node_or_value, rc } in changes_set.insertions { - map.add(trie_node_or_value_hash, trie_node_or_value, rc.get()); + map.entry(trie_node_or_value_hash).or_insert_with(|| (trie_node_or_value, 0)).1 += + rc.get() as i32; } - for TrieRefcountSubtraction { trie_node_or_value_hash, rc } in changes_set.deletions { - map.subtract(trie_node_or_value_hash, rc.get()); + for TrieRefcountChange { trie_node_or_value_hash, trie_node_or_value, rc } in + changes_set.deletions + { + map.entry(trie_node_or_value_hash).or_insert_with(|| (trie_node_or_value, 0)).1 -= + rc.get() as i32; } } - let (insertions, deletions) = map.into_changes(); + let (insertions, deletions) = Trie::convert_to_insertions_and_deletions(map); TrieChanges { old_root: Default::default(), new_root, insertions, deletions } } @@ -973,7 +971,10 @@ mod tests { } } - fn format_simple_trie_refcount_diff(left: &[T], right: &[T]) -> String { + fn format_simple_trie_refcount_diff( + left: &[TrieRefcountChange], + right: &[TrieRefcountChange], + ) -> String { let left_set: HashSet<_> = HashSet::from_iter(left.iter()); let right_set: HashSet<_> = HashSet::from_iter(right.iter()); format!( diff --git a/core/store/src/trie/trie_tests.rs b/core/store/src/trie/trie_tests.rs index ff2afc9df27..eebdb8d31b1 100644 --- a/core/store/src/trie/trie_tests.rs +++ b/core/store/src/trie/trie_tests.rs @@ -208,7 +208,7 @@ mod trie_storage_tests { use crate::test_utils::{create_test_store, create_tries}; use crate::trie::accounting_cache::TrieAccountingCache; use crate::trie::trie_storage::{TrieCache, TrieCachingStorage, TrieDBStorage}; - use crate::trie::TrieRefcountAddition; + use crate::trie::TrieRefcountChange; use crate::{Store, TrieChanges, TrieConfig}; use assert_matches::assert_matches; use near_primitives::hash::hash; @@ -218,7 +218,7 @@ mod trie_storage_tests { let mut trie_changes = TrieChanges::empty(Trie::EMPTY_ROOT); trie_changes.insertions = values .iter() - .map(|value| TrieRefcountAddition { + .map(|value| TrieRefcountChange { trie_node_or_value_hash: hash(value), trie_node_or_value: value.clone(), rc: std::num::NonZeroU32::new(1).unwrap(),