aboutsummaryrefslogtreecommitdiff
path: root/contrib
diff options
context:
space:
mode:
Diffstat (limited to 'contrib')
-rw-r--r--contrib/epee/include/net/abstract_tcp_server2.h9
-rw-r--r--contrib/epee/include/net/abstract_tcp_server2.inl68
-rw-r--r--contrib/epee/include/net/levin_protocol_handler_async.h26
-rw-r--r--contrib/epee/include/net/net_utils_base.h10
-rw-r--r--contrib/epee/include/storages/levin_abstract_invoke2.h4
-rw-r--r--contrib/otshell_utils/runoptions.cpp4
-rw-r--r--contrib/otshell_utils/runoptions.hpp4
-rw-r--r--contrib/otshell_utils/utils.cpp58
-rw-r--r--contrib/otshell_utils/utils.hpp11
9 files changed, 145 insertions, 49 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h
index 1e6223212..3e6ea2171 100644
--- a/contrib/epee/include/net/abstract_tcp_server2.h
+++ b/contrib/epee/include/net/abstract_tcp_server2.h
@@ -56,6 +56,7 @@
#include "syncobj.h"
#include "../../../../src/p2p/connection_basic.hpp"
#include "../../../../contrib/otshell_utils/utils.hpp"
+#include "../../../../src/p2p/network_throttle-detail.hpp"
#define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT 1000
@@ -130,6 +131,7 @@ namespace net_utils
/// Buffer for incoming data.
boost::array<char, 8192> buffer_;
+ //boost::array<char, 1024> buffer_;
t_connection_context context;
i_connection_filter* &m_pfilter;
@@ -143,6 +145,12 @@ namespace net_utils
critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk()
t_server_role m_connection_type;
+
+ // for calculate speed (last 60 sec)
+ network_throttle m_throttle_speed_in;
+ network_throttle m_throttle_speed_out;
+ std::mutex m_throttle_speed_in_mutex;
+ std::mutex m_throttle_speed_out_mutex;
public:
void setRPcStation();
@@ -285,6 +293,7 @@ namespace net_utils
critical_section m_threads_lock;
volatile uint32_t m_thread_index; // TODO change to std::atomic
t_server_role type;
+ void detach_threads();
/// The next connection to be accepted
connection_ptr new_connection_;
diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl
index 8dff192b1..5855f7f86 100644
--- a/contrib/epee/include/net/abstract_tcp_server2.inl
+++ b/contrib/epee/include/net/abstract_tcp_server2.inl
@@ -52,6 +52,7 @@
#include "../../../../src/cryptonote_core/cryptonote_core.h" // e.g. for the send_stop_signal()
#include "../../../../contrib/otshell_utils/utils.hpp"
+#include "../../../../src/p2p/data_logger.hpp"
using namespace nOT::nUtils; // TODO
PRAGMA_WARNING_PUSH
@@ -75,7 +76,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
connection_basic(io_service, ref_sock_count, sock_number),
m_protocol_handler(this, config, context),
m_pfilter( pfilter ),
- m_connection_type(NET)
+ m_connection_type(NET),
+ m_throttle_speed_in("speed_in", "throttle_speed_in"),
+ m_throttle_speed_out("speed_out", "throttle_speed_out")
{
_info_c("net/sleepRPC", "connection constructor set m_connection_type="<<m_connection_type);
}
@@ -162,6 +165,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
socket_.set_option( optionTos );
//_dbg1("Set ToS flag to " << tos);
+ boost::asio::ip::tcp::no_delay noDelayOption(false);
+ socket_.set_option(noDelayOption);
+
return true;
CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false);
@@ -240,6 +246,32 @@ PRAGMA_WARNING_DISABLE_VS(4355)
if (!e)
{
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_in_mutex);
+ m_throttle_speed_in.handle_trafic_exact(bytes_transferred);
+ context.m_current_speed_down = m_throttle_speed_in.get_current_speed();
+ }
+
+ {
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::network_throttle_manager::m_lock_get_global_throttle_in );
+ epee::net_utils::network_throttle_manager::network_throttle_manager::get_global_throttle_in().handle_trafic_exact(bytes_transferred * 1024);
+ }
+ double delay=0; // will be calculated
+ do
+ {
+ { //_scope_dbg1("CRITICAL_REGION_LOCAL");
+ CRITICAL_REGION_LOCAL( epee::net_utils::network_throttle_manager::m_lock_get_global_throttle_in );
+ delay = epee::net_utils::network_throttle_manager::get_global_throttle_in().get_sleep_time_after_tick( bytes_transferred ); // decission from global
+ }
+
+ delay *= 0.5;
+ if (delay > 0) {
+ long int ms = (long int)(delay * 100);
+ epee::net_utils::data_logger::get_instance().add_data("sleep_down", ms);
+ std::this_thread::sleep_for(std::chrono::milliseconds(ms));
+ }
+ } while(delay > 0);
+
//_info("[sock " << socket_.native_handle() << "] RECV " << bytes_transferred);
logger_handle_net_read(bytes_transferred);
context.m_last_recv = time(NULL);
@@ -398,6 +430,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
return false;
if(m_was_shutdown)
return false;
+ {
+ CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex);
+ m_throttle_speed_out.handle_trafic_exact(cb);
+ context.m_current_speed_up = m_throttle_speed_out.get_current_speed();
+ }
//_info("[sock " << socket_.native_handle() << "] SEND " << cb);
context.m_last_send = time(NULL);
@@ -405,8 +442,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
//some data should be wrote to stream
//request complete
- do_send_handler_start( ptr , cb ); // (((H)))
-
+ sleep_before_packet(cb, 1, 1);
epee::critical_region_t<decltype(m_send_que_lock)> send_guard(m_send_que_lock); // *** critical ***
long int retry=0;
const long int retry_limit = 5*4;
@@ -440,8 +476,9 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{ // active operation should be in progress, nothing to do, just wait last operation callback
auto size_now = cb;
_info_c("net/out/size", "do_send() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size());
- do_send_handler_delayed( ptr , size_now ); // (((H)))
-
+ //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function
+
+ LOG_PRINT_L4("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
else
{ // no active operation
@@ -463,11 +500,11 @@ PRAGMA_WARNING_DISABLE_VS(4355)
//)
);
//_dbg3("(chunk): " << size_now);
- logger_handle_net_write(size_now);
+ //logger_handle_net_write(size_now);
//_info("[sock " << socket_.native_handle() << "] Async send requested " << m_send_que.front().size());
}
- do_send_handler_stop( ptr , cb );
+ //do_send_handler_stop( ptr , cb ); // empty function
return true;
@@ -516,7 +553,8 @@ PRAGMA_WARNING_DISABLE_VS(4355)
shutdown();
return;
}
-
+ logger_handle_net_write(cb);
+ sleep_before_packet(cb, 1, 1);
bool do_shutdown = false;
CRITICAL_REGION_BEGIN(m_send_que_lock);
if(m_send_que.empty())
@@ -545,7 +583,6 @@ PRAGMA_WARNING_DISABLE_VS(4355)
// )
);
//_dbg3("(normal)" << size_now);
- logger_handle_net_write(size_now);
}
CRITICAL_REGION_END();
@@ -784,6 +821,10 @@ POP_WARNINGS
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::send_stop_signal()
{
+ if (::cryptonote::core::get_fast_exit() == true)
+ {
+ detach_threads();
+ }
m_stop_signal_sent = true;
TRY_ENTRY();
io_service_.stop();
@@ -988,6 +1029,15 @@ POP_WARNINGS
return true;
CATCH_ENTRY_L0("boosted_tcp_server<t_protocol_handler>::connect_async", false);
}
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ void boosted_tcp_server<t_protocol_handler>::detach_threads()
+ {
+ for (auto thread : m_threads)
+ thread->detach();
+ }
+
+
} // namespace
} // namespace
PRAGMA_WARNING_POP
diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h
index 5e5e803f5..92f75161a 100644
--- a/contrib/epee/include/net/levin_protocol_handler_async.h
+++ b/contrib/epee/include/net/levin_protocol_handler_async.h
@@ -81,7 +81,7 @@ public:
async_protocol_handler_config():m_pcommands_handler(NULL), m_max_packet_size(LEVIN_DEFAULT_MAX_PACKET_SIZE)
{}
- void del_connections(size_t count);
+ void del_out_connections(size_t count);
};
@@ -670,10 +670,30 @@ void async_protocol_handler_config<t_connection_context>::del_connection(async_p
}
//------------------------------------------------------------------------------------------
template<class t_connection_context>
-void async_protocol_handler_config<t_connection_context>::del_connections(size_t count) // TODO
+void async_protocol_handler_config<t_connection_context>::del_out_connections(size_t count)
{
+ std::vector <boost::uuids::uuid> out_connections;
CRITICAL_REGION_BEGIN(m_connects_lock);
- m_connects.clear();
+ for (auto& c: m_connects)
+ {
+ if (!c.second->m_connection_context.m_is_income)
+ out_connections.push_back(c.first);
+ }
+
+ if (out_connections.size() == 0)
+ return;
+
+ // close random out connections
+ unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
+ shuffle(out_connections.begin(), out_connections.end(), std::default_random_engine(seed));
+ while (count > 0 && out_connections.size() > 0)
+ {
+ close(*out_connections.begin());
+ del_connection(m_connects.at(*out_connections.begin()));
+ out_connections.erase(out_connections.begin());
+ --count;
+ }
+
CRITICAL_REGION_END();
}
//------------------------------------------------------------------------------------------
diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h
index 90e352787..f963e7746 100644
--- a/contrib/epee/include/net/net_utils_base.h
+++ b/contrib/epee/include/net/net_utils_base.h
@@ -55,6 +55,8 @@ namespace net_utils
time_t m_last_send;
uint64_t m_recv_cnt;
uint64_t m_send_cnt;
+ double m_current_speed_down;
+ double m_current_speed_up;
connection_context_base(boost::uuids::uuid connection_id,
long remote_ip, int remote_port, bool is_income,
@@ -68,7 +70,9 @@ namespace net_utils
m_last_recv(last_recv),
m_last_send(last_send),
m_recv_cnt(recv_cnt),
- m_send_cnt(send_cnt)
+ m_send_cnt(send_cnt),
+ m_current_speed_down(0),
+ m_current_speed_up(0)
{}
connection_context_base(): m_connection_id(),
@@ -79,7 +83,9 @@ namespace net_utils
m_last_recv(0),
m_last_send(0),
m_recv_cnt(0),
- m_send_cnt(0)
+ m_send_cnt(0),
+ m_current_speed_down(0),
+ m_current_speed_up(0)
{}
connection_context_base& operator=(const connection_context_base& a)
diff --git a/contrib/epee/include/storages/levin_abstract_invoke2.h b/contrib/epee/include/storages/levin_abstract_invoke2.h
index 1b32c51d1..73ede1b12 100644
--- a/contrib/epee/include/storages/levin_abstract_invoke2.h
+++ b/contrib/epee/include/storages/levin_abstract_invoke2.h
@@ -185,7 +185,7 @@ namespace epee
}
return res;
- };
+ }
template<class t_owner, class t_in_type, class t_context, class callback_t>
int buff_to_t_adapter(t_owner* powner, int command, const std::string& in_buff, callback_t cb, t_context& context)
@@ -199,7 +199,7 @@ namespace epee
boost::value_initialized<t_in_type> in_struct;
static_cast<t_in_type&>(in_struct).load(strg);
return cb(command, in_struct, context);
- };
+ }
#define CHAIN_LEVIN_INVOKE_MAP2(context_type) \
int invoke(int command, const std::string& in_buff, std::string& buff_out, context_type& context) \
diff --git a/contrib/otshell_utils/runoptions.cpp b/contrib/otshell_utils/runoptions.cpp
index 28e7ceb58..ffd37eae4 100644
--- a/contrib/otshell_utils/runoptions.cpp
+++ b/contrib/otshell_utils/runoptions.cpp
@@ -7,7 +7,7 @@
namespace nOT {
-INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces
+INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces
// (no debug - this is the default)
// +nodebug (no debug)
@@ -64,6 +64,6 @@ void cRunOptions::Normalize() {
cRunOptions gRunOptions; // (extern)
-}; // namespace OT
+} // namespace OT
diff --git a/contrib/otshell_utils/runoptions.hpp b/contrib/otshell_utils/runoptions.hpp
index f3306283a..219d3b509 100644
--- a/contrib/otshell_utils/runoptions.hpp
+++ b/contrib/otshell_utils/runoptions.hpp
@@ -10,7 +10,7 @@ Template for new files, replace word "template" and later delete this line here.
namespace nOT {
-INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces
+INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces
/** Global options to run this program main() Eg used for developer's special options like +setdemo +setdebug.
This is NOT for all the other options that are parsed and executed by program. */
@@ -50,7 +50,7 @@ class cRunOptions {
extern cRunOptions gRunOptions;
-}; // namespace nOT
+} // namespace nOT
diff --git a/contrib/otshell_utils/utils.cpp b/contrib/otshell_utils/utils.cpp
index 489fb3076..1d26075c4 100644
--- a/contrib/otshell_utils/utils.cpp
+++ b/contrib/otshell_utils/utils.cpp
@@ -26,8 +26,7 @@
#elif defined(__unix__) || defined(__posix) || defined(__linux) || defined(__darwin) || defined(__APPLE__) || defined(__clang__)
#define OS_TYPE_POSIX
#else
- #warning "Compiler/OS platform is not recognized"
- #warning "Just assuming it will work as POSIX then"
+ #warning "Compiler/OS platform is not recognized. Just assuming it will work as POSIX then"
#define OS_TYPE_POSIX
#endif
@@ -44,7 +43,7 @@
namespace nOT {
namespace nUtils {
-INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces
+INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces
myexception::myexception(const char * what)
: std::runtime_error(what)
@@ -78,26 +77,37 @@ std::string & trim(std::string &s) {
return ltrim(rtrim(s));
}
-std::string get_current_time()
-{
- std::stringstream stream;
- struct tm * date;
-
- std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
- time_t time_now;
- time_now = std::chrono::high_resolution_clock::to_time_t(now);
- date = std::localtime(& time_now);
-
- char date_buff[32];
- std::strftime(date_buff, sizeof(date_buff), "%d-%b-%Y %H:%M:%S.", date);
- stream << date_buff;
+std::string get_current_time() {
+ std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
+ time_t time_now = std::chrono::system_clock::to_time_t(now);
+ std::chrono::high_resolution_clock::duration duration = now.time_since_epoch();
+ int64_t micro = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
- std::chrono::high_resolution_clock::duration duration = now.time_since_epoch();
- int64_t micro = std::chrono::duration_cast<std::chrono::microseconds>(duration).count();
- micro %= 1000000;
- stream << std::setfill('0') << std::setw(3) << micro;
+ // std::localtime() - This function may not be thread-safe.
+ #ifdef OS_TYPE_WINDOWS
+ struct tm * tm_pointer = std::localtime( &time_now ); // thread-safe on mingw-w64 (thread local variable) and on MSVC btw
+ // http://stackoverflow.com/questions/18551409/localtime-r-support-on-mingw
+ // tm_pointer points to thread-local data, memory is owned/managed by the system/library
+ #else
+ // linux, freebsd, have this
+ struct tm tm_object; // automatic storage duration http://en.cppreference.com/w/cpp/language/storage_duration
+ struct tm * tm_pointer = & tm_object; // just point to our data
+ auto x = localtime_r( &time_now , tm_pointer ); // modifies our own (this thread) data in tm_object, this is safe http://linux.die.net/man/3/localtime_r
+ if (x != tm_pointer) return "(internal error in get_current_time)"; // redundant check in case of broken implementation of localtime_r
+ #endif
+ // tm_pointer now points to proper time data, and that memory is automatically managed
+ if (!tm_pointer) return "(internal error in get_current_time - NULL)"; // redundant check in case of broken implementation of used library methods
- return stream.str();
+ std::stringstream stream;
+ stream << std::setfill('0')
+ << std::setw(2) << tm_pointer->tm_year+1900
+ << '-' << std::setw(2) << tm_pointer->tm_mon+1
+ << '-' << std::setw(2) << tm_pointer->tm_mday
+ << ' ' << std::setw(2) << tm_pointer->tm_hour
+ << ':' << std::setw(2) << tm_pointer->tm_min
+ << ':' << std::setw(2) << tm_pointer->tm_sec
+ << '.' << std::setw(6) << (micro%1000000); // 6 because microseconds
+ return stream.str();
}
cNullstream g_nullstream; // extern a stream that does nothing (eats/discards data)
@@ -213,7 +223,7 @@ void cDebugScopeGuard::Assign(const string &chan, const int level, const string
mMsg=msg;
}
-}; // namespace nDetail
+} // namespace nDetail
// ====================================================================
@@ -591,10 +601,10 @@ string stringToColor(const string &hash) {
// algorthms
-}; // namespace nUtil
+} // namespace nUtil
-}; // namespace OT
+} // namespace OT
// global namespace
diff --git a/contrib/otshell_utils/utils.hpp b/contrib/otshell_utils/utils.hpp
index 6cfd11ee1..bb984320b 100644
--- a/contrib/otshell_utils/utils.hpp
+++ b/contrib/otshell_utils/utils.hpp
@@ -14,7 +14,7 @@
#endif
#ifndef CFG_WITH_TERMCOLORS
- #error "You requested to turn off terminal colors (CFG_WITH_TERMCOLORS), however currently they are hardcoded (this option to turn them off is not yet implemented)."
+ //#error "You requested to turn off terminal colors (CFG_WITH_TERMCOLORS), however currently they are hardcoded (this option to turn them off is not yet implemented)."
#endif
///Macros related to automatic deduction of class name etc;
@@ -35,7 +35,7 @@ class myexception : public std::runtime_error {
};
/// @macro Use this macro INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 as a shortcut for various using std::string etc.
-INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1; // <=== namespaces
+INJECT_OT_COMMON_USING_NAMESPACE_COMMON_1 // <=== namespaces
// ======================================================================================
/// text trimming functions (they do mutate the passes string); they trim based on std::isspace. also return it's reference again
@@ -87,6 +87,7 @@ extern std::mutex gLoggerGuard;
#define _warn(VAR) _debug_level( 90,VAR) // some problem
#define _erro(VAR) _debug_level(100,VAR) // error - report
+
#define _dbg3_c(C,VAR) _debug_level_c(C, 20,VAR)
#define _dbg2_c(C,VAR) _debug_level_c(C, 30,VAR)
#define _dbg1_c(C,VAR) _debug_level_c(C, 40,VAR) // details
@@ -140,7 +141,7 @@ class cDebugScopeGuard {
const char* DbgShortenCodeFileName(const char *s); ///< Returns a pointer to some part of the string that was given, skipping directory names, for log/debug
-}; // namespace nDetail
+} // namespace nDetail
// ========== logger ==========
@@ -423,9 +424,9 @@ class value_init {
template <class T, T INIT>
value_init<T, INIT>::value_init() : data(INIT) { }
-}; // namespace nUtils
+} // namespace nUtils
-}; // namespace nOT
+} // namespace nOT
// global namespace