Skip to content

Commit

Permalink
[reed_solomon] Remove ReedSolomonWrapper (#11150)
Browse files Browse the repository at this point in the history
Based on the [conversation on
Zulip](https://near.zulipchat.com/#narrow/stream/300659-Rust-.F0.9F.A6.80/topic/RefCell.20usage/near/435417054),
seems like the memory leak bug in reed-solomon-erasure encoding crate
has been fixed with the latest version 6.0.0

This PR removes the ReedSolomonWrapper which was a workaround and
upgrades the crate to version 6.0.0
  • Loading branch information
Shreyan Gupta authored Apr 29, 2024
1 parent 549f4b5 commit ead5b9a
Show file tree
Hide file tree
Showing 19 changed files with 173 additions and 135 deletions.
61 changes: 57 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ rand_hc = "0.3.1"
rand_xorshift = "0.3"
rayon = "1.5"
redis = "0.23.0"
reed-solomon-erasure = "4"
reed-solomon-erasure = "6.0.0"
regex = "1.7.1"
region = "3.0"
reqwest = { version = "0.11.14", features = ["blocking"] }
Expand Down
44 changes: 24 additions & 20 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ use near_primitives::errors::EpochError;
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::{verify_path, MerklePath};
use near_primitives::receipt::Receipt;
use near_primitives::reed_solomon::ReedSolomonWrapper;
use near_primitives::reed_solomon::{reed_solomon_decode, reed_solomon_encode};
use near_primitives::sharding::{
ChunkHash, EncodedShardChunk, EncodedShardChunkBody, PartialEncodedChunk,
PartialEncodedChunkPart, PartialEncodedChunkV2, ReceiptProof, ShardChunk, ShardChunkHeader,
Expand All @@ -129,6 +129,7 @@ use near_primitives::version::ProtocolVersion;
use near_primitives::{checked_feature, unwrap_or_return};
use rand::seq::IteratorRandom;
use rand::Rng;
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tracing::{debug, debug_span, error, warn};
Expand Down Expand Up @@ -255,7 +256,7 @@ pub struct ShardsManager {
shard_tracker: ShardTracker,
peer_manager_adapter: Sender<PeerManagerMessageRequest>,
client_adapter: Sender<ShardsManagerResponse>,
rs: ReedSolomonWrapper,
rs: ReedSolomon,

encoded_chunks: EncodedChunksCache,
requested_partial_encoded_chunks: RequestPool,
Expand Down Expand Up @@ -292,10 +293,11 @@ impl ShardsManager {
shard_tracker,
peer_manager_adapter: network_adapter,
client_adapter,
rs: ReedSolomonWrapper::new(
rs: ReedSolomon::new(
epoch_manager.num_data_parts(),
epoch_manager.num_total_parts() - epoch_manager.num_data_parts(),
),
)
.unwrap(),
encoded_chunks: EncodedChunksCache::new(),
requested_partial_encoded_chunks: RequestPool::new(
CHUNK_REQUEST_RETRY,
Expand Down Expand Up @@ -801,7 +803,7 @@ impl ShardsManager {
}

pub fn process_partial_encoded_chunk_request(
&mut self,
&self,
request: PartialEncodedChunkRequestMsg,
route_back: CryptoHash,
) {
Expand Down Expand Up @@ -837,7 +839,7 @@ impl ShardsManager {
/// an explanation of that part of the return value.
/// Ensures the receipts in the response are in a deterministic order.
fn prepare_partial_encoded_chunk_response(
&mut self,
&self,
request: PartialEncodedChunkRequestMsg,
) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) {
let (src, mut response_msg) = self.prepare_partial_encoded_chunk_response_unsorted(request);
Expand All @@ -848,7 +850,7 @@ impl ShardsManager {
}

fn prepare_partial_encoded_chunk_response_unsorted(
&mut self,
&self,
request: PartialEncodedChunkRequestMsg,
) -> (PartialEncodedChunkResponseSource, PartialEncodedChunkResponseMsg) {
let PartialEncodedChunkRequestMsg { chunk_hash, part_ords, mut tracking_shards } = request;
Expand Down Expand Up @@ -963,8 +965,8 @@ impl ShardsManager {
/// expensive operation. If possible, the request should be served from
/// EncodedChunksCacheEntry or PartialEncodedChunk instead.
// pub for testing
pub fn lookup_partial_encoded_chunk_from_chunk_storage(
&mut self,
fn lookup_partial_encoded_chunk_from_chunk_storage(
&self,
part_ords: HashSet<u64>,
tracking_shards: HashSet<ShardId>,
response: &mut PartialEncodedChunkResponseMsg,
Expand All @@ -990,9 +992,10 @@ impl ShardsManager {
// Construct EncodedShardChunk. If we earlier determined that we will
// need parity parts, instruct the constructor to calculate them as
// well. Otherwise we won’t bother.
let (parts, encoded_length) = self
.rs
.encode(TransactionReceipt(chunk.transactions().to_vec(), outgoing_receipts.to_vec()));
let (parts, encoded_length) = reed_solomon_encode(
&self.rs,
TransactionReceipt(chunk.transactions().to_vec(), outgoing_receipts.to_vec()),
);

if header.encoded_length() != encoded_length as u64 {
warn!(target: "chunks",
Expand Down Expand Up @@ -1051,7 +1054,8 @@ impl ShardsManager {
}

let encoded_length = chunk.encoded_length();
if let Err(err) = self.rs.decode::<TransactionReceipt>(
if let Err(err) = reed_solomon_decode::<TransactionReceipt>(
&self.rs,
chunk.content_mut().parts.as_mut_slice(),
encoded_length as usize,
) {
Expand Down Expand Up @@ -1919,7 +1923,7 @@ impl ShardsManager {
tx_root: CryptoHash,
congestion_info: CongestionInfo,
signer: &dyn ValidatorSigner,
rs: &mut ReedSolomonWrapper,
rs: &ReedSolomon,
protocol_version: ProtocolVersion,
) -> Result<(EncodedShardChunk, Vec<MerklePath>), Error> {
EncodedShardChunk::new(
Expand Down Expand Up @@ -2700,7 +2704,7 @@ mod test {
#[test]
fn test_chunk_response_for_uncached_partial_chunk() {
let mut fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
let shards_manager = ShardsManager::new(
FakeClock::default().clock(),
Some(fixture.mock_shard_tracker.clone()),
Arc::new(fixture.epoch_manager.clone()),
Expand Down Expand Up @@ -2732,7 +2736,7 @@ mod test {
#[test]
fn test_chunk_response_for_uncached_shard_chunk() {
let mut fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
let shards_manager = ShardsManager::new(
FakeClock::default().clock(),
Some(fixture.mock_shard_tracker.clone()),
Arc::new(fixture.epoch_manager.clone()),
Expand Down Expand Up @@ -2890,7 +2894,7 @@ mod test {
#[test]
fn test_chunk_response_empty_request() {
let fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
let shards_manager = ShardsManager::new(
FakeClock::default().clock(),
Some(fixture.mock_shard_tracker.clone()),
Arc::new(fixture.epoch_manager.clone()),
Expand All @@ -2914,7 +2918,7 @@ mod test {
#[test]
fn test_chunk_response_for_nonexistent_chunk() {
let fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
let shards_manager = ShardsManager::new(
FakeClock::default().clock(),
Some(fixture.mock_shard_tracker.clone()),
Arc::new(fixture.epoch_manager.clone()),
Expand All @@ -2938,7 +2942,7 @@ mod test {
#[test]
fn test_chunk_response_for_request_including_invalid_part_ord() {
let mut fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
let shards_manager = ShardsManager::new(
FakeClock::default().clock(),
Some(fixture.mock_shard_tracker.clone()),
Arc::new(fixture.epoch_manager.clone()),
Expand Down Expand Up @@ -2972,7 +2976,7 @@ mod test {
fn test_chunk_response_for_request_with_duplicate_part_ords() {
// We should not return any duplicates.
let mut fixture = ChunkTestFixture::default();
let mut shards_manager = ShardsManager::new(
let shards_manager = ShardsManager::new(
FakeClock::default().clock(),
Some(fixture.mock_shard_tracker.clone()),
Arc::new(fixture.epoch_manager.clone()),
Expand Down
3 changes: 1 addition & 2 deletions chain/chunks/src/test/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use near_network::{
use near_primitives::types::{AccountId, BlockHeight};
use near_store::test_utils::create_test_store;
use std::collections::HashSet;
use tracing::log::info;

#[derive(derive_more::AsMut)]
struct TestData {
Expand Down Expand Up @@ -203,7 +202,7 @@ fn test_chunk_forward() {
data.chain.record_block(*hash, i as BlockHeight + 1);
let next_chunk_producer = data.chain.next_chunk_producer(0);
if !chunk_only_producers.contains(&next_chunk_producer) {
info!(target: "test", "Trying again at height {} which has chunk producer {}, we want the next chunk producer to be a chunk only producer",
tracing::info!(target: "test", "Trying again at height {} which has chunk producer {}, we want the next chunk producer to be a chunk only producer",
i + 1, next_chunk_producer);
continue;
}
Expand Down
6 changes: 3 additions & 3 deletions chain/chunks/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use near_primitives::congestion_info::CongestionInfo;
use near_primitives::{
hash::CryptoHash,
merkle::{self, MerklePath},
reed_solomon::ReedSolomonWrapper,
sharding::{
EncodedShardChunk, PartialEncodedChunk, PartialEncodedChunkV2, ReceiptProof,
ShardChunkHeader,
Expand All @@ -36,6 +35,7 @@ use near_primitives::{
version::PROTOCOL_VERSION,
};
use near_store::Store;
use reed_solomon_erasure::galois_8::ReedSolomon;
use std::{collections::HashMap, sync::Arc};

pub fn forward_client_request_to_shards_manager(
Expand Down Expand Up @@ -261,7 +261,7 @@ impl MockChainForShardsManager {
let signer = create_test_signer(chunk_producer.as_str());
let data_parts = self.epoch_manager.num_data_parts();
let parity_parts = self.epoch_manager.num_total_parts() - data_parts;
let mut rs = ReedSolomonWrapper::new(data_parts, parity_parts);
let rs = ReedSolomon::new(data_parts, parity_parts).unwrap();
let (chunk, merkle_paths) = ShardsManager::create_encoded_shard_chunk(
self.tip.last_block_hash,
CryptoHash::default(),
Expand All @@ -278,7 +278,7 @@ impl MockChainForShardsManager {
MerkleHash::default(),
CongestionInfo::default(),
&signer,
&mut rs,
&rs,
PROTOCOL_VERSION,
)
.unwrap();
Expand Down
Loading

0 comments on commit ead5b9a

Please sign in to comment.