diff options
author | moneromooo-monero <moneromooo-monero@users.noreply.github.com> | 2018-04-29 23:30:51 +0100 |
---|---|---|
committer | moneromooo-monero <moneromooo-monero@users.noreply.github.com> | 2019-01-22 20:30:51 +0000 |
commit | b750fb27b0f20e9443827732b69a504a76036430 (patch) | |
tree | 36fdd03f84ae78dbd217b5bd255bd86e33db1bcc /src/cryptonote_protocol/cryptonote_protocol_handler.inl | |
parent | Merge pull request #5008 (diff) | |
download | monero-b750fb27b0f20e9443827732b69a504a76036430.tar.xz |
Pruning
The blockchain prunes seven eighths of prunable tx data.
This saves about two thirds of the blockchain size, while
keeping the node useful as a sync source for an eighth
of the blockchain.
No other data is currently pruned.
There are three ways to prune a blockchain:
- run monerod with --prune-blockchain
- run "prune_blockchain" in the monerod console
- run the monero-blockchain-prune utility
The first two will prune in place. Due to how LMDB works, this
will not reduce the blockchain size on disk. Instead, it will
mark parts of the file as free, so that future data will use
that free space, causing the file to not grow until free space
grows scarce.
The third way will create a second database, a pruned copy of
the original one. Since this is a new file, this one will be
smaller than the original one.
Once the database is pruned, it will stay pruned as it syncs.
That is, there is no need to use --prune-blockchain again, etc.
Diffstat (limited to 'src/cryptonote_protocol/cryptonote_protocol_handler.inl')
-rw-r--r-- | src/cryptonote_protocol/cryptonote_protocol_handler.inl | 788 |
1 files changed, 613 insertions, 175 deletions
diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.inl b/src/cryptonote_protocol/cryptonote_protocol_handler.inl index 01f70cba1..61a211094 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.inl +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.inl @@ -42,17 +42,24 @@ #include "cryptonote_basic/cryptonote_format_utils.h" #include "profile_tools.h" #include "net/network_throttle-detail.hpp" +#include "common/pruning.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net.cn" #define MLOG_P2P_MESSAGE(x) MCINFO("net.p2p.msg", context << x) +#define MLOG_PEER_STATE(x) \ + MCINFO(MONERO_DEFAULT_LOG_CATEGORY, context << "[" << epee::string_tools::to_string_hex(context.m_pruning_seed) << "] state: " << x << " in state " << cryptonote::get_protocol_state_string(context.m_state)) -#define BLOCK_QUEUE_NBLOCKS_THRESHOLD 10 // chunks of N blocks +#define BLOCK_QUEUE_NSPANS_THRESHOLD 10 // chunks of N blocks #define BLOCK_QUEUE_SIZE_THRESHOLD (100*1024*1024) // MB -#define REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD (5 * 1000000) // microseconds +#define BLOCK_QUEUE_FORCE_DOWNLOAD_NEAR_BLOCKS 1000 +#define REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY (5 * 1000000) // microseconds +#define REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD (30 * 1000000) // microseconds #define IDLE_PEER_KICK_TIME (600 * 1000000) // microseconds #define PASSIVE_PEER_KICK_TIME (60 * 1000000) // microseconds +#define DROP_ON_SYNC_WEDGE_THRESHOLD (30 * 1000000000ull) // nanoseconds +#define LAST_ACTIVITY_STALL_THRESHOLD (2.0f) // seconds namespace cryptonote { @@ -75,6 +82,19 @@ namespace cryptonote template<class t_core> bool t_cryptonote_protocol_handler<t_core>::init(const boost::program_options::variables_map& vm) { + m_sync_timer.pause(); + m_sync_timer.reset(); + m_add_timer.pause(); + m_add_timer.reset(); + m_last_add_end_time = 0; + m_sync_spans_downloaded = 0; + m_sync_old_spans_downloaded = 0; + m_sync_bad_spans_downloaded = 0; + m_sync_download_chain_size = 0; + m_sync_download_objects_size = 0; + + m_block_download_max_size = command_line::get_arg(vm, cryptonote::arg_block_download_max_size); + return true; } //------------------------------------------------------------------------------------------------------------------------ @@ -103,9 +123,12 @@ namespace cryptonote if(context.m_state == cryptonote_connection_context::state_synchronizing) { NOTIFY_REQUEST_CHAIN::request r = boost::value_initialized<NOTIFY_REQUEST_CHAIN::request>(); + context.m_needed_objects.clear(); m_core.get_short_chain_history(r.block_ids); - LOG_PRINT_CCONTEXT_L2("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); + handler_request_blocks_history( r.block_ids ); // change the limit(?), sleep(?) + MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); post_notify<NOTIFY_REQUEST_CHAIN>(r, context); + MLOG_PEER_STATE("requesting chain"); } else if(context.m_state == cryptonote_connection_context::state_standby) { @@ -247,6 +270,7 @@ namespace cryptonote cnx.connection_id = epee::string_tools::pod_to_hex(cntxt.m_connection_id); cnx.height = cntxt.m_remote_blockchain_height; + cnx.pruning_seed = cntxt.m_pruning_seed; connections.push_back(cnx); @@ -279,7 +303,23 @@ namespace cryptonote } } + // reject weird pruning schemes + if (hshd.pruning_seed) + { + const uint32_t log_stripes = tools::get_pruning_log_stripes(hshd.pruning_seed); + if (log_stripes != CRYPTONOTE_PRUNING_LOG_STRIPES || tools::get_pruning_stripe(hshd.pruning_seed) > (1u << log_stripes)) + { + MWARNING(context << " peer claim unexpected pruning seed " << epee::string_tools::to_string_hex(hshd.pruning_seed) << ", disconnecting"); + return false; + } + } + context.m_remote_blockchain_height = hshd.current_height; + context.m_pruning_seed = hshd.pruning_seed; +#ifdef CRYPTONOTE_PRUNING_DEBUG_SPOOF_SEED + context.m_pruning_seed = tools::make_pruning_seed(1 + (context.m_remote_address.as<epee::net_utils::ipv4_network_address>().ip()) % (1 << CRYPTONOTE_PRUNING_LOG_STRIPES), CRYPTONOTE_PRUNING_LOG_STRIPES); + LOG_INFO_CC(context, "New connection posing as pruning seed " << epee::string_tools::to_string_hex(context.m_pruning_seed) << ", seed address " << &context.m_pruning_seed); +#endif uint64_t target = m_core.get_target_blockchain_height(); if (target == 0) @@ -298,7 +338,6 @@ namespace cryptonote /* As I don't know if accessing hshd from core could be a good practice, I prefer pushing target height to the core at the same time it is pushed to the user. Nz. */ - m_core.set_target_blockchain_height((hshd.current_height)); int64_t diff = static_cast<int64_t>(hshd.current_height) - static_cast<int64_t>(m_core.get_current_blockchain_height()); uint64_t abs_diff = std::abs(diff); uint64_t max_block_height = std::max(hshd.current_height,m_core.get_current_blockchain_height()); @@ -309,14 +348,31 @@ namespace cryptonote << (0 <= diff ? std::string("behind") : std::string("ahead")) << "] " << ENDL << "SYNCHRONIZATION started"); if (hshd.current_height >= m_core.get_current_blockchain_height() + 5) // don't switch to unsafe mode just for a few blocks + { m_core.safesyncmode(false); + } + if (m_core.get_target_blockchain_height() == 0) // only when sync starts + { + m_sync_timer.resume(); + m_sync_timer.reset(); + m_add_timer.pause(); + m_add_timer.reset(); + m_last_add_end_time = 0; + m_sync_spans_downloaded = 0; + m_sync_old_spans_downloaded = 0; + m_sync_bad_spans_downloaded = 0; + m_sync_download_chain_size = 0; + m_sync_download_objects_size = 0; + } + m_core.set_target_blockchain_height((hshd.current_height)); } - LOG_PRINT_L1("Remote blockchain height: " << hshd.current_height << ", id: " << hshd.top_id); + MINFO(context << "Remote blockchain height: " << hshd.current_height << ", id: " << hshd.top_id); context.m_state = cryptonote_connection_context::state_synchronizing; //let the socket to send response to handshake, but request callback, to let send request data after response LOG_PRINT_CCONTEXT_L2("requesting callback"); ++context.m_callback_request_count; m_p2p->request_callback(context); + MLOG_PEER_STATE("requesting callback"); return true; } //------------------------------------------------------------------------------------------------------------------------ @@ -327,6 +383,7 @@ namespace cryptonote hshd.top_version = m_core.get_ideal_hard_fork_version(hshd.current_height); hshd.cumulative_difficulty = m_core.get_block_cumulative_difficulty(hshd.current_height); hshd.current_height +=1; + hshd.pruning_seed = m_core.get_blockchain_pruning_seed(); return true; } //------------------------------------------------------------------------------------------------------------------------ @@ -389,11 +446,14 @@ namespace cryptonote relay_block(arg, context); }else if(bvc.m_marked_as_orphaned) { + context.m_needed_objects.clear(); context.m_state = cryptonote_connection_context::state_synchronizing; NOTIFY_REQUEST_CHAIN::request r = boost::value_initialized<NOTIFY_REQUEST_CHAIN::request>(); m_core.get_short_chain_history(r.block_ids); - LOG_PRINT_CCONTEXT_L2("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); + handler_request_blocks_history( r.block_ids ); // change the limit(?), sleep(?) + MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); post_notify<NOTIFY_REQUEST_CHAIN>(r, context); + MLOG_PEER_STATE("requesting chain"); } return 1; @@ -616,6 +676,7 @@ namespace cryptonote missing_tx_req.missing_tx_indices = std::move(need_tx_indices); m_core.resume_mine(); + MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_FLUFFY_MISSING_TX: missing_tx_indices.size()=" << missing_tx_req.missing_tx_indices.size() ); post_notify<NOTIFY_REQUEST_FLUFFY_MISSING_TX>(missing_tx_req, context); } else // whoo-hoo we've got em all .. @@ -656,11 +717,14 @@ namespace cryptonote } else if( bvc.m_marked_as_orphaned ) { + context.m_needed_objects.clear(); context.m_state = cryptonote_connection_context::state_synchronizing; NOTIFY_REQUEST_CHAIN::request r = boost::value_initialized<NOTIFY_REQUEST_CHAIN::request>(); m_core.get_short_chain_history(r.block_ids); - LOG_PRINT_CCONTEXT_L2("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); + handler_request_blocks_history( r.block_ids ); // change the limit(?), sleep(?) + MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); post_notify<NOTIFY_REQUEST_CHAIN>(r, context); + MLOG_PEER_STATE("requesting chain"); } } } @@ -747,7 +811,7 @@ namespace cryptonote fluffy_response.b.txs.push_back(t_serializable_object_to_blob(tx)); } - LOG_PRINT_CCONTEXT_L2 + MLOG_P2P_MESSAGE ( "-->>NOTIFY_RESPONSE_FLUFFY_MISSING_TX: " << ", txs.size()=" << fluffy_response.b.txs.size() @@ -811,7 +875,7 @@ namespace cryptonote drop_connection(context, false, false); return 1; } - LOG_PRINT_CCONTEXT_L2("-->>NOTIFY_RESPONSE_GET_OBJECTS: blocks.size()=" << rsp.blocks.size() << ", txs.size()=" << rsp.txs.size() + MLOG_P2P_MESSAGE("-->>NOTIFY_RESPONSE_GET_OBJECTS: blocks.size()=" << rsp.blocks.size() << ", txs.size()=" << rsp.txs.size() << ", rsp.m_current_blockchain_height=" << rsp.current_blockchain_height << ", missed_ids.size()=" << rsp.missed_ids.size()); post_notify<NOTIFY_RESPONSE_GET_OBJECTS>(rsp, context); //handler_response_blocks_now(sizeof(rsp)); // XXX @@ -834,11 +898,14 @@ namespace cryptonote return avg / m_avg_buffer.size(); } - template<class t_core> int t_cryptonote_protocol_handler<t_core>::handle_response_get_objects(int command, NOTIFY_RESPONSE_GET_OBJECTS::request& arg, cryptonote_connection_context& context) { MLOG_P2P_MESSAGE("Received NOTIFY_RESPONSE_GET_OBJECTS (" << arg.blocks.size() << " blocks, " << arg.txs.size() << " txes)"); + MLOG_PEER_STATE("received objects"); + + boost::posix_time::ptime request_time = context.m_last_request_time; + context.m_last_request_time = boost::date_time::not_a_date_time; // calculate size of request size_t size = 0; @@ -860,6 +927,8 @@ namespace cryptonote CRITICAL_REGION_LOCAL(m_buffer_mutex); m_avg_buffer.push_back(size); } + ++m_sync_spans_downloaded; + m_sync_download_objects_size += size; MDEBUG(context << " downloaded " << size << " bytes worth of blocks"); /*using namespace boost::chrono; @@ -874,6 +943,7 @@ namespace cryptonote LOG_ERROR_CCONTEXT("sent wrong NOTIFY_HAVE_OBJECTS: arg.m_current_blockchain_height=" << arg.current_blockchain_height << " < m_last_response_height=" << context.m_last_response_height << ", dropping connection"); drop_connection(context, false, false); + ++m_sync_bad_spans_downloaded; return 1; } @@ -898,6 +968,7 @@ namespace cryptonote LOG_ERROR_CCONTEXT("sent wrong block: failed to parse and validate block: " << epee::string_tools::buff_to_hex_nodelimer(block_entry.block) << ", dropping connection"); drop_connection(context, false, false); + ++m_sync_bad_spans_downloaded; return 1; } if (b.miner_tx.vin.size() != 1 || b.miner_tx.vin.front().type() != typeid(txin_gen)) @@ -905,6 +976,7 @@ namespace cryptonote LOG_ERROR_CCONTEXT("sent wrong block: block: miner tx does not have exactly one txin_gen input" << epee::string_tools::buff_to_hex_nodelimer(block_entry.block) << ", dropping connection"); drop_connection(context, false, false); + ++m_sync_bad_spans_downloaded; return 1; } if (start_height == std::numeric_limits<uint64_t>::max()) @@ -917,6 +989,7 @@ namespace cryptonote LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_GET_OBJECTS: block with id=" << epee::string_tools::pod_to_hex(get_blob_hash(block_entry.block)) << " wasn't requested, dropping connection"); drop_connection(context, false, false); + ++m_sync_bad_spans_downloaded; return 1; } if(b.tx_hashes.size() != block_entry.txs.size()) @@ -924,6 +997,7 @@ namespace cryptonote LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_GET_OBJECTS: block with id=" << epee::string_tools::pod_to_hex(get_blob_hash(block_entry.block)) << ", tx_hashes.size()=" << b.tx_hashes.size() << " mismatch with block_complete_entry.m_txs.size()=" << block_entry.txs.size() << ", dropping connection"); drop_connection(context, false, false); + ++m_sync_bad_spans_downloaded; return 1; } @@ -931,34 +1005,27 @@ namespace cryptonote block_hashes.push_back(block_hash); } - if(context.m_requested_objects.size()) + if(!context.m_requested_objects.empty()) { - MERROR("returned not all requested objects (context.m_requested_objects.size()=" + MERROR(context << "returned not all requested objects (context.m_requested_objects.size()=" << context.m_requested_objects.size() << "), dropping connection"); drop_connection(context, false, false); + ++m_sync_bad_spans_downloaded; return 1; } - // get the last parsed block, which should be the highest one - const crypto::hash last_block_hash = cryptonote::get_block_hash(b); - if(m_core.have_block(last_block_hash)) - { - const uint64_t subchain_height = start_height + arg.blocks.size(); - LOG_DEBUG_CC(context, "These are old blocks, ignoring: blocks " << start_height << " - " << (subchain_height-1) << ", blockchain height " << m_core.get_current_blockchain_height()); - m_block_queue.remove_spans(context.m_connection_id, start_height); - goto skip; - } - { MLOG_YELLOW(el::Level::Debug, context << " Got NEW BLOCKS inside of " << __FUNCTION__ << ": size: " << arg.blocks.size() - << ", blocks: " << start_height << " - " << (start_height + arg.blocks.size() - 1)); + << ", blocks: " << start_height << " - " << (start_height + arg.blocks.size() - 1) << + " (pruning seed " << epee::string_tools::to_string_hex(context.m_pruning_seed) << ")"); // add that new span to the block queue - const boost::posix_time::time_duration dt = now - context.m_last_request_time; + const boost::posix_time::time_duration dt = now - request_time; const float rate = size * 1e6 / (dt.total_microseconds() + 1); - MDEBUG(context << " adding span: " << arg.blocks.size() << " at height " << start_height << ", " << dt.total_microseconds()/1e6 << " seconds, " << (rate/1e3) << " kB/s, size now " << (m_block_queue.get_data_size() + blocks_size) / 1048576.f << " MB"); + MDEBUG(context << " adding span: " << arg.blocks.size() << " at height " << start_height << ", " << dt.total_microseconds()/1e6 << " seconds, " << (rate/1024) << " kB/s, size now " << (m_block_queue.get_data_size() + blocks_size) / 1048576.f << " MB"); m_block_queue.add_blocks(start_height, arg.blocks, context.m_connection_id, rate, blocks_size); + const crypto::hash last_block_hash = cryptonote::get_block_hash(b); context.m_last_known_hash = last_block_hash; if (!m_core.get_test_drop_download() || !m_core.get_test_drop_download_height()) { // DISCARD BLOCKS for testing @@ -966,7 +1033,6 @@ namespace cryptonote } } -skip: try_add_next_blocks(context); return 1; } @@ -983,15 +1049,22 @@ skip: const boost::unique_lock<boost::mutex> sync{m_sync_lock, boost::try_to_lock}; if (!sync.owns_lock()) { - MINFO("Failed to lock m_sync_lock, going back to download"); + MINFO(context << "Failed to lock m_sync_lock, going back to download"); goto skip; } MDEBUG(context << " lock m_sync_lock, adding blocks to chain..."); + MLOG_PEER_STATE("adding blocks"); { m_core.pause_mine(); - epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler( - boost::bind(&t_core::resume_mine, &m_core)); + m_add_timer.resume(); + bool starting = true; + epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler([this, &starting]() { + m_add_timer.pause(); + m_core.resume_mine(); + if (!starting) + m_last_add_end_time = tools::get_tick_count(); + }); while (1) { @@ -1004,22 +1077,38 @@ skip: MDEBUG(context << " no next span found, going back to download"); break; } - MDEBUG(context << " next span in the queue has blocks " << start_height << "-" << (start_height + blocks.size() - 1) - << ", we need " << previous_height); if (blocks.empty()) { - MERROR("Next span has no blocks"); + MERROR(context << "Next span has no blocks"); m_block_queue.remove_spans(span_connection_id, start_height); - break; + continue; } + MDEBUG(context << " next span in the queue has blocks " << start_height << "-" << (start_height + blocks.size() - 1) + << ", we need " << previous_height); + block new_block; + if (!parse_and_validate_block_from_blob(blocks.back().block, new_block)) + { + MERROR(context << "Failed to parse block, but it should already have been parsed"); + m_block_queue.remove_spans(span_connection_id, start_height); + continue; + } + const crypto::hash last_block_hash = cryptonote::get_block_hash(new_block); + if (m_core.have_block(last_block_hash)) + { + const uint64_t subchain_height = start_height + blocks.size(); + LOG_DEBUG_CC(context, "These are old blocks, ignoring: blocks " << start_height << " - " << (subchain_height-1) << ", blockchain height " << m_core.get_current_blockchain_height()); + m_block_queue.remove_spans(span_connection_id, start_height); + ++m_sync_old_spans_downloaded; + continue; + } if (!parse_and_validate_block_from_blob(blocks.front().block, new_block)) { - MERROR("Failed to parse block, but it should already have been parsed"); + MERROR(context << "Failed to parse block, but it should already have been parsed"); m_block_queue.remove_spans(span_connection_id, start_height); - break; + continue; } bool parent_known = m_core.have_block(new_block.prev_id); if (!parent_known) @@ -1032,6 +1121,24 @@ skip: bool parent_requested = m_block_queue.requested(new_block.prev_id); if (!parent_requested) { + // we might be able to ask for that block directly, as we now can request out of order, + // otherwise we continue out of order, unless this block is the one we need, in which + // case we request block hashes, though it might be safer to disconnect ? + if (start_height > previous_height) + { + if (should_drop_connection(context, get_next_needed_pruning_stripe().first)) + { + MDEBUG(context << "Got block with unknown parent which was not requested, but peer does not have that block - dropping connection"); + if (!context.m_is_income) + m_p2p->add_used_stripe_peer(context); + drop_connection(context, false, true); + return 1; + } + MDEBUG(context << "Got block with unknown parent which was not requested, but peer does not have that block - back to download"); + + goto skip; + } + // this can happen if a connection was sicced onto a late span, if it did not have those blocks, // since we don't know that at the sic time LOG_ERROR_CCONTEXT("Got block with unknown parent which was not requested - querying block hashes"); @@ -1047,7 +1154,17 @@ skip: } const boost::posix_time::ptime start = boost::posix_time::microsec_clock::universal_time(); - context.m_last_request_time = start; + + if (starting) + { + starting = false; + if (m_last_add_end_time) + { + const uint64_t tnow = tools::get_tick_count(); + const uint64_t ns = tools::ticks_to_ns(tnow - m_last_add_end_time); + MINFO("Restarting adding block after idle for " << ns/1e9 << " seconds"); + } + } m_core.prepare_handle_incoming_blocks(blocks); @@ -1150,7 +1267,7 @@ skip: } // each download block - MCINFO("sync-info", "Block process time (" << blocks.size() << " blocks, " << num_txs << " txs): " << block_process_time_full + transactions_process_time_full << " (" << transactions_process_time_full << "/" << block_process_time_full << ") ms"); + MDEBUG(context << "Block process time (" << blocks.size() << " blocks, " << num_txs << " txs): " << block_process_time_full + transactions_process_time_full << " (" << transactions_process_time_full << "/" << block_process_time_full << ") ms"); if (!m_core.cleanup_handle_incoming_blocks()) { @@ -1160,40 +1277,54 @@ skip: m_block_queue.remove_spans(span_connection_id, start_height); - if (m_core.get_current_blockchain_height() > previous_height) + const uint64_t current_blockchain_height = m_core.get_current_blockchain_height(); + if (current_blockchain_height > previous_height) { + const uint64_t target_blockchain_height = m_core.get_target_blockchain_height(); const boost::posix_time::time_duration dt = boost::posix_time::microsec_clock::universal_time() - start; + std::string progress_message = ""; + if (current_blockchain_height < target_blockchain_height) + { + uint64_t completion_percent = (current_blockchain_height * 100 / target_blockchain_height); + if (completion_percent == 100) // never show 100% if not actually up to date + completion_percent = 99; + progress_message = " (" + std::to_string(completion_percent) + "%, " + + std::to_string(target_blockchain_height - current_blockchain_height) + " left)"; + } + const uint32_t previous_stripe = tools::get_pruning_stripe(previous_height, target_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); + const uint32_t current_stripe = tools::get_pruning_stripe(current_blockchain_height, target_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); std::string timing_message = ""; if (ELPP->vRegistry()->allowed(el::Level::Info, "sync-info")) timing_message = std::string(" (") + std::to_string(dt.total_microseconds()/1e6) + " sec, " - + std::to_string((m_core.get_current_blockchain_height() - previous_height) * 1e6 / dt.total_microseconds()) - + " blocks/sec), " + std::to_string(m_block_queue.get_data_size() / 1048576.f) + " MB queued"; + + std::to_string((current_blockchain_height - previous_height) * 1e6 / dt.total_microseconds()) + + " blocks/sec), " + std::to_string(m_block_queue.get_data_size() / 1048576.f) + " MB queued in " + + std::to_string(m_block_queue.get_num_filled_spans()) + " spans, stripe " + + std::to_string(previous_stripe) + " -> " + std::to_string(current_stripe); if (ELPP->vRegistry()->allowed(el::Level::Debug, "sync-info")) - timing_message += std::string(": ") + m_block_queue.get_overview(); - if(m_core.get_target_blockchain_height() == 0){ - MGINFO_YELLOW(context << " Synced " << m_core.get_current_blockchain_height() << "/" << m_core.get_target_blockchain_height() - << timing_message); - } else { - const int completition_percent = (m_core.get_current_blockchain_height() * 100 / m_core.get_target_blockchain_height()); - if(completition_percent < 99) {//printing completion percent only if % is < of 99 cause for 99 >= this is useless - MGINFO_YELLOW(context << " Synced " << m_core.get_current_blockchain_height() << "/" << m_core.get_target_blockchain_height() - << " (" << completition_percent << "% " << (m_core.get_target_blockchain_height() - m_core.get_current_blockchain_height()) - << " blocks remaining)" << timing_message); - } else { - MGINFO_YELLOW(context << " Synced " << m_core.get_current_blockchain_height() << "/" << m_core.get_target_blockchain_height() - << timing_message); - } - } + timing_message += std::string(": ") + m_block_queue.get_overview(current_blockchain_height); + MGINFO_YELLOW("Synced " << current_blockchain_height << "/" << target_blockchain_height + << progress_message << timing_message); + if (previous_stripe != current_stripe) + notify_new_stripe(context, current_stripe); } } } - if (should_download_next_span(context)) + MLOG_PEER_STATE("stopping adding blocks"); + + if (should_download_next_span(context, false)) { - context.m_needed_objects.clear(); - context.m_last_response_height = 0; force_next_span = true; } + else if (should_drop_connection(context, get_next_needed_pruning_stripe().first)) + { + if (!context.m_is_income) + { + m_p2p->add_used_stripe_peer(context); + drop_connection(context, false, false); + } + return 1; + } } skip: @@ -1207,6 +1338,28 @@ skip: } //------------------------------------------------------------------------------------------------------------------------ template<class t_core> + void t_cryptonote_protocol_handler<t_core>::notify_new_stripe(cryptonote_connection_context& cntxt, uint32_t stripe) + { + m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool + { + if (cntxt.m_connection_id == context.m_connection_id) + return true; + if (context.m_state == cryptonote_connection_context::state_normal) + { + const uint32_t peer_stripe = tools::get_pruning_stripe(context.m_pruning_seed); + if (stripe && peer_stripe && peer_stripe != stripe) + return true; + context.m_state = cryptonote_connection_context::state_synchronizing; + LOG_PRINT_CCONTEXT_L2("requesting callback"); + ++context.m_callback_request_count; + m_p2p->request_callback(context); + MLOG_PEER_STATE("requesting callback"); + } + return true; + }); + } + //------------------------------------------------------------------------------------------------------------------------ + template<class t_core> bool t_cryptonote_protocol_handler<t_core>::on_idle() { m_idle_peer_kicker.do_call(boost::bind(&t_cryptonote_protocol_handler<t_core>::kick_idle_peers, this)); @@ -1218,30 +1371,24 @@ skip: bool t_cryptonote_protocol_handler<t_core>::kick_idle_peers() { MTRACE("Checking for idle peers..."); - std::vector<boost::uuids::uuid> kick_connections; m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool { - if (context.m_state == cryptonote_connection_context::state_synchronizing || context.m_state == cryptonote_connection_context::state_before_handshake) + if (context.m_state == cryptonote_connection_context::state_synchronizing && context.m_last_request_time != boost::date_time::not_a_date_time) { - const bool passive = context.m_state == cryptonote_connection_context::state_before_handshake; const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); const boost::posix_time::time_duration dt = now - context.m_last_request_time; - const int64_t threshold = passive ? PASSIVE_PEER_KICK_TIME : IDLE_PEER_KICK_TIME; - if (dt.total_microseconds() > threshold) + if (dt.total_microseconds() > IDLE_PEER_KICK_TIME) { - MINFO(context << " kicking " << (passive ? "passive" : "idle") << " peer"); - kick_connections.push_back(context.m_connection_id); + MINFO(context << " kicking idle peer, last update " << (dt.total_microseconds() / 1.e6) << " seconds ago"); + LOG_PRINT_CCONTEXT_L2("requesting callback"); + context.m_last_request_time = boost::date_time::not_a_date_time; + context.m_state = cryptonote_connection_context::state_standby; // we'll go back to adding, then (if we can't), download + ++context.m_callback_request_count; + m_p2p->request_callback(context); } } return true; }); - for (const boost::uuids::uuid &conn_id: kick_connections) - { - m_p2p->for_connection(conn_id, [this](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags) { - drop_connection(context, false, false); - return true; - }); - } return true; } //------------------------------------------------------------------------------------------------------------------------ @@ -1272,75 +1419,174 @@ skip: drop_connection(context, false, false); return 1; } - LOG_PRINT_CCONTEXT_L2("-->>NOTIFY_RESPONSE_CHAIN_ENTRY: m_start_height=" << r.start_height << ", m_total_height=" << r.total_height << ", m_block_ids.size()=" << r.m_block_ids.size()); + MLOG_P2P_MESSAGE("-->>NOTIFY_RESPONSE_CHAIN_ENTRY: m_start_height=" << r.start_height << ", m_total_height=" << r.total_height << ", m_block_ids.size()=" << r.m_block_ids.size()); post_notify<NOTIFY_RESPONSE_CHAIN_ENTRY>(r, context); return 1; } //------------------------------------------------------------------------------------------------------------------------ template<class t_core> - bool t_cryptonote_protocol_handler<t_core>::should_download_next_span(cryptonote_connection_context& context) const + bool t_cryptonote_protocol_handler<t_core>::should_download_next_span(cryptonote_connection_context& context, bool standby) { std::vector<crypto::hash> hashes; boost::uuids::uuid span_connection_id; boost::posix_time::ptime request_time; + boost::uuids::uuid connection_id; std::pair<uint64_t, uint64_t> span; + bool filled; - span = m_block_queue.get_start_gap_span(); - if (span.second > 0) - { - MDEBUG(context << " we should download it as there is a gap"); - return true; - } - - // if the next span is not scheduled (or there is none) - span = m_block_queue.get_next_span_if_scheduled(hashes, span_connection_id, request_time); - if (span.second == 0) + const uint64_t blockchain_height = m_core.get_current_blockchain_height(); + if (context.m_remote_blockchain_height <= blockchain_height) + return false; + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + const bool has_next_block = tools::has_unpruned_block(blockchain_height, context.m_remote_blockchain_height, context.m_pruning_seed); + if (has_next_block) { - // we might be in a weird case where there is a filled next span, - // but it starts higher than the current height - uint64_t height; - std::vector<cryptonote::block_complete_entry> bcel; - if (!m_block_queue.get_next_span(height, bcel, span_connection_id, true)) - return false; - if (height > m_core.get_current_blockchain_height()) + if (!m_block_queue.has_next_span(blockchain_height, filled, request_time, connection_id)) { - MDEBUG(context << " we should download it as the next block isn't scheduled"); + MDEBUG(context << " we should download it as no peer reserved it"); return true; } + if (!filled) + { + const long dt = (now - request_time).total_microseconds(); + if (dt >= REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD) + { + MDEBUG(context << " we should download it as it's not been received yet after " << dt/1e6); + return true; + } + + // in standby, be ready to double download early since we're idling anyway + // let the fastest peer trigger first + long threshold; + const double dl_speed = context.m_max_speed_down; + if (standby && dt >= REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY && dl_speed > 0) + { + bool download = false; + if (m_p2p->for_connection(connection_id, [&](cryptonote_connection_context& ctx, nodetool::peerid_type peer_id, uint32_t f)->bool{ + const time_t nowt = time(NULL); + const time_t time_since_last_recv = nowt - ctx.m_last_recv; + const float last_activity = std::min((float)time_since_last_recv, dt/1e6f); + const bool stalled = last_activity > LAST_ACTIVITY_STALL_THRESHOLD; + if (stalled) + { + MDEBUG(context << " we should download it as the downloading peer is stalling for " << nowt - ctx.m_last_recv << " seconds"); + download = true; + return true; + } + + // estimate the standby peer can give us 80% of its max speed + // and let it download if that speed is > N times as fast as the current one + // N starts at 10 after REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY, + // decreases to 1.25 at REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD, + // so that at times goes without the download being done, a retry becomes easier + const float max_multiplier = 10.f; + const float min_multiplier = 1.25f; + float multiplier = max_multiplier; + if (dt/1e6 >= REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY) + { + multiplier = max_multiplier - (dt/1e6-REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY) * (max_multiplier - min_multiplier) / (REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD - REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY); + multiplier = std::min(max_multiplier, std::max(min_multiplier, multiplier)); + } + if (dl_speed * .8f > ctx.m_current_speed_down * multiplier) + { + MDEBUG(context << " we should download it as we are substantially faster (" << dl_speed << " vs " + << ctx.m_current_speed_down << ", multiplier " << multiplier << " after " << dt/1e6 << " seconds)"); + download = true; + return true; + } + return true; + })) + { + if (download) + return true; + } + else + { + MWARNING(context << " we should download it as the downloading peer is unexpectedly not known to us"); + return true; + } + } + } + } + + return false; + } + //------------------------------------------------------------------------------------------------------------------------ + template<class t_core> + bool t_cryptonote_protocol_handler<t_core>::should_drop_connection(cryptonote_connection_context& context, uint32_t next_stripe) + { + if (context.m_anchor) + { + MDEBUG(context << "This is an anchor peer, not dropping"); return false; } - // if it was scheduled by this particular peer - if (span_connection_id == context.m_connection_id) + if (context.m_pruning_seed == 0) + { + MDEBUG(context << "This peer is not striped, not dropping"); return false; + } - float span_speed = m_block_queue.get_speed(span_connection_id); - float speed = m_block_queue.get_speed(context.m_connection_id); - MDEBUG(context << " next span is scheduled for " << span_connection_id << ", speed " << span_speed << ", ours " << speed); - - // we try for that span too if: - // - we're substantially faster, or: - // - we're the fastest and the other one isn't (avoids a peer being waaaay slow but yet unmeasured) - // - the other one asked at least 5 seconds ago - if (span_speed < .25 && speed > .75f) + const uint32_t peer_stripe = tools::get_pruning_stripe(context.m_pruning_seed); + if (next_stripe == peer_stripe) { - MDEBUG(context << " we should download it as we're substantially faster"); - return true; + MDEBUG(context << "This peer has needed stripe " << peer_stripe << ", not dropping"); + return false; } - if (speed == 1.0f && span_speed != 1.0f) + + if (!context.m_needed_objects.empty()) { - MDEBUG(context << " we should download it as we're the fastest peer"); - return true; + const uint64_t next_available_block_height = context.m_last_response_height - context.m_needed_objects.size() + 1; + if (tools::has_unpruned_block(next_available_block_height, context.m_remote_blockchain_height, context.m_pruning_seed)) + { + MDEBUG(context << "This peer has unpruned next block at height " << next_available_block_height << ", not dropping"); + return false; + } } - const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); - if ((now - request_time).total_microseconds() > REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD) + + if (next_stripe > 0) { - MDEBUG(context << " we should download it as this span was requested long ago"); - return true; + unsigned int n_out_peers = 0, n_peers_on_next_stripe = 0; + m_p2p->for_each_connection([&](cryptonote_connection_context& ctx, nodetool::peerid_type peer_id, uint32_t support_flags)->bool{ + if (!ctx.m_is_income) + ++n_out_peers; + if (ctx.m_state >= cryptonote_connection_context::state_synchronizing && tools::get_pruning_stripe(ctx.m_pruning_seed) == next_stripe) + ++n_peers_on_next_stripe; + return true; + }); + const uint32_t distance = (peer_stripe + (1<<CRYPTONOTE_PRUNING_LOG_STRIPES) - next_stripe) % (1<<CRYPTONOTE_PRUNING_LOG_STRIPES); + if ((n_out_peers >= m_max_out_peers && n_peers_on_next_stripe == 0) || (distance > 1 && n_peers_on_next_stripe <= 2) || distance > 2) + { + MDEBUG(context << "we want seed " << next_stripe << ", and either " << n_out_peers << " is at max out peers (" + << m_max_out_peers << ") or distance " << distance << " from " << next_stripe << " to " << peer_stripe << + " is too large and we have only " << n_peers_on_next_stripe << " peers on next seed, dropping connection to make space"); + return true; + } } + MDEBUG(context << "End of checks, not dropping"); return false; } //------------------------------------------------------------------------------------------------------------------------ template<class t_core> + void t_cryptonote_protocol_handler<t_core>::skip_unneeded_hashes(cryptonote_connection_context& context, bool check_block_queue) const + { + // take out blocks we already have + size_t skip = 0; + while (skip < context.m_needed_objects.size() && (m_core.have_block(context.m_needed_objects[skip]) || (check_block_queue && m_block_queue.have(context.m_needed_objects[skip])))) + { + // if we're popping the last hash, record it so we can ask again from that hash, + // this prevents never being able to progress on peers we get old hash lists from + if (skip + 1 == context.m_needed_objects.size()) + context.m_last_known_hash = context.m_needed_objects[skip]; + ++skip; + } + if (skip > 0) + { + MDEBUG(context << "skipping " << skip << "/" << context.m_needed_objects.size() << " blocks"); + context.m_needed_objects = std::vector<crypto::hash>(context.m_needed_objects.begin() + skip, context.m_needed_objects.end()); + } + } + //------------------------------------------------------------------------------------------------------------------------ + template<class t_core> bool t_cryptonote_protocol_handler<t_core>::request_missing_objects(cryptonote_connection_context& context, bool check_having_blocks, bool force_next_span) { // flush stale spans @@ -1357,47 +1603,102 @@ skip: { do { - size_t nblocks = m_block_queue.get_num_filled_spans(); + size_t nspans = m_block_queue.get_num_filled_spans(); size_t size = m_block_queue.get_data_size(); - if (nblocks < BLOCK_QUEUE_NBLOCKS_THRESHOLD || size < BLOCK_QUEUE_SIZE_THRESHOLD) + const uint64_t bc_height = m_core.get_current_blockchain_height(); + const auto next_needed_pruning_stripe = get_next_needed_pruning_stripe(); + const uint32_t add_stripe = tools::get_pruning_stripe(bc_height, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); + const uint32_t peer_stripe = tools::get_pruning_stripe(context.m_pruning_seed); + const size_t block_queue_size_threshold = m_block_download_max_size ? m_block_download_max_size : BLOCK_QUEUE_SIZE_THRESHOLD; + bool queue_proceed = nspans < BLOCK_QUEUE_NSPANS_THRESHOLD || size < block_queue_size_threshold; + // get rid of blocks we already requested, or already have + skip_unneeded_hashes(context, true); + uint64_t next_needed_height = m_block_queue.get_next_needed_height(bc_height); + uint64_t next_block_height; + if (context.m_needed_objects.empty()) + next_block_height = next_needed_height; + else + next_block_height = context.m_last_response_height - context.m_needed_objects.size() + 1; + bool stripe_proceed_main = (add_stripe == 0 || peer_stripe == 0 || add_stripe == peer_stripe) && (next_block_height < bc_height + BLOCK_QUEUE_FORCE_DOWNLOAD_NEAR_BLOCKS || next_needed_height < bc_height + BLOCK_QUEUE_FORCE_DOWNLOAD_NEAR_BLOCKS); + bool stripe_proceed_secondary = tools::has_unpruned_block(next_block_height, context.m_remote_blockchain_height, context.m_pruning_seed); + bool proceed = stripe_proceed_main || (queue_proceed && stripe_proceed_secondary); + if (!stripe_proceed_main && !stripe_proceed_secondary && should_drop_connection(context, tools::get_pruning_stripe(next_block_height, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES))) + { + if (!context.m_is_income) + m_p2p->add_used_stripe_peer(context); + return false; // drop outgoing connections + } + + MDEBUG(context << "proceed " << proceed << " (queue " << queue_proceed << ", stripe " << stripe_proceed_main << "/" << + stripe_proceed_secondary << "), " << next_needed_pruning_stripe.first << "-" << next_needed_pruning_stripe.second << + " needed, bc add stripe " << add_stripe << ", we have " << peer_stripe << "), bc_height " << bc_height); + MDEBUG(context << " - next_block_height " << next_block_height << ", seed " << epee::string_tools::to_string_hex(context.m_pruning_seed) << + ", next_needed_height "<< next_needed_height); + MDEBUG(context << " - last_response_height " << context.m_last_response_height << ", m_needed_objects size " << context.m_needed_objects.size()); + + // if we're waiting for next span, try to get it before unblocking threads below, + // or a runaway downloading of future spans might happen + if (stripe_proceed_main && should_download_next_span(context, true)) + { + MDEBUG(context << " we should try for that next span too, we think we could get it faster, resuming"); + force_next_span = true; + MLOG_PEER_STATE("resuming"); + break; + } + + if (proceed) { if (context.m_state != cryptonote_connection_context::state_standby) { - LOG_DEBUG_CC(context, "Block queue is " << nblocks << " and " << size << ", resuming"); + LOG_DEBUG_CC(context, "Block queue is " << nspans << " and " << size << ", resuming"); + MLOG_PEER_STATE("resuming"); } break; } // this one triggers if all threads are in standby, which should not happen, // but happened at least once, so we unblock at least one thread if so - const boost::unique_lock<boost::mutex> sync{m_sync_lock, boost::try_to_lock}; + boost::unique_lock<boost::mutex> sync{m_sync_lock, boost::try_to_lock}; if (sync.owns_lock()) { - LOG_DEBUG_CC(context, "No other thread is adding blocks, resuming"); - break; - } + bool filled = false; + boost::posix_time::ptime time; + boost::uuids::uuid connection_id; + if (m_block_queue.has_next_span(m_core.get_current_blockchain_height(), filled, time, connection_id) && filled) + { + LOG_DEBUG_CC(context, "No other thread is adding blocks, and next span needed is ready, resuming"); + MLOG_PEER_STATE("resuming"); + context.m_state = cryptonote_connection_context::state_standby; + ++context.m_callback_request_count; + m_p2p->request_callback(context); + return true; + } + else + { + sync.unlock(); - if (should_download_next_span(context)) - { - MDEBUG(context << " we should try for that next span too, we think we could get it faster, resuming"); - force_next_span = true; - break; + // if this has gone on for too long, drop incoming connection to guard against some wedge state + if (!context.m_is_income) + { + const uint64_t now = tools::get_tick_count(); + const uint64_t dt = now - m_last_add_end_time; + if (tools::ticks_to_ns(dt) >= DROP_ON_SYNC_WEDGE_THRESHOLD) + { + MDEBUG(context << "Block addition seems to have wedged, dropping connection"); + return false; + } + } + } } if (context.m_state != cryptonote_connection_context::state_standby) { - LOG_DEBUG_CC(context, "Block queue is " << nblocks << " and " << size << ", pausing"); + if (!queue_proceed) + LOG_DEBUG_CC(context, "Block queue is " << nspans << " and " << size << ", pausing"); + else if (!stripe_proceed_main && !stripe_proceed_secondary) + LOG_DEBUG_CC(context, "We do not have the stripe required to download another block, pausing"); context.m_state = cryptonote_connection_context::state_standby; - } - - // this needs doing after we went to standby, so the callback knows what to do - bool filled; - if (m_block_queue.has_next_span(context.m_connection_id, filled) && !filled) - { - MDEBUG(context << " we have the next span, and it is scheduled, resuming"); - ++context.m_callback_request_count; - m_p2p->request_callback(context); - return true; + MLOG_PEER_STATE("pausing"); } return true; @@ -1405,7 +1706,9 @@ skip: context.m_state = cryptonote_connection_context::state_synchronizing; } - MDEBUG(context << " request_missing_objects: check " << check_having_blocks << ", force_next_span " << force_next_span << ", m_needed_objects " << context.m_needed_objects.size() << " lrh " << context.m_last_response_height << ", chain " << m_core.get_current_blockchain_height()); + MDEBUG(context << " request_missing_objects: check " << check_having_blocks << ", force_next_span " << force_next_span + << ", m_needed_objects " << context.m_needed_objects.size() << " lrh " << context.m_last_response_height << ", chain " + << m_core.get_current_blockchain_height() << ", pruning seed " << epee::string_tools::to_string_hex(context.m_pruning_seed)); if(context.m_needed_objects.size() || force_next_span) { //we know objects that we need, request this objects @@ -1414,28 +1717,6 @@ skip: size_t count = 0; const size_t count_limit = m_core.get_block_sync_size(m_core.get_current_blockchain_height()); std::pair<uint64_t, uint64_t> span = std::make_pair(0, 0); - { - MDEBUG(context << " checking for gap"); - span = m_block_queue.get_start_gap_span(); - if (span.second > 0) - { - const uint64_t first_block_height_known = context.m_last_response_height - context.m_needed_objects.size() + 1; - const uint64_t last_block_height_known = context.m_last_response_height; - const uint64_t first_block_height_needed = span.first; - const uint64_t last_block_height_needed = span.first + std::min(span.second, (uint64_t)count_limit) - 1; - MDEBUG(context << " gap found, span: " << span.first << " - " << span.first + span.second - 1 << " (" << last_block_height_needed << ")"); - MDEBUG(context << " current known hashes from " << first_block_height_known << " to " << last_block_height_known); - if (first_block_height_needed < first_block_height_known || last_block_height_needed > last_block_height_known) - { - MDEBUG(context << " we are missing some of the necessary hashes for this gap, requesting chain again"); - context.m_needed_objects.clear(); - context.m_last_response_height = 0; - start_from_current_chain = true; - goto skip; - } - MDEBUG(context << " we have the hashes for this gap"); - } - } if (force_next_span) { if (span.second == 0) @@ -1452,6 +1733,7 @@ skip: req.blocks.push_back(hash); context.m_requested_objects.insert(hash); } + m_block_queue.reset_next_span_time(); } } } @@ -1465,22 +1747,20 @@ skip: context.m_last_response_height = 0; goto skip; } - // take out blocks we already have - size_t skip = 0; - while (skip < context.m_needed_objects.size() && m_core.have_block(context.m_needed_objects[skip])) - { - // if we're popping the last hash, record it so we can ask again from that hash, - // this prevents never being able to progress on peers we get old hash lists from - if (skip + 1 == context.m_needed_objects.size()) - context.m_last_known_hash = context.m_needed_objects[skip]; - ++skip; - } - if (skip > 0) - context.m_needed_objects = std::vector<crypto::hash>(context.m_needed_objects.begin() + skip, context.m_needed_objects.end()); + skip_unneeded_hashes(context, false); const uint64_t first_block_height = context.m_last_response_height - context.m_needed_objects.size() + 1; - span = m_block_queue.reserve_span(first_block_height, context.m_last_response_height, count_limit, context.m_connection_id, context.m_needed_objects); + span = m_block_queue.reserve_span(first_block_height, context.m_last_response_height, count_limit, context.m_connection_id, context.m_pruning_seed, context.m_remote_blockchain_height, context.m_needed_objects); MDEBUG(context << " span from " << first_block_height << ": " << span.first << "/" << span.second); + if (span.second > 0) + { + const uint32_t stripe = tools::get_pruning_stripe(span.first, context.m_remote_blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); + if (context.m_pruning_seed && stripe != tools::get_pruning_stripe(context.m_pruning_seed)) + { + MDEBUG(context << " starting early on next seed (" << span.first << " with stripe " << stripe << + ", context seed " << epee::string_tools::to_string_hex(context.m_pruning_seed) << ")"); + } + } } if (span.second == 0 && !force_next_span) { @@ -1489,6 +1769,8 @@ skip: boost::uuids::uuid span_connection_id; boost::posix_time::ptime time; span = m_block_queue.get_next_span_if_scheduled(hashes, span_connection_id, time); + if (span.second > 0 && !tools::has_unpruned_block(span.first, context.m_remote_blockchain_height, context.m_pruning_seed)) + span = std::make_pair(0, 0); if (span.second > 0) { is_next = true; @@ -1534,17 +1816,67 @@ skip: } context.m_last_request_time = boost::posix_time::microsec_clock::universal_time(); - LOG_PRINT_CCONTEXT_L1("-->>NOTIFY_REQUEST_GET_OBJECTS: blocks.size()=" << req.blocks.size() << ", txs.size()=" << req.txs.size() + MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_GET_OBJECTS: blocks.size()=" << req.blocks.size() << ", txs.size()=" << req.txs.size() << "requested blocks count=" << count << " / " << count_limit << " from " << span.first << ", first hash " << req.blocks.front()); //epee::net_utils::network_throttle_manager::get_global_throttle_inreq().logger_handle_net("log/dr-monero/net/req-all.data", sec, get_avg_block_size()); post_notify<NOTIFY_REQUEST_GET_OBJECTS>(req, context); + MLOG_PEER_STATE("requesting objects"); + return true; + } + + // if we're still around, we might be at a point where the peer is pruned, so we could either + // drop it to make space for other peers, or ask for a span further down the line + const uint32_t next_stripe = get_next_needed_pruning_stripe().first; + const uint32_t peer_stripe = tools::get_pruning_stripe(context.m_pruning_seed); + if (next_stripe && peer_stripe && next_stripe != peer_stripe) + { + // at this point, we have to either close the connection, or start getting blocks past the + // current point, or become dormant + MDEBUG(context << "this peer is pruned at seed " << epee::string_tools::to_string_hex(context.m_pruning_seed) << + ", next stripe needed is " << next_stripe); + if (!context.m_is_income) + { + if (should_drop_connection(context, next_stripe)) + { + m_p2p->add_used_stripe_peer(context); + return false; // drop outgoing connections + } + } + // we'll get back stuck waiting for the go ahead + context.m_state = cryptonote_connection_context::state_normal; + MLOG_PEER_STATE("Nothing to do for now, switching to normal state"); return true; } } skip: context.m_needed_objects.clear(); + + // we might have been called from the "received chain entry" handler, and end up + // here because we can't use any of those blocks (maybe because all of them are + // actually already requested). In this case, if we can add blocks instead, do so + if (m_core.get_current_blockchain_height() < m_core.get_target_blockchain_height()) + { + const boost::unique_lock<boost::mutex> sync{m_sync_lock, boost::try_to_lock}; + if (sync.owns_lock()) + { + uint64_t start_height; + std::vector<cryptonote::block_complete_entry> blocks; + boost::uuids::uuid span_connection_id; + bool filled = false; + if (m_block_queue.get_next_span(start_height, blocks, span_connection_id, filled) && filled) + { + LOG_DEBUG_CC(context, "No other thread is adding blocks, resuming"); + MLOG_PEER_STATE("will try to add blocks next"); + context.m_state = cryptonote_connection_context::state_standby; + ++context.m_callback_request_count; + m_p2p->request_callback(context); + return true; + } + } + } + if(context.m_last_response_height < context.m_remote_blockchain_height-1) {//we have to fetch more objects ids, request blockchain entry @@ -1567,8 +1899,9 @@ skip: //LOG_PRINT_CCONTEXT_L1("r = " << 200); context.m_last_request_time = boost::posix_time::microsec_clock::universal_time(); - LOG_PRINT_CCONTEXT_L1("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() << ", start_from_current_chain " << start_from_current_chain); + MLOG_P2P_MESSAGE("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() << ", start_from_current_chain " << start_from_current_chain); post_notify<NOTIFY_REQUEST_CHAIN>(r, context); + MLOG_PEER_STATE("requesting chain"); }else { CHECK_AND_ASSERT_MES(context.m_last_response_height == context.m_remote_blockchain_height-1 @@ -1608,9 +1941,25 @@ skip: << ENDL << "Use the \"help\" command to see the list of available commands." << ENDL << "**********************************************************************"); + m_sync_timer.pause(); + if (ELPP->vRegistry()->allowed(el::Level::Info, "sync-info")) + { + const uint64_t sync_time = m_sync_timer.value(); + const uint64_t add_time = m_add_timer.value(); + if (sync_time && add_time) + { + MCLOG_YELLOW(el::Level::Info, "sync-info", "Sync time: " << sync_time/1e9/60 << " min, idle time " << + (100.f * (1.0f - add_time / (float)sync_time)) << "%" << ", " << + (10 * m_sync_download_objects_size / 1024 / 1024) / 10.f << " + " << + (10 * m_sync_download_chain_size / 1024 / 1024) / 10.f << " MB downloaded, " << + 100.0f * m_sync_old_spans_downloaded / m_sync_spans_downloaded << "% old spans, " << + 100.0f * m_sync_bad_spans_downloaded / m_sync_spans_downloaded << "% bad spans"); + } + } m_core.on_synchronized(); } m_core.safesyncmode(true); + m_p2p->clear_used_stripe_peers(); return true; } //------------------------------------------------------------------------------------------------------------------------ @@ -1631,6 +1980,11 @@ skip: { MLOG_P2P_MESSAGE("Received NOTIFY_RESPONSE_CHAIN_ENTRY: m_block_ids.size()=" << arg.m_block_ids.size() << ", m_start_height=" << arg.start_height << ", m_total_height=" << arg.total_height); + MLOG_PEER_STATE("received chain"); + + context.m_last_request_time = boost::date_time::not_a_date_time; + + m_sync_download_chain_size += arg.m_block_ids.size() * sizeof(crypto::hash); if(!arg.m_block_ids.size()) { @@ -1644,7 +1998,14 @@ skip: drop_connection(context, true, false); return 1; } + MDEBUG(context << "first block hash " << arg.m_block_ids.front() << ", last " << arg.m_block_ids.back()); + if (arg.total_height >= CRYPTONOTE_MAX_BLOCK_NUMBER || arg.m_block_ids.size() >= CRYPTONOTE_MAX_BLOCK_NUMBER) + { + LOG_ERROR_CCONTEXT("sent wrong NOTIFY_RESPONSE_CHAIN_ENTRY, with total_height=" << arg.total_height << " and block_ids=" << arg.m_block_ids.size()); + drop_connection(context, false, false); + return 1; + } context.m_remote_blockchain_height = arg.total_height; context.m_last_response_height = arg.start_height + arg.m_block_ids.size()-1; if(context.m_last_response_height > context.m_remote_blockchain_height) @@ -1664,6 +2025,7 @@ skip: return 1; } + context.m_needed_objects.clear(); uint64_t added = 0; for(auto& bl_id: arg.m_block_ids) { @@ -1773,10 +2135,85 @@ skip: } //------------------------------------------------------------------------------------------------------------------------ template<class t_core> + std::string t_cryptonote_protocol_handler<t_core>::get_peers_overview() const + { + std::stringstream ss; + const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time(); + m_p2p->for_each_connection([&](const connection_context &ctx, nodetool::peerid_type peer_id, uint32_t support_flags) { + const uint32_t stripe = tools::get_pruning_stripe(ctx.m_pruning_seed); + char state_char = cryptonote::get_protocol_state_char(ctx.m_state); + ss << stripe + state_char; + if (ctx.m_last_request_time != boost::date_time::not_a_date_time) + ss << (((now - ctx.m_last_request_time).total_microseconds() > IDLE_PEER_KICK_TIME) ? "!" : "?"); + ss << + " "; + return true; + }); + return ss.str(); + } + //------------------------------------------------------------------------------------------------------------------------ + template<class t_core> + std::pair<uint32_t, uint32_t> t_cryptonote_protocol_handler<t_core>::get_next_needed_pruning_stripe() const + { + const uint64_t want_height_from_blockchain = m_core.get_current_blockchain_height(); + const uint64_t want_height_from_block_queue = m_block_queue.get_next_needed_height(want_height_from_blockchain); + const uint64_t want_height = std::max(want_height_from_blockchain, want_height_from_block_queue); + uint64_t blockchain_height = m_core.get_target_blockchain_height(); + // if we don't know the remote chain size yet, assume infinitely large so we get the right stripe if we're not near the tip + if (blockchain_height == 0) + blockchain_height = CRYPTONOTE_MAX_BLOCK_NUMBER; + const uint32_t next_pruning_stripe = tools::get_pruning_stripe(want_height, blockchain_height, CRYPTONOTE_PRUNING_LOG_STRIPES); + if (next_pruning_stripe == 0) + return std::make_pair(0, 0); + // if we already have a few peers on this stripe, but none on next one, try next one + unsigned int n_next = 0, n_subsequent = 0, n_others = 0; + const uint32_t subsequent_pruning_stripe = 1 + next_pruning_stripe % (1<<CRYPTONOTE_PRUNING_LOG_STRIPES); + m_p2p->for_each_connection([&](const connection_context &context, nodetool::peerid_type peer_id, uint32_t support_flags) { + if (context.m_state >= cryptonote_connection_context::state_synchronizing) + { + if (context.m_pruning_seed == 0 || tools::get_pruning_stripe(context.m_pruning_seed) == next_pruning_stripe) + ++n_next; + else if (tools::get_pruning_stripe(context.m_pruning_seed) == subsequent_pruning_stripe) + ++n_subsequent; + else + ++n_others; + } + return true; + }); + const bool use_next = (n_next > m_max_out_peers / 2 && n_subsequent <= 1) || (n_next > 2 && n_subsequent == 0); + const uint32_t ret_stripe = use_next ? subsequent_pruning_stripe: next_pruning_stripe; + MIDEBUG(const std::string po = get_peers_overview(), "get_next_needed_pruning_stripe: want height " << want_height << " (" << + want_height_from_blockchain << " from blockchain, " << want_height_from_block_queue << " from block queue), stripe " << + next_pruning_stripe << " (" << n_next << "/" << m_max_out_peers << " on it and " << n_subsequent << " on " << + subsequent_pruning_stripe << ", " << n_others << " others) -> " << ret_stripe << " (+" << + (ret_stripe - next_pruning_stripe + (1 << CRYPTONOTE_PRUNING_LOG_STRIPES)) % (1 << CRYPTONOTE_PRUNING_LOG_STRIPES) << + "), current peers " << po); + return std::make_pair(next_pruning_stripe, ret_stripe); + } + //------------------------------------------------------------------------------------------------------------------------ + template<class t_core> + bool t_cryptonote_protocol_handler<t_core>::needs_new_sync_connections() const + { + const uint64_t target = m_core.get_target_blockchain_height(); + const uint64_t height = m_core.get_current_blockchain_height(); + if (target && target <= height) + return false; + size_t n_out_peers = 0; + m_p2p->for_each_connection([&](cryptonote_connection_context& ctx, nodetool::peerid_type peer_id, uint32_t support_flags)->bool{ + if (!ctx.m_is_income) + ++n_out_peers; + return true; + }); + if (n_out_peers >= m_max_out_peers) + return false; + return true; + } + //------------------------------------------------------------------------------------------------------------------------ + template<class t_core> void t_cryptonote_protocol_handler<t_core>::drop_connection(cryptonote_connection_context &context, bool add_fail, bool flush_all_spans) { - LOG_DEBUG_CC(context, "dropping connection id " << context.m_connection_id << - ", add_fail " << add_fail << ", flush_all_spans " << flush_all_spans); + LOG_DEBUG_CC(context, "dropping connection id " << context.m_connection_id << " (pruning seed " << + epee::string_tools::to_string_hex(context.m_pruning_seed) << + "), add_fail " << add_fail << ", flush_all_spans " << flush_all_spans); if (add_fail) m_p2p->add_host_fail(context.m_remote_address); @@ -1803,6 +2240,7 @@ skip: } m_block_queue.flush_spans(context.m_connection_id, false); + MLOG_PEER_STATE("closed"); } //------------------------------------------------------------------------------------------------------------------------ |