diff options
Diffstat (limited to 'contrib/epee/include/net/abstract_tcp_server2.inl')
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.inl | 428 |
1 files changed, 331 insertions, 97 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index db3f9e322..31836fe9e 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -1,3 +1,9 @@ +/** +@file +@author from CrypoNote (see copyright below; Andrey N. Sabelnikov) +@monero rfree +@brief the connection templated-class for one peer connection +*/ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. // @@ -26,7 +32,7 @@ -#include "net_utils_base.h" +//#include "net_utils_base.h" #include <boost/lambda/bind.hpp> #include <boost/foreach.hpp> #include <boost/lambda/lambda.hpp> @@ -34,9 +40,21 @@ #include <boost/chrono.hpp> #include <boost/utility/value_init.hpp> #include <boost/asio/deadline_timer.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> // TODO +#include <boost/thread/thread.hpp> // TODO #include "misc_language.h" #include "pragma_comp_defs.h" +#include <sstream> +#include <iomanip> +#include <algorithm> + +#include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal() + +#include "../../../../contrib/otshell_utils/utils.hpp" +#include "../../../../src/p2p/data_logger.hpp" +using namespace nOT::nUtils; // TODO + PRAGMA_WARNING_PUSH namespace epee { @@ -48,17 +66,21 @@ namespace net_utils PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> - connection<t_protocol_handler>::connection(boost::asio::io_service& io_service, - typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter* &pfilter) - : strand_(io_service), - socket_(io_service), - m_want_close_connection(0), - m_was_shutdown(0), - m_ref_sockets_count(sock_count), - m_pfilter(pfilter), - m_protocol_handler(this, config, context) + connection<t_protocol_handler>::connection( boost::asio::io_service& io_service, + typename t_protocol_handler::config_type& config, + std::atomic<long> &ref_sock_count, // the ++/-- counter + std::atomic<long> &sock_number, // the only increasing ++ number generator + i_connection_filter* &pfilter + ) + : + connection_basic(io_service, ref_sock_count, sock_number), + m_protocol_handler(this, config, context), + m_pfilter( pfilter ), + m_connection_type(NET), + m_throttle_speed_in("speed_in", "throttle_speed_in"), + m_throttle_speed_out("speed_out", "throttle_speed_out") { - boost::interprocess::ipcdetail::atomic_inc32(&m_ref_sockets_count); + _info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type); } PRAGMA_WARNING_DISABLE_VS(4355) //--------------------------------------------------------------------------------- @@ -67,12 +89,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) { if(!m_was_shutdown) { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown."); + _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown."); shutdown(); } - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed"); - boost::interprocess::ipcdetail::atomic_dec32(&m_ref_sockets_count); + _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed"); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> @@ -118,13 +139,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) long ip_ = boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong()); context.set_details(boost::uuids::random_generator()(), ip_, remote_ep.port(), is_income); - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) << + _dbg3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) << " to " << local_ep.address().to_string() << ':' << local_ep.port() << - ", total sockets objects " << m_ref_sockets_count); + ", total sockets objects " << m_ref_sock_count); if(m_pfilter && !m_pfilter->is_remote_ip_allowed(context.m_remote_ip)) { - LOG_PRINT_L2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection"); + _dbg2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection"); close(); return false; } @@ -136,7 +157,17 @@ PRAGMA_WARNING_DISABLE_VS(4355) boost::bind(&connection<t_protocol_handler>::handle_read, self, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); - + + //set ToS flag + int tos = get_tos_flag(); + boost::asio::detail::socket_option::integer< IPPROTO_IP, IP_TOS > + optionTos( tos ); + socket_.set_option( optionTos ); + //_dbg1("Set ToS flag to " << tos); + + boost::asio::ip::tcp::no_delay noDelayOption(false); + socket_.set_option(noDelayOption); + return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false); @@ -146,7 +177,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) bool connection<t_protocol_handler>::request_callback() { TRY_ENTRY(); - LOG_PRINT_L2("[" << print_connection_context_short(context) << "] request_callback"); + _dbg2("[" << print_connection_context_short(context) << "] request_callback"); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); if(!self) @@ -167,8 +198,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) bool connection<t_protocol_handler>::add_ref() { TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] add_ref"); + //_dbg3("[sock " << socket_.native_handle() << "] add_ref, m_peer_number=" << mI->m_peer_number); CRITICAL_REGION_LOCAL(m_self_refs_lock); + //_dbg3("[sock " << socket_.native_handle() << "] add_ref 2, m_peer_number=" << mI->m_peer_number); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); @@ -201,7 +233,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) void connection<t_protocol_handler>::call_back_starter() { TRY_ENTRY(); - LOG_PRINT_L2("[" << print_connection_context_short(context) << "] fired_callback"); + _dbg2("[" << print_connection_context_short(context) << "] fired_callback"); m_protocol_handler.handle_qued_callback(); CATCH_ENTRY_L0("connection<t_protocol_handler>::call_back_starter()", void()); } @@ -211,17 +243,44 @@ PRAGMA_WARNING_DISABLE_VS(4355) std::size_t bytes_transferred) { TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async read calledback."); + //_info("[sock " << socket_.native_handle() << "] Async read calledback."); if (!e) { - LOG_PRINT("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred, LOG_LEVEL_4); + { + CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex); + m_throttle_speed_in.handle_trafic_exact(bytes_transferred); + context.m_current_speed_down = m_throttle_speed_in.get_current_speed(); + } + + { + CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in ); + epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024); + } + double delay=0; // will be calculated + do + { + { //_scope_dbg1("CRITICAL_REGION_LOCAL"); + CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in ); + delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global + } + + delay *= 0.5; + if (delay > 0) { + long int ms = (long int)(delay * 100); + epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms); + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + } + } while(delay > 0); + + //_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred); + logger_handle_net_read(bytes_transferred); context.m_last_recv = time(NULL); context.m_recv_cnt += bytes_transferred; bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred); if(!recv_res) { - LOG_PRINT("[sock " << socket_.native_handle() << "] protocol_want_close", LOG_LEVEL_4); + //_info("[sock " << socket_.native_handle() << "] protocol_want_close"); //some error in protocol, protocol handler ask to close connection boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); @@ -239,14 +298,14 @@ PRAGMA_WARNING_DISABLE_VS(4355) boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "]Async read requested."); + //_info("[sock " << socket_.native_handle() << "]Async read requested."); } }else { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value()); + _dbg3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value()); if(e.value() != 2) { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); + _dbg3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); shutdown(); } } @@ -283,8 +342,91 @@ PRAGMA_WARNING_DISABLE_VS(4355) CATCH_ENTRY_L0("connection<t_protocol_handler>::call_run_once_service_io", false); } //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) { + TRY_ENTRY(); + + // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted + auto self = safe_shared_from_this(); + if (!self) return false; + if (m_was_shutdown) return false; + // TODO avoid copy + + const double factor = 32; // TODO config + typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic + const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) ); + const t_safe chunksize_max = chunksize_good * 2 ; + const bool allow_split = (m_connection_type == RPC) ? false : true; // TODO config + + ASRT(! (chunksize_max<0) ); // make sure it is unsigned before removin sign with cast: + long long unsigned int chunksize_max_unsigned = static_cast<long long unsigned int>( chunksize_max ) ; + + if (allow_split && (cb > chunksize_max_unsigned)) { + { // LOCK: chunking + epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical *** + + _dbg3_c("net/out/size", "do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr); + t_safe all = cb; // all bytes to send + t_safe pos = 0; // current sending position + // 01234567890 + // ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4 + // ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8 + // ^^^ (pos=8, len=4) ; + + // const size_t bufsize = chunksize_good; // TODO safecast + // char* buf = new char[ bufsize ]; + + bool all_ok = true; + while (pos < all) { + t_safe lenall = all-pos; // length from here to end + t_safe len = std::min( chunksize_good , lenall); // take a smaller part + ASRT(len<=chunksize_good); + // pos=8; len=4; all=10; len=3; + + ASRT(! (len<0) ); // check before we cast away sign: + unsigned long long int len_unsigned = static_cast<long long int>( len ); + ASRT(len>0); // (redundand) + ASRT(len_unsigned < std::numeric_limits<size_t>::max()); // yeap we want strong < then max size, to be sure + + void *chunk_start = ((char*)ptr) + pos; + _fact_c("net/out/size","chunk_start="<<chunk_start<<" ptr="<<ptr<<" pos="<<pos); + ASRT(chunk_start >= ptr); // not wrapped around address? + //std::memcpy( (void*)buf, chunk_start, len); + + _dbg3_c("net/out/size", "part of " << lenall << ": pos="<<pos << " len="<<len); + + bool ok = do_send_chunk(chunk_start, len); // <====== *** + + all_ok = all_ok && ok; + if (!all_ok) { + _dbg1_c("net/out/size", "do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr); + _dbg1("do_send() SEND was aborted in middle of big package - this is mostly harmless " + << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << cb); + return false; // partial failure in sending + } + pos = pos+len; ASRT(pos >0); + + // (in catch block, or uniq pointer) delete buf; + } // each chunk + + _dbg3_c("net/out/size", "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr); + _dbg3 ( "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr); + + _info_c("net/sleepRPC", "do_send() m_connection_type = " << m_connection_type); + + return all_ok; // done - e.g. queued - all the chunks of current do_send call + } // LOCK: chunking + } // a big block (to be chunked) - all chunks + else { // small block + return do_send_chunk(ptr,cb); // just send as 1 big chunk + } + + CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); + } // do_send() + + //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) + bool connection<t_protocol_handler>::do_send_chunk(const void* ptr, size_t cb) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -293,50 +435,86 @@ PRAGMA_WARNING_DISABLE_VS(4355) return false; if(m_was_shutdown) return false; + { + CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex); + m_throttle_speed_out.handle_trafic_exact(cb); + context.m_current_speed_up = m_throttle_speed_out.get_current_speed(); + } - LOG_PRINT("[sock " << socket_.native_handle() << "] SEND " << cb, LOG_LEVEL_4); + //_info("[sock " << socket_.native_handle() << "] SEND " << cb); context.m_last_send = time(NULL); context.m_send_cnt += cb; //some data should be wrote to stream //request complete - epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); - if(m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) + sleep_before_packet(cb, 1, 1); + epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical *** + long int retry=0; + const long int retry_limit = 5*4; + while (m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) { - send_guard.unlock(); - LOG_PRINT_L2("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); - close(); - return false; + retry++; + + /* if ( ::cryptonote::core::get_is_stopping() ) { // TODO re-add fast stop + _fact("ABORT queue wait due to stopping"); + return false; // aborted + }*/ + + long int ms = 250 + (rand()%50); + _info_c("net/sleep", "Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<cb); // XXX debug sleep + boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); + _dbg1("sleep for queue: " << ms); + + if (retry > retry_limit) { + send_guard.unlock(); + _erro("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); + // _dbg1_c("net/sleep", "send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); + close(); + return false; + } } m_send_que.resize(m_send_que.size()+1); m_send_que.back().assign((const char*)ptr, cb); if(m_send_que.size() > 1) - { - //active operation should be in progress, nothing to do, just wait last operation callback - }else - { - //no active operation - if(m_send_que.size()!=1) - { - LOG_ERROR("Looks like no active operations, but send que size != 1!!"); - return false; - } - - boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), - //strand_.wrap( - boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2) - //) - ); + { // active operation should be in progress, nothing to do, just wait last operation callback + auto size_now = cb; + _info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size()); + //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size()); } + else + { // no active operation + + if(m_send_que.size()!=1) + { + _erro("Looks like no active operations, but send que size != 1!!"); + return false; + } + + auto size_now = m_send_que.front().size(); + _dbg1_c("net/out/size", "do_send() NOW SENSD: packet="<<size_now<<" B"); + do_send_handler_write( ptr , size_now ); // (((H))) + + ASRT( size_now == m_send_que.front().size() ); + boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) , + //strand_.wrap( + boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2) + //) + ); + //_dbg3("(chunk): " << size_now); + //logger_handle_net_write(size_now); + //_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size()); + } + + //do_send_handler_stop( ptr , cb ); // empty function return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); - } + } // do_send_chunk //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::shutdown() @@ -353,7 +531,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) bool connection<t_protocol_handler>::close() { TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Que Shutdown called."); + //_info("[sock " << socket_.native_handle() << "] Que Shutdown called."); size_t send_que_size = 0; CRITICAL_REGION_BEGIN(m_send_que_lock); send_que_size = m_send_que.size(); @@ -376,16 +554,17 @@ PRAGMA_WARNING_DISABLE_VS(4355) if (e) { - LOG_PRINT_L1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value()); + _dbg1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value()); shutdown(); return; } - + logger_handle_net_write(cb); + sleep_before_packet(cb, 1, 1); bool do_shutdown = false; CRITICAL_REGION_BEGIN(m_send_que_lock); if(m_send_que.empty()) { - LOG_ERROR("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!"); + _erro("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!"); return; } @@ -399,10 +578,16 @@ PRAGMA_WARNING_DISABLE_VS(4355) }else { //have more data to send - boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), - //strand_.wrap( - boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2)); - //); + auto size_now = m_send_que.front().size(); + _dbg1_c("net/out/size", "handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size()); + do_send_handler_write_from_queue(e, m_send_que.front().size() , m_send_que.size()); // (((H))) + ASRT( size_now == m_send_que.front().size() ); + boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now) , + // strand_.wrap( + boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2) + // ) + ); + //_dbg3("(normal)" << size_now); } CRITICAL_REGION_END(); @@ -412,6 +597,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) } CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_write", void()); } + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + void connection<t_protocol_handler>::setRPcStation() + { + m_connection_type = RPC; + _fact_c("net/sleepRPC", "set m_connection_type = RPC "); + } /************************************************************************/ /* */ /************************************************************************/ @@ -420,19 +612,27 @@ PRAGMA_WARNING_DISABLE_VS(4355) m_io_service_local_instance(new boost::asio::io_service()), io_service_(*m_io_service_local_instance.get()), acceptor_(io_service_), - new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)), - m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) + m_stop_signal_sent(false), m_port(0), + m_sock_count(0), m_sock_number(0), m_threads_count(0), + m_pfilter(NULL), m_thread_index(0), + new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)) { + create_server_type_map(); m_thread_name_prefix = "NET"; + type = NET; } template<class t_protocol_handler> - boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service): + boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_server_role s_type): io_service_(extarnal_io_service), acceptor_(io_service_), - new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)), - m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) + m_stop_signal_sent(false), m_port(0), + m_sock_count(0), m_sock_number(0), m_threads_count(0), + m_pfilter(NULL), m_thread_index(0), + type(NET), + new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)) { + create_server_type_map(); m_thread_name_prefix = "NET"; } //--------------------------------------------------------------------------------- @@ -444,6 +644,14 @@ PRAGMA_WARNING_DISABLE_VS(4355) } //--------------------------------------------------------------------------------- template<class t_protocol_handler> + void boosted_tcp_server<t_protocol_handler>::create_server_type_map() + { + server_type_map["NET"] = t_server_role::NET; + server_type_map["RPC"] = t_server_role::RPC; + server_type_map["P2P"] = t_server_role::P2P; + } + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::init_server(uint32_t port, const std::string address) { TRY_ENTRY(); @@ -491,6 +699,7 @@ POP_WARNINGS std::string thread_name = std::string("[") + m_thread_name_prefix; thread_name += boost::to_string(local_thr_index) + "]"; log_space::log_singletone::set_thread_log_prefix(thread_name); + // _fact("Thread name: " << m_thread_name_prefix); while(!m_stop_signal_sent) { try @@ -499,14 +708,14 @@ POP_WARNINGS } catch(const std::exception& ex) { - LOG_ERROR("Exception at server worker thread, what=" << ex.what()); + _erro("Exception at server worker thread, what=" << ex.what()); } catch(...) { - LOG_ERROR("Exception at server worker thread, unknown execption"); + _erro("Exception at server worker thread, unknown execption"); } } - LOG_PRINT_L4("Worker thread finished"); + //_info("Worker thread finished"); return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::worker_thread", false); } @@ -515,6 +724,9 @@ POP_WARNINGS void boosted_tcp_server<t_protocol_handler>::set_threads_prefix(const std::string& prefix_name) { m_thread_name_prefix = prefix_name; + type = server_type_map[m_thread_name_prefix]; + _note("Set server type to: " << type); + _note("Set server type to: " << m_thread_name_prefix); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> @@ -539,32 +751,38 @@ POP_WARNINGS { boost::shared_ptr<boost::thread> thread(new boost::thread( attrs, boost::bind(&boosted_tcp_server<t_protocol_handler>::worker_thread, this))); + _note("Run server thread name: " << m_thread_name_prefix); m_threads.push_back(thread); } CRITICAL_REGION_END(); // Wait for all threads in the pool to exit. - if(wait) + if (wait) // && ! ::cryptonote::core::get_is_stopping()) // TODO fast_exit { - for (std::size_t i = 0; i < m_threads.size(); ++i) - m_threads[i]->join(); + _fact("JOINING all threads"); + for (std::size_t i = 0; i < m_threads.size(); ++i) { + m_threads[i]->join(); + } + _fact("JOINING all threads - almost"); m_threads.clear(); + _fact("JOINING all threads - DONE"); - }else - { + } + else { + _dbg1("Reiniting OK."); return true; } if(wait && !m_stop_signal_sent) { //some problems with the listening socket ?.. - LOG_PRINT_L0("Net service stopped without stop request, restarting..."); + _dbg1("Net service stopped without stop request, restarting..."); if(!this->init_server(m_port, m_address)) { - LOG_PRINT_L0("Reiniting service failed, exit."); + _dbg1("Reiniting service failed, exit."); return false; }else { - LOG_PRINT_L0("Reiniting OK."); + _dbg1("Reiniting OK."); } } } @@ -597,7 +815,7 @@ POP_WARNINGS { if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms)) { - LOG_PRINT_L0("Interrupting thread " << m_threads[i]->native_handle()); + _dbg1("Interrupting thread " << m_threads[i]->native_handle()); m_threads[i]->interrupt(); } } @@ -608,6 +826,10 @@ POP_WARNINGS template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::send_stop_signal() { + if (::cryptonote::core::get_fast_exit() == true) + { + detach_threads(); + } m_stop_signal_sent = true; TRY_ENTRY(); io_service_.stop(); @@ -626,19 +848,22 @@ POP_WARNINGS TRY_ENTRY(); if (!e) { - connection_ptr conn(std::move(new_connection_)); - - new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)); + if (type == RPC) { + new_connection_->setRPcStation(); + _note("New server for RPC connections"); + } + connection_ptr conn(std::move(new_connection_)); + new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this, boost::asio::placeholders::error)); bool r = conn->start(true, 1 < m_threads_count); if (!r) - LOG_ERROR("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count); + _erro("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count); }else { - LOG_ERROR("Some problems at accept: " << e.message() << ", connections_count = " << m_sockets_count); + _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_sock_count); } CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::handle_accept", void()); } @@ -648,7 +873,7 @@ POP_WARNINGS { TRY_ENTRY(); - connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) ); + connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) ); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); ////////////////////////////////////////////////////////////////////////// @@ -658,7 +883,7 @@ POP_WARNINGS boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { - LOG_ERROR("Failed to resolve " << adr); + _erro("Failed to resolve " << adr); return false; } ////////////////////////////////////////////////////////////////////////// @@ -704,7 +929,7 @@ POP_WARNINGS { //timeout sock_.close(); - LOG_PRINT_L3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); + _dbg3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); return false; } } @@ -712,21 +937,21 @@ POP_WARNINGS if (ec || !sock_.is_open()) { - LOG_PRINT("Some problems at connect, message: " << ec.message(), LOG_LEVEL_3); + _dbg3("Some problems at connect, message: " << ec.message()); return false; } - LOG_PRINT_L3("Connected success to " << adr << ':' << port); + _dbg3("Connected success to " << adr << ':' << port); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) { new_connection_l->get_context(conn_context); - //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)); + //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_pfilter)); } else { - LOG_ERROR("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count); + _erro("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count); } return r; @@ -738,7 +963,7 @@ POP_WARNINGS bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback cb, const std::string& bind_ip) { TRY_ENTRY(); - connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) ); + connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) ); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); ////////////////////////////////////////////////////////////////////////// @@ -748,7 +973,7 @@ POP_WARNINGS boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { - LOG_ERROR("Failed to resolve " << adr); + _erro("Failed to resolve " << adr); return false; } ////////////////////////////////////////////////////////////////////////// @@ -768,7 +993,7 @@ POP_WARNINGS { if(error != boost::asio::error::operation_aborted) { - LOG_PRINT_L3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); + _dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); new_connection_l->socket().close(); } }); @@ -785,7 +1010,7 @@ POP_WARNINGS cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation }else { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << + _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port()); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) @@ -795,13 +1020,13 @@ POP_WARNINGS } else { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port); + _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port); cb(conn_context, boost::asio::error::fault); } } }else { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << + _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value()); cb(conn_context, ec_); } @@ -809,6 +1034,15 @@ POP_WARNINGS return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false); } -} -} + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + void boosted_tcp_server<t_protocol_handler>::detach_threads() + { + for (auto thread : m_threads) + thread->detach(); + } + + +} // namespace +} // namespace PRAGMA_WARNING_POP |