aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee/include/net/abstract_tcp_server2.inl
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/epee/include/net/abstract_tcp_server2.inl')
-rw-r--r--contrib/epee/include/net/abstract_tcp_server2.inl68
1 files changed, 59 insertions, 9 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl
index 8dff192b1..5855f7f86 100644
--- a/contrib/epee/include/net/abstract_tcp_server2.inl
+++ b/contrib/epee/include/net/abstract_tcp_server2.inl
@@ -52,6 +52,7 @@
#include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal()
#include "../../../../contrib/otshell_utils/utils.hpp"
+#include "../../../../src/p2p/data_logger.hpp"
using namespace nOT::nUtils; // TODO
PRAGMA_WARNING_PUSH
@@ -75,7 +76,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
connection_basic(io_service, ref_sock_count, sock_number),
m_protocol_handler(this, config, context),
m_pfilter( pfilter ),
- m_connection_type(NET)
+ m_connection_type(NET),
+ m_throttle_speed_in("speed_in", "throttle_speed_in"),
+ m_throttle_speed_out("speed_out", "throttle_speed_out")
{
_info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type);
}
@@ -162,6 +165,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
socket_.set_option( optionTos );
//_dbg1("Set ToS flag to " << tos);
+ boost::asio::ip::tcp::no_delay noDelayOption(false);
+ socket_.set_option(noDelayOption);
+
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false);
@@ -240,6 +246,32 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if (!e)
{
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex);
+ m_throttle_speed_in.handle_trafic_exact(bytes_transferred);
+ context.m_current_speed_down = m_throttle_speed_in.get_current_speed();
+ }
+
+ {
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in );
+ epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024);
+ }
+ double delay=0; // will be calculated
+ do
+ {
+ { //_scope_dbg1("CRITICAL_REGION_LOCAL");
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in );
+ delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global
+ }
+
+ delay *= 0.5;
+ if (delay > 0) {
+ long int ms = (long int)(delay * 100);
+ epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms);
+ std::this_thread::sleep_for(std::chrono::milliseconds(ms));
+ }
+ } while(delay > 0);
+
//_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred);
logger_handle_net_read(bytes_transferred);
context.m_last_recv = time(NULL);
@@ -398,6 +430,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
return false;
if(m_was_shutdown)
return false;
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex);
+ m_throttle_speed_out.handle_trafic_exact(cb);
+ context.m_current_speed_up = m_throttle_speed_out.get_current_speed();
+ }
//_info("[sock " << socket_.native_handle() << "] SEND " << cb);
context.m_last_send = time(NULL);
@@ -405,8 +442,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
//some data should be wrote to stream
//request complete
- do_send_handler_start( ptr , cb ); // (((H)))
-
+ sleep_before_packet(cb, 1, 1);
epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical ***
long int retry=0;
const long int retry_limit = 5*4;
@@ -440,8 +476,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{ // active operation should be in progress, nothing to do, just wait last operation callback
auto size_now = cb;
_info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size());
- do_send_handler_delayed( ptr , size_now ); // (((H)))
-
+ //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function
+
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
else
{ // no active operation
@@ -463,11 +500,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
//)
);
//_dbg3("(chunk): " << size_now);
- logger_handle_net_write(size_now);
+ //logger_handle_net_write(size_now);
//_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
- do_send_handler_stop( ptr , cb );
+ //do_send_handler_stop( ptr , cb ); // empty function
return true;
@@ -516,7 +553,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
shutdown();
return;
}
-
+ logger_handle_net_write(cb);
+ sleep_before_packet(cb, 1, 1);
bool do_shutdown = false;
CRITICAL_REGION_BEGIN(m_send_que_lock);
if(m_send_que.empty())
@@ -545,7 +583,6 @@ PRAGMA_WARNING_DISABLE_VS(4355)
// )
);
//_dbg3("(normal)" << size_now);
- logger_handle_net_write(size_now);
}
CRITICAL_REGION_END();
@@ -784,6 +821,10 @@ POP_WARNINGS
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::send_stop_signal()
{
+ if (::cryptonote::core::get_fast_exit() == true)
+ {
+ detach_threads();
+ }
m_stop_signal_sent = true;
TRY_ENTRY();
io_service_.stop();
@@ -988,6 +1029,15 @@ POP_WARNINGS
return true;
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
}
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::detach_threads()
+ {
+ for (auto thread : m_threads)
+ thread->detach();
+ }
+
+
} // namespace
} // namespace
PRAGMA_WARNING_POP