diff options
author | Riccardo Spagni <ric@spagni.net> | 2019-01-29 16:47:17 +0200 |
---|---|---|
committer | Riccardo Spagni <ric@spagni.net> | 2019-01-29 16:47:17 +0200 |
commit | 31bdf7bd113c2576fe579ef3a25a2d8fef419ffc (patch) | |
tree | b48a2503e55908b993dbc388c67fefda35ac7285 /contrib/epee/include/net/abstract_tcp_server2.inl | |
parent | Merge pull request #5104 (diff) | |
parent | Adding initial support for broadcasting transactions over Tor (diff) | |
download | monero-31bdf7bd113c2576fe579ef3a25a2d8fef419ffc.tar.xz |
Merge pull request #4988
973403bc Adding initial support for broadcasting transactions over Tor (Lee Clagett)
Diffstat (limited to 'contrib/epee/include/net/abstract_tcp_server2.inl')
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.inl | 136 |
1 files changed, 92 insertions, 44 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index 457ee2dd4..9c89a18cf 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -40,6 +40,7 @@ #include <boost/asio/deadline_timer.hpp> #include <boost/date_time/posix_time/posix_time.hpp> // TODO #include <boost/thread/condition_variable.hpp> // TODO +#include <boost/make_shared.hpp> #include "warnings.h" #include "string_tools.h" #include "misc_language.h" @@ -62,6 +63,13 @@ namespace epee { namespace net_utils { + template<typename T> + T& check_and_get(boost::shared_ptr<T>& ptr) + { + CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null"); + return *ptr; + } + /************************************************************************/ /* */ /************************************************************************/ @@ -69,25 +77,31 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::io_service& io_service, - typename t_protocol_handler::config_type& config, - std::atomic<long> &ref_sock_count, // the ++/-- counter - std::atomic<long> &sock_number, // the only increasing ++ number generator - i_connection_filter* &pfilter - ,t_connection_type connection_type + boost::shared_ptr<shared_state> state, + t_connection_type connection_type + ) + : connection(boost::asio::ip::tcp::socket{io_service}, std::move(state), connection_type) + { + } + + template<class t_protocol_handler> + connection<t_protocol_handler>::connection( boost::asio::ip::tcp::socket&& sock, + boost::shared_ptr<shared_state> state, + t_connection_type connection_type ) : - connection_basic(io_service, ref_sock_count, sock_number), - m_protocol_handler(this, config, context), - m_pfilter( pfilter ), + connection_basic(std::move(sock), state), + m_protocol_handler(this, check_and_get(state).config, context), m_connection_type( connection_type ), m_throttle_speed_in("speed_in", "throttle_speed_in"), m_throttle_speed_out("speed_out", "throttle_speed_out"), - m_timer(io_service), + m_timer(socket_.get_io_service()), m_local(false), m_ready_to_close(false) { MDEBUG("test, connection constructor set m_connection_type="<<m_connection_type); } + PRAGMA_WARNING_DISABLE_VS(4355) //--------------------------------------------------------------------------------- template<class t_protocol_handler> @@ -127,34 +141,44 @@ PRAGMA_WARNING_DISABLE_VS(4355) { TRY_ENTRY(); + boost::system::error_code ec; + auto remote_ep = socket_.remote_endpoint(ec); + CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get remote endpoint: " << ec.message() << ':' << ec.value()); + CHECK_AND_NO_ASSERT_MES(remote_ep.address().is_v4(), false, "IPv6 not supported here"); + + const unsigned long ip_{boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong())}; + return start(is_income, is_multithreaded, ipv4_network_address{uint32_t(ip_), remote_ep.port()}); + CATCH_ENTRY_L0("connection<t_protocol_handler>::start()", false); + } + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> + bool connection<t_protocol_handler>::start(bool is_income, bool is_multithreaded, network_address real_remote) + { + TRY_ENTRY(); + // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted auto self = safe_shared_from_this(); if(!self) return false; m_is_multithreaded = is_multithreaded; + m_local = real_remote.is_loopback() || real_remote.is_local(); - boost::system::error_code ec; - auto remote_ep = socket_.remote_endpoint(ec); - CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get remote endpoint: " << ec.message() << ':' << ec.value()); - CHECK_AND_NO_ASSERT_MES(remote_ep.address().is_v4(), false, "IPv6 not supported here"); + // create a random uuid, we don't need crypto strength here + const boost::uuids::uuid random_uuid = boost::uuids::random_generator()(); + + context = t_connection_context{}; + context.set_details(random_uuid, std::move(real_remote), is_income); + boost::system::error_code ec; auto local_ep = socket_.local_endpoint(ec); CHECK_AND_NO_ASSERT_MES(!ec, false, "Failed to get local endpoint: " << ec.message() << ':' << ec.value()); - context = boost::value_initialized<t_connection_context>(); - const unsigned long ip_{boost::asio::detail::socket_ops::host_to_network_long(remote_ep.address().to_v4().to_ulong())}; - m_local = epee::net_utils::is_ip_loopback(ip_) || epee::net_utils::is_ip_local(ip_); - - // create a random uuid, we don't need crypto strength here - const boost::uuids::uuid random_uuid = boost::uuids::random_generator()(); - - context.set_details(random_uuid, epee::net_utils::ipv4_network_address(ip_, remote_ep.port()), is_income); _dbg3("[sock " << socket_.native_handle() << "] new connection from " << print_connection_context_short(context) << " to " << local_ep.address().to_string() << ':' << local_ep.port() << - ", total sockets objects " << m_ref_sock_count); + ", total sockets objects " << get_stats().sock_count); - if(m_pfilter && !m_pfilter->is_remote_host_allowed(context.m_remote_address)) + if(static_cast<shared_state&>(get_stats()).pfilter && !static_cast<shared_state&>(get_stats()).pfilter->is_remote_host_allowed(context.m_remote_address)) { _dbg2("[sock " << socket_.native_handle() << "] host denied " << context.m_remote_address.host_str() << ", shutdowning connection"); close(); @@ -279,7 +303,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) } MDEBUG(" connection type " << to_string( m_connection_type ) << " " << socket_.local_endpoint().address().to_string() << ":" << socket_.local_endpoint().port() - << " <--> " << address << ":" << port); + << " <--> " << context.m_remote_address.str() << " (via " << address << ":" << port << ")"); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> @@ -784,12 +808,14 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server( t_connection_type connection_type ) : + m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), m_io_service_local_instance(new boost::asio::io_service()), io_service_(*m_io_service_local_instance.get()), acceptor_(io_service_), + default_remote(), m_stop_signal_sent(false), m_port(0), - m_sock_count(0), m_sock_number(0), m_threads_count(0), - m_pfilter(NULL), m_thread_index(0), + m_threads_count(0), + m_thread_index(0), m_connection_type( connection_type ), new_connection_() { @@ -799,11 +825,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) : + m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), io_service_(extarnal_io_service), acceptor_(io_service_), - m_stop_signal_sent(false), m_port(0), - m_sock_count(0), m_sock_number(0), m_threads_count(0), - m_pfilter(NULL), m_thread_index(0), + default_remote(), + m_stop_signal_sent(false), m_port(0), + m_threads_count(0), + m_thread_index(0), m_connection_type(connection_type), new_connection_() { @@ -844,7 +872,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) boost::asio::ip::tcp::endpoint binded_endpoint = acceptor_.local_endpoint(); m_port = binded_endpoint.port(); MDEBUG("start accept"); - new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type)); + new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type)); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this, boost::asio::placeholders::error)); @@ -922,7 +950,8 @@ POP_WARNINGS template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::set_connection_filter(i_connection_filter* pfilter) { - m_pfilter = pfilter; + assert(m_state != nullptr); // always set in constructor + m_state->pfilter = pfilter; } //--------------------------------------------------------------------------------- template<class t_protocol_handler> @@ -1030,12 +1059,6 @@ POP_WARNINGS } //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool boosted_tcp_server<t_protocol_handler>::is_stop_signal_sent() - { - return m_stop_signal_sent; - } - //--------------------------------------------------------------------------------- - template<class t_protocol_handler> void boosted_tcp_server<t_protocol_handler>::handle_accept(const boost::system::error_code& e) { MDEBUG("handle_accept"); @@ -1048,7 +1071,7 @@ POP_WARNINGS new_connection_->setRpcStation(); // hopefully this is not needed actually } connection_ptr conn(std::move(new_connection_)); - new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type)); + new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type)); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this, boost::asio::placeholders::error)); @@ -1056,7 +1079,10 @@ POP_WARNINGS boost::asio::socket_base::keep_alive opt(true); conn->socket().set_option(opt); - conn->start(true, 1 < m_threads_count); + if (default_remote.get_type_id() == net_utils::address_type::invalid) + conn->start(true, 1 < m_threads_count); + else + conn->start(true, 1 < m_threads_count, default_remote); conn->save_dbg_log(); return; } @@ -1071,20 +1097,41 @@ POP_WARNINGS } // error path, if e or exception - _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_sock_count); + assert(m_state != nullptr); // always set in constructor + _erro("Some problems at accept: " << e.message() << ", connections_count = " << m_state->sock_count); misc_utils::sleep_no_w(100); - new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type)); + new_connection_.reset(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type)); acceptor_.async_accept(new_connection_->socket(), boost::bind(&boosted_tcp_server<t_protocol_handler>::handle_accept, this, boost::asio::placeholders::error)); } //--------------------------------------------------------------------------------- template<class t_protocol_handler> + bool boosted_tcp_server<t_protocol_handler>::add_connection(t_connection_context& out, boost::asio::ip::tcp::socket&& sock, network_address real_remote) + { + if(std::addressof(get_io_service()) == std::addressof(sock.get_io_service())) + { + connection_ptr conn(new connection<t_protocol_handler>(std::move(sock), m_state, m_connection_type)); + if(conn->start(false, 1 < m_threads_count, std::move(real_remote))) + { + conn->get_context(out); + conn->save_dbg_log(); + return true; + } + } + else + { + MWARNING(out << " was not added, socket/io_service mismatch"); + } + return false; + } + //--------------------------------------------------------------------------------- + template<class t_protocol_handler> bool boosted_tcp_server<t_protocol_handler>::connect(const std::string& adr, const std::string& port, uint32_t conn_timeout, t_connection_context& conn_context, const std::string& bind_ip) { TRY_ENTRY(); - connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type) ); + connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type) ); connections_mutex.lock(); connections_.insert(new_connection_l); MDEBUG("connections_ size now " << connections_.size()); @@ -1187,7 +1234,8 @@ POP_WARNINGS } else { - _erro("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection, connections_count = " << m_sock_count); + assert(m_state != nullptr); // always set in constructor + _erro("[sock " << new_connection_l->socket().native_handle() << "] Failed to start connection, connections_count = " << m_state->sock_count); } new_connection_l->save_dbg_log(); @@ -1201,7 +1249,7 @@ POP_WARNINGS bool boosted_tcp_server<t_protocol_handler>::connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeout, const t_callback &cb, const std::string& bind_ip) { TRY_ENTRY(); - connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_config, m_sock_count, m_sock_number, m_pfilter, m_connection_type) ); + connection_ptr new_connection_l(new connection<t_protocol_handler>(io_service_, m_state, m_connection_type) ); connections_mutex.lock(); connections_.insert(new_connection_l); MDEBUG("connections_ size now " << connections_.size()); |