aboutsummaryrefslogtreecommitdiff
path: root/contrib
diff options
context:
space:
mode:
Diffstat (limited to 'contrib')
-rw-r--r--contrib/epee/include/console_handler.h6
-rw-r--r--contrib/epee/include/net/levin_protocol_handler_async.h55
-rw-r--r--contrib/epee/include/net/net_utils_base.h8
-rw-r--r--contrib/epee/include/readline_buffer.h4
-rw-r--r--contrib/epee/src/readline_buffer.cpp13
5 files changed, 66 insertions, 20 deletions
diff --git a/contrib/epee/include/console_handler.h b/contrib/epee/include/console_handler.h
index bb20faa65..6832f2ea1 100644
--- a/contrib/epee/include/console_handler.h
+++ b/contrib/epee/include/console_handler.h
@@ -315,7 +315,11 @@ namespace epee
if (!m_prompt.empty())
{
#ifdef HAVE_READLINE
- m_stdin_reader.get_readline_buffer().set_prompt(m_prompt);
+ std::string color_prompt = "\001\033[1;33m\002" + m_prompt;
+ if (' ' != m_prompt.back())
+ color_prompt += " ";
+ color_prompt += "\001\033[0m\002";
+ m_stdin_reader.get_readline_buffer().set_prompt(color_prompt);
#else
epee::set_console_color(epee::console_color_yellow, true);
std::cout << m_prompt;
diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h
index 891089be6..5ef782206 100644
--- a/contrib/epee/include/net/levin_protocol_handler_async.h
+++ b/contrib/epee/include/net/levin_protocol_handler_async.h
@@ -42,6 +42,10 @@
#undef MONERO_DEFAULT_LOG_CATEGORY
#define MONERO_DEFAULT_LOG_CATEGORY "net"
+#ifndef MIN_BYTES_WANTED
+#define MIN_BYTES_WANTED 512
+#endif
+
namespace epee
{
namespace levin
@@ -139,26 +143,23 @@ public:
virtual bool is_timer_started() const=0;
virtual void cancel()=0;
virtual bool cancel_timer()=0;
+ virtual void reset_timer()=0;
+ virtual void timeout_handler(const boost::system::error_code& error)=0;
};
template <class callback_t>
struct anvoke_handler: invoke_response_handler_base
{
anvoke_handler(const callback_t& cb, uint64_t timeout, async_protocol_handler& con, int command)
- :m_cb(cb), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
+ :m_cb(cb), m_timeout(timeout), m_con(con), m_timer(con.m_pservice_endpoint->get_io_service()), m_timer_started(false),
m_cancel_timer_called(false), m_timer_cancelled(false), m_command(command)
{
if(m_con.start_outer_call())
{
+ MDEBUG(con.get_context_ref() << "anvoke_handler, timeout: " << timeout);
m_timer.expires_from_now(boost::posix_time::milliseconds(timeout));
- m_timer.async_wait([&con, command, cb](const boost::system::error_code& ec)
+ m_timer.async_wait([this](const boost::system::error_code& ec)
{
- if(ec == boost::asio::error::operation_aborted)
- return;
- MINFO(con.get_context_ref() << "Timeout on invoke operation happened, command: " << command);
- std::string fake;
- cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, con.get_context_ref());
- con.close();
- con.finish_outer_call();
+ timeout_handler(ec);
});
m_timer_started = true;
}
@@ -171,7 +172,18 @@ public:
bool m_timer_started;
bool m_cancel_timer_called;
bool m_timer_cancelled;
+ uint64_t m_timeout;
int m_command;
+ virtual void timeout_handler(const boost::system::error_code& error)
+ {
+ if(error == boost::asio::error::operation_aborted)
+ return;
+ MINFO(m_con.get_context_ref() << "Timeout on invoke operation happened, command: " << m_command << " timeout: " << m_timeout);
+ std::string fake;
+ m_cb(LEVIN_ERROR_CONNECTION_TIMEDOUT, fake, m_con.get_context_ref());
+ m_con.close();
+ m_con.finish_outer_call();
+ }
virtual bool handle(int res, const std::string& buff, typename async_protocol_handler::connection_context& context)
{
if(!cancel_timer())
@@ -203,6 +215,18 @@ public:
}
return m_timer_cancelled;
}
+ virtual void reset_timer()
+ {
+ boost::system::error_code ignored_ec;
+ if (!m_cancel_timer_called && m_timer.cancel(ignored_ec) > 0)
+ {
+ m_timer.expires_from_now(boost::posix_time::milliseconds(m_timeout));
+ m_timer.async_wait([this](const boost::system::error_code& ec)
+ {
+ timeout_handler(ec);
+ });
+ }
+ }
};
critical_section m_invoke_response_handlers_lock;
std::list<boost::shared_ptr<invoke_response_handler_base> > m_invoke_response_handlers;
@@ -342,6 +366,13 @@ public:
if(m_cache_in_buffer.size() < m_current_head.m_cb)
{
is_continue = false;
+ if(cb >= MIN_BYTES_WANTED && !m_invoke_response_handlers.empty())
+ {
+ //async call scenario
+ boost::shared_ptr<invoke_response_handler_base> response_handler = m_invoke_response_handlers.front();
+ response_handler->reset_timer();
+ MDEBUG(m_connection_context << "LEVIN_PACKET partial msg received. len=" << cb);
+ }
break;
}
{
@@ -595,9 +626,15 @@ public:
<< ", ver=" << head.m_protocol_version);
uint64_t ticks_start = misc_utils::get_tick_count();
+ size_t prev_size = 0;
while(!boost::interprocess::ipcdetail::atomic_read32(&m_invoke_buf_ready) && !m_deletion_initiated && !m_protocol_released)
{
+ if(m_cache_in_buffer.size() - prev_size >= MIN_BYTES_WANTED)
+ {
+ prev_size = m_cache_in_buffer.size();
+ ticks_start = misc_utils::get_tick_count();
+ }
if(misc_utils::get_tick_count() - ticks_start > m_config.m_invoke_timeout)
{
MWARNING(m_connection_context << "invoke timeout (" << m_config.m_invoke_timeout << "), closing connection ");
diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h
index ad0faa2a9..3bea11985 100644
--- a/contrib/epee/include/net/net_utils_base.h
+++ b/contrib/epee/include/net/net_utils_base.h
@@ -115,10 +115,9 @@ namespace net_utils
std::string host_str() const { return (*this) ? (*this)->host_str() : "<none>"; }
bool is_loopback() const { return (*this)->is_loopback(); }
bool is_local() const { return (*this)->is_local(); }
- const std::type_info &type() const { return typeid(**this); }
uint8_t get_type_id() const { return (*this)->get_type_id(); }
- template<typename Type> Type &as() { if (type() != typeid(Type)) throw std::runtime_error("Bad type"); return *(Type*)get(); }
- template<typename Type> const Type &as() const { if (type() != typeid(Type)) throw std::runtime_error("Bad type"); return *(const Type*)get(); }
+ template<typename Type> Type &as() { if (get_type_id() != Type::ID) throw std::runtime_error("Bad type"); return *(Type*)get(); }
+ template<typename Type> const Type &as() const { if (get_type_id() != Type::ID) throw std::runtime_error("Bad type"); return *(const Type*)get(); }
BEGIN_KV_SERIALIZE_MAP()
uint8_t type = is_store ? this_ref.get_type_id() : 0;
@@ -156,6 +155,7 @@ namespace net_utils
const network_address m_remote_address;
const bool m_is_income;
const time_t m_started;
+ bool m_in_timedsync;
time_t m_last_recv;
time_t m_last_send;
uint64_t m_recv_cnt;
@@ -171,6 +171,7 @@ namespace net_utils
m_remote_address(remote_address),
m_is_income(is_income),
m_started(time(NULL)),
+ m_in_timedsync(false),
m_last_recv(last_recv),
m_last_send(last_send),
m_recv_cnt(recv_cnt),
@@ -183,6 +184,7 @@ namespace net_utils
m_remote_address(new ipv4_network_address(0,0)),
m_is_income(false),
m_started(time(NULL)),
+ m_in_timedsync(false),
m_last_recv(0),
m_last_send(0),
m_recv_cnt(0),
diff --git a/contrib/epee/include/readline_buffer.h b/contrib/epee/include/readline_buffer.h
index 7d929bc4c..916d14f01 100644
--- a/contrib/epee/include/readline_buffer.h
+++ b/contrib/epee/include/readline_buffer.h
@@ -13,11 +13,11 @@ namespace rdln
void start();
void stop();
int process();
- bool is_running()
+ bool is_running() const
{
return m_cout_buf != NULL;
}
- void get_line(std::string& line);
+ void get_line(std::string& line) const;
void set_prompt(const std::string& prompt);
protected:
diff --git a/contrib/epee/src/readline_buffer.cpp b/contrib/epee/src/readline_buffer.cpp
index 68b739db9..d38afd296 100644
--- a/contrib/epee/src/readline_buffer.cpp
+++ b/contrib/epee/src/readline_buffer.cpp
@@ -39,7 +39,7 @@ rdln::suspend_readline::~suspend_readline()
}
rdln::readline_buffer::readline_buffer()
-: std::stringbuf()
+: std::stringbuf(), m_cout_buf(NULL)
{
current = this;
}
@@ -62,7 +62,7 @@ void rdln::readline_buffer::stop()
remove_line_handler();
}
-void rdln::readline_buffer::get_line(std::string& line)
+void rdln::readline_buffer::get_line(std::string& line) const
{
std::unique_lock<std::mutex> lock(line_mutex);
have_line.wait(lock);
@@ -122,7 +122,7 @@ static int process_input()
struct timeval t;
t.tv_sec = 0;
- t.tv_usec = 0;
+ t.tv_usec = 1000;
FD_ZERO(&fds);
FD_SET(STDIN_FILENO, &fds);
@@ -168,8 +168,11 @@ static int handle_enter(int x, int y)
}
free(line);
- rl_set_prompt(last_prompt.c_str());
- rl_redisplay();
+ if(last_line != "exit")
+ {
+ rl_set_prompt(last_prompt.c_str());
+ rl_redisplay();
+ }
rl_done = 1;
return 0;