diff --git a/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp b/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp index a44fb51112..1a22d06f69 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/auto_bp_peering.hpp @@ -100,6 +100,11 @@ class bp_connection_manager { config.my_bp_accounts.insert(accounts.begin(), accounts.end()); } + // thread safe, my_bp_accounts only modified during plugin startup + bool is_producer(account_name account) const { + return config.my_bp_accounts.contains(account); + } + // Only called at plugin startup void set_bp_peers(const std::vector& peers) { try { diff --git a/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp b/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp index 7a2ce0d4a4..1ddb241086 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp @@ -112,6 +112,7 @@ namespace eosio { uint32_t pending{0}; vector ids; bool empty () const { return (mode == none || ids.empty()); } + bool operator==(const select_ids&) const noexcept = default; }; using ordered_txn_ids = select_ids; @@ -134,6 +135,15 @@ namespace eosio { uint32_t end_block{0}; }; + struct block_nack_message { + block_id_type id; + }; + + struct block_notice_message { + block_id_type previous; + block_id_type id; + }; + using net_message = std::variant; + vote_message, + block_nack_message, + block_notice_message>; } // namespace eosio @@ -162,6 +174,8 @@ FC_REFLECT( eosio::time_message, (org)(rec)(xmt)(dst) ) FC_REFLECT( eosio::notice_message, (known_trx)(known_blocks) ) FC_REFLECT( eosio::request_message, (req_trx)(req_blocks) ) FC_REFLECT( eosio::sync_request_message, (start_block)(end_block) ) +FC_REFLECT( eosio::block_nack_message, (id) ) +FC_REFLECT( eosio::block_notice_message, (previous)(id) ) /** * diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 9a797b72fe..2d519f729f 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -186,6 +186,7 @@ namespace eosio { }; explicit sync_manager( uint32_t span, uint32_t sync_peer_limit, uint32_t min_blocks_distance ); static void send_handshakes(); + static void send_block_nack_resets(); bool syncing_from_peer() const { return sync_state == lib_catchup; } bool is_in_sync() const { return sync_state == in_sync; } void sync_reset_fork_db_root_num( const connection_ptr& conn, bool closing ); @@ -265,6 +266,8 @@ namespace eosio { signed_block = fc::get_index(), packed_transaction = fc::get_index(), vote_message = fc::get_index(), + block_nack_message = fc::get_index(), + block_notice_message = fc::get_index(), unknown }; @@ -412,6 +415,7 @@ namespace eosio { uint32_t max_nodes_per_host = 1; bool p2p_accept_transactions = true; + bool p2p_disable_block_nack = false; bool p2p_accept_votes = true; fc::microseconds p2p_dedup_cache_expire_time_us{}; @@ -598,10 +602,11 @@ namespace eosio { constexpr uint16_t proto_dup_node_id_goaway = 6; // eosio 2.1: support peer node_id based duplicate connection resolution constexpr uint16_t proto_leap_initial = 7; // leap client, needed because none of the 2.1 versions are supported constexpr uint16_t proto_block_range = 8; // include block range in notice_message - constexpr uint16_t proto_savanna = 9; // savanna + constexpr uint16_t proto_savanna = 9; // savanna, adds vote_message + constexpr uint16_t proto_block_nack = 10; // adds block_nack_message & block_notice_message #pragma GCC diagnostic pop - constexpr uint16_t net_version_max = proto_savanna; + constexpr uint16_t net_version_max = proto_block_nack; /** * Index by start_block_num @@ -623,6 +628,7 @@ namespace eosio { void reset() { fc::lock_guard g( _mtx ); _write_queue.clear(); + _sync_write_queue.clear(); _trx_write_queue.clear(); _write_queue_size = 0; _out_queue.clear(); @@ -631,6 +637,7 @@ namespace eosio { void clear_write_queue() { fc::lock_guard g( _mtx ); _write_queue.clear(); + _sync_write_queue.clear(); _trx_write_queue.clear(); _write_queue_size = 0; } @@ -646,17 +653,12 @@ namespace eosio { return _write_queue_size; } - bool is_out_queue_empty() const { - fc::lock_guard g( _mtx ); - return _out_queue.empty(); - } - // called from connection strand bool ready_to_send(uint32_t connection_id) const { fc::unique_lock g( _mtx ); // if out_queue is not empty then async_write is in progress - bool async_write_in_progress = !_out_queue.empty(); - bool ready = ((!_trx_write_queue.empty() || !_write_queue.empty()) && !async_write_in_progress); + const bool async_write_in_progress = !_out_queue.empty(); + const bool ready = !async_write_in_progress && _write_queue_size != 0; g.unlock(); if (async_write_in_progress) { fc_dlog(logger, "Connection - ${id} not ready to send data, async write in progress", ("id", connection_id)); @@ -664,15 +666,19 @@ namespace eosio { return ready; } + enum class queue_t { block_sync, general }; // @param callback must not callback into queued_buffer - bool add_write_queue( const std::shared_ptr>& buff, - std::function callback, - msg_type_t net_msg ) { + bool add_write_queue(msg_type_t net_msg, + queue_t queue, + const std::shared_ptr>& buff, + std::function callback) { fc::lock_guard g( _mtx ); if( net_msg == msg_type_t::packed_transaction ) { - _trx_write_queue.push_back( {buff, std::move(callback)} ); + _trx_write_queue.emplace_back( buff, std::move(callback) ); + } else if (queue == queue_t::block_sync) { + _sync_write_queue.emplace_back( buff, std::move(callback) ); } else { - _write_queue.push_back( {buff, std::move(callback)} ); + _write_queue.emplace_back( buff, std::move(callback) ); } _write_queue_size += buff->size(); if( _write_queue_size > 2 * def_max_write_queue_size ) { @@ -683,11 +689,13 @@ namespace eosio { void fill_out_buffer( std::vector& bufs ) { fc::lock_guard g( _mtx ); - if( !_write_queue.empty() ) { // always send msgs from write_queue first + if (!_sync_write_queue.empty()) { // always send msgs from sync_write_queue first + fill_out_buffer( bufs, _sync_write_queue ); + } else if (!_write_queue.empty()) { // always send msgs from write_queue before trx queue fill_out_buffer( bufs, _write_queue ); } else { fill_out_buffer( bufs, _trx_write_queue ); - assert(_trx_write_queue.empty() && _write_queue.empty() && _write_queue_size == 0); + assert(_trx_write_queue.empty() && _write_queue.empty() && _sync_write_queue.empty() && _write_queue_size == 0); } } @@ -718,8 +726,9 @@ namespace eosio { alignas(hardware_destructive_interference_sz) mutable fc::mutex _mtx; - uint32_t _write_queue_size GUARDED_BY(_mtx) {0}; // size of _write_queue and _trx_write_queue - deque _write_queue GUARDED_BY(_mtx); // queued messages, all messages except trxs + uint32_t _write_queue_size GUARDED_BY(_mtx) {0}; // size of _write_queue + _sync_write_queue + _trx_write_queue + deque _write_queue GUARDED_BY(_mtx); // queued messages, all messages except sync & trxs + deque _sync_write_queue GUARDED_BY(_mtx); // sync_write_queue blocks will be sent first deque _trx_write_queue GUARDED_BY(_mtx); // queued trx messages, trx_write_queue will be sent last deque _out_queue GUARDED_BY(_mtx); // currently being async_write @@ -912,6 +921,12 @@ namespace eosio { std::chrono::nanoseconds connection_start_time{0}; + // block nack support + static constexpr uint16_t consecutive_block_nacks_threshold{2}; // stop sending blocks when reached + block_num_type consecutive_blocks_nacks{0}; + block_id_type last_block_nack; + block_id_type last_block_nack_request_message_id GUARDED_BY(conn_mtx); + connection_status get_status()const; /** \name Peer Timestamps @@ -962,6 +977,7 @@ namespace eosio { */ bool process_next_message(uint32_t message_length); + void send_block_nack(const block_id_type& block_id); void send_handshake(); /** \name Peer Timestamps @@ -992,11 +1008,13 @@ namespace eosio { /** @} */ void blk_send_branch( const block_id_type& msg_head_id ); + void blk_send_branch_from_nack_request( const block_id_type& msg_head_id, const block_id_type& req_id ); void blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num ); void enqueue( const net_message& msg ); - size_t enqueue_block( const std::vector& sb, uint32_t block_num ); + size_t enqueue_block( const std::vector& sb, uint32_t block_num, queued_buffer::queue_t queue ); void enqueue_buffer( msg_type_t net_msg, + queued_buffer::queue_t queue, const std::shared_ptr>& send_buffer, block_num_type block_num, go_away_reason close_after_send); @@ -1009,6 +1027,7 @@ namespace eosio { void sync_wait(); void queue_write(msg_type_t net_msg, + queued_buffer::queue_t queue, const std::shared_ptr>& buff, std::function callback); void do_queue_write(); @@ -1042,6 +1061,8 @@ namespace eosio { void handle_message( const packed_transaction_ptr& trx ); void handle_message( const vote_message_ptr& msg ); void handle_message( const vote_message& msg ) = delete; // vote_message_ptr overload used instead + void handle_message( const block_nack_message& msg); + void handle_message( const block_notice_message& msg); // returns calculated number of blocks combined latency uint32_t calc_block_latency(); @@ -1120,6 +1141,18 @@ namespace eosio { peer_dlog( c, "handle sync_request_message" ); c->handle_message( msg ); } + + void operator()( const block_nack_message& msg ) const { + // continue call to handle_message on connection strand + peer_dlog( c, "handle block_nack_message #${bn}:${id}", ("bn", block_header::num_from_id(msg.id))("id", msg.id) ); + c->handle_message( msg ); + } + + void operator()( const block_notice_message& msg ) const { + // continue call to handle_message on connection strand + peer_dlog( c, "handle block_notice_message #${bn}:${id}", ("bn", block_header::num_from_id(msg.id))("id", msg.id) ); + c->handle_message( msg ); + } }; @@ -1202,6 +1235,24 @@ namespace eosio { //--------------------------------------------------------------------------- + struct on_fork_t { + bool on_fork = true; + bool unknown = true; + }; + on_fork_t block_on_fork(const block_id_type& id) { // thread safe + auto id_num = block_header::num_from_id(id); + bool on_fork = false; + bool unknown_block = true; + try { + const controller& cc = my_impl->chain_plug->chain(); + std::optional my_id = cc.fork_block_id_for_num( id_num ); // thread-safe + unknown_block = !my_id; + on_fork = my_id != id; + } catch( ... ) { + } + return { on_fork, unknown_block }; + } + connection::connection( const string& endpoint, const string& listen_address ) : peer_addr( endpoint ), strand( boost::asio::make_strand(my_impl->thread_pool.get_executor()) ), @@ -1297,7 +1348,7 @@ namespace eosio { return; if (s == connection_state::connected && curr != connection_state::connecting) return; - fc_dlog(logger, "old connection ${id} state ${os} becoming ${ns}", ("id", connection_id)("os", state_str(curr))("ns", state_str(s))); + fc_dlog(logger, "old connection - ${c} state ${os} becoming ${ns}", ("c", connection_id)("os", state_str(curr))("ns", state_str(s))); conn_state = s; } @@ -1407,6 +1458,7 @@ namespace eosio { last_handshake_sent = handshake_message(); last_close = fc::time_point::now(); conn_node_id = fc::sha256(); + last_block_nack_request_message_id = block_id_type{}; } peer_fork_db_root_num = 0; peer_ping_time_ns = std::numeric_limits::max(); @@ -1423,6 +1475,13 @@ namespace eosio { block_sync_frame_bytes_sent = 0; block_sync_throttling = false; last_vote_received = time_point{}; + consecutive_blocks_nacks = 0; + last_block_nack = block_id_type{}; + + uint32_t head_num = my_impl->get_chain_head_num(); + if (last_received_block_num >= head_num) { + sync_manager::send_block_nack_resets(); + } if( reconnect && !shutdown && !incoming() ) { my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ), @@ -1455,29 +1514,41 @@ namespace eosio { if( fork_db_root_num == 0 ) return; // if fork_db_root_id is null (we have not received handshake or reset) auto msg_head_num = block_header::num_from_id(msg_head_id); - bool on_fork = msg_head_num == 0; - bool unknown_block = false; - if( !on_fork ) { - try { - const controller& cc = my_impl->chain_plug->chain(); - std::optional my_id = cc.fork_block_id_for_num( msg_head_num ); // thread-safe - unknown_block = !my_id; - on_fork = my_id != msg_head_id; - } catch( ... ) { - unknown_block = true; - } + if (msg_head_num == 0) { + blk_send_branch( msg_head_num, fork_db_root_num, head_num ); + return; } + + auto [on_fork, unknown_block] = block_on_fork(msg_head_id); if( unknown_block ) { peer_ilog( this, "Peer asked for unknown block ${mn}, sending: benign_other go away", ("mn", msg_head_num) ); no_retry = benign_other; enqueue( go_away_message( benign_other ) ); } else { - if( on_fork ) msg_head_num = 0; // if peer on fork, start at their last fork_db_root_num, otherwise we can start at msg_head+1 + if (on_fork) + msg_head_num = 0; blk_send_branch( msg_head_num, fork_db_root_num, head_num ); } } + // called from connection strand + void connection::blk_send_branch_from_nack_request( const block_id_type& msg_head_id, const block_id_type& req_id ) { + auto [on_fork, unknown_block] = block_on_fork(msg_head_id); + uint32_t head_num = my_impl->get_chain_head_num(); + // peer head might be unknown if our LIB has moved past it, so if unknown then just send the requested block + if (on_fork) { // send from lib if we know they are on a fork + // a more complicated better approach would be to find where the fork branches and send from there, for now use lib + uint32_t fork_db_root_num = my_impl->get_fork_db_root_num(); + // --fork_db_root_num since blk_send_branch adds one to the request, and we want to start at fork_db_root_num + blk_send_branch( --fork_db_root_num, 0, head_num); + } else { + auto msg_req_num = block_header::num_from_id(req_id); + // --msg_req_num since blk_send_branch adds one to the request, and we need to start at msg_req_num + blk_send_branch( --msg_req_num, 0, head_num ); + } + } + // called from connection strand void connection::blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num ) { if( !peer_requested ) { @@ -1485,7 +1556,9 @@ namespace eosio { peer_requested = peer_sync_state( last+1, head_num, last ); } else { auto last = msg_head_num != 0 ? msg_head_num : std::min( peer_requested->last, fork_db_root_num ); - uint32_t end = std::max( peer_requested->end_block, head_num ); + uint32_t end = std::max( peer_requested->end_block, head_num ); + if (peer_requested->start_block <= last+1 && peer_requested->end_block >= end) + return; // nothing to do, send in progress peer_requested = peer_sync_state( last+1, end, last ); } if( peer_requested->start_block <= peer_requested->end_block ) { @@ -1574,9 +1647,10 @@ namespace eosio { // called from connection strand void connection::queue_write(msg_type_t net_msg, + queued_buffer::queue_t queue, const std::shared_ptr>& buff, std::function callback) { - if( !buffer_queue.add_write_queue( buff, std::move(callback), net_msg )) { + if( !buffer_queue.add_write_queue( net_msg, queue, buff, std::move(callback) )) { peer_wlog( this, "write_queue full ${s} bytes, giving up on connection", ("s", buffer_queue.write_queue_size()) ); close(); return; @@ -1692,7 +1766,7 @@ namespace eosio { } } block_sync_throttling = false; - auto sent = enqueue_block( sb, num ); + auto sent = enqueue_block( sb, num, queued_buffer::queue_t::block_sync ); block_sync_total_bytes_sent += sent; block_sync_frame_bytes_sent += sent; ++peer_requested->last; @@ -1850,32 +1924,33 @@ namespace eosio { buffer_factory buff_factory; const auto& send_buffer = buff_factory.get_send_buffer( m ); - enqueue_buffer( to_msg_type_t(m.index()), send_buffer, 0, close_after_send ); + enqueue_buffer( to_msg_type_t(m.index()), queued_buffer::queue_t::general, send_buffer, 0, close_after_send ); } // called from connection strand - size_t connection::enqueue_block( const std::vector& b, uint32_t block_num ) { + size_t connection::enqueue_block( const std::vector& b, uint32_t block_num, queued_buffer::queue_t queue ) { peer_dlog( this, "enqueue block ${num}", ("num", block_num) ); verify_strand_in_this_thread( strand, __func__, __LINE__ ); block_buffer_factory buff_factory; const auto& sb = buff_factory.get_send_buffer( b ); latest_blk_time = std::chrono::steady_clock::now(); - enqueue_buffer( msg_type_t::signed_block, sb, block_num, no_reason); + enqueue_buffer( msg_type_t::signed_block, queue, sb, block_num, no_reason); return sb->size(); } // called from connection strand void connection::enqueue_buffer( msg_type_t net_msg, + queued_buffer::queue_t queue, const std::shared_ptr>& send_buffer, block_num_type block_num, // only valid for net_msg == signed_block variant which go_away_reason close_after_send) { connection_ptr self = shared_from_this(); - queue_write(net_msg, send_buffer, + queue_write(net_msg, queue, send_buffer, [conn{std::move(self)}, close_after_send, net_msg, block_num](boost::system::error_code ec, std::size_t ) { if (ec) { - fc_elog(logger, "Connection - ${cid} - send failed with: ${e}", ("cid", conn->connection_id)("e", ec.to_string())); + fc_elog(logger, "Connection - ${cid} - send failed with: ${e}", ("cid", conn->connection_id)("e", ec.message())); return; } if (net_msg == msg_type_t::signed_block) @@ -2128,6 +2203,17 @@ namespace eosio { } ); } + // static, thread safe + void sync_manager::send_block_nack_resets() { + my_impl->connections.for_each_block_connection( []( const connection_ptr& cp ) { + if (cp->current()) { + boost::asio::post(cp->strand, [cp]() { + cp->send_block_nack({}); + }); + } + } ); + } + bool sync_manager::is_sync_required( uint32_t fork_db_head_block_num ) const REQUIRES(sync_mtx) { fc_dlog( logger, "last req = ${req}, last recv = ${recv} known = ${known} our fhead = ${h}", ("req", sync_last_requested_num)( "recv", sync_next_expected_num-1 )( "known", sync_known_fork_db_root_num ) @@ -2346,11 +2432,9 @@ namespace eosio { } c->peer_syncing_from_us = false; try { - controller& cc = my_impl->chain_plug->chain(); - std::optional fork_db_head_id = cc.fork_block_id_for_num( msg.fork_db_head_num ); // thread-safe - if (fork_db_head_id && fork_db_head_id != msg.fork_db_head_id) { // possible for fork_db_root to move and fork_db_head_num not be found if running with no block-log - peer_dlog(c, "Sending catch_up request_message sync 4, fhead ${fh} != msg.fhead ${mfh}", - ("fh", *fork_db_head_id)("mfh", msg.fork_db_head_id)); + auto [on_fork, unknown_block] = block_on_fork(msg.fork_db_head_id); // thread safe + if (on_fork) { // possible for fork_db_root to move and fork_db_head_num not be found if running with no block-log + peer_dlog(c, "Sending catch_up request_message sync 4, msg.fhead ${mfh} on fork", ("mfh", msg.fork_db_head_id)); request_message req; req.req_blocks.mode = catch_up; req.req_trx.mode = none; @@ -2691,8 +2775,9 @@ namespace eosio { if(my_impl->sync_master->syncing_from_peer() ) return; block_buffer_factory buff_factory; + buffer_factory block_notice_buff_factory; const auto bnum = b->block_num(); - my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) { + my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory, &block_notice_buff_factory]( auto& cp ) { fc_dlog( logger, "socket_is_open ${s}, state ${c}, syncing ${ss}, connection - ${cid}", ("s", cp->socket_is_open())("c", connection::state_str(cp->state()))("ss", cp->peer_syncing_from_us.load())("cid", cp->connection_id) ); if( !cp->current() ) return; @@ -2702,6 +2787,21 @@ namespace eosio { return; } + if (cp->protocol_version >= proto_block_nack && !my_impl->p2p_disable_block_nack) { + if (cp->consecutive_blocks_nacks > connection::consecutive_block_nacks_threshold) { + // only send block_notice if we didn't produce the block, otherwise broadcast the block below + if (!my_impl->is_producer(b->producer)) { + auto send_buffer = block_notice_buff_factory.get_send_buffer( block_notice_message{b->previous, id} ); + boost::asio::post(cp->strand, [cp, send_buffer{std::move(send_buffer)}, bnum]() { + cp->latest_blk_time = std::chrono::steady_clock::now(); + peer_dlog( cp, "bcast block_notice ${b}", ("b", bnum) ); + cp->enqueue_buffer( msg_type_t::block_notice_message, queued_buffer::queue_t::general, send_buffer, 0, no_reason ); + }); + return; + } + } + } + send_buffer_type sb = buff_factory.get_send_buffer( b ); boost::asio::post(cp->strand, [cp, bnum, sb{std::move(sb)}]() { @@ -2709,7 +2809,7 @@ namespace eosio { bool has_block = cp->peer_fork_db_root_num >= bnum; if( !has_block ) { peer_dlog( cp, "bcast block ${b}", ("b", bnum) ); - cp->enqueue_buffer( msg_type_t::signed_block, sb, bnum, no_reason ); + cp->enqueue_buffer( msg_type_t::signed_block, queued_buffer::queue_t::general, sb, bnum, no_reason ); } }); } ); @@ -2719,12 +2819,11 @@ namespace eosio { my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) { if( !cp->current() ) return true; if( cp->connection_id == exclude_peer ) return true; + if (cp->protocol_version < proto_savanna) return true; boost::asio::post(cp->strand, [cp, msg]() { - if (cp->protocol_version >= proto_savanna) { - if (vote_logger.is_enabled(fc::log_level::debug)) - peer_dlog(cp, "sending vote msg"); - cp->enqueue_buffer( msg_type_t::vote_message, msg, 0, no_reason ); - } + if (vote_logger.is_enabled(fc::log_level::debug)) + peer_dlog(cp, "sending vote msg"); + cp->enqueue_buffer( msg_type_t::vote_message, queued_buffer::queue_t::general, msg, 0, no_reason ); }); return true; } ); @@ -2745,7 +2844,7 @@ namespace eosio { send_buffer_type sb = buff_factory.get_send_buffer( trx ); fc_dlog( logger, "sending trx: ${id}, to connection - ${cid}", ("id", trx->id())("cid", cp->connection_id) ); boost::asio::post(cp->strand, [cp, sb{std::move(sb)}]() { - cp->enqueue_buffer( msg_type_t::packed_transaction, sb, 0, no_reason ); + cp->enqueue_buffer( msg_type_t::packed_transaction, queued_buffer::queue_t::general, sb, 0, no_reason ); } ); } ); } @@ -3049,43 +3148,43 @@ namespace eosio { const block_id_type blk_id = bh.calculate_id(); const uint32_t blk_num = last_received_block_num = block_header::num_from_id(blk_id); const fc::microseconds age(fc::time_point::now() - bh.timestamp); - // don't add_peer_block because we have not validated this block header yet if( my_impl->dispatcher.have_block( blk_id ) ) { + pending_message_buffer.advance_read_ptr( message_length ); // advance before any send + + // if we have the block then it has been header validated, add for this connection_id + my_impl->dispatcher.add_peer_block(blk_id, connection_id); + send_block_nack(blk_id); + peer_dlog( this, "already received block ${num}, id ${id}..., latency ${l}ms", ("num", blk_num)("id", blk_id.str().substr(8,16))("l", age.count()/1000) ); my_impl->sync_master->sync_recv_block( shared_from_this(), blk_id, blk_num, age ); - pending_message_buffer.advance_read_ptr( message_length ); return true; } peer_dlog( this, "received block ${num}, id ${id}..., latency: ${l}ms, head ${h}, fhead ${f}", ("num", bh.block_num())("id", blk_id.str().substr(8,16))("l", age.count()/1000) ("h", my_impl->get_chain_head_num())("f", my_impl->get_fork_db_head_num())); if( !my_impl->sync_master->syncing_from_peer() ) { // guard against peer thinking it needs to send us old blocks - uint32_t fork_db_root_num = my_impl->get_fork_db_root_num(); + block_num_type fork_db_root_num = my_impl->get_fork_db_root_num(); if( blk_num <= fork_db_root_num ) { - fc::unique_lock g( conn_mtx ); - const auto last_sent_fork_db_root_num = last_handshake_sent.fork_db_root_num; - g.unlock(); - peer_ilog( this, "received block ${n} less than ${which}froot ${fr}", - ("n", blk_num)("which", blk_num < last_sent_fork_db_root_num ? "sent " : "") - ("fr", blk_num < last_sent_fork_db_root_num ? last_sent_fork_db_root_num : fork_db_root_num) ); - enqueue( (sync_request_message) {0, 0} ); - send_handshake(); + pending_message_buffer.advance_read_ptr( message_length ); // advance before any send + peer_dlog( this, "received block ${n} less than froot ${fr}", ("n", blk_num)("fr", fork_db_root_num) ); + send_block_nack(blk_id); cancel_sync_wait(); - pending_message_buffer.advance_read_ptr( message_length ); return true; } } else { block_sync_bytes_received += message_length; uint32_t fork_db_root_num = my_impl->get_fork_db_root_num(); - my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, age); - if( blk_num <= fork_db_root_num ) { + const bool block_le_lib = blk_num <= fork_db_root_num; + if (block_le_lib) { peer_dlog( this, "received block ${n} less than froot ${fr} while syncing", ("n", blk_num)("fr", fork_db_root_num) ); - pending_message_buffer.advance_read_ptr( message_length ); - return true; + pending_message_buffer.advance_read_ptr( message_length ); // advance before any send } + my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, age); + if (block_le_lib) + return true; } auto ds = pending_message_buffer.create_datastream(); @@ -3180,6 +3279,22 @@ namespace eosio { return true; } + // called from connection strand + void connection::send_block_nack(const block_id_type& block_id) { + if (protocol_version < proto_block_nack || my_impl->p2p_disable_block_nack) + return; + + if (my_impl->sync_master->syncing_from_peer()) + return; + + peer_dlog(this, "Sending nack ${n}", ("n", block_header::num_from_id(block_id))); + + buffer_factory buff_factory; + auto send_buffer = buff_factory.get_send_buffer( block_nack_message{block_id} ); + + enqueue_buffer( msg_type_t::block_nack_message, queued_buffer::queue_t::general, send_buffer, 0, no_reason ); + } + void net_plugin_impl::plugin_shutdown() { thread_pool.stop(); } @@ -3381,12 +3496,11 @@ namespace eosio { if( peer_fork_db_root_num <= fork_db_root_num && peer_fork_db_root_num > 0 ) { try { - controller& cc = my_impl->chain_plug->chain(); - std::optional peer_fork_db_root_id = cc.fork_block_id_for_num( peer_fork_db_root_num ); // thread-safe - if (!peer_fork_db_root_id) { + auto [on_fork, unknown_block] = block_on_fork(msg.fork_db_root_id); // thread safe + if (unknown_block) { // can be not found if running with a truncated block log peer_dlog( this, "peer froot block ${n} is unknown", ("n", peer_fork_db_root_num) ); - } else if (msg.fork_db_root_id != peer_fork_db_root_id) { + } else if (on_fork) { peer_wlog( this, "Peer chain is forked, sending: forked go away" ); no_retry = go_away_reason::forked; enqueue( go_away_message( go_away_reason::forked ) ); @@ -3586,7 +3700,7 @@ namespace eosio { } void connection::handle_message( const request_message& msg ) { - if( msg.req_blocks.ids.size() > 1 ) { + if( msg.req_blocks.ids.size() > 2 ) { peer_wlog( this, "Invalid request_message, req_blocks.ids.size ${s}, closing", ("s", msg.req_blocks.ids.size()) ); close(); @@ -3594,14 +3708,29 @@ namespace eosio { } switch (msg.req_blocks.mode) { - case catch_up : - peer_dlog( this, "received request_message:catch_up" ); - blk_send_branch( msg.req_blocks.ids.empty() ? block_id_type() : msg.req_blocks.ids.back() ); + case catch_up : { + const block_id_type& id = msg.req_blocks.ids.empty() ? block_id_type() : msg.req_blocks.ids.back(); + peer_dlog( this, "received request_message:catch_up #${bn}:${id}", ("bn", block_header::num_from_id(id))("id",id) ); + blk_send_branch( id ); break; - case normal : + } + case normal : { + if (protocol_version >= proto_block_nack) { + if (msg.req_blocks.ids.size() == 2 && msg.req_trx.ids.empty()) { + const block_id_type& req_id = msg.req_blocks.ids[0]; // 0 - req_id, 1 - peer_head_id + peer_dlog( this, "${d} request_message:normal #${bn}:${id}", + ("d", is_blocks_connection() ? "received" : "ignoring")("bn", block_header::num_from_id(req_id))("id", req_id) ); + if (!is_blocks_connection()) + return; + const block_id_type& peer_head_id = msg.req_blocks.ids[1]; + blk_send_branch_from_nack_request(req_id, peer_head_id); + return; + } + } peer_wlog( this, "Invalid request_message, req_blocks.mode = normal" ); close(); return; + } default:; } @@ -3645,6 +3774,7 @@ namespace eosio { } } + // called from connection strand void connection::handle_message( const vote_message_ptr& msg ) { last_vote_received = fc::time_point::now(); if (vote_logger.is_enabled(fc::log_level::debug)) { @@ -3656,6 +3786,73 @@ namespace eosio { cc.process_vote_message(connection_id, msg); } + // called from connection strand + void connection::handle_message( const block_nack_message& msg ) { + auto block_num = block_header::num_from_id(msg.id); + + if (block_num == 0) { // peer requested reset + consecutive_blocks_nacks = 0; + last_block_nack = msg.id; + peer_dlog(this, "received block nack reset"); + return; + } + + latest_blk_time = std::chrono::steady_clock::now(); + auto fork_db_root_num = my_impl->get_fork_db_root_num(); + const bool before_lib = block_header::num_from_id(msg.id) <= fork_db_root_num; + + if (before_lib || my_impl->dispatcher.have_block(msg.id)) { + if (block_num - 1 == block_header::num_from_id(last_block_nack)) { + ++consecutive_blocks_nacks; + } + if (!before_lib) { + my_impl->dispatcher.add_peer_block(msg.id, connection_id); + } + } + last_block_nack = msg.id; + + peer_dlog(this, "received block nack #${bn}:${id}, consecutive ${c}", ("bn", block_num)("id", msg.id)("c", consecutive_blocks_nacks)); + } + + // called from connection strand + void connection::handle_message( const block_notice_message& msg ) { + if (block_header::num_from_id(msg.id)-1 != block_header::num_from_id(msg.previous)) { + peer_dlog(this, "Invalid block_notice_message ${id}, closing", ("id", msg.id)); + close(); + return; + } + + auto fork_db_root_num = my_impl->get_fork_db_root_num(); + if (block_header::num_from_id(msg.id) <= fork_db_root_num) + return; + + latest_blk_time = std::chrono::steady_clock::now(); + if (my_impl->dispatcher.have_block(msg.id)) { + my_impl->dispatcher.add_peer_block(msg.id, connection_id); + } else if (!my_impl->dispatcher.have_block(msg.previous)) { // still don't have previous block + peer_dlog(this, "Received unknown block notice, checking already requested"); + request_message req; + req.req_blocks.mode = normal; + req.req_blocks.ids.push_back(msg.previous); + bool already_requested = my_impl->connections.any_of_block_connections([&req](const auto& c) { + fc::lock_guard g_conn(c->conn_mtx); + return c->last_block_nack_request_message_id == req.req_blocks.ids[0]; + }); + if (!already_requested) { + peer_ilog(this, "Received unknown block notice, requesting blocks from ${bn}", + ("bn", block_header::num_from_id(msg.previous))); + block_id_type head_id = my_impl->get_chain_info().head_id; + req.req_blocks.ids.push_back(head_id); + send_block_nack({}); + { + fc::lock_guard g_conn(conn_mtx); + last_block_nack_request_message_id = req.req_blocks.ids[0]; + } + enqueue(req); + } + } + } + size_t calc_trx_size( const packed_transaction_ptr& trx ) { return trx->get_estimated_size(); } @@ -3701,8 +3898,10 @@ namespace eosio { // may have come in on a different connection and posted into dispatcher strand before this one if( block_header::num_from_id(id) <= fork_db_root_num || my_impl->dispatcher.have_block( id ) || cc.block_exists( id ) ) { // thread-safe - my_impl->dispatcher.add_peer_block( id, c->connection_id ); boost::asio::post(c->strand, [c, id, ptr{std::move(ptr)}]() { + if (my_impl->dispatcher.add_peer_block( id, c->connection_id )) { + c->send_block_nack(id); + } const fc::microseconds age(fc::time_point::now() - ptr->timestamp); my_impl->sync_master->sync_recv_block( c, id, block_header::num_from_id(id), age ); }); @@ -4101,6 +4300,8 @@ namespace eosio { " p2p.blk.eos.io:9876:blk\n") ( "p2p-max-nodes-per-host", bpo::value()->default_value(def_max_nodes_per_host), "Maximum number of client nodes from any single IP address") ( "p2p-accept-transactions", bpo::value()->default_value(true), "Allow transactions received over p2p network to be evaluated and relayed if valid.") + ( "p2p-disable-block-nack", bpo::value()->default_value(false), + "Disable block notice and block nack. All blocks received will be broadcast to all peers unless already received.") ( "p2p-auto-bp-peer", bpo::value< vector >()->composing(), "The account and public p2p endpoint of a block producer node to automatically connect to when the it is in producer schedule proximity\n." " Syntax: account,host:port\n" @@ -4159,6 +4360,7 @@ namespace eosio { resp_expected_period = def_resp_expected_wait; max_nodes_per_host = options.at( "p2p-max-nodes-per-host" ).as(); p2p_accept_transactions = options.at( "p2p-accept-transactions" ).as(); + p2p_disable_block_nack = options.at( "p2p-disable-block-nack" ).as(); use_socket_read_watermark = options.at( "use-socket-read-watermark" ).as(); keepalive_interval = std::chrono::milliseconds( options.at( "p2p-keepalive-interval-ms" ).as() ); diff --git a/tests/PerformanceHarness/performance_test_basic.py b/tests/PerformanceHarness/performance_test_basic.py index 94670664f6..2aa9815183 100755 --- a/tests/PerformanceHarness/performance_test_basic.py +++ b/tests/PerformanceHarness/performance_test_basic.py @@ -193,7 +193,7 @@ def __init__(self, testHelperConfig: TestHelperConfig=TestHelperConfig(), cluste Utils.Debug = self.testHelperConfig.verbose self.errorExit = Utils.errorExit - self.emptyBlockGoal = 1 + self.emptyBlockGoal = 7 self.testStart = datetime.utcnow() self.testEnd = self.testStart diff --git a/tests/disaster_recovery_3.py b/tests/disaster_recovery_3.py index b9f3fb66f0..26b2391dd1 100755 --- a/tests/disaster_recovery_3.py +++ b/tests/disaster_recovery_3.py @@ -99,7 +99,7 @@ for node in [node2, node3]: assert node.waitForBlock(n_LIB, timeout=None, blockType=BlockType.lib), "Node did not advance LIB after shutdown of node0 and node1" currentLIB = node.getIrreversibleBlockNum() - assert currentLIB == n_LIB, f"Node advanced LIB {currentLIB} beyond N LIB {n_LIB}" + assert currentLIB == n_LIB, f"Node {node.nodeId} advanced LIB {currentLIB} beyond N LIB {n_LIB}" Print("Shutdown other two nodes") for node in [node2, node3]: