aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/epee')
-rw-r--r--contrib/epee/include/net/abstract_tcp_server2.h9
-rw-r--r--contrib/epee/include/net/abstract_tcp_server2.inl68
-rw-r--r--contrib/epee/include/net/levin_protocol_handler_async.h26
-rw-r--r--contrib/epee/include/net/net_utils_base.h10
-rw-r--r--contrib/epee/include/storages/levin_abstract_invoke2.h4
5 files changed, 101 insertions, 16 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h
index 1e6223212..3e6ea2171 100644
--- a/contrib/epee/include/net/abstract_tcp_server2.h
+++ b/contrib/epee/include/net/abstract_tcp_server2.h
@@ -56,6 +56,7 @@
#include "syncobj.h"
#include "../../../../src/p2p/connection_basic.hpp"
#include "../../../../contrib/otshell_utils/utils.hpp"
+#include "../../../../src/p2p/network_throttle-detail.hpp"
#define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT 1000
@@ -130,6 +131,7 @@ namespace net_utils
/// Buffer for incoming data.
boost::array<char, 8192> buffer_;
+ //boost::array<char, 1024> buffer_;
t_connection_context context;
i_connection_filter* &m_pfilter;
@@ -143,6 +145,12 @@ namespace net_utils
critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk()
t_server_role m_connection_type;
+
+ // for calculate speed (last 60 sec)
+ network_throttle m_throttle_speed_in;
+ network_throttle m_throttle_speed_out;
+ std::mutex m_throttle_speed_in_mutex;
+ std::mutex m_throttle_speed_out_mutex;
public:
void setRPcStation();
@@ -285,6 +293,7 @@ namespace net_utils
critical_section m_threads_lock;
volatile uint32_t m_thread_index; // TODO change to std::atomic
t_server_role type;
+ void detach_threads();
/// The next connection to be accepted
connection_ptr new_connection_;
diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl
index 8dff192b1..5855f7f86 100644
--- a/contrib/epee/include/net/abstract_tcp_server2.inl
+++ b/contrib/epee/include/net/abstract_tcp_server2.inl
@@ -52,6 +52,7 @@
#include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal()
#include "../../../../contrib/otshell_utils/utils.hpp"
+#include "../../../../src/p2p/data_logger.hpp"
using namespace nOT::nUtils; // TODO
PRAGMA_WARNING_PUSH
@@ -75,7 +76,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
connection_basic(io_service, ref_sock_count, sock_number),
m_protocol_handler(this, config, context),
m_pfilter( pfilter ),
- m_connection_type(NET)
+ m_connection_type(NET),
+ m_throttle_speed_in("speed_in", "throttle_speed_in"),
+ m_throttle_speed_out("speed_out", "throttle_speed_out")
{
_info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type);
}
@@ -162,6 +165,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
socket_.set_option( optionTos );
//_dbg1("Set ToS flag to " << tos);
+ boost::asio::ip::tcp::no_delay noDelayOption(false);
+ socket_.set_option(noDelayOption);
+
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false);
@@ -240,6 +246,32 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if (!e)
{
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex);
+ m_throttle_speed_in.handle_trafic_exact(bytes_transferred);
+ context.m_current_speed_down = m_throttle_speed_in.get_current_speed();
+ }
+
+ {
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in );
+ epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024);
+ }
+ double delay=0; // will be calculated
+ do
+ {
+ { //_scope_dbg1("CRITICAL_REGION_LOCAL");
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in );
+ delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global
+ }
+
+ delay *= 0.5;
+ if (delay > 0) {
+ long int ms = (long int)(delay * 100);
+ epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms);
+ std::this_thread::sleep_for(std::chrono::milliseconds(ms));
+ }
+ } while(delay > 0);
+
//_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred);
logger_handle_net_read(bytes_transferred);
context.m_last_recv = time(NULL);
@@ -398,6 +430,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
return false;
if(m_was_shutdown)
return false;
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex);
+ m_throttle_speed_out.handle_trafic_exact(cb);
+ context.m_current_speed_up = m_throttle_speed_out.get_current_speed();
+ }
//_info("[sock " << socket_.native_handle() << "] SEND " << cb);
context.m_last_send = time(NULL);
@@ -405,8 +442,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
//some data should be wrote to stream
//request complete
- do_send_handler_start( ptr , cb ); // (((H)))
-
+ sleep_before_packet(cb, 1, 1);
epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical ***
long int retry=0;
const long int retry_limit = 5*4;
@@ -440,8 +476,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{ // active operation should be in progress, nothing to do, just wait last operation callback
auto size_now = cb;
_info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size());
- do_send_handler_delayed( ptr , size_now ); // (((H)))
-
+ //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function
+
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
else
{ // no active operation
@@ -463,11 +500,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
//)
);
//_dbg3("(chunk): " << size_now);
- logger_handle_net_write(size_now);
+ //logger_handle_net_write(size_now);
//_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
- do_send_handler_stop( ptr , cb );
+ //do_send_handler_stop( ptr , cb ); // empty function
return true;
@@ -516,7 +553,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
shutdown();
return;
}
-
+ logger_handle_net_write(cb);
+ sleep_before_packet(cb, 1, 1);
bool do_shutdown = false;
CRITICAL_REGION_BEGIN(m_send_que_lock);
if(m_send_que.empty())
@@ -545,7 +583,6 @@ PRAGMA_WARNING_DISABLE_VS(4355)
// )
);
//_dbg3("(normal)" << size_now);
- logger_handle_net_write(size_now);
}
CRITICAL_REGION_END();
@@ -784,6 +821,10 @@ POP_WARNINGS
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::send_stop_signal()
{
+ if (::cryptonote::core::get_fast_exit() == true)
+ {
+ detach_threads();
+ }
m_stop_signal_sent = true;
TRY_ENTRY();
io_service_.stop();
@@ -988,6 +1029,15 @@ POP_WARNINGS
return true;
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
}
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::detach_threads()
+ {
+ for (auto thread : m_threads)
+ thread->detach();
+ }
+
+
} // namespace
} // namespace
PRAGMA_WARNING_POP
diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h
index 5e5e803f5..92f75161a 100644
--- a/contrib/epee/include/net/levin_protocol_handler_async.h
+++ b/contrib/epee/include/net/levin_protocol_handler_async.h
@@ -81,7 +81,7 @@ public:
async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE)
{}
- void del_connections(size_t count);
+ void del_out_connections(size_t count);
};
@@ -670,10 +670,30 @@ void async_protocol_handler_config<t_connection_context>::del_connection(async_p
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
-void async_protocol_handler_config<t_connection_context>::del_connections(size_t count) // TODO
+void async_protocol_handler_config<t_connection_context>::del_out_connections(size_t count)
{
+ std::vector <boost::uuids::uuid> out_connections;
CRITICAL_REGION_BEGIN(m_connects_lock);
- m_connects.clear();
+ for (auto& c: m_connects)
+ {
+ if (!c.second->m_connection_context.m_is_income)
+ out_connections.push_back(c.first);
+ }
+
+ if (out_connections.size() == 0)
+ return;
+
+ // close random out connections
+ unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
+ shuffle(out_connections.begin(), out_connections.end(), std::default_random_engine(seed));
+ while (count > 0 && out_connections.size() > 0)
+ {
+ close(*out_connections.begin());
+ del_connection(m_connects.at(*out_connections.begin()));
+ out_connections.erase(out_connections.begin());
+ --count;
+ }
+
CRITICAL_REGION_END();
}
//------------------------------------------------------------------------------------------
diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h
index 90e352787..f963e7746 100644
--- a/contrib/epee/include/net/net_utils_base.h
+++ b/contrib/epee/include/net/net_utils_base.h
@@ -55,6 +55,8 @@ namespace net_utils
time_t m_last_send;
uint64_t m_recv_cnt;
uint64_t m_send_cnt;
+ double m_current_speed_down;
+ double m_current_speed_up;
connection_context_base(boost::uuids::uuid connection_id,
long remote_ip, int remote_port, bool is_income,
@@ -68,7 +70,9 @@ namespace net_utils
m_last_recv(last_recv),
m_last_send(last_send),
m_recv_cnt(recv_cnt),
- m_send_cnt(send_cnt)
+ m_send_cnt(send_cnt),
+ m_current_speed_down(0),
+ m_current_speed_up(0)
{}
connection_context_base(): m_connection_id(),
@@ -79,7 +83,9 @@ namespace net_utils
m_last_recv(0),
m_last_send(0),
m_recv_cnt(0),
- m_send_cnt(0)
+ m_send_cnt(0),
+ m_current_speed_down(0),
+ m_current_speed_up(0)
{}
connection_context_base& operator=(const connection_context_base& a)
diff --git a/contrib/epee/include/storages/levin_abstract_invoke2.h b/contrib/epee/include/storages/levin_abstract_invoke2.h
index 1b32c51d1..73ede1b12 100644
--- a/contrib/epee/include/storages/levin_abstract_invoke2.h
+++ b/contrib/epee/include/storages/levin_abstract_invoke2.h
@@ -185,7 +185,7 @@ namespace epee
}
return res;
- };
+ }
template<class t_owner, class t_in_type, class t_context, class callback_t>
int buff_to_t_adapter(t_owner* powner, int command, const std::string& in_buff, callback_t cb, t_context& context)
@@ -199,7 +199,7 @@ namespace epee
boost::value_initialized<t_in_type> in_struct;
static_cast<t_in_type&>(in_struct).load(strg);
return cb(command, in_struct, context);
- };
+ }
#define CHAIN_LEVIN_INVOKE_MAP2(context_type) \
int invoke(int command, const std::string& in_buff, std::string& buff_out, context_type& context) \