diff options
Diffstat (limited to 'contrib/epee/include/net')
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.h | 22 | ||||
-rw-r--r-- | contrib/epee/include/net/abstract_tcp_server2.inl | 17 | ||||
-rw-r--r-- | contrib/epee/include/net/connection_basic.hpp | 6 | ||||
-rw-r--r-- | contrib/epee/include/net/enums.h | 2 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_base.h | 27 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_protocol_handler_async.h | 132 | ||||
-rw-r--r-- | contrib/epee/include/net/net_helper.h | 13 |
7 files changed, 158 insertions, 61 deletions
diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index 373ef0dc6..3a2c5341d 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -49,10 +49,11 @@ #include <boost/asio/ssl.hpp> #include <boost/array.hpp> #include <boost/noncopyable.hpp> -#include <boost/shared_ptr.hpp> +#include <boost/shared_ptr.hpp> //! \TODO Convert to std::shared_ptr #include <boost/enable_shared_from_this.hpp> #include <boost/interprocess/detail/atomic.hpp> #include <boost/thread/thread.hpp> +#include <memory> #include "byte_slice.h" #include "net_utils_base.h" #include "syncobj.h" @@ -91,25 +92,24 @@ namespace net_utils public: typedef typename t_protocol_handler::connection_context t_connection_context; - struct shared_state : connection_basic_shared_state + struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type { shared_state() - : connection_basic_shared_state(), pfilter(nullptr), config(), stop_signal_sent(false) + : connection_basic_shared_state(), t_protocol_handler::config_type(), pfilter(nullptr), stop_signal_sent(false) {} i_connection_filter* pfilter; - typename t_protocol_handler::config_type config; bool stop_signal_sent; }; /// Construct a connection with the given io_service. explicit connection( boost::asio::io_service& io_service, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); explicit connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); @@ -271,7 +271,13 @@ namespace net_utils typename t_protocol_handler::config_type& get_config_object() { assert(m_state != nullptr); // always set in constructor - return m_state->config; + return *m_state; + } + + std::shared_ptr<typename t_protocol_handler::config_type> get_config_shared() + { + assert(m_state != nullptr); // always set in constructor + return {m_state}; } int get_binded_port(){return m_port;} @@ -352,7 +358,7 @@ namespace net_utils bool is_thread_worker(); - const boost::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state; + const std::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state; /// The io_service used to perform asynchronous operations. struct worker diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index ebe1d5aaf..12a87071a 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -68,7 +68,7 @@ namespace epee namespace net_utils { template<typename T> - T& check_and_get(boost::shared_ptr<T>& ptr) + T& check_and_get(std::shared_ptr<T>& ptr) { CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null"); return *ptr; @@ -81,7 +81,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::io_service& io_service, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) @@ -91,13 +91,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) : connection_basic(std::move(sock), state, ssl_support), - m_protocol_handler(this, check_and_get(state).config, context), + m_protocol_handler(this, check_and_get(state), context), buffer_ssl_init_fill(0), m_connection_type( connection_type ), m_throttle_speed_in("speed_in", "throttle_speed_in"), @@ -378,7 +378,6 @@ PRAGMA_WARNING_DISABLE_VS(4355) if(!recv_res) { //_info("[sock " << socket().native_handle() << "] protocol_want_close"); - //some error in protocol, protocol handler ask to close connection boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); bool do_shutdown = false; @@ -601,7 +600,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) double current_speed_up; { CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex); - m_throttle_speed_out.handle_trafic_exact(cb); + m_throttle_speed_out.handle_trafic_exact(chunk.size()); current_speed_up = m_throttle_speed_out.get_current_speed(); } context.m_current_speed_up = current_speed_up; @@ -665,7 +664,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) auto size_now = m_send_que.front().size(); MDEBUG("do_send_chunk() NOW SENSD: packet="<<size_now<<" B"); if (speed_limit_is_enabled()) - do_send_handler_write( chunk.data(), chunk.size() ); // (((H))) + do_send_handler_write( m_send_que.back().data(), m_send_que.back().size() ); // (((H))) CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size"); reset_timer(get_default_timeout(), false); @@ -893,7 +892,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server( t_connection_type connection_type ) : - m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), + m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), m_io_service_local_instance(new worker()), io_service_(m_io_service_local_instance->io_service), acceptor_(io_service_), @@ -912,7 +911,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) : - m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), + m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), io_service_(extarnal_io_service), acceptor_(io_service_), acceptor_ipv6(io_service_), diff --git a/contrib/epee/include/net/connection_basic.hpp b/contrib/epee/include/net/connection_basic.hpp index baeb18e27..2f60f7604 100644 --- a/contrib/epee/include/net/connection_basic.hpp +++ b/contrib/epee/include/net/connection_basic.hpp @@ -100,7 +100,7 @@ class connection_basic_pimpl; // PIMPL for this class class connection_basic { // not-templated base class for rapid developmet of some code parts // beware of removing const, net_utils::connection is sketchily doing a cast to prevent storing ptr twice - const boost::shared_ptr<connection_basic_shared_state> m_state; + const std::shared_ptr<connection_basic_shared_state> m_state; public: std::unique_ptr< connection_basic_pimpl > mI; // my Implementation @@ -119,8 +119,8 @@ class connection_basic { // not-templated base class for rapid developmet of som public: // first counter is the ++/-- count of current sockets, the other socket_number is only-increasing ++ number generator - connection_basic(boost::asio::ip::tcp::socket&& socket, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); - connection_basic(boost::asio::io_service &io_service, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); + connection_basic(boost::asio::ip::tcp::socket&& socket, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); + connection_basic(boost::asio::io_service &io_service, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); virtual ~connection_basic() noexcept(false); diff --git a/contrib/epee/include/net/enums.h b/contrib/epee/include/net/enums.h index 078a4b274..2f27d07f9 100644 --- a/contrib/epee/include/net/enums.h +++ b/contrib/epee/include/net/enums.h @@ -49,7 +49,7 @@ namespace net_utils { invalid = 0, public_ = 1, // public is keyword - i2p = 2, + i2p = 2, // order from here changes priority of selection for origin TXes tor = 3 }; diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h index a88a1eb49..f9b6f9a81 100644 --- a/contrib/epee/include/net/levin_base.h +++ b/contrib/epee/include/net/levin_base.h @@ -29,7 +29,11 @@ #ifndef _LEVIN_BASE_H_ #define _LEVIN_BASE_H_ +#include <cstdint> + +#include "byte_slice.h" #include "net_utils_base.h" +#include "span.h" #define LEVIN_SIGNATURE 0x0101010101012101LL //Bender's nightmare @@ -72,6 +76,8 @@ namespace levin #define LEVIN_PACKET_REQUEST 0x00000001 #define LEVIN_PACKET_RESPONSE 0x00000002 +#define LEVIN_PACKET_BEGIN 0x00000004 +#define LEVIN_PACKET_END 0x00000008 #define LEVIN_PROTOCOL_VER_0 0 @@ -118,9 +124,30 @@ namespace levin } } + //! \return Intialized levin header. + bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept; + + //! \return A levin notification message. + byte_slice make_notify(int command, epee::span<const std::uint8_t> payload); + + /*! Generate a dummy levin message. + \param noise_bytes Total size of the returned `byte_slice`. + \return `nullptr` if `noise_size` is smaller than the levin header. + Otherwise, a dummy levin message. */ + byte_slice make_noise_notify(std::size_t noise_bytes); + + /*! Generate 1+ levin messages that are identical to the noise message size. + + \param noise Each levin message will be identical to the size of this + message. The bytes from this message will be used for padding. + \return `nullptr` if `noise.size()` is less than the levin header size. + Otherwise, a levin notification message OR 2+ levin fragment messages. + Each message is `noise.size()` in length. */ + byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload); } } #endif //_LEVIN_BASE_H_ + diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index 3f734a72b..208911e1a 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -32,6 +32,7 @@ #include <boost/smart_ptr/make_shared.hpp> #include <atomic> +#include <deque> #include "levin_base.h" #include "buffer.h" @@ -91,6 +92,7 @@ public: int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id); + int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); bool close(boost::uuids::uuid connection_id); bool update_connection_context(const t_connection_context& contxt); bool request_callback(boost::uuids::uuid connection_id); @@ -117,6 +119,22 @@ public: template<class t_connection_context = net_utils::connection_context_base> class async_protocol_handler { + std::string m_fragment_buffer; + + bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response) + { + const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); + if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) + return false; + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); + return true; + } + public: typedef t_connection_context connection_context; typedef async_protocol_handler_config<t_connection_context> config_type; @@ -259,34 +277,6 @@ public: return handler->is_timer_started(); } template<class callback_t> friend struct anvoke_handler; - - static bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept - { - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_have_to_return_data = expect_response; - head.m_cb = SWAP64LE(msg_size); - - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - head.m_flags = SWAP32LE(flags); - return head; - } - - bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response) - { - const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); - if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) - return false; - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb - << ", flags" << head.m_flags - << ", r?=" << head.m_have_to_return_data - <<", cmd = " << head.m_command - << ", ver=" << head.m_protocol_version); - return true; - } - public: async_protocol_handler(net_utils::i_service_endpoint* psnd_hndlr, config_type& config, @@ -403,7 +393,12 @@ public: return false; } - if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size) + // these should never fail, but do runtime check for safety + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()"); + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()"); + + // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public + if(cb > m_config.m_max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size()) { MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size << ", packet received " << m_cache_in_buffer.size() + cb @@ -435,8 +430,38 @@ public: } break; } + { + std::string temp{}; epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb); + m_state = stream_state_head; + + // abstract_tcp_server2.h manages max bandwidth for a p2p link + if (!(m_current_head.m_flags & (LEVIN_PACKET_REQUEST | LEVIN_PACKET_RESPONSE))) + { + // special noise/fragment command + static constexpr const uint32_t both_flags = (LEVIN_PACKET_BEGIN | LEVIN_PACKET_END); + if ((m_current_head.m_flags & both_flags) == both_flags) + break; // noise message, skip to next message + + if (m_current_head.m_flags & LEVIN_PACKET_BEGIN) + m_fragment_buffer.clear(); + + m_fragment_buffer.append(reinterpret_cast<const char*>(buff_to_invoke.data()), buff_to_invoke.size()); + if (!(m_current_head.m_flags & LEVIN_PACKET_END)) + break; // skip to next message + + if (m_fragment_buffer.size() < sizeof(bucket_head2)) + { + MERROR(m_connection_context << "Fragmented data too small for levin header"); + return false; + } + + temp = std::move(m_fragment_buffer); + m_fragment_buffer.clear(); + std::memcpy(std::addressof(m_current_head), std::addressof(temp[0]), sizeof(bucket_head2)); + buff_to_invoke = {reinterpret_cast<const uint8_t*>(temp.data()) + sizeof(bucket_head2), temp.size() - sizeof(bucket_head2)}; + } bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE); @@ -489,9 +514,9 @@ public: m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context ); - bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); - head.m_return_code = SWAP32LE(return_code); - return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head)); + bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); + head.m_return_code = SWAP32LE(return_code); + return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head)); if(!m_pservice_endpoint->do_send(byte_slice{std::move(return_buff)})) return false; @@ -505,8 +530,13 @@ public: else m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); } + // reuse small buffer + if (!temp.empty() && temp.capacity() <= 64 * 1024) + { + temp.clear(); + m_fragment_buffer = std::move(temp); + } } - m_state = stream_state_head; break; case stream_state_head: { @@ -616,7 +646,7 @@ public: if (LEVIN_OK != err_code) { - epee::span<const uint8_t> stub_buff{(const uint8_t*)"", 0}; + epee::span<const uint8_t> stub_buff = nullptr; // Never call callback inside critical section, that can cause deadlock cb(err_code, stub_buff, m_connection_context); return false; @@ -698,6 +728,32 @@ public: return 1; } + + /*! Sends `message` without adding a levin header. The message must have + been created with `make_notify`, `make_noise_notify` or + `make_fragmented_notify`. See additional instructions for + `make_fragmented_notify`. + + \return 1 on success */ + int send(byte_slice message) + { + const misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this) + ); + + if(m_deletion_initiated) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + const std::size_t length = message.size(); + if (!m_pservice_endpoint->do_send(std::move(message))) + { + LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it"); + return -1; + } + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]"); + return 1; + } //------------------------------------------------------------------------------------------ boost::uuids::uuid get_connection_id() {return m_connection_context.m_connection_id;} //------------------------------------------------------------------------------------------ @@ -876,6 +932,14 @@ int async_protocol_handler_config<t_connection_context>::notify(int command, con } //------------------------------------------------------------------------------------------ template<class t_connection_context> +int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id) +{ + async_protocol_handler<t_connection_context>* aph; + int r = find_and_lock_connection(connection_id, aph); + return LEVIN_OK == r ? aph->send(std::move(message)) : 0; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id) { CRITICAL_REGION_LOCAL(m_connects_lock); diff --git a/contrib/epee/include/net/net_helper.h b/contrib/epee/include/net/net_helper.h index e315555fc..2b02eafa4 100644 --- a/contrib/epee/include/net/net_helper.h +++ b/contrib/epee/include/net/net_helper.h @@ -31,6 +31,7 @@ //#include <Winsock2.h> //#include <Ws2tcpip.h> +#include <atomic> #include <string> #include <boost/version.hpp> #include <boost/asio/io_service.hpp> @@ -154,7 +155,7 @@ namespace net_utils } inline - try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout, epee::net_utils::ssl_support_t ssl_support) + try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout) { m_deadline.expires_from_now(timeout); boost::unique_future<boost::asio::ip::tcp::socket> connection = m_connector(addr, port, m_deadline); @@ -174,11 +175,11 @@ namespace net_utils m_connected = true; m_deadline.expires_at(std::chrono::steady_clock::time_point::max()); // SSL Options - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { if (!m_ssl_options.handshake(*m_ssl_socket, boost::asio::ssl::stream_base::client, addr)) { - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { boost::system::error_code ignored_ec; m_ssl_socket->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); @@ -217,7 +218,7 @@ namespace net_utils // Get a list of endpoints corresponding to the server name. - try_connect_result_t try_connect_result = try_connect(addr, port, timeout, m_ssl_options.support); + try_connect_result_t try_connect_result = try_connect(addr, port, timeout); if (try_connect_result == CONNECT_FAILURE) return false; if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) @@ -226,7 +227,7 @@ namespace net_utils { MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL"); m_ssl_options.support = epee::net_utils::ssl_support_t::e_ssl_support_disabled; - if (try_connect(addr, port, timeout, m_ssl_options.support) != CONNECT_SUCCESS) + if (try_connect(addr, port, timeout) != CONNECT_SUCCESS) return false; } } @@ -562,7 +563,7 @@ namespace net_utils { m_deadline.cancel(); boost::system::error_code ec; - if(m_ssl_options.support != ssl_support_t::e_ssl_support_disabled) + if(m_ssl_options) shutdown_ssl(); m_ssl_socket->next_layer().cancel(ec); if(ec) |