diff options
Diffstat (limited to 'contrib/epee')
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.h | 86 | ||||
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.inl | 428 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_protocol_handler_async.h | 29 | ||||
-rw-r--r-- | contrib/epee/include/net/net_utils_base.h | 10 | ||||
-rw-r--r-- | contrib/epee/include/storages/levin_abstract_invoke2.h | 4 | ||||
-rw-r--r-- | contrib/epee/include/syncobj.h | 12 |
6 files changed, 436 insertions, 133 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index 6c613c5d5..3e6ea2171 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -1,3 +1,9 @@ +/** +@file +@author from CrypoNote (see copyright below; Andrey N. Sabelnikov) +@monero rfree +@brief the connection templated-class for one peer connection +*/ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. // @@ -26,7 +32,7 @@ -#ifndef _ABSTRACT_TCP_SERVER2_H_ +#ifndef _ABSTRACT_TCP_SERVER2_H_ #define _ABSTRACT_TCP_SERVER2_H_ @@ -36,6 +42,8 @@ #include <boost/noncopyable.hpp> #include <boost/shared_ptr.hpp> #include <atomic> +#include <map> +#include <memory> #include <boost/asio.hpp> #include <boost/array.hpp> @@ -46,7 +54,9 @@ #include <boost/thread/thread.hpp> #include "net_utils_base.h" #include "syncobj.h" - +#include "../../../../src/p2p/connection_basic.hpp" +#include "../../../../contrib/otshell_utils/utils.hpp" +#include "../../../../src/p2p/network_throttle-detail.hpp" #define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT 1000 @@ -61,6 +71,12 @@ namespace net_utils protected: virtual ~i_connection_filter(){} }; + + enum t_server_role { // type of the server, e.g. so that we will know how to limit it + NET = 0, // default (not used? used for misc connections maybe?) TODO + RPC = 1, // the rpc commands + P2P = 2 // to other p2p node + }; /************************************************************************/ /* */ @@ -70,13 +86,18 @@ namespace net_utils class connection : public boost::enable_shared_from_this<connection<t_protocol_handler> >, private boost::noncopyable, - public i_service_endpoint + public i_service_endpoint, + public connection_basic { public: typedef typename t_protocol_handler::connection_context t_connection_context; /// Construct a connection with the given io_service. - explicit connection(boost::asio::io_service& io_service, - typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter * &pfilter); + + explicit connection( boost::asio::io_service& io_service, + typename t_protocol_handler::config_type& config, + std::atomic<long> &ref_sock_count, // the ++/-- counter + std::atomic<long> &sock_number, // the only increasing ++ number generator + i_connection_filter * &pfilter); virtual ~connection(); /// Get the socket associated with the connection. @@ -90,7 +111,8 @@ namespace net_utils void call_back_starter(); private: //----------------- i_service_endpoint --------------------- - virtual bool do_send(const void* ptr, size_t cb); + virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint) + virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data virtual bool close(); virtual bool call_run_once_service_io(); virtual bool request_callback(); @@ -107,29 +129,31 @@ namespace net_utils /// Handle completion of a write operation. void handle_write(const boost::system::error_code& e, size_t cb); - /// Strand to ensure the connection's handlers are not called concurrently. - boost::asio::io_service::strand strand_; - - /// Socket for the connection. - boost::asio::ip::tcp::socket socket_; - /// Buffer for incoming data. boost::array<char, 8192> buffer_; + //boost::array<char, 1024> buffer_; t_connection_context context; - volatile uint32_t m_want_close_connection; - std::atomic<bool> m_was_shutdown; - critical_section m_send_que_lock; - std::list<std::string> m_send_que; - volatile uint32_t& m_ref_sockets_count; i_connection_filter* &m_pfilter; - volatile bool m_is_multithreaded; + // TODO what do they mean about wait on destructor?? --rfree : //this should be the last one, because it could be wait on destructor, while other activities possible on other threads t_protocol_handler m_protocol_handler; //typename t_protocol_handler::config_type m_dummy_config; std::list<boost::shared_ptr<connection<t_protocol_handler> > > m_self_refs; // add_ref/release support critical_section m_self_refs_lock; + critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk() + + t_server_role m_connection_type; + + // for calculate speed (last 60 sec) + network_throttle m_throttle_speed_in; + network_throttle m_throttle_speed_out; + std::mutex m_throttle_speed_in_mutex; + std::mutex m_throttle_speed_out_mutex; + + public: + void setRPcStation(); }; @@ -146,9 +170,11 @@ namespace net_utils /// Construct the server to listen on the specified TCP address and port, and /// serve up files from the given directory. boosted_tcp_server(); - explicit boosted_tcp_server(boost::asio::io_service& external_io_service); + explicit boosted_tcp_server(boost::asio::io_service& external_io_service, t_server_role s_type); ~boosted_tcp_server(); - + + std::map<std::string, t_server_role> server_type_map; + void create_server_type_map(); bool init_server(uint32_t port, const std::string address = "0.0.0.0"); bool init_server(const std::string port, const std::string& address = "0.0.0.0"); @@ -254,22 +280,26 @@ namespace net_utils /// Acceptor used to listen for incoming connections. boost::asio::ip::tcp::acceptor acceptor_; - /// The next connection to be accepted. - connection_ptr new_connection_; std::atomic<bool> m_stop_signal_sent; uint32_t m_port; - volatile uint32_t m_sockets_count; + std::atomic<long> m_sock_count; + std::atomic<long> m_sock_number; std::string m_address; - std::string m_thread_name_prefix; + std::string m_thread_name_prefix; //TODO: change to enum server_type, now used size_t m_threads_count; i_connection_filter* m_pfilter; std::vector<boost::shared_ptr<boost::thread> > m_threads; boost::thread::id m_main_thread_id; critical_section m_threads_lock; - volatile uint32_t m_thread_index; - }; -} -} + volatile uint32_t m_thread_index; // TODO change to std::atomic + t_server_role type; + void detach_threads(); + + /// The next connection to be accepted + connection_ptr new_connection_; + }; // class <>boosted_tcp_server +} // namespace +} // namespace #include "abstract_tcp_server2.inl" diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index db3f9e322..31836fe9e 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -1,3 +1,9 @@ +/** +@file +@author from CrypoNote (see copyright below; Andrey N. Sabelnikov) +@monero rfree +@brief the connection templated-class for one peer connection +*/ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. // @@ -26,7 +32,7 @@ -#include "net_utils_base.h" +//#include "net_utils_base.h" #include <boost/lambda/bind.hpp> #include <boost/foreach.hpp> #include <boost/lambda/lambda.hpp> @@ -34,9 +40,21 @@ #include <boost/chrono.hpp> #include <boost/utility/value_init.hpp> #include <boost/asio/deadline_timer.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> // TODO +#include <boost/thread/thread.hpp> // TODO #include "misc_language.h" #include "pragma_comp_defs.h" +#include <sstream> +#include <iomanip> +#include <algorithm> + +#include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal() + +#include "../../../../contrib/otshell_utils/utils.hpp" +#include "../../../../src/p2p/data_logger.hpp" +using namespace nOT::nUtils; // TODO + PRAGMA_WARNING_PUSH namespace epee { @@ -48,17 +66,21 @@ namespace net_utils PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> - connection<t_protocol_handler>::connection(boost::asio::io_service& io_service, - typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter* &pfilter) - : strand_(io_service), - socket_(io_service), - m_want_close_connection(0), - m_was_shutdown(0), - m_ref_sockets_count(sock_count), - m_pfilter(pfilter), - m_protocol_handler(this, config, context) + connection<t_protocol_handler>::connection( boost::asio::io_service& io_service, + typename t_protocol_handler::config_type& config, + std::atomic<long> &ref_sock_count, // the ++/-- counter + std::atomic<long> &sock_number, // the only increasing ++ number generator + i_connection_filter* &pfilter + ) + : + connection_basic(io_service, ref_sock_count, sock_number), + m_protocol_handler(this, config, context), + m_pfilter( pfilter ), + m_connection_type(NET), + m_throttle_speed_in("speed_in", "throttle_speed_in"), + m_throttle_speed_out("speed_out", "throttle_speed_out") { - boost::interprocess::ipcdetail::atomic_inc32(&m_ref_sockets_count); + _info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type); } PRAGMA_WARNING_DISABLE_VS(4355) //--------------------------------------------------------------------------------- @@ -67,12 +89,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) { if(!m_was_shutdown) { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown."); + _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown."); shutdown(); } - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed"); - boost::interprocess::ipcdetail::atomic_dec32(&m_ref_sockets_count); + _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed"); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> @@ -118,13 +139,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) long ip_ = boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong()); context.set_details(boost::uuids::random_generator()(), ip_, remote_ep.port(), is_income); - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) << + _dbg3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) << " to " << local_ep.address().to_string() << ':' << local_ep.port() << - ", total sockets objects " << m_ref_sockets_count); + ", total sockets objects " << m_ref_sock_count); if(m_pfilter && !m_pfilter->is_remote_ip_allowed(context.m_remote_ip)) { - LOG_PRINT_L2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection"); + _dbg2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection"); close(); return false; } @@ -136,7 +157,17 @@ PRAGMA_WARNING_DISABLE_VS(4355) boost::bind(&connection<t_protocol_handler>::handle_read, self, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); - + + //set ToS flag + int tos = get_tos_flag(); + boost::asio::detail::socket_option::integer< IPPROTO_IP, IP_TOS > + optionTos( tos ); + socket_.set_option( optionTos ); + //_dbg1("Set ToS flag to " << tos); + + boost::asio::ip::tcp::no_delay noDelayOption(false); + socket_.set_option(noDelayOption); + return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false); @@ -146,7 +177,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) bool connection<t_protocol_handler>::request_callback() { TRY_ENTRY(); - LOG_PRINT_L2("[" << print_connection_context_short(context) << "] request_callback"); + _dbg2("[" << print_connection_context_short(context) << "] request_callback"); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); if(!self) @@ -167,8 +198,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) bool connection<t_protocol_handler>::add_ref() { TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] add_ref"); + //_dbg3("[sock " << socket_.native_handle() << "] add_ref, m_peer_number=" << mI->m_peer_number); CRITICAL_REGION_LOCAL(m_self_refs_lock); + //_dbg3("[sock " << socket_.native_handle() << "] add_ref 2, m_peer_number=" << mI->m_peer_number); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); @@ -201,7 +233,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) void connection<t_protocol_handler>::call_back_starter() { TRY_ENTRY(); - LOG_PRINT_L2("[" << print_connection_context_short(context) << "] fired_callback"); + _dbg2("[" << print_connection_context_short(context) << "] fired_callback"); m_protocol_handler.handle_qued_callback(); CATCH_ENTRY_L0("connection<t_protocol_handler>::call_back_starter()", void()); } @@ -211,17 +243,44 @@ PRAGMA_WARNING_DISABLE_VS(4355) std::size_t bytes_transferred) { TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async read calledback."); + //_info("[sock " << socket_.native_handle() << "] Async read calledback."); if (!e) { - LOG_PRINT("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred, LOG_LEVEL_4); + { + CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex); + m_throttle_speed_in.handle_trafic_exact(bytes_transferred); + context.m_current_speed_down = m_throttle_speed_in.get_current_speed(); + } + + { + CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in ); + epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024); + } + double delay=0; // will be calculated + do + { + { //_scope_dbg1("CRITICAL_REGION_LOCAL"); + CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in ); + delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global + } + + delay *= 0.5; + if (delay > 0) { + long int ms = (long int)(delay * 100); + epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms); + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + } + } while(delay > 0); + + //_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred); + logger_handle_net_read(bytes_transferred); context.m_last_recv = time(NULL); context.m_recv_cnt += bytes_transferred; bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred); if(!recv_res) { - LOG_PRINT("[sock " << socket_.native_handle() << "] protocol_want_close", LOG_LEVEL_4); + //_info("[sock " << socket_.native_handle() << "] protocol_want_close"); //some error in protocol, protocol handler ask to close connection boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); @@ -239,14 +298,14 @@ PRAGMA_WARNING_DISABLE_VS(4355) boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "]Async read requested."); + //_info("[sock " << socket_.native_handle() << "]Async read requested."); } }else { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value()); + _dbg3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value()); if(e.value() != 2) { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); + _dbg3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); shutdown(); } } @@ -283,8 +342,91 @@ PRAGMA_WARNING_DISABLE_VS(4355) CATCH_ENTRY_L0("connection<t_protocol_handler>::call_run_once_service_io", false); } //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) { + TRY_ENTRY(); + + // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted + auto self = safe_shared_from_this(); + if (!self) return false; + if (m_was_shutdown) return false; + // TODO avoid copy + + const double factor = 32; // TODO config + typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic + const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) ); + const t_safe chunksize_max = chunksize_good * 2 ; + const bool allow_split = (m_connection_type == RPC) ? false : true; // TODO config + + ASRT(! (chunksize_max<0) ); // make sure it is unsigned before removin sign with cast: + long long unsigned int chunksize_max_unsigned = static_cast<long long unsigned int>( chunksize_max ) ; + + if (allow_split && (cb > chunksize_max_unsigned)) { + { // LOCK: chunking + epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical *** + + _dbg3_c("net/out/size", "do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr); + t_safe all = cb; // all bytes to send + t_safe pos = 0; // current sending position + // 01234567890 + // ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4 + // ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8 + // ^^^ (pos=8, len=4) ; + + // const size_t bufsize = chunksize_good; // TODO safecast + // char* buf = new char[ bufsize ]; + + bool all_ok = true; + while (pos < all) { + t_safe lenall = all-pos; // length from here to end + t_safe len = std::min( chunksize_good , lenall); // take a smaller part + ASRT(len<=chunksize_good); + // pos=8; len=4; all=10; len=3; + + ASRT(! (len<0) ); // check before we cast away sign: + unsigned long long int len_unsigned = static_cast<long long int>( len ); + ASRT(len>0); // (redundand) + ASRT(len_unsigned < std::numeric_limits<size_t>::max()); // yeap we want strong < then max size, to be sure + + void *chunk_start = ((char*)ptr) + pos; + _fact_c("net/out/size","chunk_start="<<chunk_start<<" ptr="<<ptr<<" pos="<<pos); + ASRT(chunk_start >= ptr); // not wrapped around address? + //std::memcpy( (void*)buf, chunk_start, len); + + _dbg3_c("net/out/size", "part of " << lenall << ": pos="<<pos << " len="<<len); + + bool ok = do_send_chunk(chunk_start, len); // <====== *** + + all_ok = all_ok && ok; + if (!all_ok) { + _dbg1_c("net/out/size", "do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr); + _dbg1("do_send() SEND was aborted in middle of big package - this is mostly harmless " + << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << cb); + return false; // partial failure in sending + } + pos = pos+len; ASRT(pos >0); + + // (in catch block, or uniq pointer) delete buf; + } // each chunk + + _dbg3_c("net/out/size", "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr); + _dbg3 ( "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr); + + _info_c("net/sleepRPC", "do_send() m_connection_type = " << m_connection_type); + + return all_ok; // done - e.g. queued - all the chunks of current do_send call + } // LOCK: chunking + } // a big block (to be chunked) - all chunks + else { // small block + return do_send_chunk(ptr,cb); // just send as 1 big chunk + } + + CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); + } // do_send() + + //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) + bool connection<t_protocol_handler>::do_send_chunk(const void* ptr, size_t cb) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -293,50 +435,86 @@ PRAGMA_WARNING_DISABLE_VS(4355) return false; if(m_was_shutdown) return false; + { + CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex); + m_throttle_speed_out.handle_trafic_exact(cb); + context.m_current_speed_up = m_throttle_speed_out.get_current_speed(); + } - LOG_PRINT("[sock " << socket_.native_handle() << "] SEND " << cb, LOG_LEVEL_4); + //_info("[sock " << socket_.native_handle() << "] SEND " << cb); context.m_last_send = time(NULL); context.m_send_cnt += cb; //some data should be wrote to stream //request complete - epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); - if(m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) + sleep_before_packet(cb, 1, 1); + epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical *** + long int retry=0; + const long int retry_limit = 5*4; + while (m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) { - send_guard.unlock(); - LOG_PRINT_L2("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); - close(); - return false; + retry++; + + /* if ( ::cryptonote::core::get_is_stopping() ) { // TODO re-add fast stop + _fact("ABORT queue wait due to stopping"); + return false; // aborted + }*/ + + long int ms = 250 + (rand()%50); + _info_c("net/sleep", "Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<cb); // XXX debug sleep + boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); + _dbg1("sleep for queue: " << ms); + + if (retry > retry_limit) { + send_guard.unlock(); + _erro("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); + // _dbg1_c("net/sleep", "send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); + close(); + return false; + } } m_send_que.resize(m_send_que.size()+1); m_send_que.back().assign((const char*)ptr, cb); if(m_send_que.size() > 1) - { - //active operation should be in progress, nothing to do, just wait last operation callback - }else - { - //no active operation - if(m_send_que.size()!=1) - { - LOG_ERROR("Looks like no active operations, but send que size != 1!!"); - return false; - } - - boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), - //strand_.wrap( - boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2) - //) - ); + { // active operation should be in progress, nothing to do, just wait last operation callback + auto size_now = cb; + _info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size()); + //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size()); } + else + { // no active operation + + if(m_send_que.size()!=1) + { + _erro("Looks like no active operations, but send que size != 1!!"); + return false; + } + + auto size_now = m_send_que.front().size(); + _dbg1_c("net/out/size", "do_send() NOW SENSD: packet="<<size_now<<" B"); + do_send_handler_write( ptr , size_now ); // (((H))) + + ASRT( size_now == m_send_que.front().size() ); + boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) , + //strand_.wrap( + boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2) + //) + ); + //_dbg3("(chunk): " << size_now); + //logger_handle_net_write(size_now); + //_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size()); + } + + //do_send_handler_stop( ptr , cb ); // empty function return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); - } + } // do_send_chunk //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::shutdown() @@ -353,7 +531,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) bool connection<t_protocol_handler>::close() { TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Que Shutdown called."); + //_info("[sock " << socket_.native_handle() << "] Que Shutdown called."); size_t send_que_size = 0; CRITICAL_REGION_BEGIN(m_send_que_lock); send_que_size = m_send_que.size(); @@ -376,16 +554,17 @@ PRAGMA_WARNING_DISABLE_VS(4355) if (e) { - LOG_PRINT_L1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value()); + _dbg1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value()); shutdown(); return; } - + logger_handle_net_write(cb); + sleep_before_packet(cb, 1, 1); bool do_shutdown = false; CRITICAL_REGION_BEGIN(m_send_que_lock); if(m_send_que.empty()) { - LOG_ERROR("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!"); + _erro("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!"); return; } @@ -399,10 +578,16 @@ PRAGMA_WARNING_DISABLE_VS(4355) }else { //have more data to send - boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), - //strand_.wrap( - boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2)); - //); + auto size_now = m_send_que.front().size(); + _dbg1_c("net/out/size", "handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size()); + do_send_handler_write_from_queue(e, m_send_que.front().size() , m_send_que.size()); // (((H))) + ASRT( size_now == m_send_que.front().size() ); + boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now) , + // strand_.wrap( + boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2) + // ) + ); + //_dbg3("(normal)" << size_now); } CRITICAL_REGION_END(); @@ -412,6 +597,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) } CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_write", void()); } + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + void connection<t_protocol_handler>::setRPcStation() + { + m_connection_type = RPC; + _fact_c("net/sleepRPC", "set m_connection_type = RPC "); + } /************************************************************************/ /* */ /************************************************************************/ @@ -420,19 +612,27 @@ PRAGMA_WARNING_DISABLE_VS(4355) m_io_service_local_instance(new boost::asio::io_service()), io_service_(*m_io_service_local_instance.get()), acceptor_(io_service_), - new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)), - m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) + m_stop_signal_sent(false), m_port(0), + m_sock_count(0), m_sock_number(0), m_threads_count(0), + m_pfilter(NULL), m_thread_index(0), + new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)) { + create_server_type_map(); m_thread_name_prefix = "NET"; + type = NET; } template<class t_protocol_handler> - boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service): + boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_server_role s_type): io_service_(extarnal_io_service), acceptor_(io_service_), - new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)), - m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) + m_stop_signal_sent(false), m_port(0), + m_sock_count(0), m_sock_number(0), m_threads_count(0), + m_pfilter(NULL), m_thread_index(0), + type(NET), + new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)) { + create_server_type_map(); m_thread_name_prefix = "NET"; } //--------------------------------------------------------------------------------- @@ -444,6 +644,14 @@ PRAGMA_WARNING_DISABLE_VS(4355) } //--------------------------------------------------------------------------------- template<class t_protocol_handler> + void boosted_tcp_server<t_protocol_handler>::create_server_type_map() + { + server_type_map["NET"] = t_server_role::NET; + server_type_map["RPC"] = t_server_role::RPC; + server_type_map["P2P"] = t_server_role::P2P; + } + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::init_server(uint32_t port, const std::string address) { TRY_ENTRY(); @@ -491,6 +699,7 @@ POP_WARNINGS std::string thread_name = std::string("[") + m_thread_name_prefix; thread_name += boost::to_string(local_thr_index) + "]"; log_space::log_singletone::set_thread_log_prefix(thread_name); + // _fact("Thread name: " << m_thread_name_prefix); while(!m_stop_signal_sent) { try @@ -499,14 +708,14 @@ POP_WARNINGS } catch(const std::exception& ex) { - LOG_ERROR("Exception at server worker thread, what=" << ex.what()); + _erro("Exception at server worker thread, what=" << ex.what()); } catch(...) { - LOG_ERROR("Exception at server worker thread, unknown execption"); + _erro("Exception at server worker thread, unknown execption"); } } - LOG_PRINT_L4("Worker thread finished"); + //_info("Worker thread finished"); return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::worker_thread", false); } @@ -515,6 +724,9 @@ POP_WARNINGS void boosted_tcp_server<t_protocol_handler>::set_threads_prefix(const std::string& prefix_name) { m_thread_name_prefix = prefix_name; + type = server_type_map[m_thread_name_prefix]; + _note("Set server type to: " << type); + _note("Set server type to: " << m_thread_name_prefix); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> @@ -539,32 +751,38 @@ POP_WARNINGS { boost::shared_ptr<boost::thread> thread(new boost::thread( attrs, boost::bind(&boosted_tcp_server<t_protocol_handler>::worker_thread, this))); + _note("Run server thread name: " << m_thread_name_prefix); m_threads.push_back(thread); } CRITICAL_REGION_END(); // Wait for all threads in the pool to exit. - if(wait) + if (wait) // && ! ::cryptonote::core::get_is_stopping()) // TODO fast_exit { - for (std::size_t i = 0; i < m_threads.size(); ++i) - m_threads[i]->join(); + _fact("JOINING all threads"); + for (std::size_t i = 0; i < m_threads.size(); ++i) { + m_threads[i]->join(); + } + _fact("JOINING all threads - almost"); m_threads.clear(); + _fact("JOINING all threads - DONE"); - }else - { + } + else { + _dbg1("Reiniting OK."); return true; } if(wait && !m_stop_signal_sent) { //some problems with the listening socket ?.. - LOG_PRINT_L0("Net service stopped without stop request, restarting..."); + _dbg1("Net service stopped without stop request, restarting..."); if(!this->init_server(m_port, m_address)) { - LOG_PRINT_L0("Reiniting service failed, exit."); + _dbg1("Reiniting service failed, exit."); return false; }else { - LOG_PRINT_L0("Reiniting OK."); + _dbg1("Reiniting OK."); } } } @@ -597,7 +815,7 @@ POP_WARNINGS { if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms)) { - LOG_PRINT_L0("Interrupting thread " << m_threads[i]->native_handle()); + _dbg1("Interrupting thread " << m_threads[i]->native_handle()); m_threads[i]->interrupt(); } } @@ -608,6 +826,10 @@ POP_WARNINGS template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::send_stop_signal() { + if (::cryptonote::core::get_fast_exit() == true) + { + detach_threads(); + } m_stop_signal_sent = true; TRY_ENTRY(); io_service_.stop(); @@ -626,19 +848,22 @@ POP_WARNINGS TRY_ENTRY(); if (!e) { - connection_ptr conn(std::move(new_connection_)); - - new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)); + if (type == RPC) { + new_connection_->setRPcStation(); + _note("New server for RPC connections"); + } + connection_ptr conn(std::move(new_connection_)); + new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this, boost::asio::placeholders::error)); bool r = conn->start(true, 1 < m_threads_count); if (!r) - LOG_ERROR("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count); + _erro("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count); }else { - LOG_ERROR("Some problems at accept: " << e.message() << ", connections_count = " << m_sockets_count); + _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_sock_count); } CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::handle_accept", void()); } @@ -648,7 +873,7 @@ POP_WARNINGS { TRY_ENTRY(); - connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) ); + connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) ); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); ////////////////////////////////////////////////////////////////////////// @@ -658,7 +883,7 @@ POP_WARNINGS boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { - LOG_ERROR("Failed to resolve " << adr); + _erro("Failed to resolve " << adr); return false; } ////////////////////////////////////////////////////////////////////////// @@ -704,7 +929,7 @@ POP_WARNINGS { //timeout sock_.close(); - LOG_PRINT_L3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); + _dbg3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); return false; } } @@ -712,21 +937,21 @@ POP_WARNINGS if (ec || !sock_.is_open()) { - LOG_PRINT("Some problems at connect, message: " << ec.message(), LOG_LEVEL_3); + _dbg3("Some problems at connect, message: " << ec.message()); return false; } - LOG_PRINT_L3("Connected success to " << adr << ':' << port); + _dbg3("Connected success to " << adr << ':' << port); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) { new_connection_l->get_context(conn_context); - //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)); + //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_pfilter)); } else { - LOG_ERROR("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count); + _erro("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count); } return r; @@ -738,7 +963,7 @@ POP_WARNINGS bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback cb, const std::string& bind_ip) { TRY_ENTRY(); - connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) ); + connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) ); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); ////////////////////////////////////////////////////////////////////////// @@ -748,7 +973,7 @@ POP_WARNINGS boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { - LOG_ERROR("Failed to resolve " << adr); + _erro("Failed to resolve " << adr); return false; } ////////////////////////////////////////////////////////////////////////// @@ -768,7 +993,7 @@ POP_WARNINGS { if(error != boost::asio::error::operation_aborted) { - LOG_PRINT_L3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); + _dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); new_connection_l->socket().close(); } }); @@ -785,7 +1010,7 @@ POP_WARNINGS cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation }else { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << + _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port()); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) @@ -795,13 +1020,13 @@ POP_WARNINGS } else { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port); + _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port); cb(conn_context, boost::asio::error::fault); } } }else { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << + _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value()); cb(conn_context, ec_); } @@ -809,6 +1034,15 @@ POP_WARNINGS return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false); } -} -} + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + void boosted_tcp_server<t_protocol_handler>::detach_threads() + { + for (auto thread : m_threads) + thread->detach(); + } + + +} // namespace +} // namespace PRAGMA_WARNING_POP diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index e79768b04..92f75161a 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -81,6 +81,7 @@ public: async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE) {} + void del_out_connections(size_t count); }; @@ -669,6 +670,34 @@ void async_protocol_handler_config<t_connection_context>::del_connection(async_p } //------------------------------------------------------------------------------------------ template<class t_connection_context> +void async_protocol_handler_config<t_connection_context>::del_out_connections(size_t count) +{ + std::vector <boost::uuids::uuid> out_connections; + CRITICAL_REGION_BEGIN(m_connects_lock); + for (auto& c: m_connects) + { + if (!c.second->m_connection_context.m_is_income) + out_connections.push_back(c.first); + } + + if (out_connections.size() == 0) + return; + + // close random out connections + unsigned seed = std::chrono::system_clock::now().time_since_epoch().count(); + shuffle(out_connections.begin(), out_connections.end(), std::default_random_engine(seed)); + while (count > 0 && out_connections.size() > 0) + { + close(*out_connections.begin()); + del_connection(m_connects.at(*out_connections.begin())); + out_connections.erase(out_connections.begin()); + --count; + } + + CRITICAL_REGION_END(); +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> void async_protocol_handler_config<t_connection_context>::add_connection(async_protocol_handler<t_connection_context>* pconn) { CRITICAL_REGION_BEGIN(m_connects_lock); diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h index 90e352787..f963e7746 100644 --- a/contrib/epee/include/net/net_utils_base.h +++ b/contrib/epee/include/net/net_utils_base.h @@ -55,6 +55,8 @@ namespace net_utils time_t m_last_send; uint64_t m_recv_cnt; uint64_t m_send_cnt; + double m_current_speed_down; + double m_current_speed_up; connection_context_base(boost::uuids::uuid connection_id, long remote_ip, int remote_port, bool is_income, @@ -68,7 +70,9 @@ namespace net_utils m_last_recv(last_recv), m_last_send(last_send), m_recv_cnt(recv_cnt), - m_send_cnt(send_cnt) + m_send_cnt(send_cnt), + m_current_speed_down(0), + m_current_speed_up(0) {} connection_context_base(): m_connection_id(), @@ -79,7 +83,9 @@ namespace net_utils m_last_recv(0), m_last_send(0), m_recv_cnt(0), - m_send_cnt(0) + m_send_cnt(0), + m_current_speed_down(0), + m_current_speed_up(0) {} connection_context_base& operator=(const connection_context_base& a) diff --git a/contrib/epee/include/storages/levin_abstract_invoke2.h b/contrib/epee/include/storages/levin_abstract_invoke2.h index 1b32c51d1..73ede1b12 100644 --- a/contrib/epee/include/storages/levin_abstract_invoke2.h +++ b/contrib/epee/include/storages/levin_abstract_invoke2.h @@ -185,7 +185,7 @@ namespace epee } return res; - }; + } template<class t_owner, class t_in_type, class t_context, class callback_t> int buff_to_t_adapter(t_owner* powner, int command, const std::string& in_buff, callback_t cb, t_context& context) @@ -199,7 +199,7 @@ namespace epee boost::value_initialized<t_in_type> in_struct; static_cast<t_in_type&>(in_struct).load(strg); return cb(command, in_struct, context); - }; + } #define CHAIN_LEVIN_INVOKE_MAP2(context_type) \ int invoke(int command, const std::string& in_buff, std::string& buff_out, context_type& context) \ diff --git a/contrib/epee/include/syncobj.h b/contrib/epee/include/syncobj.h index b7273da8e..b81eb43a9 100644 --- a/contrib/epee/include/syncobj.h +++ b/contrib/epee/include/syncobj.h @@ -35,10 +35,14 @@ #include <boost/thread/locks.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/recursive_mutex.hpp> +#include <thread> +#include <chrono> namespace epee { + extern unsigned int g_test_dbg_lock_sleep; + struct simple_event { simple_event() : m_rised(false) @@ -215,10 +219,10 @@ namespace epee #define SHARED_CRITICAL_REGION_BEGIN(x) { shared_guard critical_region_var(x) #define EXCLUSIVE_CRITICAL_REGION_BEGIN(x) { exclusive_guard critical_region_var(x) -#define CRITICAL_REGION_LOCAL(x) epee::critical_region_t<decltype(x)> critical_region_var(x) -#define CRITICAL_REGION_BEGIN(x) { epee::critical_region_t<decltype(x)> critical_region_var(x) -#define CRITICAL_REGION_LOCAL1(x) epee::critical_region_t<decltype(x)> critical_region_var1(x) -#define CRITICAL_REGION_BEGIN1(x) { epee::critical_region_t<decltype(x)> critical_region_var1(x) +#define CRITICAL_REGION_LOCAL(x) {std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep));} epee::critical_region_t<decltype(x)> critical_region_var(x) +#define CRITICAL_REGION_BEGIN(x) { std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep)); epee::critical_region_t<decltype(x)> critical_region_var(x) +#define CRITICAL_REGION_LOCAL1(x) {std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep));} epee::critical_region_t<decltype(x)> critical_region_var1(x) +#define CRITICAL_REGION_BEGIN1(x) { std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep)); epee::critical_region_t<decltype(x)> critical_region_var1(x) #define CRITICAL_REGION_END() } |