diff options
Diffstat (limited to 'contrib/epee/include/net')
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.h | 77 | ||||
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.inl | 377 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_protocol_handler_async.h | 9 |
3 files changed, 336 insertions, 127 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index 6c613c5d5..1e6223212 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -1,3 +1,9 @@ +/** +@file +@author from CrypoNote (see copyright below; Andrey N. Sabelnikov) +@monero rfree +@brief the connection templated-class for one peer connection +*/ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. // @@ -26,7 +32,7 @@ -#ifndef _ABSTRACT_TCP_SERVER2_H_ +#ifndef _ABSTRACT_TCP_SERVER2_H_ #define _ABSTRACT_TCP_SERVER2_H_ @@ -36,6 +42,8 @@ #include <boost/noncopyable.hpp> #include <boost/shared_ptr.hpp> #include <atomic> +#include <map> +#include <memory> #include <boost/asio.hpp> #include <boost/array.hpp> @@ -46,7 +54,8 @@ #include <boost/thread/thread.hpp> #include "net_utils_base.h" #include "syncobj.h" - +#include "../../../../src/p2p/connection_basic.hpp" +#include "../../../../contrib/otshell_utils/utils.hpp" #define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT 1000 @@ -61,6 +70,12 @@ namespace net_utils protected: virtual ~i_connection_filter(){} }; + + enum t_server_role { // type of the server, e.g. so that we will know how to limit it + NET = 0, // default (not used? used for misc connections maybe?) TODO + RPC = 1, // the rpc commands + P2P = 2 // to other p2p node + }; /************************************************************************/ /* */ @@ -70,13 +85,18 @@ namespace net_utils class connection : public boost::enable_shared_from_this<connection<t_protocol_handler> >, private boost::noncopyable, - public i_service_endpoint + public i_service_endpoint, + public connection_basic { public: typedef typename t_protocol_handler::connection_context t_connection_context; /// Construct a connection with the given io_service. - explicit connection(boost::asio::io_service& io_service, - typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter * &pfilter); + + explicit connection( boost::asio::io_service& io_service, + typename t_protocol_handler::config_type& config, + std::atomic<long> &ref_sock_count, // the ++/-- counter + std::atomic<long> &sock_number, // the only increasing ++ number generator + i_connection_filter * &pfilter); virtual ~connection(); /// Get the socket associated with the connection. @@ -90,7 +110,8 @@ namespace net_utils void call_back_starter(); private: //----------------- i_service_endpoint --------------------- - virtual bool do_send(const void* ptr, size_t cb); + virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint) + virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data virtual bool close(); virtual bool call_run_once_service_io(); virtual bool request_callback(); @@ -107,29 +128,24 @@ namespace net_utils /// Handle completion of a write operation. void handle_write(const boost::system::error_code& e, size_t cb); - /// Strand to ensure the connection's handlers are not called concurrently. - boost::asio::io_service::strand strand_; - - /// Socket for the connection. - boost::asio::ip::tcp::socket socket_; - /// Buffer for incoming data. boost::array<char, 8192> buffer_; t_connection_context context; - volatile uint32_t m_want_close_connection; - std::atomic<bool> m_was_shutdown; - critical_section m_send_que_lock; - std::list<std::string> m_send_que; - volatile uint32_t& m_ref_sockets_count; i_connection_filter* &m_pfilter; - volatile bool m_is_multithreaded; + // TODO what do they mean about wait on destructor?? --rfree : //this should be the last one, because it could be wait on destructor, while other activities possible on other threads t_protocol_handler m_protocol_handler; //typename t_protocol_handler::config_type m_dummy_config; std::list<boost::shared_ptr<connection<t_protocol_handler> > > m_self_refs; // add_ref/release support critical_section m_self_refs_lock; + critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk() + + t_server_role m_connection_type; + + public: + void setRPcStation(); }; @@ -146,9 +162,11 @@ namespace net_utils /// Construct the server to listen on the specified TCP address and port, and /// serve up files from the given directory. boosted_tcp_server(); - explicit boosted_tcp_server(boost::asio::io_service& external_io_service); + explicit boosted_tcp_server(boost::asio::io_service& external_io_service, t_server_role s_type); ~boosted_tcp_server(); - + + std::map<std::string, t_server_role> server_type_map; + void create_server_type_map(); bool init_server(uint32_t port, const std::string address = "0.0.0.0"); bool init_server(const std::string port, const std::string& address = "0.0.0.0"); @@ -254,22 +272,25 @@ namespace net_utils /// Acceptor used to listen for incoming connections. boost::asio::ip::tcp::acceptor acceptor_; - /// The next connection to be accepted. - connection_ptr new_connection_; std::atomic<bool> m_stop_signal_sent; uint32_t m_port; - volatile uint32_t m_sockets_count; + std::atomic<long> m_sock_count; + std::atomic<long> m_sock_number; std::string m_address; - std::string m_thread_name_prefix; + std::string m_thread_name_prefix; //TODO: change to enum server_type, now used size_t m_threads_count; i_connection_filter* m_pfilter; std::vector<boost::shared_ptr<boost::thread> > m_threads; boost::thread::id m_main_thread_id; critical_section m_threads_lock; - volatile uint32_t m_thread_index; - }; -} -} + volatile uint32_t m_thread_index; // TODO change to std::atomic + t_server_role type; + + /// The next connection to be accepted + connection_ptr new_connection_; + }; // class <>boosted_tcp_server +} // namespace +} // namespace #include "abstract_tcp_server2.inl" diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index db3f9e322..8dff192b1 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -1,3 +1,9 @@ +/** +@file +@author from CrypoNote (see copyright below; Andrey N. Sabelnikov) +@monero rfree +@brief the connection templated-class for one peer connection +*/ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. // @@ -26,7 +32,7 @@ -#include "net_utils_base.h" +//#include "net_utils_base.h" #include <boost/lambda/bind.hpp> #include <boost/foreach.hpp> #include <boost/lambda/lambda.hpp> @@ -34,9 +40,20 @@ #include <boost/chrono.hpp> #include <boost/utility/value_init.hpp> #include <boost/asio/deadline_timer.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> // TODO +#include <boost/thread/thread.hpp> // TODO #include "misc_language.h" #include "pragma_comp_defs.h" +#include <sstream> +#include <iomanip> +#include <algorithm> + +#include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal() + +#include "../../../../contrib/otshell_utils/utils.hpp" +using namespace nOT::nUtils; // TODO + PRAGMA_WARNING_PUSH namespace epee { @@ -48,17 +65,19 @@ namespace net_utils PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> - connection<t_protocol_handler>::connection(boost::asio::io_service& io_service, - typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter* &pfilter) - : strand_(io_service), - socket_(io_service), - m_want_close_connection(0), - m_was_shutdown(0), - m_ref_sockets_count(sock_count), - m_pfilter(pfilter), - m_protocol_handler(this, config, context) - { - boost::interprocess::ipcdetail::atomic_inc32(&m_ref_sockets_count); + connection<t_protocol_handler>::connection( boost::asio::io_service& io_service, + typename t_protocol_handler::config_type& config, + std::atomic<long> &ref_sock_count, // the ++/-- counter + std::atomic<long> &sock_number, // the only increasing ++ number generator + i_connection_filter* &pfilter + ) + : + connection_basic(io_service, ref_sock_count, sock_number), + m_protocol_handler(this, config, context), + m_pfilter( pfilter ), + m_connection_type(NET) + { + _info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type); } PRAGMA_WARNING_DISABLE_VS(4355) //--------------------------------------------------------------------------------- @@ -67,12 +86,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) { if(!m_was_shutdown) { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown."); + _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown."); shutdown(); } - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed"); - boost::interprocess::ipcdetail::atomic_dec32(&m_ref_sockets_count); + _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed"); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> @@ -118,13 +136,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) long ip_ = boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong()); context.set_details(boost::uuids::random_generator()(), ip_, remote_ep.port(), is_income); - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) << + _dbg3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) << " to " << local_ep.address().to_string() << ':' << local_ep.port() << - ", total sockets objects " << m_ref_sockets_count); + ", total sockets objects " << m_ref_sock_count); if(m_pfilter && !m_pfilter->is_remote_ip_allowed(context.m_remote_ip)) { - LOG_PRINT_L2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection"); + _dbg2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection"); close(); return false; } @@ -136,7 +154,14 @@ PRAGMA_WARNING_DISABLE_VS(4355) boost::bind(&connection<t_protocol_handler>::handle_read, self, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); - + + //set ToS flag + int tos = get_tos_flag(); + boost::asio::detail::socket_option::integer< IPPROTO_IP, IP_TOS > + optionTos( tos ); + socket_.set_option( optionTos ); + //_dbg1("Set ToS flag to " << tos); + return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false); @@ -146,7 +171,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) bool connection<t_protocol_handler>::request_callback() { TRY_ENTRY(); - LOG_PRINT_L2("[" << print_connection_context_short(context) << "] request_callback"); + _dbg2("[" << print_connection_context_short(context) << "] request_callback"); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); if(!self) @@ -167,7 +192,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) bool connection<t_protocol_handler>::add_ref() { TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] add_ref"); + //_info("[sock " << socket_.native_handle() << "] add_ref"); CRITICAL_REGION_LOCAL(m_self_refs_lock); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -201,7 +226,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) void connection<t_protocol_handler>::call_back_starter() { TRY_ENTRY(); - LOG_PRINT_L2("[" << print_connection_context_short(context) << "] fired_callback"); + _dbg2("[" << print_connection_context_short(context) << "] fired_callback"); m_protocol_handler.handle_qued_callback(); CATCH_ENTRY_L0("connection<t_protocol_handler>::call_back_starter()", void()); } @@ -211,17 +236,18 @@ PRAGMA_WARNING_DISABLE_VS(4355) std::size_t bytes_transferred) { TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async read calledback."); + //_info("[sock " << socket_.native_handle() << "] Async read calledback."); if (!e) { - LOG_PRINT("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred, LOG_LEVEL_4); + //_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred); + logger_handle_net_read(bytes_transferred); context.m_last_recv = time(NULL); context.m_recv_cnt += bytes_transferred; bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred); if(!recv_res) { - LOG_PRINT("[sock " << socket_.native_handle() << "] protocol_want_close", LOG_LEVEL_4); + //_info("[sock " << socket_.native_handle() << "] protocol_want_close"); //some error in protocol, protocol handler ask to close connection boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); @@ -239,14 +265,14 @@ PRAGMA_WARNING_DISABLE_VS(4355) boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "]Async read requested."); + //_info("[sock " << socket_.native_handle() << "]Async read requested."); } }else { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value()); + _dbg3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value()); if(e.value() != 2) { - LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); + _dbg3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value()); shutdown(); } } @@ -283,8 +309,87 @@ PRAGMA_WARNING_DISABLE_VS(4355) CATCH_ENTRY_L0("connection<t_protocol_handler>::call_run_once_service_io", false); } //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) { + TRY_ENTRY(); + + // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted + auto self = safe_shared_from_this(); + if (!self) return false; + if (m_was_shutdown) return false; + // TODO avoid copy + + const double factor = 32; // TODO config + typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic + const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) ); + const t_safe chunksize_max = chunksize_good * 2 ; + const bool allow_split = (m_connection_type == RPC) ? false : true; // TODO config + + if (allow_split && (cb > chunksize_max)) { + { // LOCK: chunking + epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical *** + + _mark_c("net/out/size", "do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr); + _mark("do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr); + t_safe all = cb; // all bytes to send + t_safe pos = 0; // current sending position + // 01234567890 + // ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4 + // ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8 + // ^^^ (pos=8, len=4) ; + + // const size_t bufsize = chunksize_good; // TODO safecast + // char* buf = new char[ bufsize ]; + + bool all_ok = true; + while (pos < all) { + t_safe lenall = all-pos; // length from here to end + t_safe len = std::min( chunksize_good , lenall); // take a smaller part + ASRT(len<=chunksize_good); + // pos=8; len=4; all=10; len=3; + + ASRT(len>0); ASRT(len < std::numeric_limits<size_t>::max()); // yeap we want strong < then max size, to be sure + + void *chunk_start = ((char*)ptr) + pos; + _fact_c("net/out/size","chunk_start="<<chunk_start<<" ptr="<<ptr<<" pos="<<pos); + ASRT(chunk_start >= ptr); // not wrapped around address? + //std::memcpy( (void*)buf, chunk_start, len); + + _dbg1_c("net/out/size", "part of " << lenall << ": pos="<<pos << " len="<<len); + + bool ok = do_send_chunk(chunk_start, len); // <====== *** + + all_ok = all_ok && ok; + if (!all_ok) { + _mark_c("net/out/size", "do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr); + _mark ( "do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr); + _dbg1("do_send() SEND was aborted in middle of big package - this is mostly harmless " + << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << cb); + return false; // partial failure in sending + } + pos = pos+len; ASRT(pos >0); + + // (in catch block, or uniq pointer) delete buf; + } // each chunk + + _mark_c("net/out/size", "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr); + _mark ( "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr); + + _info_c("net/sleepRPC", "do_send() m_connection_type = " << m_connection_type); + + return all_ok; // done - e.g. queued - all the chunks of current do_send call + } // LOCK: chunking + } // a big block (to be chunked) - all chunks + else { // small block + return do_send_chunk(ptr,cb); // just send as 1 big chunk + } + + CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); + } // do_send() + + //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) + bool connection<t_protocol_handler>::do_send_chunk(const void* ptr, size_t cb) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -294,49 +399,80 @@ PRAGMA_WARNING_DISABLE_VS(4355) if(m_was_shutdown) return false; - LOG_PRINT("[sock " << socket_.native_handle() << "] SEND " << cb, LOG_LEVEL_4); + //_info("[sock " << socket_.native_handle() << "] SEND " << cb); context.m_last_send = time(NULL); context.m_send_cnt += cb; //some data should be wrote to stream //request complete - epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); - if(m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) - { - send_guard.unlock(); - LOG_PRINT_L2("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); - close(); - return false; + do_send_handler_start( ptr , cb ); // (((H))) + + epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical *** + long int retry=0; + const long int retry_limit = 5*4; + while (m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT) + { + retry++; + + /* if ( ::cryptonote::core::get_is_stopping() ) { // TODO re-add fast stop + _fact("ABORT queue wait due to stopping"); + return false; // aborted + }*/ + + long int ms = 250 + (rand()%50); + _info_c("net/sleep", "Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<cb); // XXX debug sleep + boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); + _dbg1("sleep for queue: " << ms); + + if (retry > retry_limit) { + send_guard.unlock(); + _erro("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); + // _mark_c("net/sleep", "send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection"); + close(); + return false; + } } m_send_que.resize(m_send_que.size()+1); m_send_que.back().assign((const char*)ptr, cb); if(m_send_que.size() > 1) - { - //active operation should be in progress, nothing to do, just wait last operation callback - }else - { - //no active operation - if(m_send_que.size()!=1) - { - LOG_ERROR("Looks like no active operations, but send que size != 1!!"); - return false; - } + { // active operation should be in progress, nothing to do, just wait last operation callback + auto size_now = cb; + _info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size()); + do_send_handler_delayed( ptr , size_now ); // (((H))) - boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), - //strand_.wrap( - boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2) - //) - ); - - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size()); } + else + { // no active operation + + if(m_send_que.size()!=1) + { + _erro("Looks like no active operations, but send que size != 1!!"); + return false; + } + + auto size_now = m_send_que.front().size(); + _mark_c("net/out/size", "do_send() NOW SENSD: packet="<<size_now<<" B"); + do_send_handler_write( ptr , size_now ); // (((H))) + + ASRT( size_now == m_send_que.front().size() ); + boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) , + //strand_.wrap( + boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2) + //) + ); + //_dbg3("(chunk): " << size_now); + logger_handle_net_write(size_now); + //_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size()); + } + + do_send_handler_stop( ptr , cb ); return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); - } + } // do_send_chunk //--------------------------------------------------------------------------------- template<class t_protocol_handler> bool connection<t_protocol_handler>::shutdown() @@ -353,7 +489,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) bool connection<t_protocol_handler>::close() { TRY_ENTRY(); - LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Que Shutdown called."); + //_info("[sock " << socket_.native_handle() << "] Que Shutdown called."); size_t send_que_size = 0; CRITICAL_REGION_BEGIN(m_send_que_lock); send_que_size = m_send_que.size(); @@ -376,7 +512,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) if (e) { - LOG_PRINT_L1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value()); + _dbg1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value()); shutdown(); return; } @@ -385,7 +521,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) CRITICAL_REGION_BEGIN(m_send_que_lock); if(m_send_que.empty()) { - LOG_ERROR("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!"); + _erro("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!"); return; } @@ -399,10 +535,17 @@ PRAGMA_WARNING_DISABLE_VS(4355) }else { //have more data to send - boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()), - //strand_.wrap( - boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2)); - //); + auto size_now = m_send_que.front().size(); + _mark_c("net/out/size", "handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size()); + do_send_handler_write_from_queue(e, m_send_que.front().size() , m_send_que.size()); // (((H))) + ASRT( size_now == m_send_que.front().size() ); + boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now) , + // strand_.wrap( + boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2) + // ) + ); + //_dbg3("(normal)" << size_now); + logger_handle_net_write(size_now); } CRITICAL_REGION_END(); @@ -412,6 +555,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) } CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_write", void()); } + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + void connection<t_protocol_handler>::setRPcStation() + { + m_connection_type = RPC; + _fact_c("net/sleepRPC", "set m_connection_type = RPC "); + } /************************************************************************/ /* */ /************************************************************************/ @@ -420,19 +570,27 @@ PRAGMA_WARNING_DISABLE_VS(4355) m_io_service_local_instance(new boost::asio::io_service()), io_service_(*m_io_service_local_instance.get()), acceptor_(io_service_), - new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)), - m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) + m_stop_signal_sent(false), m_port(0), + m_sock_count(0), m_sock_number(0), m_threads_count(0), + m_pfilter(NULL), m_thread_index(0), + new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)) { + create_server_type_map(); m_thread_name_prefix = "NET"; + type = NET; } template<class t_protocol_handler> - boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service): + boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_server_role s_type): io_service_(extarnal_io_service), acceptor_(io_service_), - new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)), - m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0) + m_stop_signal_sent(false), m_port(0), + m_sock_count(0), m_sock_number(0), m_threads_count(0), + m_pfilter(NULL), m_thread_index(0), + type(NET), + new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)) { + create_server_type_map(); m_thread_name_prefix = "NET"; } //--------------------------------------------------------------------------------- @@ -444,6 +602,14 @@ PRAGMA_WARNING_DISABLE_VS(4355) } //--------------------------------------------------------------------------------- template<class t_protocol_handler> + void boosted_tcp_server<t_protocol_handler>::create_server_type_map() + { + server_type_map["NET"] = t_server_role::NET; + server_type_map["RPC"] = t_server_role::RPC; + server_type_map["P2P"] = t_server_role::P2P; + } + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::init_server(uint32_t port, const std::string address) { TRY_ENTRY(); @@ -491,6 +657,7 @@ POP_WARNINGS std::string thread_name = std::string("[") + m_thread_name_prefix; thread_name += boost::to_string(local_thr_index) + "]"; log_space::log_singletone::set_thread_log_prefix(thread_name); + // _fact("Thread name: " << m_thread_name_prefix); while(!m_stop_signal_sent) { try @@ -499,14 +666,14 @@ POP_WARNINGS } catch(const std::exception& ex) { - LOG_ERROR("Exception at server worker thread, what=" << ex.what()); + _erro("Exception at server worker thread, what=" << ex.what()); } catch(...) { - LOG_ERROR("Exception at server worker thread, unknown execption"); + _erro("Exception at server worker thread, unknown execption"); } } - LOG_PRINT_L4("Worker thread finished"); + //_info("Worker thread finished"); return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::worker_thread", false); } @@ -515,6 +682,9 @@ POP_WARNINGS void boosted_tcp_server<t_protocol_handler>::set_threads_prefix(const std::string& prefix_name) { m_thread_name_prefix = prefix_name; + type = server_type_map[m_thread_name_prefix]; + _note("Set server type to: " << type); + _note("Set server type to: " << m_thread_name_prefix); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> @@ -539,32 +709,38 @@ POP_WARNINGS { boost::shared_ptr<boost::thread> thread(new boost::thread( attrs, boost::bind(&boosted_tcp_server<t_protocol_handler>::worker_thread, this))); + _note("Run server thread name: " << m_thread_name_prefix); m_threads.push_back(thread); } CRITICAL_REGION_END(); // Wait for all threads in the pool to exit. - if(wait) + if (wait) // && ! ::cryptonote::core::get_is_stopping()) // TODO fast_exit { - for (std::size_t i = 0; i < m_threads.size(); ++i) - m_threads[i]->join(); + _fact("JOINING all threads"); + for (std::size_t i = 0; i < m_threads.size(); ++i) { + m_threads[i]->join(); + } + _fact("JOINING all threads - almost"); m_threads.clear(); + _fact("JOINING all threads - DONE"); - }else - { + } + else { + _dbg1("Reiniting OK."); return true; } if(wait && !m_stop_signal_sent) { //some problems with the listening socket ?.. - LOG_PRINT_L0("Net service stopped without stop request, restarting..."); + _dbg1("Net service stopped without stop request, restarting..."); if(!this->init_server(m_port, m_address)) { - LOG_PRINT_L0("Reiniting service failed, exit."); + _dbg1("Reiniting service failed, exit."); return false; }else { - LOG_PRINT_L0("Reiniting OK."); + _dbg1("Reiniting OK."); } } } @@ -597,7 +773,7 @@ POP_WARNINGS { if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms)) { - LOG_PRINT_L0("Interrupting thread " << m_threads[i]->native_handle()); + _dbg1("Interrupting thread " << m_threads[i]->native_handle()); m_threads[i]->interrupt(); } } @@ -626,19 +802,22 @@ POP_WARNINGS TRY_ENTRY(); if (!e) { - connection_ptr conn(std::move(new_connection_)); - - new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)); + if (type == RPC) { + new_connection_->setRPcStation(); + _note("New server for RPC connections"); + } + connection_ptr conn(std::move(new_connection_)); + new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter)); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this, boost::asio::placeholders::error)); bool r = conn->start(true, 1 < m_threads_count); if (!r) - LOG_ERROR("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count); + _erro("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count); }else { - LOG_ERROR("Some problems at accept: " << e.message() << ", connections_count = " << m_sockets_count); + _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_sock_count); } CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::handle_accept", void()); } @@ -648,7 +827,7 @@ POP_WARNINGS { TRY_ENTRY(); - connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) ); + connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) ); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); ////////////////////////////////////////////////////////////////////////// @@ -658,7 +837,7 @@ POP_WARNINGS boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { - LOG_ERROR("Failed to resolve " << adr); + _erro("Failed to resolve " << adr); return false; } ////////////////////////////////////////////////////////////////////////// @@ -704,7 +883,7 @@ POP_WARNINGS { //timeout sock_.close(); - LOG_PRINT_L3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); + _dbg3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")"); return false; } } @@ -712,21 +891,21 @@ POP_WARNINGS if (ec || !sock_.is_open()) { - LOG_PRINT("Some problems at connect, message: " << ec.message(), LOG_LEVEL_3); + _dbg3("Some problems at connect, message: " << ec.message()); return false; } - LOG_PRINT_L3("Connected success to " << adr << ':' << port); + _dbg3("Connected success to " << adr << ':' << port); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) { new_connection_l->get_context(conn_context); - //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)); + //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_pfilter)); } else { - LOG_ERROR("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count); + _erro("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count); } return r; @@ -738,7 +917,7 @@ POP_WARNINGS bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback cb, const std::string& bind_ip) { TRY_ENTRY(); - connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) ); + connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) ); boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket(); ////////////////////////////////////////////////////////////////////////// @@ -748,7 +927,7 @@ POP_WARNINGS boost::asio::ip::tcp::resolver::iterator end; if(iterator == end) { - LOG_ERROR("Failed to resolve " << adr); + _erro("Failed to resolve " << adr); return false; } ////////////////////////////////////////////////////////////////////////// @@ -768,7 +947,7 @@ POP_WARNINGS { if(error != boost::asio::error::operation_aborted) { - LOG_PRINT_L3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); + _dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")"); new_connection_l->socket().close(); } }); @@ -785,7 +964,7 @@ POP_WARNINGS cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation }else { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << + _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port()); bool r = new_connection_l->start(false, 1 < m_threads_count); if (r) @@ -795,13 +974,13 @@ POP_WARNINGS } else { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port); + _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port); cb(conn_context, boost::asio::error::fault); } } }else { - LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << + _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port << " from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value()); cb(conn_context, ec_); } @@ -809,6 +988,6 @@ POP_WARNINGS return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false); } -} -} +} // namespace +} // namespace PRAGMA_WARNING_POP diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index e79768b04..5e5e803f5 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -81,6 +81,7 @@ public: async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE) {} + void del_connections(size_t count); }; @@ -669,6 +670,14 @@ void async_protocol_handler_config<t_connection_context>::del_connection(async_p } //------------------------------------------------------------------------------------------ template<class t_connection_context> +void async_protocol_handler_config<t_connection_context>::del_connections(size_t count) // TODO +{ + CRITICAL_REGION_BEGIN(m_connects_lock); + m_connects.clear(); + CRITICAL_REGION_END(); +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> void async_protocol_handler_config<t_connection_context>::add_connection(async_protocol_handler<t_connection_context>* pconn) { CRITICAL_REGION_BEGIN(m_connects_lock); |