aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee/include/net/abstract_tcp_server2.inl
diff options
context:
space:
mode:
authorRiccardo Spagni <ric@spagni.net>2015-04-02 17:43:38 +0200
committerRiccardo Spagni <ric@spagni.net>2015-04-02 17:43:43 +0200
commit6f0d93097e3fbe9b1101e90ad1adc718a18263aa (patch)
treeb5a2189ee6cbdd6f8ba57cd1d46edac7ac599ee3 /contrib/epee/include/net/abstract_tcp_server2.inl
parentMerge pull request #251 (diff)
parentNetwork 1.7; Quieted the debug a bit. (diff)
downloadmonero-6f0d93097e3fbe9b1101e90ad1adc718a18263aa.tar.xz
Merge pull request #252
618f20c Network 1.7; Quieted the debug a bit. (rfree2monero) 391c7f9 Utils: use const, document dbg. Less default debug (rfree2monero) 44f4234 [fix] mac os x includes std::random... (rfree2monero) 162c993 Network 1.6: network limits, logging, +doxy (rfree2monero) a3b2226 my changelog (rfree2monero) 2900b1e doxygen files (rfree2monero) 1489310 doxygen related tool (rfree2monero) f9dba47 added windows_stream.* console colors (rfree2monero) c511abf remerged; commands JSON. logging upgrade. doxygen (rfree2monero) f79821a fix locking in count-peers thread (2) (rfree2monero) 0198ffb 2014 network limit 1.3 fix log/path/data +utils (rfree2monero) ae2a506 2014 network limit 1.2 +utils +toc -doc -drmonero (rfree2monero) 0f06dca fixed size_t on windows (rfree2monero) 39fc63f removed not needed <netinet/in.h> (rfree2monero) 5ce4256 2014 network limit 1.1 +utils +toc -doc -drmonero (rfree2monero) eabb519 2014 network limit 1.0a +utils +toc -doc -drmonero (rfree2monero)
Diffstat (limited to 'contrib/epee/include/net/abstract_tcp_server2.inl')
-rw-r--r--contrib/epee/include/net/abstract_tcp_server2.inl428
1 files changed, 331 insertions, 97 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl
index db3f9e322..31836fe9e 100644
--- a/contrib/epee/include/net/abstract_tcp_server2.inl
+++ b/contrib/epee/include/net/abstract_tcp_server2.inl
@@ -1,3 +1,9 @@
+/**
+@file
+@author from CrypoNote (see copyright below; Andrey N. Sabelnikov)
+@monero rfree
+@brief the connection templated-class for one peer connection
+*/
// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net
// All rights reserved.
//
@@ -26,7 +32,7 @@
-#include "net_utils_base.h"
+//#include "net_utils_base.h"
#include <boost/lambda/bind.hpp>
#include <boost/foreach.hpp>
#include <boost/lambda/lambda.hpp>
@@ -34,9 +40,21 @@
#include <boost/chrono.hpp>
#include <boost/utility/value_init.hpp>
#include <boost/asio/deadline_timer.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp> // TODO
+#include <boost/thread/thread.hpp> // TODO
#include "misc_language.h"
#include "pragma_comp_defs.h"
+#include <sstream>
+#include <iomanip>
+#include <algorithm>
+
+#include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal()
+
+#include "../../../../contrib/otshell_utils/utils.hpp"
+#include "../../../../src/p2p/data_logger.hpp"
+using namespace nOT::nUtils; // TODO
+
PRAGMA_WARNING_PUSH
namespace epee
{
@@ -48,17 +66,21 @@ namespace net_utils
PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
- connection<t_protocol_handler>::connection(boost::asio::io_service& io_service,
- typename t_protocol_handler::config_type& config, volatile uint32_t& sock_count, i_connection_filter* &pfilter)
- : strand_(io_service),
- socket_(io_service),
- m_want_close_connection(0),
- m_was_shutdown(0),
- m_ref_sockets_count(sock_count),
- m_pfilter(pfilter),
- m_protocol_handler(this, config, context)
+ connection<t_protocol_handler>::connection( boost::asio::io_service& io_service,
+ typename t_protocol_handler::config_type& config,
+ std::atomic<long> &ref_sock_count, // the ++/-- counter
+ std::atomic<long> &sock_number, // the only increasing ++ number generator
+ i_connection_filter* &pfilter
+ )
+ :
+ connection_basic(io_service, ref_sock_count, sock_number),
+ m_protocol_handler(this, config, context),
+ m_pfilter( pfilter ),
+ m_connection_type(NET),
+ m_throttle_speed_in("speed_in", "throttle_speed_in"),
+ m_throttle_speed_out("speed_out", "throttle_speed_out")
{
- boost::interprocess::ipcdetail::atomic_inc32(&m_ref_sockets_count);
+ _info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type);
}
PRAGMA_WARNING_DISABLE_VS(4355)
//---------------------------------------------------------------------------------
@@ -67,12 +89,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{
if(!m_was_shutdown)
{
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown.");
+ _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed without shutdown.");
shutdown();
}
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Socket destroyed");
- boost::interprocess::ipcdetail::atomic_dec32(&m_ref_sockets_count);
+ _dbg3("[sock " << socket_.native_handle() << "] Socket destroyed");
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
@@ -118,13 +139,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
long ip_ = boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong());
context.set_details(boost::uuids::random_generator()(), ip_, remote_ep.port(), is_income);
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) <<
+ _dbg3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) <<
" to " << local_ep.address().to_string() << ':' << local_ep.port() <<
- ", total sockets objects " << m_ref_sockets_count);
+ ", total sockets objects " << m_ref_sock_count);
if(m_pfilter && !m_pfilter->is_remote_ip_allowed(context.m_remote_ip))
{
- LOG_PRINT_L2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection");
+ _dbg2("[sock " << socket_.native_handle() << "] ip denied " << string_tools::get_ip_string_from_int32(context.m_remote_ip) << ", shutdowning connection");
close();
return false;
}
@@ -136,7 +157,17 @@ PRAGMA_WARNING_DISABLE_VS(4355)
boost::bind(&connection<t_protocol_handler>::handle_read, self,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
-
+
+ //set ToS flag
+ int tos = get_tos_flag();
+ boost::asio::detail::socket_option::integer< IPPROTO_IP, IP_TOS >
+ optionTos( tos );
+ socket_.set_option( optionTos );
+ //_dbg1("Set ToS flag to " << tos);
+
+ boost::asio::ip::tcp::no_delay noDelayOption(false);
+ socket_.set_option(noDelayOption);
+
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false);
@@ -146,7 +177,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
bool connection<t_protocol_handler>::request_callback()
{
TRY_ENTRY();
- LOG_PRINT_L2("[" << print_connection_context_short(context) << "] request_callback");
+ _dbg2("[" << print_connection_context_short(context) << "] request_callback");
// Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
auto self = safe_shared_from_this();
if(!self)
@@ -167,8 +198,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
bool connection<t_protocol_handler>::add_ref()
{
TRY_ENTRY();
- LOG_PRINT_L4("[sock " << socket_.native_handle() << "] add_ref");
+ //_dbg3("[sock " << socket_.native_handle() << "] add_ref, m_peer_number=" << mI->m_peer_number);
CRITICAL_REGION_LOCAL(m_self_refs_lock);
+ //_dbg3("[sock " << socket_.native_handle() << "] add_ref 2, m_peer_number=" << mI->m_peer_number);
// Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
auto self = safe_shared_from_this();
@@ -201,7 +233,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
void connection<t_protocol_handler>::call_back_starter()
{
TRY_ENTRY();
- LOG_PRINT_L2("[" << print_connection_context_short(context) << "] fired_callback");
+ _dbg2("[" << print_connection_context_short(context) << "] fired_callback");
m_protocol_handler.handle_qued_callback();
CATCH_ENTRY_L0("connection<t_protocol_handler>::call_back_starter()", void());
}
@@ -211,17 +243,44 @@ PRAGMA_WARNING_DISABLE_VS(4355)
std::size_t bytes_transferred)
{
TRY_ENTRY();
- LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async read calledback.");
+ //_info("[sock " << socket_.native_handle() << "] Async read calledback.");
if (!e)
{
- LOG_PRINT("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred, LOG_LEVEL_4);
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex);
+ m_throttle_speed_in.handle_trafic_exact(bytes_transferred);
+ context.m_current_speed_down = m_throttle_speed_in.get_current_speed();
+ }
+
+ {
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in );
+ epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024);
+ }
+ double delay=0; // will be calculated
+ do
+ {
+ { //_scope_dbg1("CRITICAL_REGION_LOCAL");
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in );
+ delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global
+ }
+
+ delay *= 0.5;
+ if (delay > 0) {
+ long int ms = (long int)(delay * 100);
+ epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms);
+ std::this_thread::sleep_for(std::chrono::milliseconds(ms));
+ }
+ } while(delay > 0);
+
+ //_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred);
+ logger_handle_net_read(bytes_transferred);
context.m_last_recv = time(NULL);
context.m_recv_cnt += bytes_transferred;
bool recv_res = m_protocol_handler.handle_recv(buffer_.data(), bytes_transferred);
if(!recv_res)
{
- LOG_PRINT("[sock " << socket_.native_handle() << "] protocol_want_close", LOG_LEVEL_4);
+ //_info("[sock " << socket_.native_handle() << "] protocol_want_close");
//some error in protocol, protocol handler ask to close connection
boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1);
@@ -239,14 +298,14 @@ PRAGMA_WARNING_DISABLE_VS(4355)
boost::bind(&connection<t_protocol_handler>::handle_read, connection<t_protocol_handler>::shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
- LOG_PRINT_L4("[sock " << socket_.native_handle() << "]Async read requested.");
+ //_info("[sock " << socket_.native_handle() << "]Async read requested.");
}
}else
{
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value());
+ _dbg3("[sock " << socket_.native_handle() << "] Some not success at read: " << e.message() << ':' << e.value());
if(e.value() != 2)
{
- LOG_PRINT_L3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value());
+ _dbg3("[sock " << socket_.native_handle() << "] Some problems at read: " << e.message() << ':' << e.value());
shutdown();
}
}
@@ -283,8 +342,91 @@ PRAGMA_WARNING_DISABLE_VS(4355)
CATCH_ENTRY_L0("connection<t_protocol_handler>::call_run_once_service_io", false);
}
//---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) {
+ TRY_ENTRY();
+
+ // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
+ auto self = safe_shared_from_this();
+ if (!self) return false;
+ if (m_was_shutdown) return false;
+ // TODO avoid copy
+
+ const double factor = 32; // TODO config
+ typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic
+ const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) );
+ const t_safe chunksize_max = chunksize_good * 2 ;
+ const bool allow_split = (m_connection_type == RPC) ? false : true; // TODO config
+
+ ASRT(! (chunksize_max<0) ); // make sure it is unsigned before removin sign with cast:
+ long long unsigned int chunksize_max_unsigned = static_cast<long long unsigned int>( chunksize_max ) ;
+
+ if (allow_split && (cb > chunksize_max_unsigned)) {
+ { // LOCK: chunking
+ epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical ***
+
+ _dbg3_c("net/out/size", "do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr);
+ t_safe all = cb; // all bytes to send
+ t_safe pos = 0; // current sending position
+ // 01234567890
+ // ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4
+ // ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8
+ // ^^^ (pos=8, len=4) ;
+
+ // const size_t bufsize = chunksize_good; // TODO safecast
+ // char* buf = new char[ bufsize ];
+
+ bool all_ok = true;
+ while (pos < all) {
+ t_safe lenall = all-pos; // length from here to end
+ t_safe len = std::min( chunksize_good , lenall); // take a smaller part
+ ASRT(len<=chunksize_good);
+ // pos=8; len=4; all=10; len=3;
+
+ ASRT(! (len<0) ); // check before we cast away sign:
+ unsigned long long int len_unsigned = static_cast<long long int>( len );
+ ASRT(len>0); // (redundand)
+ ASRT(len_unsigned < std::numeric_limits<size_t>::max()); // yeap we want strong < then max size, to be sure
+
+ void *chunk_start = ((char*)ptr) + pos;
+ _fact_c("net/out/size","chunk_start="<<chunk_start<<" ptr="<<ptr<<" pos="<<pos);
+ ASRT(chunk_start >= ptr); // not wrapped around address?
+ //std::memcpy( (void*)buf, chunk_start, len);
+
+ _dbg3_c("net/out/size", "part of " << lenall << ": pos="<<pos << " len="<<len);
+
+ bool ok = do_send_chunk(chunk_start, len); // <====== ***
+
+ all_ok = all_ok && ok;
+ if (!all_ok) {
+ _dbg1_c("net/out/size", "do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr);
+ _dbg1("do_send() SEND was aborted in middle of big package - this is mostly harmless "
+ << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << cb);
+ return false; // partial failure in sending
+ }
+ pos = pos+len; ASRT(pos >0);
+
+ // (in catch block, or uniq pointer) delete buf;
+ } // each chunk
+
+ _dbg3_c("net/out/size", "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr);
+ _dbg3 ( "do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr);
+
+ _info_c("net/sleepRPC", "do_send() m_connection_type = " << m_connection_type);
+
+ return all_ok; // done - e.g. queued - all the chunks of current do_send call
+ } // LOCK: chunking
+ } // a big block (to be chunked) - all chunks
+ else { // small block
+ return do_send_chunk(ptr,cb); // just send as 1 big chunk
+ }
+
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false);
+ } // do_send()
+
+ //---------------------------------------------------------------------------------
template<class t_protocol_handler>
- bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb)
+ bool connection<t_protocol_handler>::do_send_chunk(const void* ptr, size_t cb)
{
TRY_ENTRY();
// Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
@@ -293,50 +435,86 @@ PRAGMA_WARNING_DISABLE_VS(4355)
return false;
if(m_was_shutdown)
return false;
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex);
+ m_throttle_speed_out.handle_trafic_exact(cb);
+ context.m_current_speed_up = m_throttle_speed_out.get_current_speed();
+ }
- LOG_PRINT("[sock " << socket_.native_handle() << "] SEND " << cb, LOG_LEVEL_4);
+ //_info("[sock " << socket_.native_handle() << "] SEND " << cb);
context.m_last_send = time(NULL);
context.m_send_cnt += cb;
//some data should be wrote to stream
//request complete
- epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock);
- if(m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT)
+ sleep_before_packet(cb, 1, 1);
+ epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical ***
+ long int retry=0;
+ const long int retry_limit = 5*4;
+ while (m_send_que.size() > ABSTRACT_SERVER_SEND_QUE_MAX_COUNT)
{
- send_guard.unlock();
- LOG_PRINT_L2("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
- close();
- return false;
+ retry++;
+
+ /* if ( ::cryptonote::core::get_is_stopping() ) { // TODO re-add fast stop
+ _fact("ABORT queue wait due to stopping");
+ return false; // aborted
+ }*/
+
+ long int ms = 250 + (rand()%50);
+ _info_c("net/sleep", "Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<cb); // XXX debug sleep
+ boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) );
+ _dbg1("sleep for queue: " << ms);
+
+ if (retry > retry_limit) {
+ send_guard.unlock();
+ _erro("send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
+ // _dbg1_c("net/sleep", "send que size is more than ABSTRACT_SERVER_SEND_QUE_MAX_COUNT(" << ABSTRACT_SERVER_SEND_QUE_MAX_COUNT << "), shutting down connection");
+ close();
+ return false;
+ }
}
m_send_que.resize(m_send_que.size()+1);
m_send_que.back().assign((const char*)ptr, cb);
if(m_send_que.size() > 1)
- {
- //active operation should be in progress, nothing to do, just wait last operation callback
- }else
- {
- //no active operation
- if(m_send_que.size()!=1)
- {
- LOG_ERROR("Looks like no active operations, but send que size != 1!!");
- return false;
- }
-
- boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()),
- //strand_.wrap(
- boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2)
- //)
- );
+ { // active operation should be in progress, nothing to do, just wait last operation callback
+ auto size_now = cb;
+ _info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size());
+ //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function
LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
+ else
+ { // no active operation
+
+ if(m_send_que.size()!=1)
+ {
+ _erro("Looks like no active operations, but send que size != 1!!");
+ return false;
+ }
+
+ auto size_now = m_send_que.front().size();
+ _dbg1_c("net/out/size", "do_send() NOW SENSD: packet="<<size_now<<" B");
+ do_send_handler_write( ptr , size_now ); // (((H)))
+
+ ASRT( size_now == m_send_que.front().size() );
+ boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now ) ,
+ //strand_.wrap(
+ boost::bind(&connection<t_protocol_handler>::handle_write, self, _1, _2)
+ //)
+ );
+ //_dbg3("(chunk): " << size_now);
+ //logger_handle_net_write(size_now);
+ //_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
+ }
+
+ //do_send_handler_stop( ptr , cb ); // empty function
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false);
- }
+ } // do_send_chunk
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
bool connection<t_protocol_handler>::shutdown()
@@ -353,7 +531,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
bool connection<t_protocol_handler>::close()
{
TRY_ENTRY();
- LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Que Shutdown called.");
+ //_info("[sock " << socket_.native_handle() << "] Que Shutdown called.");
size_t send_que_size = 0;
CRITICAL_REGION_BEGIN(m_send_que_lock);
send_que_size = m_send_que.size();
@@ -376,16 +554,17 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if (e)
{
- LOG_PRINT_L1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value());
+ _dbg1("[sock " << socket_.native_handle() << "] Some problems at write: " << e.message() << ':' << e.value());
shutdown();
return;
}
-
+ logger_handle_net_write(cb);
+ sleep_before_packet(cb, 1, 1);
bool do_shutdown = false;
CRITICAL_REGION_BEGIN(m_send_que_lock);
if(m_send_que.empty())
{
- LOG_ERROR("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!");
+ _erro("[sock " << socket_.native_handle() << "] m_send_que.size() == 0 at handle_write!");
return;
}
@@ -399,10 +578,16 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}else
{
//have more data to send
- boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), m_send_que.front().size()),
- //strand_.wrap(
- boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2));
- //);
+ auto size_now = m_send_que.front().size();
+ _dbg1_c("net/out/size", "handle_write() NOW SENDS: packet="<<size_now<<" B" <<", from queue size="<<m_send_que.size());
+ do_send_handler_write_from_queue(e, m_send_que.front().size() , m_send_que.size()); // (((H)))
+ ASRT( size_now == m_send_que.front().size() );
+ boost::asio::async_write(socket_, boost::asio::buffer(m_send_que.front().data(), size_now) ,
+ // strand_.wrap(
+ boost::bind(&connection<t_protocol_handler>::handle_write, connection<t_protocol_handler>::shared_from_this(), _1, _2)
+ // )
+ );
+ //_dbg3("(normal)" << size_now);
}
CRITICAL_REGION_END();
@@ -412,6 +597,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}
CATCH_ENTRY_L0("connection<t_protocol_handler>::handle_write", void());
}
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void connection<t_protocol_handler>::setRPcStation()
+ {
+ m_connection_type = RPC;
+ _fact_c("net/sleepRPC", "set m_connection_type = RPC ");
+ }
/************************************************************************/
/* */
/************************************************************************/
@@ -420,19 +612,27 @@ PRAGMA_WARNING_DISABLE_VS(4355)
m_io_service_local_instance(new boost::asio::io_service()),
io_service_(*m_io_service_local_instance.get()),
acceptor_(io_service_),
- new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)),
- m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0)
+ m_stop_signal_sent(false), m_port(0),
+ m_sock_count(0), m_sock_number(0), m_threads_count(0),
+ m_pfilter(NULL), m_thread_index(0),
+ new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter))
{
+ create_server_type_map();
m_thread_name_prefix = "NET";
+ type = NET;
}
template<class t_protocol_handler>
- boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service):
+ boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_server_role s_type):
io_service_(extarnal_io_service),
acceptor_(io_service_),
- new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter)),
- m_stop_signal_sent(false), m_port(0), m_sockets_count(0), m_threads_count(0), m_pfilter(NULL), m_thread_index(0)
+ m_stop_signal_sent(false), m_port(0),
+ m_sock_count(0), m_sock_number(0), m_threads_count(0),
+ m_pfilter(NULL), m_thread_index(0),
+ type(NET),
+ new_connection_(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter))
{
+ create_server_type_map();
m_thread_name_prefix = "NET";
}
//---------------------------------------------------------------------------------
@@ -444,6 +644,14 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::create_server_type_map()
+ {
+ server_type_map["NET"] = t_server_role::NET;
+ server_type_map["RPC"] = t_server_role::RPC;
+ server_type_map["P2P"] = t_server_role::P2P;
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
bool boosted_tcp_server<t_protocol_handler>::init_server(uint32_t port, const std::string address)
{
TRY_ENTRY();
@@ -491,6 +699,7 @@ POP_WARNINGS
std::string thread_name = std::string("[") + m_thread_name_prefix;
thread_name += boost::to_string(local_thr_index) + "]";
log_space::log_singletone::set_thread_log_prefix(thread_name);
+ // _fact("Thread name: " << m_thread_name_prefix);
while(!m_stop_signal_sent)
{
try
@@ -499,14 +708,14 @@ POP_WARNINGS
}
catch(const std::exception& ex)
{
- LOG_ERROR("Exception at server worker thread, what=" << ex.what());
+ _erro("Exception at server worker thread, what=" << ex.what());
}
catch(...)
{
- LOG_ERROR("Exception at server worker thread, unknown execption");
+ _erro("Exception at server worker thread, unknown execption");
}
}
- LOG_PRINT_L4("Worker thread finished");
+ //_info("Worker thread finished");
return true;
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::worker_thread", false);
}
@@ -515,6 +724,9 @@ POP_WARNINGS
void boosted_tcp_server<t_protocol_handler>::set_threads_prefix(const std::string& prefix_name)
{
m_thread_name_prefix = prefix_name;
+ type = server_type_map[m_thread_name_prefix];
+ _note("Set server type to: " << type);
+ _note("Set server type to: " << m_thread_name_prefix);
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
@@ -539,32 +751,38 @@ POP_WARNINGS
{
boost::shared_ptr<boost::thread> thread(new boost::thread(
attrs, boost::bind(&boosted_tcp_server<t_protocol_handler>::worker_thread, this)));
+ _note("Run server thread name: " << m_thread_name_prefix);
m_threads.push_back(thread);
}
CRITICAL_REGION_END();
// Wait for all threads in the pool to exit.
- if(wait)
+ if (wait) // && ! ::cryptonote::core::get_is_stopping()) // TODO fast_exit
{
- for (std::size_t i = 0; i < m_threads.size(); ++i)
- m_threads[i]->join();
+ _fact("JOINING all threads");
+ for (std::size_t i = 0; i < m_threads.size(); ++i) {
+ m_threads[i]->join();
+ }
+ _fact("JOINING all threads - almost");
m_threads.clear();
+ _fact("JOINING all threads - DONE");
- }else
- {
+ }
+ else {
+ _dbg1("Reiniting OK.");
return true;
}
if(wait && !m_stop_signal_sent)
{
//some problems with the listening socket ?..
- LOG_PRINT_L0("Net service stopped without stop request, restarting...");
+ _dbg1("Net service stopped without stop request, restarting...");
if(!this->init_server(m_port, m_address))
{
- LOG_PRINT_L0("Reiniting service failed, exit.");
+ _dbg1("Reiniting service failed, exit.");
return false;
}else
{
- LOG_PRINT_L0("Reiniting OK.");
+ _dbg1("Reiniting OK.");
}
}
}
@@ -597,7 +815,7 @@ POP_WARNINGS
{
if(m_threads[i]->joinable() && !m_threads[i]->try_join_for(ms))
{
- LOG_PRINT_L0("Interrupting thread " << m_threads[i]->native_handle());
+ _dbg1("Interrupting thread " << m_threads[i]->native_handle());
m_threads[i]->interrupt();
}
}
@@ -608,6 +826,10 @@ POP_WARNINGS
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::send_stop_signal()
{
+ if (::cryptonote::core::get_fast_exit() == true)
+ {
+ detach_threads();
+ }
m_stop_signal_sent = true;
TRY_ENTRY();
io_service_.stop();
@@ -626,19 +848,22 @@ POP_WARNINGS
TRY_ENTRY();
if (!e)
{
- connection_ptr conn(std::move(new_connection_));
-
- new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter));
+ if (type == RPC) {
+ new_connection_->setRPcStation();
+ _note("New server for RPC connections");
+ }
+ connection_ptr conn(std::move(new_connection_));
+ new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this,
boost::asio::placeholders::error));
bool r = conn->start(true, 1 < m_threads_count);
if (!r)
- LOG_ERROR("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count);
+ _erro("[sock " << conn->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count);
}else
{
- LOG_ERROR("Some problems at accept: " << e.message() << ", connections_count = " << m_sockets_count);
+ _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_sock_count);
}
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::handle_accept", void());
}
@@ -648,7 +873,7 @@ POP_WARNINGS
{
TRY_ENTRY();
- connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) );
+ connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) );
boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
//////////////////////////////////////////////////////////////////////////
@@ -658,7 +883,7 @@ POP_WARNINGS
boost::asio::ip::tcp::resolver::iterator end;
if(iterator == end)
{
- LOG_ERROR("Failed to resolve " << adr);
+ _erro("Failed to resolve " << adr);
return false;
}
//////////////////////////////////////////////////////////////////////////
@@ -704,7 +929,7 @@ POP_WARNINGS
{
//timeout
sock_.close();
- LOG_PRINT_L3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")");
+ _dbg3("Failed to connect to " << adr << ":" << port << ", because of timeout (" << conn_timeout << ")");
return false;
}
}
@@ -712,21 +937,21 @@ POP_WARNINGS
if (ec || !sock_.is_open())
{
- LOG_PRINT("Some problems at connect, message: " << ec.message(), LOG_LEVEL_3);
+ _dbg3("Some problems at connect, message: " << ec.message());
return false;
}
- LOG_PRINT_L3("Connected success to " << adr << ':' << port);
+ _dbg3("Connected success to " << adr << ':' << port);
bool r = new_connection_l->start(false, 1 < m_threads_count);
if (r)
{
new_connection_l->get_context(conn_context);
- //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter));
+ //new_connection_l.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_pfilter));
}
else
{
- LOG_ERROR("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sockets_count);
+ _erro("[sock " << new_connection_->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count);
}
return r;
@@ -738,7 +963,7 @@ POP_WARNINGS
bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_callback cb, const std::string& bind_ip)
{
TRY_ENTRY();
- connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sockets_count, m_pfilter) );
+ connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter) );
boost::asio::ip::tcp::socket& sock_ = new_connection_l->socket();
//////////////////////////////////////////////////////////////////////////
@@ -748,7 +973,7 @@ POP_WARNINGS
boost::asio::ip::tcp::resolver::iterator end;
if(iterator == end)
{
- LOG_ERROR("Failed to resolve " << adr);
+ _erro("Failed to resolve " << adr);
return false;
}
//////////////////////////////////////////////////////////////////////////
@@ -768,7 +993,7 @@ POP_WARNINGS
{
if(error != boost::asio::error::operation_aborted)
{
- LOG_PRINT_L3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")");
+ _dbg3("Failed to connect to " << adr << ':' << port << ", because of timeout (" << conn_timeout << ")");
new_connection_l->socket().close();
}
});
@@ -785,7 +1010,7 @@ POP_WARNINGS
cb(conn_context, boost::asio::error::operation_aborted);//this mean that deadline timer already queued callback with cancel operation, rare situation
}else
{
- LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port <<
+ _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Connected success to " << adr << ':' << port <<
" from " << lep.address().to_string() << ':' << lep.port());
bool r = new_connection_l->start(false, 1 < m_threads_count);
if (r)
@@ -795,13 +1020,13 @@ POP_WARNINGS
}
else
{
- LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port);
+ _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection to " << adr << ':' << port);
cb(conn_context, boost::asio::error::fault);
}
}
}else
{
- LOG_PRINT_L3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port <<
+ _dbg3("[sock " << new_connection_l->socket().native_handle() << "] Failed to connect to " << adr << ':' << port <<
" from " << lep.address().to_string() << ':' << lep.port() << ": " << ec_.message() << ':' << ec_.value());
cb(conn_context, ec_);
}
@@ -809,6 +1034,15 @@ POP_WARNINGS
return true;
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
}
-}
-}
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::detach_threads()
+ {
+ for (auto thread : m_threads)
+ thread->detach();
+ }
+
+
+} // namespace
+} // namespace
PRAGMA_WARNING_POP