Skip to content

Commit

Permalink
Merge pull request #686 from AleoHQ/staging
Browse files Browse the repository at this point in the history
Prepare for v1.3.1
  • Loading branch information
howardwu authored Apr 10, 2021
2 parents a68cd9b + 9b49400 commit ba96b56
Show file tree
Hide file tree
Showing 24 changed files with 479 additions and 388 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ version = "0.4.2"
[dependencies.log]
version = "0.4.11"

[dependencies.once_cell]
version = "1.5.2"

[dependencies.parking_lot]
version = "0.11.1"

Expand Down
25 changes: 9 additions & 16 deletions network/src/consensus/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with the snarkOS library. If not, see <https://www.gnu.org/licenses/>.

use crate::{message::*, peers::PeerInfo, Consensus, NetworkError};
use crate::{message::*, Consensus, NetworkError};
use snarkos_consensus::error::ConsensusError;
use snarkvm_objects::{Block, BlockHeaderHash, Storage};

use std::{collections::HashMap, net::SocketAddr};
use std::net::SocketAddr;

impl<S: Storage> Consensus<S> {
///
Expand All @@ -43,21 +43,16 @@ impl<S: Storage> Consensus<S> {
}

/// Broadcast block to connected peers
pub async fn propagate_block(
&self,
block_bytes: Vec<u8>,
block_miner: SocketAddr,
connected_peers: &HashMap<SocketAddr, PeerInfo>,
) {
pub async fn propagate_block(&self, block_bytes: Vec<u8>, block_miner: SocketAddr) {
debug!("Propagating a block to peers");

for remote_address in connected_peers.keys() {
if *remote_address != block_miner {
for remote_address in self.node().connected_addrs() {
if remote_address != block_miner {
// Send a `Block` message to the connected peer.
self.node()
.outbound
.send_request(Message::new(
Direction::Outbound(*remote_address),
Direction::Outbound(remote_address),
Payload::Block(block_bytes.clone()),
))
.await;
Expand All @@ -70,7 +65,7 @@ impl<S: Storage> Consensus<S> {
&self,
remote_address: SocketAddr,
block: Vec<u8>,
connected_peers: Option<HashMap<SocketAddr, PeerInfo>>,
is_block_new: bool,
) -> Result<(), NetworkError> {
let block_size = block.len();
let max_block_size = self.max_block_size();
Expand All @@ -93,10 +88,8 @@ impl<S: Storage> Consensus<S> {
let is_valid_block = self.consensus.receive_block(&block_struct).is_ok();

// This is a new block, send it to our peers.
if let Some(connected_peers) = connected_peers {
if is_valid_block && !self.is_syncing_blocks() {
self.propagate_block(block, remote_address, &connected_peers).await;
}
if is_block_new && is_valid_block && !self.is_syncing_blocks() {
self.propagate_block(block, remote_address).await;
}

Ok(())
Expand Down
8 changes: 4 additions & 4 deletions network/src/consensus/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<S: Storage + Send + Sync + 'static> MinerInstance<S> {
/// Once a block is found, A block message is sent to all peers.
/// Calling this function multiple times will spawn additional listeners on separate threads.
/// Miner threads are asynchronous so the only way to stop them is to kill the runtime they were started in. This may be changed in the future.
pub fn spawn(self) {
pub fn spawn(self) -> task::JoinHandle<()> {
task::spawn(async move {
let local_address = self.node.environment.local_address().unwrap();
info!("Initializing Aleo miner - Your miner address is {}", self.miner_address);
Expand Down Expand Up @@ -78,7 +78,7 @@ impl<S: Storage + Send + Sync + 'static> MinerInstance<S> {
};

info!("Mined a new block: {:?}", hex::encode(block.header.get_hash().0));
let peers = self.node.peer_book.read().connected_peers().clone();

let serialized_block = if let Ok(block) = block.serialize() {
block
} else {
Expand All @@ -88,9 +88,9 @@ impl<S: Storage + Send + Sync + 'static> MinerInstance<S> {

self.node
.expect_consensus()
.propagate_block(serialized_block, local_address, &peers)
.propagate_block(serialized_block, local_address)
.await;
}
});
})
}
}
15 changes: 6 additions & 9 deletions network/src/consensus/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with the snarkOS library. If not, see <https://www.gnu.org/licenses/>.

use crate::{message::*, peers::PeerInfo, Consensus, NetworkError};
use crate::{message::*, Consensus, NetworkError};
use snarkos_consensus::memory_pool::Entry;
use snarkvm_dpc::base_dpc::instantiated::Tx;
use snarkvm_objects::Storage;
Expand All @@ -23,7 +23,7 @@ use snarkvm_utilities::{
to_bytes,
};

use std::{collections::HashMap, net::SocketAddr};
use std::net::SocketAddr;

impl<S: Storage + Send + Sync + 'static> Consensus<S> {
///
Expand All @@ -45,19 +45,18 @@ impl<S: Storage + Send + Sync + 'static> Consensus<S> {
&self,
transaction_bytes: Vec<u8>,
transaction_sender: SocketAddr,
connected_peers: &HashMap<SocketAddr, PeerInfo>,
) -> Result<(), NetworkError> {
debug!("Propagating a transaction to peers");

let local_address = self.node().local_address().unwrap();

for remote_address in connected_peers.keys() {
if *remote_address != transaction_sender && *remote_address != local_address {
for remote_address in self.node().connected_addrs() {
if remote_address != transaction_sender && remote_address != local_address {
// Send a `Transaction` message to the connected peer.
self.node()
.outbound
.send_request(Message::new(
Direction::Outbound(*remote_address),
Direction::Outbound(remote_address),
Payload::Transaction(transaction_bytes.clone()),
))
.await;
Expand All @@ -72,7 +71,6 @@ impl<S: Storage + Send + Sync + 'static> Consensus<S> {
&self,
source: SocketAddr,
transaction: Vec<u8>,
connected_peers: HashMap<SocketAddr, PeerInfo>,
) -> Result<(), NetworkError> {
if let Ok(tx) = Tx::read(&*transaction) {
let insertion = {
Expand All @@ -99,8 +97,7 @@ impl<S: Storage + Send + Sync + 'static> Consensus<S> {
if let Ok(inserted) = insertion {
if inserted.is_some() {
info!("Transaction added to memory pool.");
self.propagate_transaction(transaction, source, &connected_peers)
.await?;
self.propagate_transaction(transaction, source).await?;
}
}
}
Expand Down
26 changes: 14 additions & 12 deletions network/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

use crate::NetworkError;

use once_cell::sync::OnceCell;
use parking_lot::RwLock;
use rand::{thread_rng, Rng};
use std::{
net::SocketAddr,
Expand All @@ -24,17 +26,16 @@ use std::{
};

/// A core data structure containing the networking parameters for this node.
#[derive(Clone)]
pub struct Environment {
pub name: u64,
/// The local address of this node.
local_address: Option<SocketAddr>,
local_address: OnceCell<SocketAddr>,
/// The minimum number of peers required to maintain connections with.
minimum_number_of_connected_peers: u16,
/// The maximum number of peers permitted to maintain connections with.
maximum_number_of_connected_peers: u16,
/// The default bootnodes of the network.
pub bootnodes: Vec<SocketAddr>,
pub bootnodes: RwLock<Vec<SocketAddr>>,
/// If `true`, initializes this node as a bootnode and forgoes connecting
/// to the default bootnodes or saved peers in the peer book.
is_bootnode: bool,
Expand All @@ -46,7 +47,6 @@ impl Environment {
/// Creates a new instance of `Environment`.
#[allow(clippy::too_many_arguments)]
pub fn new(
local_address: Option<SocketAddr>,
minimum_number_of_connected_peers: u16,
maximum_number_of_connected_peers: u16,
bootnodes_addresses: Vec<String>,
Expand All @@ -66,11 +66,11 @@ impl Environment {
let name = rng.gen();

Ok(Self {
local_address: Default::default(),
name,
local_address,
minimum_number_of_connected_peers,
maximum_number_of_connected_peers,
bootnodes,
bootnodes: RwLock::new(bootnodes),
is_bootnode,
peer_sync_interval,
})
Expand All @@ -79,19 +79,21 @@ impl Environment {
/// Returns the local address of the node.
#[inline]
pub fn local_address(&self) -> Option<SocketAddr> {
self.local_address
self.local_address.get().copied()
}

/// Sets the local address of the node to the given value.
#[inline]
pub fn set_local_address(&mut self, addr: SocketAddr) {
self.local_address = Some(addr);
pub fn set_local_address(&self, addr: SocketAddr) {
self.local_address
.set(addr)
.expect("local address was set more than once!");
}

/// Returns a reference to the default bootnodes of the network.
/// Returns the default bootnodes of the network.
#[inline]
pub fn bootnodes(&self) -> &Vec<SocketAddr> {
&self.bootnodes
pub fn bootnodes(&self) -> Vec<SocketAddr> {
self.bootnodes.read().clone()
}

/// Returns `true` if this node is a bootnode. Otherwise, returns `false`.
Expand Down
Loading

0 comments on commit ba96b56

Please sign in to comment.