diff options
Diffstat (limited to '')
-rw-r--r-- | src/cryptonote_core/blockchain_storage.cpp | 18 | ||||
-rw-r--r-- | src/cryptonote_core/blockchain_storage.h | 1 | ||||
-rw-r--r-- | src/cryptonote_core/cryptonote_core.cpp | 31 | ||||
-rw-r--r-- | src/cryptonote_core/cryptonote_core.h | 6 | ||||
-rw-r--r-- | src/cryptonote_protocol/cryptonote_protocol_handler-base.cpp | 94 | ||||
-rw-r--r-- | src/cryptonote_protocol/cryptonote_protocol_handler.h | 10 | ||||
-rw-r--r-- | src/cryptonote_protocol/cryptonote_protocol_handler.inl | 148 | ||||
-rw-r--r-- | src/daemon/daemon.cpp | 36 | ||||
-rw-r--r-- | src/daemon/daemon_commands_handler.h | 54 | ||||
-rw-r--r-- | src/p2p/connection_basic.cpp | 167 | ||||
-rw-r--r-- | src/p2p/connection_basic.hpp | 11 | ||||
-rw-r--r-- | src/p2p/data_logger.cpp | 81 | ||||
-rw-r--r-- | src/p2p/data_logger.hpp | 46 | ||||
-rw-r--r-- | src/p2p/net_node.h | 12 | ||||
-rw-r--r-- | src/p2p/net_node.inl | 66 | ||||
-rw-r--r-- | src/p2p/network_throttle-detail.cpp | 75 | ||||
-rw-r--r-- | src/p2p/network_throttle-detail.hpp | 10 | ||||
-rw-r--r-- | src/p2p/network_throttle.hpp | 8 | ||||
-rw-r--r-- | src/serialization/binary_archive.h | 4 |
19 files changed, 560 insertions, 318 deletions
diff --git a/src/cryptonote_core/blockchain_storage.cpp b/src/cryptonote_core/blockchain_storage.cpp index 78419121f..fe80d75ac 100644 --- a/src/cryptonote_core/blockchain_storage.cpp +++ b/src/cryptonote_core/blockchain_storage.cpp @@ -178,6 +178,22 @@ bool blockchain_storage::store_genesis_block(bool testnet) { return true; } //------------------------------------------------------------------ +void blockchain_storage::logger_handle(long int ms) +{ + std::ofstream log_file; + log_file.open("log/dr-monero/blockchain_log.data", std::ofstream::out | std::ofstream::app); + log_file.precision(7); + + using namespace boost::chrono; + auto point = steady_clock::now(); + auto time_from_epoh = point.time_since_epoch(); + auto m_ms = duration_cast< milliseconds >( time_from_epoh ).count(); + double ms_f = m_ms; + ms_f /= 1000.; + + log_file << ms_f << " " << ms << std::endl; +} +//------------------------------------------------------------------ bool blockchain_storage::store_blockchain() { m_is_blockchain_storing = true; @@ -1760,6 +1776,8 @@ bool blockchain_storage::handle_block_to_main_chain(const block& bl, const crypt << "), coinbase_blob_size: " << coinbase_blob_size << ", cumulative size: " << cumulative_block_size << ", " << block_processing_time << "("<< target_calculating_time << "/" << longhash_calculating_time << ")ms"); + logger_handle(block_processing_time); + bvc.m_added_to_main_chain = true; /*if(!m_orphanes_reorganize_in_work) review_orphaned_blocks_with_new_block_id(id, true);*/ diff --git a/src/cryptonote_core/blockchain_storage.h b/src/cryptonote_core/blockchain_storage.h index 38bdfbce7..6456689b9 100644 --- a/src/cryptonote_core/blockchain_storage.h +++ b/src/cryptonote_core/blockchain_storage.h @@ -249,6 +249,7 @@ namespace cryptonote bool complete_timestamps_vector(uint64_t start_height, std::vector<uint64_t>& timestamps); bool update_next_comulative_size_limit(); bool store_genesis_block(bool testnet); + void logger_handle(long int ms); }; diff --git a/src/cryptonote_core/cryptonote_core.cpp b/src/cryptonote_core/cryptonote_core.cpp index b8b5dc008..c8daa3510 100644 --- a/src/cryptonote_core/cryptonote_core.cpp +++ b/src/cryptonote_core/cryptonote_core.cpp @@ -187,12 +187,35 @@ namespace cryptonote //----------------------------------------------------------------------------------------------- bool core::deinit() { - m_miner.stop(); - m_mempool.deinit(); - m_blockchain_storage.deinit(); + m_miner.stop(); + m_mempool.deinit(); + if (!m_fast_exit) + { + m_blockchain_storage.deinit(); + } return true; } //----------------------------------------------------------------------------------------------- + void core::set_fast_exit() + { + m_fast_exit = true; + } + //----------------------------------------------------------------------------------------------- + bool core::get_fast_exit() + { + return m_fast_exit; + } + //----------------------------------------------------------------------------------------------- + void core::no_check_blocks() + { + m_check_blocks = false; + } + //----------------------------------------------------------------------------------------------- + bool core::get_check_blocks() + { + return m_check_blocks; + } + //----------------------------------------------------------------------------------------------- bool core::handle_incoming_tx(const blobdata& tx_blob, tx_verification_context& tvc, bool keeped_by_block) { tvc = boost::value_initialized<tx_verification_context>(); @@ -595,4 +618,6 @@ namespace cryptonote { raise(SIGTERM); } + + std::atomic<bool> core::m_fast_exit(false); } diff --git a/src/cryptonote_core/cryptonote_core.h b/src/cryptonote_core/cryptonote_core.h index 748f2b665..9218aef0f 100644 --- a/src/cryptonote_core/cryptonote_core.h +++ b/src/cryptonote_core/cryptonote_core.h @@ -75,6 +75,10 @@ namespace cryptonote bool init(const boost::program_options::variables_map& vm, bool testnet); bool set_genesis_block(const block& b); bool deinit(); + static void set_fast_exit(); + static bool get_fast_exit(); + void no_check_blocks(); + bool get_check_blocks(); uint64_t get_current_blockchain_height(); bool get_blockchain_top(uint64_t& heeight, crypto::hash& top_id); bool get_blocks(uint64_t start_offset, size_t count, std::list<block>& blocks, std::list<transaction>& txs); @@ -146,6 +150,8 @@ namespace cryptonote bool on_update_blocktemplate_interval(); bool check_tx_inputs_keyimages_diff(const transaction& tx); void graceful_exit(); + static std::atomic<bool> m_fast_exit; + bool m_check_blocks = true; tx_memory_pool m_mempool; diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler-base.cpp b/src/cryptonote_protocol/cryptonote_protocol_handler-base.cpp index 6b25cb681..b5a5ceea9 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler-base.cpp +++ b/src/cryptonote_protocol/cryptonote_protocol_handler-base.cpp @@ -104,11 +104,11 @@ namespace cryptonote { double cryptonote_protocol_handler_base::estimate_one_block_size() noexcept { // for estimating size of blocks to downloa const double size_min = 500; // XXX 500 - const int history_len = 20; // how many blocks to average over + //const int history_len = 20; // how many blocks to average over double avg=0; try { - avg = get_avg_block_size(history_len); + avg = get_avg_block_size(/*history_len*/); } catch (...) { } avg = std::max( size_min , avg); return avg; @@ -120,96 +120,6 @@ cryptonote_protocol_handler_base::cryptonote_protocol_handler_base() { cryptonote_protocol_handler_base::~cryptonote_protocol_handler_base() { } -void cryptonote_protocol_handler_base::handler_request_blocks_now(size_t &count_limit) { - using namespace epee::net_utils; - size_t est_req_size=0; // how much data are we now requesting (to be soon send to us) - - const auto count_limit_default = count_limit; - - bool allowed_now = false; // are we now allowed to request or are we limited still - // long int size_limit; - - while (!allowed_now) { - /* if ( ::cryptonote::core::get_is_stopping() ) { // TODO fast exit - _fact("ABORT sleep (before sending requeset) due to stopping"); - break; - }*/ - - //LOG_PRINT_RED("[DBG]" << get_avg_block_size(1), LOG_LEVEL_0); - //{ - long int size_limit1=0, size_limit2=0; - //LOG_PRINT_RED("calculating REQUEST size:", LOG_LEVEL_0); - { - CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_in ); - network_throttle_manager::get_global_throttle_in().tick(); - size_limit1 = network_throttle_manager::get_global_throttle_in().get_recommended_size_of_planned_transport(); - } - { - CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_inreq ); - network_throttle_manager::get_global_throttle_inreq().tick(); - size_limit2 = network_throttle_manager::get_global_throttle_inreq().get_recommended_size_of_planned_transport(); - } - - long int one_block_estimated_size = estimate_one_block_size(); - long int limit_small = std::min( size_limit1 , size_limit2 ); - long int size_limit = limit_small/3 + size_limit1/3 + size_limit2/3; - if (limit_small <= 0) size_limit = 0; - const double estimated_peers = 1.2; // how many peers/threads we want to talk to, in order to not grab entire b/w by 1 thread - const double knob = 1.000; - size_limit /= (estimated_peers / estimated_peers) * knob; - _note_c("net/req-calc" , "calculating REQUEST size:" << size_limit1 << " " << size_limit2 << " small=" << limit_small << " final size_limit="<<size_limit); - - double L = size_limit / one_block_estimated_size; // calculating item limit (some heuristics) - //LOG_PRINT_RED("L1 = " << L , LOG_LEVEL_0); - //double L2=0; if (L>1) L2=std::log(L); - //L = L/10. + L2*5; - //LOG_PRINT_RED("L2 = " << L , LOG_LEVEL_0); - L = std::min( (double)count_limit_default, (double)L); - //LOG_PRINT_RED("L3 = " << L , LOG_LEVEL_0); - - const long int hard_limit = 500; // never get more blocks at once ; TODO depend on speed limit. Must be low or limiting is too bursty. - - L = std::min(L, (double) hard_limit); - - count_limit = (int)L; - - est_req_size = count_limit * one_block_estimated_size ; // how much data did we just requested? - - //LOG_PRINT_RED("est_req_size = " << est_req_size , LOG_LEVEL_0); - //LOG_PRINT_RED("count_limit = " << count_limit , LOG_LEVEL_0); - //LOG_PRINT_RED("one_block_estimated_size = " << one_block_estimated_size , LOG_LEVEL_0); - //} - - if (count_limit > 0) allowed_now = true; - // XXX if (!allowed_now) { // XXX DOWNLOAD - //long int ms = 3000; // XXX 2000 - //LOG_PRINT_RED("size_limit = " << size_limit , LOG_LEVEL_0); - long int ms = network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick(one_block_estimated_size); // XXX too long - //long int ms = network_throttle_manager::get_global_throttle_in().get_sleep_time(count_limit); // XXX - //long int ms = network_throttle_manager::get_global_throttle_in().get_sleep_time(size_limit); // XXX best - - //ms /= 100; // XXX - _info_c("net/sleep", "Sleeping in " << __FUNCTION__ << " for " << ms << " ms"); // XXX debug sleep - //LOG_PRINT_RED("ms = " << ms , LOG_LEVEL_0); - boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); // TODO randomize sleeps - //} - } - // done waiting&sleeping ^ - - // ok we are allowed to send now - { - CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_inreq ); - network_throttle_manager::get_global_throttle_inreq().handle_trafic_tcp( est_req_size ); // increase countere of the global requested input - } - - // TODO remove debug - LOG_PRINT_YELLOW("*************************************************************************", LOG_LEVEL_0); - LOG_PRINT_RED("### RRRR ### sending request (type 1), CALCULATED limit = " << count_limit << " = estimated " << est_req_size << " b", LOG_LEVEL_0); - LOG_PRINT_YELLOW("*************************************************************************", LOG_LEVEL_0); - LOG_PRINT_RED("\n", LOG_LEVEL_0); - _note_c("net/req", "### RRRR ### sending request (type 1), CALCULATED limit = " << count_limit << " = estimated " << est_req_size << " b"); -} - void cryptonote_protocol_handler_base::handler_request_blocks_history(std::list<crypto::hash>& ids) { using namespace epee::net_utils; LOG_PRINT_L0("### ~~~RRRR~~~~ ### sending request (type 2), limit = " << ids.size()); diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.h b/src/cryptonote_protocol/cryptonote_protocol_handler.h index b3393928c..0be864dfa 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.h +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.h @@ -46,6 +46,7 @@ #include "cryptonote_core/cryptonote_stat_info.h" #include "cryptonote_core/verification_context.h" #include <netinet/in.h> +#include <boost/circular_buffer.hpp> PUSH_WARNINGS DISABLE_VS_WARNINGS(4355) @@ -61,11 +62,10 @@ namespace cryptonote public: cryptonote_protocol_handler_base(); virtual ~cryptonote_protocol_handler_base(); - void handler_request_blocks_now(size_t & count_limit); // before asking for blocks, can adjust the limit of download void handler_request_blocks_history(std::list<crypto::hash>& ids); // before asking for list of objects, we can change the list still void handler_response_blocks_now(size_t packet_size); - virtual double get_avg_block_size( size_t count) const = 0; + virtual double get_avg_block_size() = 0; virtual double estimate_one_block_size() noexcept; // for estimating size of blocks to download virtual std::ofstream& get_logreq() const =0; @@ -129,10 +129,12 @@ namespace cryptonote nodetool::i_p2p_endpoint<connection_context>* m_p2p; std::atomic<uint32_t> m_syncronized_connections_count; std::atomic<bool> m_synchronized; + bool m_one_request = true; // static std::ofstream m_logreq; - - double get_avg_block_size(size_t count) const; + std::mutex m_buffer_mutex; + double get_avg_block_size(); + boost::circular_buffer<size_t> m_avg_buffer = boost::circular_buffer<size_t>(10); template<class t_parametr> bool post_notify(typename t_parametr::request& arg, cryptonote_connection_context& context) diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.inl b/src/cryptonote_protocol/cryptonote_protocol_handler.inl index aebfcd10d..0130cb384 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.inl +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.inl @@ -41,6 +41,7 @@ #include "cryptonote_core/cryptonote_format_utils.h" #include "profile_tools.h" #include "../../contrib/otshell_utils/utils.hpp" +#include "../../src/p2p/network_throttle-detail.hpp" using namespace nOT::nUtils; namespace cryptonote @@ -115,28 +116,66 @@ namespace cryptonote void t_cryptonote_protocol_handler<t_core>::log_connections() { std::stringstream ss; + ss.precision(1); + + double down_sum = 0.0; + double down_curr_sum = 0.0; + double up_sum = 0.0; + double up_curr_sum = 0.0; ss << std::setw(30) << std::left << "Remote Host" << std::setw(20) << "Peer id" - << std::setw(25) << "Recv/Sent (inactive,sec)" + << std::setw(30) << "Recv/Sent (inactive,sec)" << std::setw(25) << "State" - << std::setw(20) << "Livetime(seconds)" << ENDL; + << std::setw(20) << "Livetime(sec)" + << std::setw(12) << "Down (kB/s)" + << std::setw(14) << "Down(now)" + << std::setw(10) << "Up (kB/s)" + << std::setw(13) << "Up(now)" + << ENDL; uint32_t ip; m_p2p->for_each_connection([&](const connection_context& cntxt, nodetool::peerid_type peer_id) { + bool local_ip = false; ip = ntohl(cntxt.m_remote_ip); + // TODO: local ip in calss A, B + if (ip > 3232235520 && ip < 3232301055) // 192.168.x.x + local_ip = true; + auto connection_time = time(NULL) - cntxt.m_started; ss << std::setw(30) << std::left << std::string(cntxt.m_is_income ? " [INC]":"[OUT]") + epee::string_tools::get_ip_string_from_int32(cntxt.m_remote_ip) + ":" + std::to_string(cntxt.m_remote_port) << std::setw(20) << std::hex << peer_id - << std::setw(25) << std::to_string(cntxt.m_recv_cnt)+ "(" + std::to_string(time(NULL) - cntxt.m_last_recv) + ")" + "/" + std::to_string(cntxt.m_send_cnt) + "(" + std::to_string(time(NULL) - cntxt.m_last_send) + ")" + << std::setw(30) << std::to_string(cntxt.m_recv_cnt)+ "(" + std::to_string(time(NULL) - cntxt.m_last_recv) + ")" + "/" + std::to_string(cntxt.m_send_cnt) + "(" + std::to_string(time(NULL) - cntxt.m_last_send) + ")" << std::setw(25) << get_protocol_state_string(cntxt.m_state) << std::setw(20) << std::to_string(time(NULL) - cntxt.m_started) - << std::setw(10) << (ip > 3232235520 && ip < 3232301055 ? " [LAN]" : "") //TODO: local ip in calss A, B - << ENDL; + << std::setw(12) << std::fixed << (connection_time == 0 ? 0.0 : cntxt.m_recv_cnt / connection_time / 1024) + << std::setw(14) << std::fixed << cntxt.m_current_speed_down / 1024 + << std::setw(10) << std::fixed << (connection_time == 0 ? 0.0 : cntxt.m_send_cnt / connection_time / 1024) + << std::setw(13) << std::fixed << cntxt.m_current_speed_up / 1024 + << (local_ip ? "[LAN]" : "") + << std::left << (ip == 2130706433 ? "[LOCALHOST]" : "") // 127.0.0.1 + << ENDL; + + if (connection_time > 0) + { + down_sum += (cntxt.m_recv_cnt / connection_time / 1024); + up_sum += (cntxt.m_send_cnt / connection_time / 1024); + } + + down_curr_sum += (cntxt.m_current_speed_down / 1024); + up_curr_sum += (cntxt.m_current_speed_up / 1024); + return true; }); - LOG_PRINT_L0("Connections: " << ENDL << ss.str()); + ss << ENDL + << std::setw(125) << " " + << std::setw(12) << down_sum + << std::setw(14) << down_curr_sum + << std::setw(10) << up_sum + << std::setw(13) << up_curr_sum + << ENDL; + LOG_PRINT_L0("Connections: " << ENDL << ss.str()); } //------------------------------------------------------------------------------------------------------------------------ // Returns a list of connection_info objects describing each open p2p connection @@ -332,8 +371,22 @@ namespace cryptonote template<class t_core> - double t_cryptonote_protocol_handler<t_core>::get_avg_block_size( size_t count) const { - return m_core.get_blockchain_storage().get_avg_block_size(count); + double t_cryptonote_protocol_handler<t_core>::get_avg_block_size() { + // return m_core.get_blockchain_storage().get_avg_block_size(count); // this does not count too well the actuall network-size of data we need to download + + CRITICAL_REGION_LOCAL(m_buffer_mutex); + double avg = 0; + if (m_avg_buffer.size() == 0) { + _warn("m_avg_buffer.size() == 0"); + return 500; + } + + const bool dbg_poke_lock = 0; // debug: try to trigger an error by poking around with locks. TODO: configure option + long int dbg_repeat=0; + do { + for (auto element : m_avg_buffer) avg += element; + } while(dbg_poke_lock && (dbg_repeat++)<100000); // in debug/poke mode, repeat this calculation to trigger hidden locking error if there is one + return avg / m_avg_buffer.size(); } @@ -341,6 +394,41 @@ namespace cryptonote int t_cryptonote_protocol_handler<t_core>::handle_response_get_objects(int command, NOTIFY_RESPONSE_GET_OBJECTS::request& arg, cryptonote_connection_context& context) { LOG_PRINT_CCONTEXT_L2("NOTIFY_RESPONSE_GET_OBJECTS"); + + // calculate size of request - mainly for logging/debug + size_t size = 0; + for (auto element : arg.txs) + size += element.size(); + + for (auto element : arg.blocks) + { + size += element.block.size(); + for (auto tx : element.txs) + size += tx.size(); + } + + for (auto element : arg.missed_ids) + size += sizeof(element.data); + + size += sizeof(arg.current_blockchain_height); + { + CRITICAL_REGION_LOCAL(m_buffer_mutex); + m_avg_buffer.push_back(size); + + const bool dbg_poke_lock = 0; // debug: try to trigger an error by poking around with locks. TODO: configure option + long int dbg_repeat=0; + do { + m_avg_buffer.push_back(666); // a test value + m_avg_buffer.erase_end(1); + } while(dbg_poke_lock && (dbg_repeat++)<100000); // in debug/poke mode, repeat this calculation to trigger hidden locking error if there is one + } + /*using namespace boost::chrono; + auto point = steady_clock::now(); + auto time_from_epoh = point.time_since_epoch(); + auto sec = duration_cast< seconds >( time_from_epoh ).count();*/ + + //epee::net_utils::network_throttle_manager::get_global_throttle_inreq().logger_handle_net("log/dr-monero/net/req-all.data", sec, get_avg_block_size()); + if(context.m_last_response_height > arg.current_blockchain_height) { LOG_ERROR_CCONTEXT("sent wrong NOTIFY_HAVE_OBJECTS: arg.m_current_blockchain_height=" << arg.current_blockchain_height @@ -430,8 +518,9 @@ namespace cryptonote //process block TIME_MEASURE_START(block_process_time); block_verification_context bvc = boost::value_initialized<block_verification_context>(); - - m_core.handle_incoming_block(block_entry.block, bvc, false); + + if (m_core.get_check_blocks()) + m_core.handle_incoming_block(block_entry.block, bvc, false); if(bvc.m_verifivation_failed) { @@ -448,10 +537,21 @@ namespace cryptonote TIME_MEASURE_FINISH(block_process_time); LOG_PRINT_CCONTEXT_L2("Block process time: " << block_process_time + transactions_process_time << "(" << transactions_process_time << "/" << block_process_time << ")ms"); + + std::ofstream log_file; + log_file.open("log/dr-monero/get_objects_calc_time.data", std::ofstream::out | std::ofstream::app); + log_file.precision(7); + + using namespace boost::chrono; + auto point = steady_clock::now(); + auto time_from_epoh = point.time_since_epoch(); + auto m_ms = duration_cast< milliseconds >( time_from_epoh ).count(); + double ms_f = m_ms; + ms_f /= 1000.; + + log_file << static_cast<int>(ms_f) << " " << block_process_time + transactions_process_time << std::endl; } } - size_t count_limit = BLOCKS_SYNCHRONIZING_DEFAULT_COUNT; - handler_request_blocks_now(count_limit); // XXX request_missing_objects(context, true); return 1; } @@ -480,6 +580,15 @@ namespace cryptonote template<class t_core> bool t_cryptonote_protocol_handler<t_core>::request_missing_objects(cryptonote_connection_context& context, bool check_having_blocks) { + //if (!m_one_request == false) + //return true; + m_one_request = false; + // save request size to log (dr monero) + /*using namespace boost::chrono; + auto point = steady_clock::now(); + auto time_from_epoh = point.time_since_epoch(); + auto sec = duration_cast< seconds >( time_from_epoh ).count();*/ + if(context.m_needed_objects.size()) { //we know objects that we need, request this objects @@ -487,11 +596,8 @@ namespace cryptonote size_t count = 0; auto it = context.m_needed_objects.begin(); - size_t count_limit = BLOCKS_SYNCHRONIZING_DEFAULT_COUNT; - //handler_request_blocks_now( count_limit ); // change the limit, sleep(?) XXX - // XXX - count_limit=200; // XXX - _note_c("net/req-calc" , "Setting count_limit: " << count_limit); + size_t count_limit = BLOCKS_SYNCHRONIZING_DEFAULT_COUNT; + _note_c("net/req-calc" , "Setting count_limit: " << count_limit); while(it != context.m_needed_objects.end() && count < BLOCKS_SYNCHRONIZING_DEFAULT_COUNT) { if( !(check_having_blocks && m_core.have_block(*it))) @@ -504,6 +610,8 @@ namespace cryptonote } LOG_PRINT_CCONTEXT_L0("-->>NOTIFY_REQUEST_GET_OBJECTS: blocks.size()=" << req.blocks.size() << ", txs.size()=" << req.txs.size() << "requested blocks count=" << count << " / " << count_limit); + //epee::net_utils::network_throttle_manager::get_global_throttle_inreq().logger_handle_net("log/dr-monero/net/req-all.data", sec, get_avg_block_size()); + post_notify<NOTIFY_REQUEST_GET_OBJECTS>(req, context); }else if(context.m_last_response_height < context.m_remote_blockchain_height-1) {//we have to fetch more objects ids, request blockchain entry @@ -511,6 +619,12 @@ namespace cryptonote NOTIFY_REQUEST_CHAIN::request r = boost::value_initialized<NOTIFY_REQUEST_CHAIN::request>(); m_core.get_short_chain_history(r.block_ids); handler_request_blocks_history( r.block_ids ); // change the limit(?), sleep(?) + + //std::string blob; // for calculate size of request + //epee::serialization::store_t_to_binary(r, blob); + //epee::net_utils::network_throttle_manager::get_global_throttle_inreq().logger_handle_net("log/dr-monero/net/req-all.data", sec, get_avg_block_size()); + LOG_PRINT_CCONTEXT_L0("r = " << 200); + LOG_PRINT_CCONTEXT_L0("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); post_notify<NOTIFY_REQUEST_CHAIN>(r, context); }else diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 9f2ae3167..27a1129dc 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -69,7 +69,9 @@ namespace , "Run on testnet. The wallet must be launched with --testnet flag." , false }; - const command_line::arg_descriptor<bool> arg_dns_checkpoints = {"enforce-dns-checkpointing", "checkpoints from DNS server will be enforced", false}; + const command_line::arg_descriptor<bool> arg_dns_checkpoints = {"enforce-dns-checkpointing", "checkpoints from DNS server will be enforced", false}; + const command_line::arg_descriptor<bool> arg_test_drop_download = {"test-drop-download", "For network testing, drop downloaded blocks instead checking/adding them to blockchain. Can fake-download blocks very fast."}; + const command_line::arg_descriptor<bool> arg_save_graph = {"save-graph", "Save data for dr monero", false}; } bool command_line_preprocessor(const boost::program_options::variables_map& vm) @@ -99,6 +101,8 @@ bool command_line_preprocessor(const boost::program_options::variables_map& vm) else if (log_space::get_set_log_detalisation_level(false) != new_log_level) { log_space::get_set_log_detalisation_level(true, new_log_level); + int otshell_utils_log_level = 100 - (new_log_level * 25); + gCurrentLogger.setDebugLevel(otshell_utils_log_level); LOG_PRINT_L0("LOG_LEVEL set to " << new_log_level); } @@ -107,7 +111,7 @@ bool command_line_preprocessor(const boost::program_options::variables_map& vm) int main(int argc, char* argv[]) { - + string_tools::set_module_name_and_folder(argv[0]); #ifdef WIN32 _CrtSetDbgFlag ( _CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF ); @@ -137,6 +141,8 @@ int main(int argc, char* argv[]) command_line::add_arg(desc_cmd_sett, arg_console); command_line::add_arg(desc_cmd_sett, arg_testnet_on); command_line::add_arg(desc_cmd_sett, arg_dns_checkpoints); + command_line::add_arg(desc_cmd_sett, arg_test_drop_download); + command_line::add_arg(desc_cmd_sett, arg_save_graph); cryptonote::core::init_options(desc_cmd_sett); cryptonote::core_rpc_server::init_options(desc_cmd_sett); @@ -231,7 +237,17 @@ int main(int argc, char* argv[]) cryptonote::core_rpc_server rpc_server {ccore, p2psrv, testnet_mode}; cprotocol.set_p2p_endpoint(&p2psrv); ccore.set_cryptonote_protocol(&cprotocol); - daemon_cmmands_handler dch(p2psrv, testnet_mode); + std::shared_ptr<daemon_cmmands_handler> dch(new daemon_cmmands_handler(p2psrv, testnet_mode)); + if(command_line::has_arg(vm, arg_save_graph)) + p2psrv.set_save_graph(true); + + //initialize core here + LOG_PRINT_L0("Initializing core..."); + res = ccore.init(vm, testnet_mode); + CHECK_AND_ASSERT_MES(res, 1, "Failed to initialize core"); + if (command_line::get_arg(vm, arg_test_drop_download)) + ccore.no_check_blocks(); + LOG_PRINT_L0("Core initialized OK"); //initialize objects LOG_PRINT_L0("Initializing P2P server..."); @@ -248,17 +264,11 @@ int main(int argc, char* argv[]) res = rpc_server.init(vm); CHECK_AND_ASSERT_MES(res, 1, "Failed to initialize core RPC server."); LOG_PRINT_GREEN("Core RPC server initialized OK on port: " << rpc_server.get_binded_port(), LOG_LEVEL_0); - - //initialize core here - LOG_PRINT_L0("Initializing core..."); - res = ccore.init(vm, testnet_mode); - CHECK_AND_ASSERT_MES(res, 1, "Failed to initialize core"); - LOG_PRINT_L0("Core initialized OK"); - + // start components if(!command_line::has_arg(vm, arg_console)) { - dch.start_handling(); + dch->start_handling(); } LOG_PRINT_L0("Starting core RPC server..."); @@ -267,7 +277,7 @@ int main(int argc, char* argv[]) LOG_PRINT_L0("Core RPC server started ok"); tools::signal_handler::install([&dch, &p2psrv] { - dch.stop_handling(); + dch->stop_handling(); p2psrv.send_stop_signal(); }); @@ -276,6 +286,8 @@ int main(int argc, char* argv[]) LOG_PRINT_L0("P2P net loop stopped"); //stop components + dch->stop_handling(); + dch.reset(); LOG_PRINT_L0("Stopping core rpc server..."); rpc_server.send_stop_signal(); rpc_server.timed_wait_server_stop(5000); diff --git a/src/daemon/daemon_commands_handler.h b/src/daemon/daemon_commands_handler.h index 7cba4ec5a..6afbbb07f 100644 --- a/src/daemon/daemon_commands_handler.h +++ b/src/daemon/daemon_commands_handler.h @@ -79,6 +79,10 @@ public: m_cmd_binder.set_handler("limit_up", boost::bind(&daemon_cmmands_handler::limit_up, this, _1), "Set upload limit [kB/s]"); m_cmd_binder.set_handler("limit_down", boost::bind(&daemon_cmmands_handler::limit_down, this, _1), "Set download limit [kB/s]"); m_cmd_binder.set_handler("limit", boost::bind(&daemon_cmmands_handler::limit, this, _1), "Set download and upload limit [kB/s]"); + m_cmd_binder.set_handler("fast_exit", boost::bind(&daemon_cmmands_handler::fast_exit, this, _1), "Exit"); + m_cmd_binder.set_handler("test_drop_download", boost::bind(&daemon_cmmands_handler::test_drop_download, this, _1), "For network testing, drop downloaded blocks instead checking/adding them to blockchain. Can fake-download blocks very fast."); + m_cmd_binder.set_handler("start_save_graph", boost::bind(&daemon_cmmands_handler::start_save_graph, this, _1), ""); + m_cmd_binder.set_handler("stop_save_graph", boost::bind(&daemon_cmmands_handler::stop_save_graph, this, _1), ""); } bool start_handling() @@ -240,6 +244,8 @@ private: } log_space::log_singletone::get_set_log_detalisation_level(true, l); + int otshell_utils_log_level = 100 - (l * 25); + gCurrentLogger.setDebugLevel(otshell_utils_log_level); return true; } @@ -406,7 +412,7 @@ private: //-------------------------------------------------------------------------------- bool out_peers_limit(const std::vector<std::string>& args) { if(args.size()!=1) { - std::cout << "Usage: limit_down <speed>" << ENDL; + std::cout << "Usage: out_peers <number_of_peers>" << ENDL; return true; } @@ -420,13 +426,26 @@ private: return false; } + using namespace boost::chrono; + auto point = steady_clock::now(); + auto time_from_epoh = point.time_since_epoch(); + auto ms = duration_cast< milliseconds >( time_from_epoh ).count(); + double ms_f = ms; + ms_f /= 1000.; + + std::ofstream limitFile("log/dr-monero/peers_limit.info", std::ios::app); + limitFile.precision(7); + limitFile << ms_f << " " << static_cast<int>(limit) << std::endl; if (m_srv.m_config.m_net_config.connections_count > limit) { - int count = m_srv.m_config.m_net_config.connections_count - limit; m_srv.m_config.m_net_config.connections_count = limit; - m_srv.delete_connections(count); - } - else + if (m_srv.m_number_of_out_peers > limit) + { + int count = m_srv.m_number_of_out_peers - limit; + m_srv.delete_connections(count); + } + } + else m_srv.m_config.m_net_config.connections_count = limit; return true; @@ -527,4 +546,29 @@ private: return true; } + //-------------------------------------------------------------------------------- + bool fast_exit(const std::vector<std::string>& args) + { + m_srv.get_payload_object().get_core().set_fast_exit(); + m_srv.send_stop_signal(); + return true; + } + //-------------------------------------------------------------------------------- + bool test_drop_download(const std::vector<std::string>& args) + { + m_srv.get_payload_object().get_core().no_check_blocks(); + return true; + } + //-------------------------------------------------------------------------------- + bool start_save_graph(const std::vector<std::string>& args) + { + m_srv.set_save_graph(true); + return true; + } + //-------------------------------------------------------------------------------- + bool stop_save_graph(const std::vector<std::string>& args) + { + m_srv.set_save_graph(false); + return true; + } }; diff --git a/src/p2p/connection_basic.cpp b/src/p2p/connection_basic.cpp index 35b0d4c8e..0e2fd5942 100644 --- a/src/p2p/connection_basic.cpp +++ b/src/p2p/connection_basic.cpp @@ -78,6 +78,7 @@ #include "../../contrib/epee/include/net/abstract_tcp_server2.h" #include "../../contrib/otshell_utils/utils.hpp" +#include "data_logger.hpp" using namespace nOT::nUtils; // TODO: @@ -146,31 +147,31 @@ connection_basic::connection_basic(boost::asio::io_service& io_service, std::ato { ++ref_sock_count; // increase the global counter mI->m_peer_number = sock_number.fetch_add(1); // use, and increase the generated number - _note("Spawned connection p2p#"<<mI->m_peer_number<<" currently we have sockets count:" << m_ref_sock_count); + + string remote_addr_str = "?"; + try { remote_addr_str = socket_.remote_endpoint().address().to_string(); } catch(...){} ; + + _note("Spawned connection p2p#"<<mI->m_peer_number<<" to " << remote_addr_str << " currently we have sockets count:" << m_ref_sock_count); boost::filesystem::create_directories("log/dr-monero/net/"); - /*boost::asio::SettableSocketOption option;// = new boost::asio::SettableSocketOption(); - option.level(IPPROTO_IP); - option.name(IP_TOS); - option.value(&tos); - option.size = sizeof(tos); - socket_.set_option(option);*/ - // TODO socket options } connection_basic::~connection_basic() { - _note("Destructing connection p2p#"<<mI->m_peer_number); + string remote_addr_str = "?"; + try { remote_addr_str = socket_.remote_endpoint().address().to_string(); } catch(...){} ; + _note("Destructing connection p2p#"<<mI->m_peer_number << " to " << remote_addr_str); } void connection_basic::set_rate_up_limit(uint64_t limit) { - save_limit_to_file(limit); + + // TODO remove __SCALING_FACTOR... + const double SCALING_FACTOR = 2.1; // to acheve the best performance + limit *= SCALING_FACTOR; { - // TODO remove __SCALING_FACTOR... - const double SCALING_FACTOR = 2.25; // to acheve the best performance - limit *= SCALING_FACTOR; CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_out ); network_throttle_manager::get_global_throttle_out().set_target_speed(limit); + network_throttle_manager::get_global_throttle_out().set_real_target_speed(limit / SCALING_FACTOR); } - // connection_basic_pimpl::m_throttle_global.m_out.set_target_speed(limit); + save_limit_to_file(limit); } void connection_basic::set_rate_down_limit(uint64_t limit) { @@ -186,36 +187,30 @@ void connection_basic::set_rate_down_limit(uint64_t limit) { save_limit_to_file(limit); } -void connection_basic::set_rate_limit(uint64_t limit) { - // TODO -} -void connection_basic::set_kill_limit (uint64_t limit) { - { - CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_in ); - network_throttle_manager::get_global_throttle_in().set_target_kill(limit); - } - - { - CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_out ); - network_throttle_manager::get_global_throttle_out().set_target_kill(limit); - } - - { - CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_inreq ); - network_throttle_manager::get_global_throttle_inreq().set_target_kill(limit); - } -} void connection_basic::save_limit_to_file(int limit) { // saving limit to file - std::ofstream file; - file.open("log/dr-monero/limit.info"); - file << limit; -} - -void connection_basic::set_rate_autodetect(uint64_t limit) { - // TODO - LOG_PRINT_L0("inside connection_basic we set autodetect (this is additional notification).."); + if (!epee::net_utils::data_logger::m_save_graph) + return; + std::ofstream file_up, file_down; + file_up.open("log/dr-monero/limit_up.info", std::ofstream::out | std::ofstream::app); + file_up.precision(8); + file_down.open("log/dr-monero/limit_down.info", std::ofstream::out | std::ofstream::app); + file_down.precision(8); + using namespace boost::chrono; + auto point = steady_clock::now(); + auto time_from_epoh = point.time_since_epoch(); + auto s = duration_cast< seconds >( time_from_epoh ).count(); + + { + CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_out ); + file_up << s << " " << network_throttle_manager::get_global_throttle_out().get_terget_speed() / 1024 << "\n"; + } + + { + CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_in ); + file_down << s << " " << network_throttle_manager::get_global_throttle_in().get_terget_speed() / 1024 << "\n"; + } } void connection_basic::set_tos_flag(int tos) { @@ -230,39 +225,30 @@ void connection_basic::sleep_before_packet(size_t packet_size, int phase, int q double delay=0; // will be calculated do { // rate limiting - //XXX - /*if (::cryptonote::core::get_is_stopping()) { - _dbg1("We are stopping - so abort sleep"); - return; - }*/ if (m_was_shutdown) { _dbg2("m_was_shutdown - so abort sleep"); return; } { - CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_out ); + CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_out ); delay = network_throttle_manager::get_global_throttle_out().get_sleep_time_after_tick( packet_size ); // decission from global } - delay *= 0.50; - delay = 0; // XXX if (delay > 0) { - //delay += rand2*0.1; - long int ms = (long int)(delay * 1000); - _info_c("net/sleep", "Sleeping in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<packet_size); // XXX debug sleep + long int ms = (long int)(delay * 1000); + _info_c("net/sleep", "Sleeping in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<packet_size); // debug sleep _dbg1("sleep in sleep_before_packet"); - boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); // TODO randomize sleeps + epee::net_utils::data_logger::get_instance().add_data("sleep_up", ms); + boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); } } while(delay > 0); // XXX LATER XXX { CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_out ); - network_throttle_manager::get_global_throttle_out().handle_trafic_tcp( packet_size ); // increase counter - global - //epee::critical_region_t<decltype(m_throttle_global_lock)> guard(m_throttle_global_lock); // *** critical *** - //m_throttle_global.m_out.handle_trafic_tcp( packet_size ); // increase counter - global + network_throttle_manager::get_global_throttle_out().handle_trafic_exact( packet_size * 700); // increase counter - global } } @@ -271,34 +257,12 @@ void connection_basic::set_start_time() { m_start_time = network_throttle_manager::get_global_throttle_out().get_time_seconds(); } -void connection_basic::do_send_handler_start(const void* ptr , size_t cb ) { - _fact_c("net/out/size", "*** do_sen() called for packet="<<cb<<" B"); - sleep_before_packet(cb,1,-1); - // set_start_time(); -} - -void connection_basic::do_send_handler_delayed(const void* ptr , size_t cb ) { - // CRITICAL_REGION_LOCAL(network_throttle_manager::m_lock_get_global_throttle_out); - // auto sending_time = network_throttle_manager::get_global_throttle_out().get_time_seconds() - m_start_time; // wrong? --r -} - void connection_basic::do_send_handler_write(const void* ptr , size_t cb ) { sleep_before_packet(cb,1,-1); _info_c("net/out/size", "handler_write (direct) - before ASIO write, for packet="<<cb<<" B (after sleep)"); set_start_time(); } -void connection_basic::do_send_handler_stop(const void* ptr , size_t cb ) { -} - -void connection_basic::do_send_handler_after_write(const boost::system::error_code& e, size_t cb) { - // CRITICAL_REGION_LOCAL(network_throttle_manager::m_lock_get_global_throttle_out); - // auto sending_time = network_throttle_manager::get_global_throttle_out().get_time_seconds() - m_start_time; - // lag: if current sending time > max sending time - //if (sending_time > 0.1) network_throttle_manager::get_global_throttle_out().set_overheat(sending_time); // TODO - -} - void connection_basic::do_send_handler_write_from_queue( const boost::system::error_code& e, size_t cb, int q_len ) { sleep_before_packet(cb,2,q_len); _info_c("net/out/size", "handler_write (after write, from queue="<<q_len<<") - before ASIO write, for packet="<<cb<<" B (after sleep)"); @@ -306,49 +270,14 @@ void connection_basic::do_send_handler_write_from_queue( const boost::system::er set_start_time(); } -void connection_basic::do_read_handler_start(const boost::system::error_code& e, std::size_t bytes_transferred) { // from read, after read completion - const size_t packet_size = bytes_transferred; - { - CRITICAL_REGION_LOCAL( network_throttle_manager::m_lock_get_global_throttle_in ); - // sleep_before_packet(packet_size * __SCALING_FACTOR, 1, -1); // TODO remove __SCALING_FACTOR - network_throttle_manager::get_global_throttle_in().handle_trafic_tcp( packet_size ); // increase counter - global - // epee::critical_region_t<decltype(mI->m_throttle_global_lock)> guard(mI->m_throttle_global_lock); // *** critical *** - // mI->m_throttle_global.m_in.handle_trafic_tcp( packet_size ); // increase counter - global - } -} - -void connection_basic::logger_handle_net_peer(size_t size, bool io) { // network data written - // TODO OPTIMIZE! do NOT reopen idiotically :) - std::ostringstream oss; - std::string filename; - if (io) { // write - double time = network_throttle_manager::get_global_throttle_in().get_time_seconds() ; - oss << "log/dr-monero/net/in-peer-" << (mI->m_peer_number) << ".dat" << std::ends; - filename = oss.str(); - network_throttle_manager::get_global_throttle_out().logger_handle_net(filename,time,size); - } - else { // read - double time = network_throttle_manager::get_global_throttle_out().get_time_seconds() ; - oss << "log/dr-monero/net/out-peer-" << (mI->m_peer_number) << ".dat" << std::ends; - filename = oss.str(); - network_throttle_manager::get_global_throttle_in().logger_handle_net(filename,time,size); - } -} - void connection_basic::logger_handle_net_read(size_t size) { // network data read - std::string filename = "log/dr-monero/net/in-all.data"; - - double time = network_throttle_manager::get_global_throttle_in().get_time_seconds() ; - network_throttle_manager::get_global_throttle_in().logger_handle_net(filename, time, size); - logger_handle_net_peer(size,0); + size /= 1024; + epee::net_utils::data_logger::get_instance().add_data("download", size); } void connection_basic::logger_handle_net_write(size_t size) { - std::string filename = "log/dr-monero/net/out-all.data"; - double time = network_throttle_manager::get_global_throttle_out().get_time_seconds() ; - network_throttle_manager::get_global_throttle_out().logger_handle_net(filename, time, size); - logger_handle_net_peer(size,1); - + size /= 1024; + epee::net_utils::data_logger::get_instance().add_data("upload", size); } double connection_basic::get_sleep_time(size_t cb) { @@ -356,6 +285,10 @@ double connection_basic::get_sleep_time(size_t cb) { return t; } +void connection_basic::set_save_graph(bool save_graph) { + epee::net_utils::data_logger::m_save_graph = save_graph; +} + } // namespace } // namespace diff --git a/src/p2p/connection_basic.hpp b/src/p2p/connection_basic.hpp index 1b5a2c8ad..e9fdc3add 100644 --- a/src/p2p/connection_basic.hpp +++ b/src/p2p/connection_basic.hpp @@ -99,17 +99,11 @@ class connection_basic { // not-templated base class for rapid developmet of som virtual ~connection_basic(); // various handlers to be called from connection class: - void do_send_handler_start(const void * ptr , size_t cb); - void do_send_handler_delayed(const void * ptr , size_t cb); void do_send_handler_write(const void * ptr , size_t cb); - void do_send_handler_stop(const void * ptr , size_t cb); - void do_send_handler_after_write( const boost::system::error_code& e, size_t cb ); // from handle_write void do_send_handler_write_from_queue(const boost::system::error_code& e, size_t cb , int q_len); // from handle_write, sending next part - void do_read_handler_start(const boost::system::error_code& e, std::size_t bytes_transferred); // from read, after read completion void logger_handle_net_write(size_t size); // network data written void logger_handle_net_read(size_t size); // network data read - void logger_handle_net_peer(size_t size, bool io); void set_start_time(); @@ -117,9 +111,6 @@ class connection_basic { // not-templated base class for rapid developmet of som static void set_rate_up_limit(uint64_t limit); static void set_rate_down_limit(uint64_t limit); - static void set_rate_limit(uint64_t limit); - static void set_rate_autodetect(uint64_t limit); - static void set_kill_limit (uint64_t limit); // config misc static void set_tos_flag(int tos); // ToS / QoS flag @@ -129,6 +120,8 @@ class connection_basic { // not-templated base class for rapid developmet of som void sleep_before_packet(size_t packet_size, int phase, int q_len); // execute a sleep ; phase is not really used now(?) static void save_limit_to_file(int limit); ///< for dr-monero static double get_sleep_time(size_t cb); + + static void set_save_graph(bool save_graph); }; } // nameserver diff --git a/src/p2p/data_logger.cpp b/src/p2p/data_logger.cpp new file mode 100644 index 000000000..6a8eb25be --- /dev/null +++ b/src/p2p/data_logger.cpp @@ -0,0 +1,81 @@ +#include "data_logger.hpp" + +#include <boost/chrono.hpp> +#include <chrono> + +namespace epee +{ +namespace net_utils +{ + data_logger &data_logger::get_instance() + { + static data_logger instance; + return instance; + } + + data_logger::data_logger() + { + //create timer + std::shared_ptr<std::thread> logger_thread(new std::thread([&]() + { + while (true) + { + std::this_thread::sleep_for(std::chrono::seconds(1)); + saveToFile(); + } + })); + logger_thread->detach(); + + mFilesMap["peers"] = data_logger::fileData("log/dr-monero/peers.data"); + mFilesMap["download"] = data_logger::fileData("log/dr-monero/net/in-all.data"); + mFilesMap["upload"] = data_logger::fileData("log/dr-monero/net/out-all.data"); + mFilesMap["request"] = data_logger::fileData("log/dr-monero/net/req-all.data"); + mFilesMap["sleep_down"] = data_logger::fileData("log/dr-monero/down_sleep_log.data"); + mFilesMap["sleep_up"] = data_logger::fileData("log/dr-monero/up_sleep_log.data"); + + } + + void data_logger::add_data(std::string filename, unsigned int data) + { + if (mFilesMap.find(filename) == mFilesMap.end()) + return; // TODO: exception + + mFilesMap[filename].mDataToSave += data; + } + + double data_logger::fileData::get_current_time() + { + using namespace boost::chrono; + auto point = steady_clock::now(); + auto time_from_epoh = point.time_since_epoch(); + auto ms = duration_cast< milliseconds >( time_from_epoh ).count(); + double ms_f = ms; + return ms_f / 1000.; + } + + data_logger::fileData::fileData(std::string pFile) + { + mFile = std::make_shared<std::ofstream> (pFile); + } + + void data_logger::fileData::save() + { + if (!data_logger::m_save_graph) + return; + *mFile << static_cast<int>(get_current_time()) << " " << mDataToSave << std::endl; + } + + void data_logger::saveToFile() + { + std::lock_guard<std::mutex> lock(mSaveMutex); + for (auto &element : mFilesMap) + { + element.second.save(); + element.second.mDataToSave = 0; + } + } + +std::atomic<bool> data_logger::m_save_graph(false); + +} // namespace +} // namespace diff --git a/src/p2p/data_logger.hpp b/src/p2p/data_logger.hpp new file mode 100644 index 000000000..2b8503df3 --- /dev/null +++ b/src/p2p/data_logger.hpp @@ -0,0 +1,46 @@ +#ifndef INCLUDED_p2p_data_logger_hpp +#define INCLUDED_p2p_data_logger_hpp + +#include <string> +#include <map> +#include <fstream> +#include <memory> +#include <thread> +#include <mutex> +#include <atomic> + +namespace epee +{ +namespace net_utils +{ + + class data_logger { + public: + static data_logger &get_instance(); + data_logger(const data_logger &ob) = delete; + data_logger(data_logger &&ob) = delete; + void add_data(std::string filename, unsigned int data); + static std::atomic<bool> m_save_graph; + private: + data_logger(); + class fileData + { + public: + fileData(){} + fileData(std::string pFile); + + std::shared_ptr<std::ofstream> mFile; + long int mDataToSave = 0; + static double get_current_time(); + void save(); + }; + + std::map <std::string, fileData> mFilesMap; + std::mutex mSaveMutex; + void saveToFile(); + }; + +} // namespace +} // namespace + +#endif diff --git a/src/p2p/net_node.h b/src/p2p/net_node.h index 48737193e..ea7d5c383 100644 --- a/src/p2p/net_node.h +++ b/src/p2p/net_node.h @@ -86,7 +86,10 @@ namespace nodetool m_no_igd(false), m_hide_my_port(false), m_network_id(std::move(network_id)) - {} + { + m_number_of_out_peers = 0; + m_save_graph = false; + } static void init_options(boost::program_options::options_description& desc); @@ -225,6 +228,12 @@ namespace nodetool public: config m_config; // TODO was private, add getters? + std::atomic<unsigned int> m_number_of_out_peers; + void set_save_graph(bool save_graph) + { + m_save_graph = save_graph; + epee::net_utils::connection_basic::set_save_graph(save_graph); + } private: std::string m_config_folder; @@ -237,6 +246,7 @@ namespace nodetool bool m_allow_local_ip; bool m_hide_my_port; bool m_no_igd; + std::atomic<bool> m_save_graph; //critical_section m_connections_lock; //connections_indexed_container m_connections; diff --git a/src/p2p/net_node.inl b/src/p2p/net_node.inl index ce70e241a..60eed1f36 100644 --- a/src/p2p/net_node.inl +++ b/src/p2p/net_node.inl @@ -46,6 +46,7 @@ #include "net/local_ip.h" #include "crypto/crypto.h" #include "storages/levin_abstract_invoke2.h" +#include "data_logger.hpp" // We have to look for miniupnpc headers in different places, dependent on if its compiled or external #ifdef UPNP_STATIC @@ -85,8 +86,8 @@ namespace nodetool const command_line::arg_descriptor<std::vector<std::string> > arg_p2p_seed_node = {"seed-node", "Connect to a node to retrieve peer addresses, and disconnect"}; const command_line::arg_descriptor<bool> arg_p2p_hide_my_port = {"hide-my-port", "Do not announce yourself as peerlist candidate", false, true}; - const command_line::arg_descriptor<bool> arg_no_igd = {"no-igd", "Disable UPnP port mapping"}; - const command_line::arg_descriptor<int64_t> arg_out_peers = {"out-peers", "set max limit of out peers", -1}; + const command_line::arg_descriptor<bool> arg_no_igd = {"no-igd", "Disable UPnP port mapping"}; + const command_line::arg_descriptor<int64_t> arg_out_peers = {"out-peers", "set max limit of out peers", -1}; const command_line::arg_descriptor<int> arg_tos_flag = {"tos-flag", "set TOS flag", -1}; const command_line::arg_descriptor<int64_t> arg_limit_rate_up = {"limit-rate-up", "set limit-rate-up [kB/s]", -1}; @@ -289,6 +290,31 @@ namespace nodetool std::vector<std::vector<std::string>> dns_results; dns_results.resize(m_seed_nodes_list.size()); + + std::shared_ptr<std::thread> peersLoggerThread (new std::thread([&]() + { + unsigned int number_of_peers; + while (1) + { + if (m_save_graph) + { + //number_of_peers = m_net_server.get_config_object().get_connections_count(); + number_of_peers = 0; + m_net_server.get_config_object().foreach_connection([&](const p2p_connection_context& cntxt) + { + if(!cntxt.m_is_income) + ++number_of_peers; + return true; + }); // lambda + + m_number_of_out_peers = number_of_peers; + epee::net_utils::data_logger::get_instance().add_data("peers", number_of_peers); + } + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + })); // lambda + + peersLoggerThread->detach(); std::list<boost::thread*> dns_threads; uint64_t result_index = 0; @@ -487,6 +513,7 @@ namespace nodetool { m_peerlist.deinit(); m_net_server.deinit_server(); + return store_config(); } //----------------------------------------------------------------------------------- @@ -697,6 +724,16 @@ namespace nodetool template<class t_payload_net_handler> bool node_server<t_payload_net_handler>::try_to_connect_and_handshake_with_new_peer(const net_address& na, bool just_take_peerlist, uint64_t last_seen_stamp, bool white) { + if (m_number_of_out_peers == m_config.m_net_config.connections_count) // out peers limit + { + return false; + } + else if (m_number_of_out_peers > m_config.m_net_config.connections_count) + { + m_net_server.get_config_object().del_out_connections(1); + m_number_of_out_peers --; // atomic variable, update time = 1s + return false; + } LOG_PRINT_L1("Connecting to " << epee::string_tools::get_ip_string_from_int32(na.ip) << ":" << epee::string_tools::num_to_string_fast(na.port) << "(white=" << white << ", last_seen: " << (last_seen_stamp ? epee::misc_utils::get_time_interval_string(time(NULL) - last_seen_stamp):"never") @@ -784,16 +821,22 @@ namespace nodetool ++try_count; - if(is_peer_used(pe)) + _note("Considering connecting (out) to peer: " << pe.id << " " << epee::string_tools::get_ip_string_from_int32(pe.adr.ip) << ":" << boost::lexical_cast<std::string>(pe.adr.port)); + + if(is_peer_used(pe)) { + _note("Peer is used"); continue; + } LOG_PRINT_L1("Selected peer: " << pe.id << " " << epee::string_tools::get_ip_string_from_int32(pe.adr.ip) << ":" << boost::lexical_cast<std::string>(pe.adr.port) << "[white=" << use_white_list << "] last_seen: " << (pe.last_seen ? epee::misc_utils::get_time_interval_string(time(NULL) - pe.last_seen) : "never")); - if(!try_to_connect_and_handshake_with_new_peer(pe.adr, false, pe.last_seen, use_white_list)) + if(!try_to_connect_and_handshake_with_new_peer(pe.adr, false, pe.last_seen, use_white_list)) { + _note("Handshake failed"); continue; + } return true; } @@ -1336,20 +1379,31 @@ namespace nodetool template<class t_payload_net_handler> bool node_server<t_payload_net_handler>::set_max_out_peers(const boost::program_options::variables_map& vm, int64_t max) { + using namespace std::chrono; + auto point = steady_clock::now(); + auto time_from_epoh = point.time_since_epoch(); + auto ms = duration_cast< milliseconds >( time_from_epoh ).count(); + double ms_f = ms; + ms_f /= 1000.; + + std::ofstream limitFile("log/dr-monero/peers_limit.info", std::ios::app); + limitFile.precision(7); if(max == -1) { m_config.m_net_config.connections_count = P2P_DEFAULT_CONNECTIONS_COUNT; + if (m_save_graph) + limitFile << static_cast<int>(ms_f) << " " << P2P_DEFAULT_CONNECTIONS_COUNT << std::endl; return true; } m_config.m_net_config.connections_count = max; - LOG_PRINT_RED_L0("connections_count: " << m_config.m_net_config.connections_count); + limitFile << static_cast<int>(ms_f) << " " << max << std::endl; return true; } template<class t_payload_net_handler> void node_server<t_payload_net_handler>::delete_connections(size_t count) { - m_net_server.get_config_object().del_connections(count); + m_net_server.get_config_object().del_out_connections(count); } template<class t_payload_net_handler> diff --git a/src/p2p/network_throttle-detail.cpp b/src/p2p/network_throttle-detail.cpp index 6ea3076a9..6b2ee698e 100644 --- a/src/p2p/network_throttle-detail.cpp +++ b/src/p2p/network_throttle-detail.cpp @@ -78,6 +78,7 @@ #include "../../src/p2p/network_throttle-detail.hpp" #include "../../contrib/otshell_utils/utils.hpp" +#include "data_logger.hpp" using namespace nOT::nUtils; // ################################################################################################ @@ -152,8 +153,6 @@ network_throttle::network_throttle(const std::string &nameshort, const std::stri m_any_packet_yet = false; m_slot_size = 1.0; // hard coded in few places m_target_speed = 16 * 1024; // other defaults are probably defined in the command-line parsing code when this class is used e.g. as main global throttle - m_target_MB = 0; - } void network_throttle::set_name(const std::string &name) @@ -163,16 +162,20 @@ void network_throttle::set_name(const std::string &name) void network_throttle::set_target_speed( network_speed_kbps target ) { - m_target_speed = target; + m_target_speed = target * 1024; _note_c("net/"+m_nameshort, "Setting LIMIT: " << target << " kbps"); + set_real_target_speed(target); } -void network_throttle::set_target_kill( network_MB target ) +void network_throttle::set_real_target_speed( network_speed_kbps real_target ) { - _note_c("net/"+m_nameshort, "Setting KILL: " << target << " MB hard limit"); - m_target_MB = target; + m_real_target_speed = real_target * 1024; } +network_speed_kbps network_throttle::get_terget_speed() +{ + return m_real_target_speed / 1024; +} void network_throttle::tick() { @@ -187,7 +190,7 @@ void network_throttle::tick() // TODO optimize when moving few slots at once while ( (!m_any_packet_yet) || (last_sample_time_slot < current_sample_time_slot)) { - LOG_PRINT_L4("Moving counter buffer by 1 second " << last_sample_time_slot << " < " << current_sample_time_slot << " (last time " << m_last_sample_time<<")"); + _dbg3("Moving counter buffer by 1 second " << last_sample_time_slot << " < " << current_sample_time_slot << " (last time " << m_last_sample_time<<")"); // rotate buffer for (size_t i=m_history.size()-1; i>=1; --i) m_history[i] = m_history[i-1]; m_history[0] = packet_info(); @@ -217,7 +220,6 @@ void network_throttle::_handle_trafic_exact(size_t packet_size, size_t orginal_s std::ostringstream oss; oss << "["; for (auto sample: m_history) oss << sample.m_size << " "; oss << "]" << std::ends; std::string history_str = oss.str(); - logger_handle_net("log/dr-monero/net/inreq-all.data",get_time_seconds(),packet_size); _info_c( "net/" + m_nameshort , "Throttle " << m_name << ": packet of ~"<<packet_size<<"b " << " (from "<<orginal_size<<" b)" << " Speed AVG=" << std::setw(4) << ((long int)(cts .average/1024)) <<"[w="<<cts .window<<"]" << " " << std::setw(4) << ((long int)(cts2.average/1024)) <<"[w="<<cts2.window<<"]" @@ -233,23 +235,22 @@ void network_throttle::handle_trafic_tcp(size_t packet_size) _handle_trafic_exact( all_size , packet_size ); } -void network_throttle::handle_congestion(double overheat) { - // TODO -} - network_time_seconds network_throttle::get_sleep_time_after_tick(size_t packet_size) { tick(); return get_sleep_time(packet_size); } void network_throttle::logger_handle_net(const std::string &filename, double time, size_t size) { + if (! epee::net_utils::data_logger::m_save_graph) + return; std::mutex mutex; mutex.lock(); { std::fstream file; file.open(filename.c_str(), std::ios::app | std::ios::out ); + file.precision(6); if(!file.is_open()) _warn("Can't open file " << filename); - file << time << " " << size/1024 << "\n"; + file << static_cast<int>(time) << " " << static_cast<double>(size/1024) << "\n"; file.close(); } mutex.unlock(); } @@ -257,27 +258,11 @@ void network_throttle::logger_handle_net(const std::string &filename, double tim // fine tune this to decide about sending speed: network_time_seconds network_throttle::get_sleep_time(size_t packet_size) const { - //_scope_mark(""); double D2=0; calculate_times_struct cts = { 0, 0, 0, 0}; - //calculate_times(packet_size, cts, false, m_window_size/2); D2=cts.delay; - //calculate_times(packet_size, cts, true, m_window_size/2); D2=cts.delay; calculate_times(packet_size, cts, true, m_window_size); D2=cts.delay; return D2; } -double network_throttle::get_current_overheat() const { - auto now = get_time_seconds(); - auto diff = now - m_overheat_time; - auto overheat = m_overheat - diff; - overheat = std::max(m_overheat, 0.); - return overheat; -} - -void network_throttle::set_overheat(double lag) { - m_overheat += lag; - m_overheat_time = get_time_seconds(); - LOG_PRINT_L0("Lag: " << lag << ", overheat: " << m_overheat ); -} // MAIN LOGIC: void network_throttle::calculate_times(size_t packet_size, calculate_times_struct &cts, bool dbg, double force_window) const @@ -310,9 +295,7 @@ void network_throttle::calculate_times(size_t packet_size, calculate_times_struc const double D1 = (Epast - M*cts.window) / M; // delay - how long to sleep to get back to target speed const double D2 = (Enow - M*cts.window) / M; // delay - how long to sleep to get back to target speed (including current packet) - auto O = get_current_overheat(); - auto Ouse = O * 0 ; // XXX TODO - cts.delay = (D1*0.80 + D2*0.20) + Ouse; // finall sleep depends on both with/without current packet + cts.delay = (D1*0.80 + D2*0.20); // finall sleep depends on both with/without current packet // update_overheat(); cts.average = Epast/cts.window; // current avg. speed (for info) @@ -329,13 +312,13 @@ void network_throttle::calculate_times(size_t packet_size, calculate_times_struc if (dbg) { std::ostringstream oss; oss << "["; for (auto sample: m_history) oss << sample.m_size << " "; oss << "]" << std::ends; std::string history_str = oss.str(); - _dbg1_c( "net/"+m_nameshort+"_c" , - "dbg " << m_name << ": " + _info_c( "net/"+m_nameshort+"_c" , + (cts.delay > 0 ? "SLEEP" : "") + << "dbg " << m_name << ": " << "speed is A=" << std::setw(8) <<cts.average<<" vs " << "Max=" << std::setw(8) <<M<<" " << " so sleep: " << "D=" << std::setw(8) <<cts.delay<<" sec " - << "Overheat=" << std::setw(8) <<O<<" sec " << "E="<< std::setw(8) << E << " (Enow="<<std::setw(8)<<Enow<<") " << "M=" << std::setw(8) << M <<" W="<< std::setw(8) << cts.window << " " << "R=" << std::setw(8) << cts.recomendetDataSize << " Wgood" << std::setw(8) << Wgood << " " @@ -347,7 +330,7 @@ void network_throttle::calculate_times(size_t packet_size, calculate_times_struc } double network_throttle::get_time_seconds() const { - using namespace boost::chrono; + using namespace std::chrono; auto point = steady_clock::now(); auto time_from_epoh = point.time_since_epoch(); auto ms = duration_cast< milliseconds >( time_from_epoh ).count(); @@ -368,14 +351,28 @@ size_t network_throttle::get_recommended_size_of_planned_transport_window(double size_t network_throttle::get_recommended_size_of_planned_transport() const { size_t R1=0,R2=0,R3=0; R1 = get_recommended_size_of_planned_transport_window( -1 ); - R2 = get_recommended_size_of_planned_transport_window( m_window_size/2); - R3 = get_recommended_size_of_planned_transport_window( 8 ); + R2 = get_recommended_size_of_planned_transport_window(m_window_size / 2); + R3 = get_recommended_size_of_planned_transport_window( 5 ); auto RM = std::min(R1, std::min(R2,R3)); - const double a1=70, a2=10, a3=10, am=10; // weight of the various windows in decisssion + const double a1=20, a2=10, a3=10, am=10; // weight of the various windows in decisssion // TODO 70 => 20 return (R1*a1 + R2*a2 + R3*a3 + RM*am) / (a1+a2+a3+am); } +double network_throttle::get_current_speed() const { + unsigned int bytes_transferred = 0; + if (m_history.size() == 0 || m_slot_size == 0) + return 0; + + auto it = m_history.begin(); + while (it < m_history.end() - 1) + { + bytes_transferred += it->m_size; + it ++; + } + + return bytes_transferred / ((m_history.size() - 1) * m_slot_size); +} } // namespace } // namespace diff --git a/src/p2p/network_throttle-detail.hpp b/src/p2p/network_throttle-detail.hpp index 9d492c534..063dac850 100644 --- a/src/p2p/network_throttle-detail.hpp +++ b/src/p2p/network_throttle-detail.hpp @@ -54,7 +54,7 @@ class network_throttle : public i_network_throttle { network_speed_kbps m_target_speed; - network_MB m_target_MB; + network_speed_kbps m_real_target_speed; size_t m_network_add_cost; // estimated add cost of headers size_t m_network_minimal_segment; // estimated minimal cost of sending 1 byte to round up to size_t m_network_max_segment; // recommended max size of 1 TCP transmission @@ -80,18 +80,16 @@ class network_throttle : public i_network_throttle { virtual ~network_throttle(); virtual void set_name(const std::string &name); virtual void set_target_speed( network_speed_kbps target ); - virtual void set_target_kill( network_MB target ); + virtual void set_real_target_speed( network_speed_kbps real_target ); // only for throttle_out + virtual network_speed_kbps get_terget_speed(); // add information about events: virtual void handle_trafic_exact(size_t packet_size); ///< count the new traffic/packet; the size is exact considering all network costs virtual void handle_trafic_tcp(size_t packet_size); ///< count the new traffic/packet; the size is as TCP, we will consider MTU etc - virtual void handle_congestion(double overheat); ///< call this when congestion is detected; see example use virtual void tick(); ///< poke and update timers/history (recalculates, moves the history if needed, checks the real clock etc) virtual double get_time_seconds() const ; ///< timer that we use, time in seconds, monotionic - virtual double get_current_overheat() const; ///< did we detected congestion now. NOT USED NOW TODO - virtual void set_overheat(double lag); ///< did we detected congestion now. NOT USED NOW TODO. rename to add_overheat ? // time calculations: virtual void calculate_times(size_t packet_size, calculate_times_struct &cts, bool dbg, double force_window) const; ///< MAIN LOGIC (see base class for info) @@ -101,7 +99,7 @@ class network_throttle : public i_network_throttle { virtual size_t get_recommended_size_of_planned_transport() const; ///< what should be the size (bytes) of next data block to be transported virtual size_t get_recommended_size_of_planned_transport_window(double force_window) const; ///< ditto, but for given windows time frame - //virtual void add_planned_transport(size_t size); + virtual double get_current_speed() const; private: virtual network_time_seconds time_to_slot(network_time_seconds t) const { return std::floor( t ); } // convert exact time eg 13.7 to rounded time for slot number in history 13 diff --git a/src/p2p/network_throttle.hpp b/src/p2p/network_throttle.hpp index dc25a2c45..add4daa86 100644 --- a/src/p2p/network_throttle.hpp +++ b/src/p2p/network_throttle.hpp @@ -100,7 +100,7 @@ struct calculate_times_struct { typedef calculate_times_struct calculate_times_struct; -namespace cryptonote { class cryptonote_protocol_handler_base; }; // a friend class // TODO friend not working +namespace cryptonote { class cryptonote_protocol_handler_base; } // a friend class // TODO friend not working /*** @brief Access to simple throttles, with singlton to access global network limits @@ -146,11 +146,11 @@ class i_network_throttle { public: virtual void set_name(const std::string &name)=0; virtual void set_target_speed( network_speed_kbps target )=0; - virtual void set_target_kill( network_MB target )=0; + virtual void set_real_target_speed(network_speed_kbps real_target)=0; + virtual network_speed_kbps get_terget_speed()=0; virtual void handle_trafic_exact(size_t packet_size) =0; // count the new traffic/packet; the size is exact considering all network costs virtual void handle_trafic_tcp(size_t packet_size) =0; // count the new traffic/packet; the size is as TCP, we will consider MTU etc - virtual void handle_congestion(double overheat) =0; // call this when congestion is detected; see example use virtual void tick() =0; // poke and update timers/history // time calculations: @@ -166,8 +166,6 @@ class i_network_throttle { virtual size_t get_recommended_size_of_planned_transport() const =0; // what should be the recommended limit of data size that we can transport over current network_throttle in near future virtual double get_time_seconds() const =0; // a timer - virtual double get_current_overheat() const =0; - virtual void set_overheat(double lag) =0; virtual void logger_handle_net(const std::string &filename, double time, size_t size)=0; diff --git a/src/serialization/binary_archive.h b/src/serialization/binary_archive.h index 6e00ad664..5cd4988e4 100644 --- a/src/serialization/binary_archive.h +++ b/src/serialization/binary_archive.h @@ -42,8 +42,8 @@ #include "warnings.h" /* I have no clue what these lines means */ -PUSH_WARNINGS; -DISABLE_VS_WARNINGS(4244); +PUSH_WARNINGS +DISABLE_VS_WARNINGS(4244) //TODO: fix size_t warning in x32 platform |