aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee/include/net
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/epee/include/net')
-rw-r--r--contrib/epee/include/net/levin_protocol_handler_async.h55
-rw-r--r--contrib/epee/include/net/net_utils_base.h5
2 files changed, 50 insertions, 10 deletions
diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h
index 891089be6..5ef782206 100644
--- a/contrib/epee/include/net/levin_protocol_handler_async.h
+++ b/contrib/epee/include/net/levin_protocol_handler_async.h
@@ -42,6 +42,10 @@
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net"
+#ifndef MIN_BYTES_WANTED
+#define MIN_BYTES_WANTED 512
+#endif
+
namespace epee
{
namespace levin
@@ -139,26 +143,23 @@ public:
virtual bool is_timer_started() const=0;
virtual void cancel()=0;
virtual bool cancel_timer()=0;
+ virtual void reset_timer()=0;
+ virtual void timeout_handler(const boost::system::error_code& error)=0;
};
template <class callback_t>
struct anvoke_handler: invoke_response_handler_base
{
anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
- :m_cb(cb), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
+ :m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command)
{
if(m_con.start_outer_call())
{
+ MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
- m_timer.async_wait([&con, command, cb](const boost::system::error_code& ec)
+ m_timer.async_wait([this](const boost::system::error_code& ec)
{
- if(ec == boost::asio::error::operation_aborted)
- return;
- MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command);
- std::string fake;
- cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
- con.close();
- con.finish_outer_call();
+ timeout_handler(ec);
});
m_timer_started = true;
}
@@ -171,7 +172,18 @@ public:
bool m_timer_started;
bool m_cancel_timer_called;
bool m_timer_cancelled;
+ uint64_t m_timeout;
int m_command;
+ virtual void timeout_handler(const boost::system::error_code& error)
+ {
+ if(error == boost::asio::error::operation_aborted)
+ return;
+ MINFO(m_con.get_context_ref() << "Timeout on invoke operation happened, command: " << m_command << " timeout: " << m_timeout);
+ std::string fake;
+ m_cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, m_con.get_context_ref());
+ m_con.close();
+ m_con.finish_outer_call();
+ }
virtual bool handle(int res, const std::string& buff, typename async_protocol_handler::connection_context& context)
{
if(!cancel_timer())
@@ -203,6 +215,18 @@ public:
}
return m_timer_cancelled;
}
+ virtual void reset_timer()
+ {
+ boost::system::error_code ignored_ec;
+ if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
+ {
+ m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
+ m_timer.async_wait([this](const boost::system::error_code& ec)
+ {
+ timeout_handler(ec);
+ });
+ }
+ }
};
critical_section m_invoke_response_handlers_lock;
std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
@@ -342,6 +366,13 @@ public:
if(m_cache_in_buffer.size() < m_current_head.m_cb)
{
is_continue = false;
+ if(cb >= MIN_BYTES_WANTED && !m_invoke_response_handlers.empty())
+ {
+ //async call scenario
+ boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
+ response_handler->reset_timer();
+ MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb);
+ }
break;
}
{
@@ -595,9 +626,15 @@ public:
<< ", ver=" << head.m_protocol_version);
uint64_t ticks_start = misc_utils::get_tick_count();
+ size_t prev_size = 0;
while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released)
{
+ if(m_cache_in_buffer.size() - prev_size >= MIN_BYTES_WANTED)
+ {
+ prev_size = m_cache_in_buffer.size();
+ ticks_start = misc_utils::get_tick_count();
+ }
if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout)
{
MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");
diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h
index ba59daf4f..1884412dc 100644
--- a/contrib/epee/include/net/net_utils_base.h
+++ b/contrib/epee/include/net/net_utils_base.h
@@ -128,7 +128,7 @@ namespace net_utils
case ipv4_network_address::ID:
if (!is_store)
const_cast<network_address&>(this_ref).reset(new ipv4_network_address(0, 0));
- KV_SERIALIZE(as<ipv4_network_address>());
+ KV_SERIALIZE(template as<ipv4_network_address>());
break;
default: MERROR("Unsupported network address type: " << type); return false;
}
@@ -156,6 +156,7 @@ namespace net_utils
const network_address m_remote_address;
const bool m_is_income;
const time_t m_started;
+ bool m_in_timedsync;
time_t m_last_recv;
time_t m_last_send;
uint64_t m_recv_cnt;
@@ -171,6 +172,7 @@ namespace net_utils
m_remote_address(remote_address),
m_is_income(is_income),
m_started(time(NULL)),
+ m_in_timedsync(false),
m_last_recv(last_recv),
m_last_send(last_send),
m_recv_cnt(recv_cnt),
@@ -183,6 +185,7 @@ namespace net_utils
m_remote_address(new ipv4_network_address(0,0)),
m_is_income(false),
m_started(time(NULL)),
+ m_in_timedsync(false),
m_last_recv(0),
m_last_send(0),
m_recv_cnt(0),