path: root/contrib/epee
diff options
Diffstat (limited to 'contrib/epee')
6 files changed, 439 insertions, 133 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h
index 6c613c5d5..3e6ea2171 100644
--- a/contrib/epee/include/net/abstract_tcp_server2.h
+++ b/contrib/epee/include/net/abstract_tcp_server2.h
@@ -1,3 +1,9 @@
+@author from CrypoNote (see copyright below; Andrey N. Sabelnikov)
+@monero rfree
+@brief the connection templated-class for one peer connection
// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
// All rights reserved.
@@ -26,7 +32,7 @@
@@ -36,6 +42,8 @@
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <atomic>
+#include <map>
+#include <memory>
#include <boost/asio.hpp>
#include <boost/array.hpp>
@@ -46,7 +54,9 @@
#include <boost/thread/thread.hpp>
#include "net_utils_base.h"
#include "syncobj.h"
+#include "../../../../src/p2p/connection_basic.hpp"
+#include "../../../../contrib/otshell_utils/utils.hpp"
+#include "../../../../src/p2p/network_throttle-detail.hpp"
@@ -61,6 +71,12 @@ namespace net_utils
virtual ~i_connection_filter(){}
+ enum t_server_role { // type of the server, e.g. so that we will know how to limit it
+ NET = 0, // default (not used? used for misc connections maybe?) TODO
+ RPC = 1, // the rpc commands
+ P2P = 2 // to other p2p node
+ };
/* */
@@ -70,13 +86,18 @@ namespace net_utils
class connection
: public boost::enable_shared_from_this<connection<t_protocol_handler> >,
private boost::noncopyable,
- public i_service_endpoint
+ public i_service_endpoint,
+ public connection_basic
typedef typename t_protocol_handler::connection_context t_connection_context;
/// Construct a connection with the given io_service.
- explicit connection(boost::asio::io_service& io_service,
- typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter * &pfilter);
+ explicit connection( boost::asio::io_service& io_service,
+ typename t_protocol_handler::config_type& config,
+ std::atomic<long> &ref_sock_count, // the ++/-- counter
+ std::atomic<long> &sock_number, // the only increasing ++ number generator
+ i_connection_filter * &pfilter);
virtual ~connection();
/// Get the socket associated with the connection.
@@ -90,7 +111,8 @@ namespace net_utils
void call_back_starter();
//----------------- i_service_endpoint ---------------------
- virtual bool do_send(const void* ptr, size_t cb);
+ virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint)
+ virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data
virtual bool close();
virtual bool call_run_once_service_io();
virtual bool request_callback();
@@ -107,29 +129,31 @@ namespace net_utils
/// Handle completion of a write operation.
void handle_write(const boost::system::error_code& e, size_t cb);
- /// Strand to ensure the connection's handlers are not called concurrently.
- boost::asio::io_service::strand strand_;
- /// Socket for the connection.
- boost::asio::ip::tcp::socket socket_;
/// Buffer for incoming data.
boost::array<char, 8192> buffer_;
+ //boost::array<char, 1024> buffer_;
t_connection_context context;
- volatile uint32_t m_want_close_connection;
- std::atomic<bool> m_was_shutdown;
- critical_section m_send_que_lock;
- std::list<std::string> m_send_que;
- volatile uint32_t& m_ref_sockets_count;
i_connection_filter* &m_pfilter;
- volatile bool m_is_multithreaded;
+ // TODO what do they mean about wait on destructor?? --rfree :
//this should be the last one, because it could be wait on destructor, while other activities possible on other threads
t_protocol_handler m_protocol_handler;
//typename t_protocol_handler::config_type m_dummy_config;
std::list<boost::shared_ptr<connection<t_protocol_handler> > > m_self_refs; // add_ref/release support
critical_section m_self_refs_lock;
+ critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk()
+ t_server_role m_connection_type;
+ // for calculate speed (last 60 sec)
+ network_throttle m_throttle_speed_in;
+ network_throttle m_throttle_speed_out;
+ std::mutex m_throttle_speed_in_mutex;
+ std::mutex m_throttle_speed_out_mutex;
+ public:
+ void setRPcStation();
@@ -146,9 +170,11 @@ namespace net_utils
/// Construct the server to listen on the specified TCP address and port, and
/// serve up files from the given directory.
- explicit boosted_tcp_server(boost::asio::io_service& external_io_service);
+ explicit boosted_tcp_server(boost::asio::io_service& external_io_service, t_server_role s_type);
+ std::map<std::string, t_server_role> server_type_map;
+ void create_server_type_map();
bool init_server(uint32_t port, const std::string address = "");
bool init_server(const std::string port, const std::string& address = "");
@@ -254,22 +280,26 @@ namespace net_utils
/// Acceptor used to listen for incoming connections.
boost::asio::ip::tcp::acceptor acceptor_;
- /// The next connection to be accepted.
- connection_ptr new_connection_;
std::atomic<bool> m_stop_signal_sent;
uint32_t m_port;
- volatile uint32_t m_sockets_count;
+ std::atomic<long> m_sock_count;
+ std::atomic<long> m_sock_number;
std::string m_address;
- std::string m_thread_name_prefix;
+ std::string m_thread_name_prefix; //TODO: change to enum server_type, now used
size_t m_threads_count;
i_connection_filter* m_pfilter;
std::vector<boost::shared_ptr<boost::thread> > m_threads;
boost::thread::id m_main_thread_id;
critical_section m_threads_lock;
- volatile uint32_t m_thread_index;
- };
+ volatile uint32_t m_thread_index; // TODO change to std::atomic
+ t_server_role type;
+ void detach_threads();
+ /// The next connection to be accepted
+ connection_ptr new_connection_;
+ }; // class <>boosted_tcp_server
+} // namespace
+} // namespace
#include "abstract_tcp_server2.inl"
diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl
index db3f9e322..31836fe9e 100644
--- a/contrib/epee/include/net/abstract_tcp_server2.inl
+++ b/contrib/epee/include/net/abstract_tcp_server2.inl
@@ -1,3 +1,9 @@
+@author from CrypoNote (see copyright below; Andrey N. Sabelnikov)
+@monero rfree
+@brief the connection templated-class for one peer connection
// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
// All rights reserved.
@@ -26,7 +32,7 @@
-#include "net_utils_base.h"
+//#include "net_utils_base.h"
#include <boost/lambda/bind.hpp>
#include <boost/foreach.hpp>
#include <boost/lambda/lambda.hpp>
@@ -34,9 +40,21 @@
#include <boost/chrono.hpp>
#include <boost/utility/value_init.hpp>
#include <boost/asio/deadline_timer.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp> // TODO
+#include <boost/thread/thread.hpp> // TODO
#include "misc_language.h"
#include "pragma_comp_defs.h"
+#include <sstream>
+#include <iomanip>
+#include <algorithm>
+#include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal()
+#include "../../../../contrib/otshell_utils/utils.hpp"
+#include "../../../../src/p2p/data_logger.hpp"
+using namespace nOT::nUtils; // TODO
namespace epee
@@ -48,17 +66,21 @@ namespace net_utils
template<class t_protocol_handler>
- connection<t_protocol_handler>::connection(boost::asio::io_service& io_service,
- typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter* &pfilter)
- : strand_(io_service),
- socket_(io_service),
- m_want_close_connection(0),
- m_was_shutdown(0),
- m_ref_sockets_count(sock_count),
- m_pfilter(pfilter),
- m_protocol_handler(this, config, context)
+ connection<t_protocol_handler>::connection( boost::asio::io_service& io_service,
+ typename t_protocol_handler::config_type& config,
+ std::atomic<long> &ref_sock_count, // the ++/-- counter
+ std::atomic<long> &sock_number, // the only increasing ++ number generator
+ i_connection_filter* &pfilter
+ )
+ :
+ connection_basic(io_service, ref_sock_count, sock_number),
+ m_protocol_handler(this, config, context),
+ m_pfilter( pfilter ),
+ m_connection_type(NET),
+ m_throttle_speed_in("speed_in", "throttle_speed_in"),
+ m_throttle_speed_out("speed_out", "throttle_speed_out")
- boost::interprocess::ipcdetail::atomic_inc32(&m_ref_sockets_count);
+ _info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type);
@@ -67,12 +89,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown.");
+ _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown.");
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed");
- boost::interprocess::ipcdetail::atomic_dec32(&m_ref_sockets_count);
+ _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed");
template<class t_protocol_handler>
@@ -118,13 +139,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
long ip_ = boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong());
context.set_details(boost::uuids::random_generator()(), ip_, remote_ep.port(), is_income);
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) <<
+ _dbg3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) <<
" to " << local_ep.address().to_string() << ':' << local_ep.port() <<
- ", total sockets objects " << m_ref_sockets_count);
+ ", total sockets objects " << m_ref_sock_count);
if(m_pfilter && !m_pfilter->is_remote_ip_allowed(context.m_remote_ip))
- LOG_PRINT_L2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection");
+ _dbg2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection");
return false;
@@ -136,7 +157,17 @@ PRAGMA_WARNING_DISABLE_VS(4355)
boost::bind(&connection<t_protocol_handler>::handle_read, self,
+ //set ToS flag
+ int tos = get_tos_flag();
+ boost::asio::detail::socket_option::integer< IPPROTO_IP, IP_TOS >
+ optionTos( tos );
+ socket_.set_option( optionTos );
+ //_dbg1("Set ToS flag to " << tos);
+ boost::asio::ip::tcp::no_delay noDelayOption(false);
+ socket_.set_option(noDelayOption);
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false);
@@ -146,7 +177,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
bool connection<t_protocol_handler>::request_callback()
- LOG_PRINT_L2("[" << print_connection_context_short(context) << "] request_callback");
+ _dbg2("[" << print_connection_context_short(context) << "] request_callback");
// Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
auto self = safe_shared_from_this();
@@ -167,8 +198,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
bool connection<t_protocol_handler>::add_ref()
- LOG_PRINT_L4("[sock " << socket_.native_handle() << "] add_ref");
+ //_dbg3("[sock " << socket_.native_handle() << "] add_ref, m_peer_number=" << mI->m_peer_number);
+ //_dbg3("[sock " << socket_.native_handle() << "] add_ref 2, m_peer_number=" << mI->m_peer_number);
// Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
auto self = safe_shared_from_this();
@@ -201,7 +233,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
void connection<t_protocol_handler>::call_back_starter()
- LOG_PRINT_L2("[" << print_connection_context_short(context) << "] fired_callback");
+ _dbg2("[" << print_connection_context_short(context) << "] fired_callback");
CATCH_ENTRY_L0("connection<t_protocol_handler>::call_back_starter()", void());
@@ -211,17 +243,44 @@ PRAGMA_WARNING_DISABLE_VS(4355)
std::size_t bytes_transferred)
- LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async read calledback.");
+ //_info("[sock " << socket_.native_handle() << "] Async read calledback.");
if (!e)
- LOG_PRINT("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred, LOG_LEVEL_4);
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex);
+ m_throttle_speed_in.handle_trafic_exact(bytes_transferred);
+ context.m_current_speed_down = m_throttle_speed_in.get_current_speed();
+ }
+ {
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in );
+ epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024);
+ }
+ double delay=0; // will be calculated
+ do
+ {
+ { //_scope_dbg1("CRITICAL_REGION_LOCAL");
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in );
+ delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global
+ }
+ delay *= 0.5;
+ if (delay > 0) {
+ long int ms = (long int)(delay * 100);
+ epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms);
+ std::this_thread::sleep_for(std::chrono::milliseconds(ms));
+ }
+ } while(delay > 0);
+ //_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred);
+ logger_handle_net_read(bytes_transferred);
context.m_last_recv = time(NULL);
context.m_recv_cnt += bytes_transferred;
bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred);
- LOG_PRINT("[sock " << socket_.native_handle() << "] protocol_want_close", LOG_LEVEL_4);
+ //_info("[sock " << socket_.native_handle() << "] protocol_want_close");
//some error in protocol, protocol handler ask to close connection
boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1);
@@ -239,14 +298,14 @@ PRAGMA_WARNING_DISABLE_VS(4355)
boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(),
- LOG_PRINT_L4("[sock " << socket_.native_handle() << "]Async read requested.");
+ //_info("[sock " << socket_.native_handle() << "]Async read requested.");
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value());
+ _dbg3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value());
if(e.value() != 2)
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value());
+ _dbg3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value());
@@ -283,8 +342,91 @@ PRAGMA_WARNING_DISABLE_VS(4355)
CATCH_ENTRY_L0("connection<t_protocol_handler>::call_run_once_service_io", false);
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) {
+ // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
+ auto self = safe_shared_from_this();
+ if (!self) return false;
+ if (m_was_shutdown) return false;
+ // TODO avoid copy
+ const double factor = 32; // TODO config
+ typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic
+ const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) );
+ const t_safe chunksize_max = chunksize_good * 2 ;
+ const bool allow_split = (m_connection_type == RPC) ? false : true; // TODO config
+ ASRT(! (chunksize_max<0) ); // make sure it is unsigned before removin sign with cast:
+ long long unsigned int chunksize_max_unsigned = static_cast<long long unsigned int>( chunksize_max ) ;
+ if (allow_split && (cb > chunksize_max_unsigned)) {
+ { // LOCK: chunking
+ epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical ***
+ _dbg3_c("net/out/size", "do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr);
+ t_safe all = cb; // all bytes to send
+ t_safe pos = 0; // current sending position
+ // 01234567890
+ // ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4
+ // ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8
+ // ^^^ (pos=8, len=4) ;
+ // const size_t bufsize = chunksize_good; // TODO safecast
+ // char* buf = new char[ bufsize ];
+ bool all_ok = true;
+ while (pos < all) {
+ t_safe lenall = all-pos; // length from here to end
+ t_safe len = std::min( chunksize_good , lenall); // take a smaller part
+ ASRT(len<=chunksize_good);
+ // pos=8; len=4; all=10; len=3;
+ ASRT(! (len<0) ); // check before we cast away sign:
+ unsigned long long int len_unsigned = static_cast<long long int>( len );
+ ASRT(len>0); // (redundand)
+ ASRT(len_unsigned < std::numeric_limits<size_t>::max()); // yeap we want strong < then max size, to be sure
+ void *chunk_start = ((char*)ptr) + pos;
+ _fact_c("net/out/size","chunk_start="<<chunk_start<<" ptr="<<ptr<<" pos="<<pos);
+ ASRT(chunk_start >= ptr); // not wrapped around address?
+ //std::memcpy( (void*)buf, chunk_start, len);
+ _dbg3_c("net/out/size", "part of " << lenall << ": pos="<<pos << " len="<<len);
+ bool ok = do_send_chunk(chunk_start, len); // <====== ***
+ all_ok = all_ok && ok;
+ if (!all_ok) {
+ _dbg1_c("net/out/size", "do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr);
+ _dbg1("do_send() SEND was aborted in middle of big package - this is mostly harmless "
+ << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << cb);
+ return false; // partial failure in sending
+ }
+ pos = pos+len; ASRT(pos >0);
+ // (in catch block, or uniq pointer) delete buf;
+ } // each chunk
+ _dbg3_c("net/out/size", "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr);
+ _dbg3 ( "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr);
+ _info_c("net/sleepRPC", "do_send() m_connection_type = " << m_connection_type);
+ return all_ok; // done - e.g. queued - all the chunks of current do_send call
+ } // LOCK: chunking
+ } // a big block (to be chunked) - all chunks
+ else { // small block
+ return do_send_chunk(ptr,cb); // just send as 1 big chunk
+ }
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false);
+ } // do_send()
+ //---------------------------------------------------------------------------------
template<class t_protocol_handler>
- bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb)
+ bool connection<t_protocol_handler>::do_send_chunk(const void* ptr, size_t cb)
// Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
@@ -293,50 +435,86 @@ PRAGMA_WARNING_DISABLE_VS(4355)
return false;
return false;
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex);
+ m_throttle_speed_out.handle_trafic_exact(cb);
+ context.m_current_speed_up = m_throttle_speed_out.get_current_speed();
+ }
- LOG_PRINT("[sock " << socket_.native_handle() << "] SEND " << cb, LOG_LEVEL_4);
+ //_info("[sock " << socket_.native_handle() << "] SEND " << cb);
context.m_last_send = time(NULL);
context.m_send_cnt += cb;
//some data should be wrote to stream
//request complete
- epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock);
- if(m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT)
+ sleep_before_packet(cb, 1, 1);
+ epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical ***
+ long int retry=0;
+ const long int retry_limit = 5*4;
+ while (m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT)
- send_guard.unlock();
- LOG_PRINT_L2("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
- close();
- return false;
+ retry++;
+ /* if ( ::cryptonote::core::get_is_stopping() ) { // TODO re-add fast stop
+ _fact("ABORT queue wait due to stopping");
+ return false; // aborted
+ }*/
+ long int ms = 250 + (rand()%50);
+ _info_c("net/sleep", "Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<cb); // XXX debug sleep
+ boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) );
+ _dbg1("sleep for queue: " << ms);
+ if (retry > retry_limit) {
+ send_guard.unlock();
+ _erro("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
+ // _dbg1_c("net/sleep", "send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
+ close();
+ return false;
+ }
m_send_que.back().assign((const char*)ptr, cb);
if(m_send_que.size() > 1)
- {
- //active operation should be in progress, nothing to do, just wait last operation callback
- }else
- {
- //no active operation
- if(m_send_que.size()!=1)
- {
- LOG_ERROR("Looks like no active operations, but send que size != 1!!");
- return false;
- }
- boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()),
- //strand_.wrap(
- boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2)
- //)
- );
+ { // active operation should be in progress, nothing to do, just wait last operation callback
+ auto size_now = cb;
+ _info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size());
+ //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function
LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
+ else
+ { // no active operation
+ if(m_send_que.size()!=1)
+ {
+ _erro("Looks like no active operations, but send que size != 1!!");
+ return false;
+ }
+ auto size_now = m_send_que.front().size();
+ _dbg1_c("net/out/size", "do_send() NOW SENSD: packet="<<size_now<<" B");
+ do_send_handler_write( ptr , size_now ); // (((H)))
+ ASRT( size_now == m_send_que.front().size() );
+ boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) ,
+ //strand_.wrap(
+ boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2)
+ //)
+ );
+ //_dbg3("(chunk): " << size_now);
+ //logger_handle_net_write(size_now);
+ //_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
+ }
+ //do_send_handler_stop( ptr , cb ); // empty function
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false);
- }
+ } // do_send_chunk
template<class t_protocol_handler>
bool connection<t_protocol_handler>::shutdown()
@@ -353,7 +531,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
bool connection<t_protocol_handler>::close()
- LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Que Shutdown called.");
+ //_info("[sock " << socket_.native_handle() << "] Que Shutdown called.");
size_t send_que_size = 0;
send_que_size = m_send_que.size();
@@ -376,16 +554,17 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if (e)
- LOG_PRINT_L1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value());
+ _dbg1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value());
+ logger_handle_net_write(cb);
+ sleep_before_packet(cb, 1, 1);
bool do_shutdown = false;
- LOG_ERROR("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!");
+ _erro("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!");
@@ -399,10 +578,16 @@ PRAGMA_WARNING_DISABLE_VS(4355)
//have more data to send
- boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()),
- //strand_.wrap(
- boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2));
- //);
+ auto size_now = m_send_que.front().size();
+ _dbg1_c("net/out/size", "handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size());
+ do_send_handler_write_from_queue(e, m_send_que.front().size() , m_send_que.size()); // (((H)))
+ ASRT( size_now == m_send_que.front().size() );
+ boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now) ,
+ // strand_.wrap(
+ boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2)
+ // )
+ );
+ //_dbg3("(normal)" << size_now);
@@ -412,6 +597,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_write", void());
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void connection<t_protocol_handler>::setRPcStation()
+ {
+ m_connection_type = RPC;
+ _fact_c("net/sleepRPC", "set m_connection_type = RPC ");
+ }
/* */
@@ -420,19 +612,27 @@ PRAGMA_WARNING_DISABLE_VS(4355)
m_io_service_local_instance(new boost::asio::io_service()),
- new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)),
- m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0)
+ m_stop_signal_sent(false), m_port(0),
+ m_sock_count(0), m_sock_number(0), m_threads_count(0),
+ m_pfilter(NULL), m_thread_index(0),
+ new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter))
+ create_server_type_map();
m_thread_name_prefix = "NET";
+ type = NET;
template<class t_protocol_handler>
- boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service):
+ boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_server_role s_type):
- new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)),
- m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0)
+ m_stop_signal_sent(false), m_port(0),
+ m_sock_count(0), m_sock_number(0), m_threads_count(0),
+ m_pfilter(NULL), m_thread_index(0),
+ type(NET),
+ new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter))
+ create_server_type_map();
m_thread_name_prefix = "NET";
@@ -444,6 +644,14 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::create_server_type_map()
+ {
+ server_type_map["NET"] = t_server_role::NET;
+ server_type_map["RPC"] = t_server_role::RPC;
+ server_type_map["P2P"] = t_server_role::P2P;
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
bool boosted_tcp_server<t_protocol_handler>::init_server(uint32_t port, const std::string address)
@@ -491,6 +699,7 @@ POP_WARNINGS
std::string thread_name = std::string("[") + m_thread_name_prefix;
thread_name += boost::to_string(local_thr_index) + "]";
+ // _fact("Thread name: " << m_thread_name_prefix);
@@ -499,14 +708,14 @@ POP_WARNINGS
catch(const std::exception& ex)
- LOG_ERROR("Exception at server worker thread, what=" << ex.what());
+ _erro("Exception at server worker thread, what=" << ex.what());
- LOG_ERROR("Exception at server worker thread, unknown execption");
+ _erro("Exception at server worker thread, unknown execption");
- LOG_PRINT_L4("Worker thread finished");
+ //_info("Worker thread finished");
return true;
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::worker_thread", false);
@@ -515,6 +724,9 @@ POP_WARNINGS
void boosted_tcp_server<t_protocol_handler>::set_threads_prefix(const std::string& prefix_name)
m_thread_name_prefix = prefix_name;
+ type = server_type_map[m_thread_name_prefix];
+ _note("Set server type to: " << type);
+ _note("Set server type to: " << m_thread_name_prefix);
template<class t_protocol_handler>
@@ -539,32 +751,38 @@ POP_WARNINGS
boost::shared_ptr<boost::thread> thread(new boost::thread(
attrs, boost::bind(&boosted_tcp_server<t_protocol_handler>::worker_thread, this)));
+ _note("Run server thread name: " << m_thread_name_prefix);
// Wait for all threads in the pool to exit.
- if(wait)
+ if (wait) // && ! ::cryptonote::core::get_is_stopping()) // TODO fast_exit
- for (std::size_t i = 0; i < m_threads.size(); ++i)
- m_threads[i]->join();
+ _fact("JOINING all threads");
+ for (std::size_t i = 0; i < m_threads.size(); ++i) {
+ m_threads[i]->join();
+ }
+ _fact("JOINING all threads - almost");
+ _fact("JOINING all threads - DONE");
- }else
- {
+ }
+ else {
+ _dbg1("Reiniting OK.");
return true;
if(wait && !m_stop_signal_sent)
//some problems with the listening socket ?..
- LOG_PRINT_L0("Net service stopped without stop request, restarting...");
+ _dbg1("Net service stopped without stop request, restarting...");
if(!this->init_server(m_port, m_address))
- LOG_PRINT_L0("Reiniting service failed, exit.");
+ _dbg1("Reiniting service failed, exit.");
return false;
- LOG_PRINT_L0("Reiniting OK.");
+ _dbg1("Reiniting OK.");
@@ -597,7 +815,7 @@ POP_WARNINGS
if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms))
- LOG_PRINT_L0("Interrupting thread " << m_threads[i]->native_handle());
+ _dbg1("Interrupting thread " << m_threads[i]->native_handle());
@@ -608,6 +826,10 @@ POP_WARNINGS
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::send_stop_signal()
+ if (::cryptonote::core::get_fast_exit() == true)
+ {
+ detach_threads();
+ }
m_stop_signal_sent = true;
@@ -626,19 +848,22 @@ POP_WARNINGS
if (!e)
- connection_ptr conn(std::move(new_connection_));
- new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter));
+ if (type == RPC) {
+ new_connection_->setRPcStation();
+ _note("New server for RPC connections");
+ }
+ connection_ptr conn(std::move(new_connection_));
+ new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter));
boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this,
bool r = conn->start(true, 1 < m_threads_count);
if (!r)
- LOG_ERROR("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count);
+ _erro("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count);
- LOG_ERROR("Some problems at accept: " << e.message() << ", connections_count = " << m_sockets_count);
+ _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_sock_count);
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::handle_accept", void());
@@ -648,7 +873,7 @@ POP_WARNINGS
- connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) );
+ connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) );
boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
@@ -658,7 +883,7 @@ POP_WARNINGS
boost::asio::ip::tcp::resolver::iterator end;
if(iterator == end)
- LOG_ERROR("Failed to resolve " << adr);
+ _erro("Failed to resolve " << adr);
return false;
@@ -704,7 +929,7 @@ POP_WARNINGS
- LOG_PRINT_L3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")");
+ _dbg3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")");
return false;
@@ -712,21 +937,21 @@ POP_WARNINGS
if (ec || !sock_.is_open())
- LOG_PRINT("Some problems at connect, message: " << ec.message(), LOG_LEVEL_3);
+ _dbg3("Some problems at connect, message: " << ec.message());
return false;
- LOG_PRINT_L3("Connected success to " << adr << ':' << port);
+ _dbg3("Connected success to " << adr << ':' << port);
bool r = new_connection_l->start(false, 1 < m_threads_count);
if (r)
- //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter));
+ //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_pfilter));
- LOG_ERROR("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count);
+ _erro("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count);
return r;
@@ -738,7 +963,7 @@ POP_WARNINGS
bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback cb, const std::string& bind_ip)
- connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) );
+ connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) );
boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
@@ -748,7 +973,7 @@ POP_WARNINGS
boost::asio::ip::tcp::resolver::iterator end;
if(iterator == end)
- LOG_ERROR("Failed to resolve " << adr);
+ _erro("Failed to resolve " << adr);
return false;
@@ -768,7 +993,7 @@ POP_WARNINGS
if(error != boost::asio::error::operation_aborted)
- LOG_PRINT_L3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")");
+ _dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")");
@@ -785,7 +1010,7 @@ POP_WARNINGS
cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation
- LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port <<
+ _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port <<
" from " << lep.address().to_string() << ':' << lep.port());
bool r = new_connection_l->start(false, 1 < m_threads_count);
if (r)
@@ -795,13 +1020,13 @@ POP_WARNINGS
- LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port);
+ _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port);
cb(conn_context, boost::asio::error::fault);
- LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port <<
+ _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port <<
" from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value());
cb(conn_context, ec_);
@@ -809,6 +1034,15 @@ POP_WARNINGS
return true;
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::detach_threads()
+ {
+ for (auto thread : m_threads)
+ thread->detach();
+ }
+} // namespace
+} // namespace
diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h
index e79768b04..a7fbffb4b 100644
--- a/contrib/epee/include/net/levin_protocol_handler_async.h
+++ b/contrib/epee/include/net/levin_protocol_handler_async.h
@@ -34,6 +34,9 @@
#include "levin_base.h"
#include "misc_language.h"
+#include <random>
+#include <chrono>
namespace epee
@@ -81,6 +84,7 @@ public:
async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE)
+ void del_out_connections(size_t count);
@@ -669,6 +673,34 @@ void async_protocol_handler_config<t_connection_context>::del_connection(async_p
template<class t_connection_context>
+void async_protocol_handler_config<t_connection_context>::del_out_connections(size_t count)
+ std::vector <boost::uuids::uuid> out_connections;
+ CRITICAL_REGION_BEGIN(m_connects_lock);
+ for (auto& c: m_connects)
+ {
+ if (!c.second->m_connection_context.m_is_income)
+ out_connections.push_back(c.first);
+ }
+ if (out_connections.size() == 0)
+ return;
+ // close random out connections
+ // TODO or better just keep removing random elements (performance)
+ unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
+ shuffle(out_connections.begin(), out_connections.end(), std::default_random_engine(seed));
+ while (count > 0 && out_connections.size() > 0)
+ {
+ close(*out_connections.begin());
+ del_connection(m_connects.at(*out_connections.begin()));
+ --count;
+ }
+template<class t_connection_context>
void async_protocol_handler_config<t_connection_context>::add_connection(async_protocol_handler<t_connection_context>* pconn)
diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h
index 90e352787..f963e7746 100644
--- a/contrib/epee/include/net/net_utils_base.h
+++ b/contrib/epee/include/net/net_utils_base.h
@@ -55,6 +55,8 @@ namespace net_utils
time_t m_last_send;
uint64_t m_recv_cnt;
uint64_t m_send_cnt;
+ double m_current_speed_down;
+ double m_current_speed_up;
connection_context_base(boost::uuids::uuid connection_id,
long remote_ip, int remote_port, bool is_income,
@@ -68,7 +70,9 @@ namespace net_utils
- m_send_cnt(send_cnt)
+ m_send_cnt(send_cnt),
+ m_current_speed_down(0),
+ m_current_speed_up(0)
connection_context_base(): m_connection_id(),
@@ -79,7 +83,9 @@ namespace net_utils
- m_send_cnt(0)
+ m_send_cnt(0),
+ m_current_speed_down(0),
+ m_current_speed_up(0)
connection_context_base& operator=(const connection_context_base& a)
diff --git a/contrib/epee/include/storages/levin_abstract_invoke2.h b/contrib/epee/include/storages/levin_abstract_invoke2.h
index 1b32c51d1..73ede1b12 100644
--- a/contrib/epee/include/storages/levin_abstract_invoke2.h
+++ b/contrib/epee/include/storages/levin_abstract_invoke2.h
@@ -185,7 +185,7 @@ namespace epee
return res;
- };
+ }
template<class t_owner, class t_in_type, class t_context, class callback_t>
int buff_to_t_adapter(t_owner* powner, int command, const std::string& in_buff, callback_t cb, t_context& context)
@@ -199,7 +199,7 @@ namespace epee
boost::value_initialized<t_in_type> in_struct;
return cb(command, in_struct, context);
- };
+ }
#define CHAIN_LEVIN_INVOKE_MAP2(context_type) \
int invoke(int command, const std::string& in_buff, std::string& buff_out, context_type& context) \
diff --git a/contrib/epee/include/syncobj.h b/contrib/epee/include/syncobj.h
index b7273da8e..b81eb43a9 100644
--- a/contrib/epee/include/syncobj.h
+++ b/contrib/epee/include/syncobj.h
@@ -35,10 +35,14 @@
#include <boost/thread/locks.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/recursive_mutex.hpp>
+#include <thread>
+#include <chrono>
namespace epee
+ extern unsigned int g_test_dbg_lock_sleep;
struct simple_event
simple_event() : m_rised(false)
@@ -215,10 +219,10 @@ namespace epee
#define SHARED_CRITICAL_REGION_BEGIN(x) { shared_guard critical_region_var(x)
#define EXCLUSIVE_CRITICAL_REGION_BEGIN(x) { exclusive_guard critical_region_var(x)
-#define CRITICAL_REGION_LOCAL(x) epee::critical_region_t<decltype(x)> critical_region_var(x)
-#define CRITICAL_REGION_BEGIN(x) { epee::critical_region_t<decltype(x)> critical_region_var(x)
-#define CRITICAL_REGION_LOCAL1(x) epee::critical_region_t<decltype(x)> critical_region_var1(x)
-#define CRITICAL_REGION_BEGIN1(x) { epee::critical_region_t<decltype(x)> critical_region_var1(x)
+#define CRITICAL_REGION_LOCAL(x) {std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep));} epee::critical_region_t<decltype(x)> critical_region_var(x)
+#define CRITICAL_REGION_BEGIN(x) { std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep)); epee::critical_region_t<decltype(x)> critical_region_var(x)
+#define CRITICAL_REGION_LOCAL1(x) {std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep));} epee::critical_region_t<decltype(x)> critical_region_var1(x)
+#define CRITICAL_REGION_BEGIN1(x) { std::this_thread::sleep_for(std::chrono::milliseconds(epee::g_test_dbg_lock_sleep)); epee::critical_region_t<decltype(x)> critical_region_var1(x)