aboutsummaryrefslogtreecommitdiff
path: root/contrib/epee/include/net/abstract_tcp_server2.inl
diff options
context:
space:
mode:
authorRiccardo Spagni <ric@spagni.net>2019-01-29 16:47:17 +0200
committerRiccardo Spagni <ric@spagni.net>2019-01-29 16:47:17 +0200
commit31bdf7bd113c2576fe579ef3a25a2d8fef419ffc (patch)
treeb48a2503e55908b993dbc388c67fefda35ac7285 /contrib/epee/include/net/abstract_tcp_server2.inl
parentMerge pull request #5104 (diff)
parentAdding initial support for broadcasting transactions over Tor (diff)
downloadmonero-31bdf7bd113c2576fe579ef3a25a2d8fef419ffc.tar.xz
Merge pull request #4988
973403bc Adding initial support for broadcasting transactions over Tor (Lee Clagett)
Diffstat (limited to 'contrib/epee/include/net/abstract_tcp_server2.inl')
-rw-r--r--contrib/epee/include/net/abstract_tcp_server2.inl136
1 files changed, 92 insertions, 44 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl
index 457ee2dd4..9c89a18cf 100644
--- a/contrib/epee/include/net/abstract_tcp_server2.inl
+++ b/contrib/epee/include/net/abstract_tcp_server2.inl
@@ -40,6 +40,7 @@
#include <boost/asio/deadline_timer.hpp>
#include <boost/date_time/posix_time/posix_time.hpp> // TODO
#include <boost/thread/condition_variable.hpp> // TODO
+#include <boost/make_shared.hpp>
#include "warnings.h"
#include "string_tools.h"
#include "misc_language.h"
@@ -62,6 +63,13 @@ namespace epee
{
namespace net_utils
{
+ template<typename T>
+ T& check_and_get(boost::shared_ptr<T>& ptr)
+ {
+ CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null");
+ return *ptr;
+ }
+
/************************************************************************/
/* */
/************************************************************************/
@@ -69,25 +77,31 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
connection<t_protocol_handler>::connection( boost::asio::io_service& io_service,
- typename t_protocol_handler::config_type& config,
- std::atomic<long> &ref_sock_count, // the ++/-- counter
- std::atomic<long> &sock_number, // the only increasing ++ number generator
- i_connection_filter* &pfilter
- ,t_connection_type connection_type
+ boost::shared_ptr<shared_state> state,
+ t_connection_type connection_type
+ )
+ : connection(boost::asio::ip::tcp::socket{io_service}, std::move(state), connection_type)
+ {
+ }
+
+ template<class t_protocol_handler>
+ connection<t_protocol_handler>::connection( boost::asio::ip::tcp::socket&& sock,
+ boost::shared_ptr<shared_state> state,
+ t_connection_type connection_type
)
:
- connection_basic(io_service, ref_sock_count, sock_number),
- m_protocol_handler(this, config, context),
- m_pfilter( pfilter ),
+ connection_basic(std::move(sock), state),
+ m_protocol_handler(this, check_and_get(state).config, context),
m_connection_type( connection_type ),
m_throttle_speed_in("speed_in", "throttle_speed_in"),
m_throttle_speed_out("speed_out", "throttle_speed_out"),
- m_timer(io_service),
+ m_timer(socket_.get_io_service()),
m_local(false),
m_ready_to_close(false)
{
MDEBUG("test, connection constructor set m_connection_type="<<m_connection_type);
}
+
PRAGMA_WARNING_DISABLE_VS(4355)
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
@@ -127,34 +141,44 @@ PRAGMA_WARNING_DISABLE_VS(4355)
{
TRY_ENTRY();
+ boost::system::error_code ec;
+ auto remote_ep = socket_.remote_endpoint(ec);
+ CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get remote endpoint: " << ec.message() << ':' << ec.value());
+ CHECK_AND_NO_ASSERT_MES(remote_ep.address().is_v4(), false, "IPv6 not supported here");
+
+ const unsigned long ip_{boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong())};
+ return start(is_income, is_multithreaded, ipv4_network_address{uint32_t(ip_), remote_ep.port()});
+ CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false);
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
+ bool connection<t_protocol_handler>::start(bool is_income, bool is_multithreaded, network_address real_remote)
+ {
+ TRY_ENTRY();
+
// Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted
auto self = safe_shared_from_this();
if(!self)
return false;
m_is_multithreaded = is_multithreaded;
+ m_local = real_remote.is_loopback() || real_remote.is_local();
- boost::system::error_code ec;
- auto remote_ep = socket_.remote_endpoint(ec);
- CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get remote endpoint: " << ec.message() << ':' << ec.value());
- CHECK_AND_NO_ASSERT_MES(remote_ep.address().is_v4(), false, "IPv6 not supported here");
+ // create a random uuid, we don't need crypto strength here
+ const boost::uuids::uuid random_uuid = boost::uuids::random_generator()();
+
+ context = t_connection_context{};
+ context.set_details(random_uuid, std::move(real_remote), is_income);
+ boost::system::error_code ec;
auto local_ep = socket_.local_endpoint(ec);
CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get local endpoint: " << ec.message() << ':' << ec.value());
- context = boost::value_initialized<t_connection_context>();
- const unsigned long ip_{boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong())};
- m_local = epee::net_utils::is_ip_loopback(ip_) || epee::net_utils::is_ip_local(ip_);
-
- // create a random uuid, we don't need crypto strength here
- const boost::uuids::uuid random_uuid = boost::uuids::random_generator()();
-
- context.set_details(random_uuid, epee::net_utils::ipv4_network_address(ip_, remote_ep.port()), is_income);
_dbg3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) <<
" to " << local_ep.address().to_string() << ':' << local_ep.port() <<
- ", total sockets objects " << m_ref_sock_count);
+ ", total sockets objects " << get_stats().sock_count);
- if(m_pfilter && !m_pfilter->is_remote_host_allowed(context.m_remote_address))
+ if(static_cast<shared_state&>(get_stats()).pfilter && !static_cast<shared_state&>(get_stats()).pfilter->is_remote_host_allowed(context.m_remote_address))
{
_dbg2("[sock " << socket_.native_handle() << "] host denied " << context.m_remote_address.host_str() << ", shutdowning connection");
close();
@@ -279,7 +303,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
}
MDEBUG(" connection type " << to_string( m_connection_type ) << " "
<< socket_.local_endpoint().address().to_string() << ":" << socket_.local_endpoint().port()
- << " <--> " << address << ":" << port);
+ << " <--> " << context.m_remote_address.str() << " (via " << address << ":" << port << ")");
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
@@ -784,12 +808,14 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
boosted_tcp_server<t_protocol_handler>::boosted_tcp_server( t_connection_type connection_type ) :
+ m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()),
m_io_service_local_instance(new boost::asio::io_service()),
io_service_(*m_io_service_local_instance.get()),
acceptor_(io_service_),
+ default_remote(),
m_stop_signal_sent(false), m_port(0),
- m_sock_count(0), m_sock_number(0), m_threads_count(0),
- m_pfilter(NULL), m_thread_index(0),
+ m_threads_count(0),
+ m_thread_index(0),
m_connection_type( connection_type ),
new_connection_()
{
@@ -799,11 +825,13 @@ PRAGMA_WARNING_DISABLE_VS(4355)
template<class t_protocol_handler>
boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) :
+ m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()),
io_service_(extarnal_io_service),
acceptor_(io_service_),
- m_stop_signal_sent(false), m_port(0),
- m_sock_count(0), m_sock_number(0), m_threads_count(0),
- m_pfilter(NULL), m_thread_index(0),
+ default_remote(),
+ m_stop_signal_sent(false), m_port(0),
+ m_threads_count(0),
+ m_thread_index(0),
m_connection_type(connection_type),
new_connection_()
{
@@ -844,7 +872,7 @@ PRAGMA_WARNING_DISABLE_VS(4355)
boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint();
m_port = binded_endpoint.port();
MDEBUG("start accept");
- new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type));
+ new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this,
boost::asio::placeholders::error));
@@ -922,7 +950,8 @@ POP_WARNINGS
template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::set_connection_filter(i_connection_filter* pfilter)
{
- m_pfilter = pfilter;
+ assert(m_state != nullptr); // always set in constructor
+ m_state->pfilter = pfilter;
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
@@ -1030,12 +1059,6 @@ POP_WARNINGS
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
- bool boosted_tcp_server<t_protocol_handler>::is_stop_signal_sent()
- {
- return m_stop_signal_sent;
- }
- //---------------------------------------------------------------------------------
- template<class t_protocol_handler>
void boosted_tcp_server<t_protocol_handler>::handle_accept(const boost::system::error_code& e)
{
MDEBUG("handle_accept");
@@ -1048,7 +1071,7 @@ POP_WARNINGS
new_connection_->setRpcStation(); // hopefully this is not needed actually
}
connection_ptr conn(std::move(new_connection_));
- new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type));
+ new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this,
boost::asio::placeholders::error));
@@ -1056,7 +1079,10 @@ POP_WARNINGS
boost::asio::socket_base::keep_alive opt(true);
conn->socket().set_option(opt);
- conn->start(true, 1 < m_threads_count);
+ if (default_remote.get_type_id() == net_utils::address_type::invalid)
+ conn->start(true, 1 < m_threads_count);
+ else
+ conn->start(true, 1 < m_threads_count, default_remote);
conn->save_dbg_log();
return;
}
@@ -1071,20 +1097,41 @@ POP_WARNINGS
}
// error path, if e or exception
- _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_sock_count);
+ assert(m_state != nullptr); // always set in constructor
+ _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_state->sock_count);
misc_utils::sleep_no_w(100);
- new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type));
+ new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this,
boost::asio::placeholders::error));
}
//---------------------------------------------------------------------------------
template<class t_protocol_handler>
+ bool boosted_tcp_server<t_protocol_handler>::add_connection(t_connection_context& out, boost::asio::ip::tcp::socket&& sock, network_address real_remote)
+ {
+ if(std::addressof(get_io_service()) == std::addressof(sock.get_io_service()))
+ {
+ connection_ptr conn(new connection<t_protocol_handler>(std::move(sock), m_state, m_connection_type));
+ if(conn->start(false, 1 < m_threads_count, std::move(real_remote)))
+ {
+ conn->get_context(out);
+ conn->save_dbg_log();
+ return true;
+ }
+ }
+ else
+ {
+ MWARNING(out << " was not added, socket/io_service mismatch");
+ }
+ return false;
+ }
+ //---------------------------------------------------------------------------------
+ template<class t_protocol_handler>
bool boosted_tcp_server<t_protocol_handler>::connect(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_connection_context& conn_context, const std::string& bind_ip)
{
TRY_ENTRY();
- connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type) );
+ connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type) );
connections_mutex.lock();
connections_.insert(new_connection_l);
MDEBUG("connections_ size now " << connections_.size());
@@ -1187,7 +1234,8 @@ POP_WARNINGS
}
else
{
- _erro("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count);
+ assert(m_state != nullptr); // always set in constructor
+ _erro("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection, connections_count = " << m_state->sock_count);
}
new_connection_l->save_dbg_log();
@@ -1201,7 +1249,7 @@ POP_WARNINGS
bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip)
{
TRY_ENTRY();
- connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type) );
+ connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type) );
connections_mutex.lock();
connections_.insert(new_connection_l);
MDEBUG("connections_ size now " << connections_.size());