Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Split TrieRefcountChange to TrieRefcountAddition/Subtraction." #10066

Merged
merged 1 commit into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::columns::DBKeyType;
use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY};
use crate::trie::TrieRefcountAddition;
use crate::trie::TrieRefcountChange;
use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges};

use borsh::BorshDeserialize;
Expand Down Expand Up @@ -475,11 +475,7 @@ impl StoreWithCache<'_> {
option_to_not_found(self.get_ser(column, key), format_args!("{:?}: {:?}", column, key))
}

pub fn insert_state_to_cache_from_op(
&mut self,
op: &TrieRefcountAddition,
shard_uid_key: &[u8],
) {
pub fn insert_state_to_cache_from_op(&mut self, op: &TrieRefcountChange, shard_uid_key: &[u8]) {
debug_assert_eq!(
DBCol::State.key_type(),
&[DBKeyType::ShardUId, DBKeyType::TrieNodeOrValueHash]
Expand Down
30 changes: 17 additions & 13 deletions core/store/src/trie/insert_delete.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
use super::TrieRefcountDeltaMap;
use std::collections::HashMap;

use borsh::BorshSerialize;

use near_primitives::hash::{hash, CryptoHash};
use near_primitives::state::ValueRef;

use crate::trie::nibble_slice::NibbleSlice;
use crate::trie::{
Children, NodeHandle, RawTrieNode, RawTrieNodeWithSize, StorageHandle, StorageValueHandle,
TrieNode, TrieNodeWithSize, ValueHandle,
};
use crate::{StorageError, Trie, TrieChanges};
use borsh::BorshSerialize;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::state::ValueRef;

pub(crate) struct NodesStorage {
nodes: Vec<Option<TrieNodeWithSize>>,
values: Vec<Option<Vec<u8>>>,
pub(crate) refcount_changes: TrieRefcountDeltaMap,
pub(crate) refcount_changes: HashMap<CryptoHash, (Vec<u8>, i32)>,
}

const INVALID_STORAGE_HANDLE: &str = "invalid storage handle";

/// Local mutable storage that owns node objects.
impl NodesStorage {
pub fn new() -> NodesStorage {
NodesStorage {
nodes: Vec::new(),
refcount_changes: TrieRefcountDeltaMap::new(),
values: Vec::new(),
}
NodesStorage { nodes: Vec::new(), refcount_changes: HashMap::new(), values: Vec::new() }
}

fn destroy(&mut self, handle: StorageHandle) -> TrieNodeWithSize {
Expand Down Expand Up @@ -605,11 +604,14 @@ impl Trie {
raw_node_with_size.serialize(&mut buffer).unwrap();
let key = hash(&buffer);

memory.refcount_changes.add(key, buffer.clone(), 1);
let (_value, rc) =
memory.refcount_changes.entry(key).or_insert_with(|| (buffer.clone(), 0));
*rc += 1;
buffer.clear();
last_hash = key;
}
let (insertions, deletions) = memory.refcount_changes.into_changes();
let (insertions, deletions) =
Trie::convert_to_insertions_and_deletions(memory.refcount_changes);
Ok(TrieChanges { old_root: *old_root, new_root: last_hash, insertions, deletions })
}

Expand All @@ -619,7 +621,9 @@ impl Trie {
let value = memory.value_ref(value_handle).to_vec();
let value_length = value.len() as u32;
let value_hash = hash(&value);
memory.refcount_changes.add(value_hash, value, 1);
let (_value, rc) =
memory.refcount_changes.entry(value_hash).or_insert_with(|| (value, 0));
*rc += 1;
ValueRef { length: value_length, hash: value_hash }
}
ValueHandle::HashAndSize(value) => value,
Expand Down
122 changes: 48 additions & 74 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use raw_node::{Children, RawTrieNode, RawTrieNodeWithSize};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Write;
use std::hash::Hash;
use std::hash::{Hash, Hasher};
use std::rc::Rc;
use std::str;
use std::sync::Arc;
Expand Down Expand Up @@ -364,92 +364,36 @@ pub trait TrieAccess {
fn get(&self, key: &TrieKey) -> Result<Option<Vec<u8>>, StorageError>;
}

/// Stores reference count addition for some key-value pair in DB.
#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
pub struct TrieRefcountAddition {
/// Stores reference count change for some key-value pair in DB.
#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct TrieRefcountChange {
/// Hash of trie_node_or_value and part of the DB key.
/// Used for uniting with shard id to get actual DB key.
trie_node_or_value_hash: CryptoHash,
/// DB value. Can be either serialized RawTrieNodeWithSize or value corresponding to
/// some TrieKey.
trie_node_or_value: Vec<u8>,
/// Reference count difference which will be added to the total refcount.
/// Reference count difference which will be added to the total refcount if it corresponds to
/// insertion and subtracted from it in the case of deletion.
rc: std::num::NonZeroU32,
}

/// Stores reference count subtraction for some key in DB.
#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
pub struct TrieRefcountSubtraction {
/// Hash of trie_node_or_value and part of the DB key.
/// Used for uniting with shard id to get actual DB key.
trie_node_or_value_hash: CryptoHash,
/// Reference count difference which will be subtracted to the total refcount.
rc: std::num::NonZeroU32,
}

impl TrieRefcountAddition {
impl TrieRefcountChange {
pub fn hash(&self) -> &CryptoHash {
&self.trie_node_or_value_hash
}

pub fn payload(&self) -> &[u8] {
self.trie_node_or_value.as_slice()
}

pub fn revert(&self) -> TrieRefcountSubtraction {
TrieRefcountSubtraction {
trie_node_or_value_hash: self.trie_node_or_value_hash,
rc: self.rc,
}
}
}

/// Helps produce a list of additions and subtractions to the trie,
/// especially in the case where deletions don't carry the full value.
pub struct TrieRefcountDeltaMap {
map: HashMap<CryptoHash, (Option<Vec<u8>>, i32)>,
}

impl TrieRefcountDeltaMap {
pub fn new() -> Self {
Self { map: HashMap::new() }
}

pub fn add(&mut self, hash: CryptoHash, data: Vec<u8>, refcount: u32) {
let (old_value, old_rc) = self.map.entry(hash).or_insert((None, 0));
*old_value = Some(data);
*old_rc += refcount as i32;
}

pub fn subtract(&mut self, hash: CryptoHash, refcount: u32) {
let (_, old_rc) = self.map.entry(hash).or_insert((None, 0));
*old_rc -= refcount as i32;
}

pub fn into_changes(self) -> (Vec<TrieRefcountAddition>, Vec<TrieRefcountSubtraction>) {
let mut insertions = Vec::new();
let mut deletions = Vec::new();
for (hash, (value, rc)) in self.map.into_iter() {
if rc > 0 {
insertions.push(TrieRefcountAddition {
trie_node_or_value_hash: hash,
trie_node_or_value: value.expect("value must be present"),
rc: std::num::NonZeroU32::new(rc as u32).unwrap(),
});
} else if rc < 0 {
deletions.push(TrieRefcountSubtraction {
trie_node_or_value_hash: hash,
rc: std::num::NonZeroU32::new((-rc) as u32).unwrap(),
});
}
}
// Sort so that trie changes have unique representation.
insertions.sort();
deletions.sort();
(insertions, deletions)
impl Hash for TrieRefcountChange {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(&self.trie_node_or_value_hash.0);
state.write_u32(self.rc.into());
}
}

///
/// TrieChanges stores delta for refcount.
/// Multiple versions of the state work the following way:
Expand Down Expand Up @@ -477,16 +421,16 @@ impl TrieRefcountDeltaMap {
pub struct TrieChanges {
pub old_root: StateRoot,
pub new_root: StateRoot,
insertions: Vec<TrieRefcountAddition>,
deletions: Vec<TrieRefcountSubtraction>,
insertions: Vec<TrieRefcountChange>,
deletions: Vec<TrieRefcountChange>,
}

impl TrieChanges {
pub fn empty(old_root: StateRoot) -> Self {
TrieChanges { old_root, new_root: old_root, insertions: vec![], deletions: vec![] }
}

pub fn insertions(&self) -> &[TrieRefcountAddition] {
pub fn insertions(&self) -> &[TrieRefcountChange] {
self.insertions.as_slice()
}
}
Expand Down Expand Up @@ -654,8 +598,12 @@ impl Trie {
) -> Result<(), StorageError> {
match value {
ValueHandle::HashAndSize(value) => {
self.internal_retrieve_trie_node(&value.hash, true)?;
memory.refcount_changes.subtract(value.hash, 1);
let bytes = self.internal_retrieve_trie_node(&value.hash, true)?;
memory
.refcount_changes
.entry(value.hash)
.or_insert_with(|| (bytes.to_vec(), 0))
.1 -= 1;
}
ValueHandle::InMemory(_) => {
// do nothing
Expand Down Expand Up @@ -993,9 +941,9 @@ impl Trie {
) -> Result<StorageHandle, StorageError> {
match self.retrieve_raw_node(hash, true)? {
None => Ok(memory.store(TrieNodeWithSize::empty())),
Some((_, node)) => {
Some((bytes, node)) => {
let result = memory.store(TrieNodeWithSize::from_raw(node));
memory.refcount_changes.subtract(*hash, 1);
memory.refcount_changes.entry(*hash).or_insert_with(|| (bytes.to_vec(), 0)).1 -= 1;
Ok(result)
}
}
Expand Down Expand Up @@ -1265,6 +1213,32 @@ impl Trie {
}
}

pub(crate) fn convert_to_insertions_and_deletions(
changes: HashMap<CryptoHash, (Vec<u8>, i32)>,
) -> (Vec<TrieRefcountChange>, Vec<TrieRefcountChange>) {
let mut deletions = Vec::new();
let mut insertions = Vec::new();
for (trie_node_or_value_hash, (trie_node_or_value, rc)) in changes.into_iter() {
if rc > 0 {
insertions.push(TrieRefcountChange {
trie_node_or_value_hash,
trie_node_or_value,
rc: std::num::NonZeroU32::new(rc as u32).unwrap(),
});
} else if rc < 0 {
deletions.push(TrieRefcountChange {
trie_node_or_value_hash,
trie_node_or_value,
rc: std::num::NonZeroU32::new((-rc) as u32).unwrap(),
});
}
}
// Sort so that trie changes have unique representation
insertions.sort();
deletions.sort();
(insertions, deletions)
}

pub fn update<I>(&self, changes: I) -> Result<TrieChanges, StorageError>
where
I: IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
Expand Down
17 changes: 6 additions & 11 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::flat::{FlatStorageManager, FlatStorageStatus};
use crate::trie::config::TrieConfig;
use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle;
use crate::trie::trie_storage::{TrieCache, TrieCachingStorage};
use crate::trie::{TrieRefcountAddition, POISONED_LOCK_ERR};
use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR};
use crate::{metrics, DBCol, PrefetchApi};
use crate::{Store, StoreUpdate, Trie, TrieChanges, TrieUpdate};

Expand All @@ -18,7 +18,6 @@ use std::rc::Rc;
use std::sync::{Arc, RwLock};

use super::state_snapshot::{StateSnapshot, StateSnapshotConfig};
use super::TrieRefcountSubtraction;

struct ShardTriesInner {
store: Store,
Expand Down Expand Up @@ -236,12 +235,12 @@ impl ShardTries {

fn apply_deletions_inner(
&self,
deletions: &[TrieRefcountSubtraction],
deletions: &[TrieRefcountChange],
shard_uid: ShardUId,
store_update: &mut StoreUpdate,
) {
let mut ops = Vec::new();
for TrieRefcountSubtraction { trie_node_or_value_hash, rc } in deletions.iter() {
for TrieRefcountChange { trie_node_or_value_hash, rc, .. } in deletions.iter() {
let key = TrieCachingStorage::get_key_from_shard_uid_and_hash(
shard_uid,
trie_node_or_value_hash,
Expand All @@ -255,12 +254,12 @@ impl ShardTries {

fn apply_insertions_inner(
&self,
insertions: &[TrieRefcountAddition],
insertions: &[TrieRefcountChange],
shard_uid: ShardUId,
store_update: &mut StoreUpdate,
) {
let mut ops = Vec::new();
for TrieRefcountAddition { trie_node_or_value_hash, trie_node_or_value, rc } in
for TrieRefcountChange { trie_node_or_value_hash, trie_node_or_value, rc } in
insertions.iter()
{
let key = TrieCachingStorage::get_key_from_shard_uid_and_hash(
Expand Down Expand Up @@ -332,11 +331,7 @@ impl ShardTries {
metrics::REVERTED_TRIE_INSERTIONS
.with_label_values(&[&shard_id])
.inc_by(trie_changes.insertions.len() as u64);
self.apply_deletions_inner(
&trie_changes.insertions.iter().map(|insertion| insertion.revert()).collect::<Vec<_>>(),
shard_uid,
store_update,
)
self.apply_deletions_inner(&trie_changes.insertions, shard_uid, store_update)
}

pub fn apply_all(
Expand Down
Loading