diff options
Diffstat (limited to '')
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.h | 9 | ||||
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.inl | 68 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_protocol_handler_async.h | 26 | ||||
-rw-r--r-- | contrib/epee/include/net/net_utils_base.h | 10 | ||||
-rw-r--r-- | contrib/epee/include/storages/levin_abstract_invoke2.h | 4 | ||||
-rw-r--r-- | contrib/otshell_utils/runoptions.cpp | 4 | ||||
-rw-r--r-- | contrib/otshell_utils/runoptions.hpp | 4 | ||||
-rw-r--r-- | contrib/otshell_utils/utils.cpp | 58 | ||||
-rw-r--r-- | contrib/otshell_utils/utils.hpp | 11 |
9 files changed, 145 insertions, 49 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index 1e6223212..3e6ea2171 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -56,6 +56,7 @@ #include "syncobj.h" #include "../../../../src/p2p/connection_basic.hpp" #include "../../../../contrib/otshell_utils/utils.hpp" +#include "../../../../src/p2p/network_throttle-detail.hpp" #define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT 1000 @@ -130,6 +131,7 @@ namespace net_utils /// Buffer for incoming data. boost::array<char, 8192> buffer_; + //boost::array<char, 1024> buffer_; t_connection_context context; i_connection_filter* &m_pfilter; @@ -143,6 +145,12 @@ namespace net_utils critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk() t_server_role m_connection_type; + + // for calculate speed (last 60 sec) + network_throttle m_throttle_speed_in; + network_throttle m_throttle_speed_out; + std::mutex m_throttle_speed_in_mutex; + std::mutex m_throttle_speed_out_mutex; public: void setRPcStation(); @@ -285,6 +293,7 @@ namespace net_utils critical_section m_threads_lock; volatile uint32_t m_thread_index; // TODO change to std::atomic t_server_role type; + void detach_threads(); /// The next connection to be accepted connection_ptr new_connection_; diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index 8dff192b1..5855f7f86 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -52,6 +52,7 @@ #include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal() #include "../../../../contrib/otshell_utils/utils.hpp" +#include "../../../../src/p2p/data_logger.hpp" using namespace nOT::nUtils; // TODO PRAGMA_WARNING_PUSH @@ -75,7 +76,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) connection_basic(io_service, ref_sock_count, sock_number), m_protocol_handler(this, config, context), m_pfilter( pfilter ), - m_connection_type(NET) + m_connection_type(NET), + m_throttle_speed_in("speed_in", "throttle_speed_in"), + m_throttle_speed_out("speed_out", "throttle_speed_out") { _info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type); } @@ -162,6 +165,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) socket_.set_option( optionTos ); //_dbg1("Set ToS flag to " << tos); + boost::asio::ip::tcp::no_delay noDelayOption(false); + socket_.set_option(noDelayOption); + return true; CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false); @@ -240,6 +246,32 @@ PRAGMA_WARNING_DISABLE_VS(4355) if (!e) { + { + CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex); + m_throttle_speed_in.handle_trafic_exact(bytes_transferred); + context.m_current_speed_down = m_throttle_speed_in.get_current_speed(); + } + + { + CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in ); + epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024); + } + double delay=0; // will be calculated + do + { + { //_scope_dbg1("CRITICAL_REGION_LOCAL"); + CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in ); + delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global + } + + delay *= 0.5; + if (delay > 0) { + long int ms = (long int)(delay * 100); + epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms); + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + } + } while(delay > 0); + //_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred); logger_handle_net_read(bytes_transferred); context.m_last_recv = time(NULL); @@ -398,6 +430,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) return false; if(m_was_shutdown) return false; + { + CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex); + m_throttle_speed_out.handle_trafic_exact(cb); + context.m_current_speed_up = m_throttle_speed_out.get_current_speed(); + } //_info("[sock " << socket_.native_handle() << "] SEND " << cb); context.m_last_send = time(NULL); @@ -405,8 +442,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) //some data should be wrote to stream //request complete - do_send_handler_start( ptr , cb ); // (((H))) - + sleep_before_packet(cb, 1, 1); epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical *** long int retry=0; const long int retry_limit = 5*4; @@ -440,8 +476,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) { // active operation should be in progress, nothing to do, just wait last operation callback auto size_now = cb; _info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size()); - do_send_handler_delayed( ptr , size_now ); // (((H))) - + //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function + + LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size()); } else { // no active operation @@ -463,11 +500,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) //) ); //_dbg3("(chunk): " << size_now); - logger_handle_net_write(size_now); + //logger_handle_net_write(size_now); //_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size()); } - do_send_handler_stop( ptr , cb ); + //do_send_handler_stop( ptr , cb ); // empty function return true; @@ -516,7 +553,8 @@ PRAGMA_WARNING_DISABLE_VS(4355) shutdown(); return; } - + logger_handle_net_write(cb); + sleep_before_packet(cb, 1, 1); bool do_shutdown = false; CRITICAL_REGION_BEGIN(m_send_que_lock); if(m_send_que.empty()) @@ -545,7 +583,6 @@ PRAGMA_WARNING_DISABLE_VS(4355) // ) ); //_dbg3("(normal)" << size_now); - logger_handle_net_write(size_now); } CRITICAL_REGION_END(); @@ -784,6 +821,10 @@ POP_WARNINGS template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::send_stop_signal() { + if (::cryptonote::core::get_fast_exit() == true) + { + detach_threads(); + } m_stop_signal_sent = true; TRY_ENTRY(); io_service_.stop(); @@ -988,6 +1029,15 @@ POP_WARNINGS return true; CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false); } + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + void boosted_tcp_server<t_protocol_handler>::detach_threads() + { + for (auto thread : m_threads) + thread->detach(); + } + + } // namespace } // namespace PRAGMA_WARNING_POP diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index 5e5e803f5..92f75161a 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -81,7 +81,7 @@ public: async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE) {} - void del_connections(size_t count); + void del_out_connections(size_t count); }; @@ -670,10 +670,30 @@ void async_protocol_handler_config<t_connection_context>::del_connection(async_p } //------------------------------------------------------------------------------------------ template<class t_connection_context> -void async_protocol_handler_config<t_connection_context>::del_connections(size_t count) // TODO +void async_protocol_handler_config<t_connection_context>::del_out_connections(size_t count) { + std::vector <boost::uuids::uuid> out_connections; CRITICAL_REGION_BEGIN(m_connects_lock); - m_connects.clear(); + for (auto& c: m_connects) + { + if (!c.second->m_connection_context.m_is_income) + out_connections.push_back(c.first); + } + + if (out_connections.size() == 0) + return; + + // close random out connections + unsigned seed = std::chrono::system_clock::now().time_since_epoch().count(); + shuffle(out_connections.begin(), out_connections.end(), std::default_random_engine(seed)); + while (count > 0 && out_connections.size() > 0) + { + close(*out_connections.begin()); + del_connection(m_connects.at(*out_connections.begin())); + out_connections.erase(out_connections.begin()); + --count; + } + CRITICAL_REGION_END(); } //------------------------------------------------------------------------------------------ diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h index 90e352787..f963e7746 100644 --- a/contrib/epee/include/net/net_utils_base.h +++ b/contrib/epee/include/net/net_utils_base.h @@ -55,6 +55,8 @@ namespace net_utils time_t m_last_send; uint64_t m_recv_cnt; uint64_t m_send_cnt; + double m_current_speed_down; + double m_current_speed_up; connection_context_base(boost::uuids::uuid connection_id, long remote_ip, int remote_port, bool is_income, @@ -68,7 +70,9 @@ namespace net_utils m_last_recv(last_recv), m_last_send(last_send), m_recv_cnt(recv_cnt), - m_send_cnt(send_cnt) + m_send_cnt(send_cnt), + m_current_speed_down(0), + m_current_speed_up(0) {} connection_context_base(): m_connection_id(), @@ -79,7 +83,9 @@ namespace net_utils m_last_recv(0), m_last_send(0), m_recv_cnt(0), - m_send_cnt(0) + m_send_cnt(0), + m_current_speed_down(0), + m_current_speed_up(0) {} connection_context_base& operator=(const connection_context_base& a) diff --git a/contrib/epee/include/storages/levin_abstract_invoke2.h b/contrib/epee/include/storages/levin_abstract_invoke2.h index 1b32c51d1..73ede1b12 100644 --- a/contrib/epee/include/storages/levin_abstract_invoke2.h +++ b/contrib/epee/include/storages/levin_abstract_invoke2.h @@ -185,7 +185,7 @@ namespace epee } return res; - }; + } template<class t_owner, class t_in_type, class t_context, class callback_t> int buff_to_t_adapter(t_owner* powner, int command, const std::string& in_buff, callback_t cb, t_context& context) @@ -199,7 +199,7 @@ namespace epee boost::value_initialized<t_in_type> in_struct; static_cast<t_in_type&>(in_struct).load(strg); return cb(command, in_struct, context); - }; + } #define CHAIN_LEVIN_INVOKE_MAP2(context_type) \ int invoke(int command, const std::string& in_buff, std::string& buff_out, context_type& context) \ diff --git a/contrib/otshell_utils/runoptions.cpp b/contrib/otshell_utils/runoptions.cpp index 28e7ceb58..ffd37eae4 100644 --- a/contrib/otshell_utils/runoptions.cpp +++ b/contrib/otshell_utils/runoptions.cpp @@ -7,7 +7,7 @@ namespace nOT { -INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces +INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces // (no debug - this is the default) // +nodebug (no debug) @@ -64,6 +64,6 @@ void cRunOptions::Normalize() { cRunOptions gRunOptions; // (extern) -}; // namespace OT +} // namespace OT diff --git a/contrib/otshell_utils/runoptions.hpp b/contrib/otshell_utils/runoptions.hpp index f3306283a..219d3b509 100644 --- a/contrib/otshell_utils/runoptions.hpp +++ b/contrib/otshell_utils/runoptions.hpp @@ -10,7 +10,7 @@ Template for new files, replace word "template" and later delete this line here. namespace nOT { -INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces +INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces /** Global options to run this program main() Eg used for developer's special options like +setdemo +setdebug. This is NOT for all the other options that are parsed and executed by program. */ @@ -50,7 +50,7 @@ class cRunOptions { extern cRunOptions gRunOptions; -}; // namespace nOT +} // namespace nOT diff --git a/contrib/otshell_utils/utils.cpp b/contrib/otshell_utils/utils.cpp index 489fb3076..1d26075c4 100644 --- a/contrib/otshell_utils/utils.cpp +++ b/contrib/otshell_utils/utils.cpp @@ -26,8 +26,7 @@ #elif defined(__unix__) || defined(__posix) || defined(__linux) || defined(__darwin) || defined(__APPLE__) || defined(__clang__) #define OS_TYPE_POSIX #else - #warning "Compiler/OS platform is not recognized" - #warning "Just assuming it will work as POSIX then" + #warning "Compiler/OS platform is not recognized. Just assuming it will work as POSIX then" #define OS_TYPE_POSIX #endif @@ -44,7 +43,7 @@ namespace nOT { namespace nUtils { -INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces +INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces myexception::myexception(const char * what) : std::runtime_error(what) @@ -78,26 +77,37 @@ std::string & trim(std::string &s) { return ltrim(rtrim(s)); } -std::string get_current_time() -{ - std::stringstream stream; - struct tm * date; - - std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); - time_t time_now; - time_now = std::chrono::high_resolution_clock::to_time_t(now); - date = std::localtime(& time_now); - - char date_buff[32]; - std::strftime(date_buff, sizeof(date_buff), "%d-%b-%Y %H:%M:%S.", date); - stream << date_buff; +std::string get_current_time() { + std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); + time_t time_now = std::chrono::system_clock::to_time_t(now); + std::chrono::high_resolution_clock::duration duration = now.time_since_epoch(); + int64_t micro = std::chrono::duration_cast<std::chrono::microseconds>(duration).count(); - std::chrono::high_resolution_clock::duration duration = now.time_since_epoch(); - int64_t micro = std::chrono::duration_cast<std::chrono::microseconds>(duration).count(); - micro %= 1000000; - stream << std::setfill('0') << std::setw(3) << micro; + // std::localtime() - This function may not be thread-safe. + #ifdef OS_TYPE_WINDOWS + struct tm * tm_pointer = std::localtime( &time_now ); // thread-safe on mingw-w64 (thread local variable) and on MSVC btw + // http://stackoverflow.com/questions/18551409/localtime-r-support-on-mingw + // tm_pointer points to thread-local data, memory is owned/managed by the system/library + #else + // linux, freebsd, have this + struct tm tm_object; // automatic storage duration http://en.cppreference.com/w/cpp/language/storage_duration + struct tm * tm_pointer = & tm_object; // just point to our data + auto x = localtime_r( &time_now , tm_pointer ); // modifies our own (this thread) data in tm_object, this is safe http://linux.die.net/man/3/localtime_r + if (x != tm_pointer) return "(internal error in get_current_time)"; // redundant check in case of broken implementation of localtime_r + #endif + // tm_pointer now points to proper time data, and that memory is automatically managed + if (!tm_pointer) return "(internal error in get_current_time - NULL)"; // redundant check in case of broken implementation of used library methods - return stream.str(); + std::stringstream stream; + stream << std::setfill('0') + << std::setw(2) << tm_pointer->tm_year+1900 + << '-' << std::setw(2) << tm_pointer->tm_mon+1 + << '-' << std::setw(2) << tm_pointer->tm_mday + << ' ' << std::setw(2) << tm_pointer->tm_hour + << ':' << std::setw(2) << tm_pointer->tm_min + << ':' << std::setw(2) << tm_pointer->tm_sec + << '.' << std::setw(6) << (micro%1000000); // 6 because microseconds + return stream.str(); } cNullstream g_nullstream; // extern a stream that does nothing (eats/discards data) @@ -213,7 +223,7 @@ void cDebugScopeGuard::Assign(const string &chan, const int level, const string mMsg=msg; } -}; // namespace nDetail +} // namespace nDetail // ==================================================================== @@ -591,10 +601,10 @@ string stringToColor(const string &hash) { // algorthms -}; // namespace nUtil +} // namespace nUtil -}; // namespace OT +} // namespace OT // global namespace diff --git a/contrib/otshell_utils/utils.hpp b/contrib/otshell_utils/utils.hpp index 6cfd11ee1..bb984320b 100644 --- a/contrib/otshell_utils/utils.hpp +++ b/contrib/otshell_utils/utils.hpp @@ -14,7 +14,7 @@ #endif #ifndef CFG_WITH_TERMCOLORS - #error "You requested to turn off terminal colors (CFG_WITH_TERMCOLORS), however currently they are hardcoded (this option to turn them off is not yet implemented)." + //#error "You requested to turn off terminal colors (CFG_WITH_TERMCOLORS), however currently they are hardcoded (this option to turn them off is not yet implemented)." #endif ///Macros related to automatic deduction of class name etc; @@ -35,7 +35,7 @@ class myexception : public std::runtime_error { }; /// @macro Use this macro INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 as a shortcut for various using std::string etc. -INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces +INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces // ====================================================================================== /// text trimming functions (they do mutate the passes string); they trim based on std::isspace. also return it's reference again @@ -87,6 +87,7 @@ extern std::mutex gLoggerGuard; #define _warn(VAR) _debug_level( 90,VAR) // some problem #define _erro(VAR) _debug_level(100,VAR) // error - report + #define _dbg3_c(C,VAR) _debug_level_c(C, 20,VAR) #define _dbg2_c(C,VAR) _debug_level_c(C, 30,VAR) #define _dbg1_c(C,VAR) _debug_level_c(C, 40,VAR) // details @@ -140,7 +141,7 @@ class cDebugScopeGuard { const char* DbgShortenCodeFileName(const char *s); ///< Returns a pointer to some part of the string that was given, skipping directory names, for log/debug -}; // namespace nDetail +} // namespace nDetail // ========== logger ========== @@ -423,9 +424,9 @@ class value_init { template <class T, T INIT> value_init<T, INIT>::value_init() : data(INIT) { } -}; // namespace nUtils +} // namespace nUtils -}; // namespace nOT +} // namespace nOT // global namespace |