diff options
author | luigi1111 <luigi1111w@gmail.com> | 2021-04-16 12:45:30 -0500 |
---|---|---|
committer | luigi1111 <luigi1111w@gmail.com> | 2021-04-16 12:45:30 -0500 |
commit | 63c7ca07fba2f063c760f786a986fb3e02fb040e (patch) | |
tree | 7bd55382b93a902e3fe95704f69e204ea7fac6de /contrib/epee/include/net | |
parent | Merge pull request #7002 (diff) | |
parent | Remove payload copy in all outgoing p2p messages (diff) | |
download | monero-63c7ca07fba2f063c760f786a986fb3e02fb040e.tar.xz |
Merge pull request #7136
23aae55 Remove payload copy in all outgoing p2p messages (Lee Clagett)
Diffstat (limited to 'contrib/epee/include/net')
-rw-r--r-- | contrib/epee/include/net/levin_base.h | 44 | ||||
-rw-r--r-- | contrib/epee/include/net/levin_protocol_handler_async.h | 97 |
2 files changed, 77 insertions, 64 deletions
diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h index fce6d4b7e..df59a6c44 100644 --- a/contrib/epee/include/net/levin_base.h +++ b/contrib/epee/include/net/levin_base.h @@ -31,6 +31,7 @@ #include <cstdint> +#include "byte_stream.h" #include "net_utils_base.h" #include "span.h" @@ -83,11 +84,12 @@ namespace levin #define LEVIN_PROTOCOL_VER_0 0 #define LEVIN_PROTOCOL_VER_1 1 + template<class t_connection_context = net_utils::connection_context_base> struct levin_commands_handler { - virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, t_connection_context& context)=0; + virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, t_connection_context& context)=0; virtual int notify(int command, const epee::span<const uint8_t> in_buff, t_connection_context& context)=0; virtual void callback(t_connection_context& context){}; @@ -125,12 +127,41 @@ namespace levin } } + //! Provides space for levin (p2p) header, so that payload can be sent without copy + class message_writer + { + byte_slice finalize(uint32_t command, uint32_t flags, uint32_t return_code, bool expect_response); + public: + using header = bucket_head2; + + explicit message_writer(std::size_t reserve = 8192); + + message_writer(const message_writer&) = delete; + message_writer(message_writer&&) = default; + ~message_writer() = default; + message_writer& operator=(const message_writer&) = delete; + message_writer& operator=(message_writer&&) = default; + + //! \return Size of payload (excludes header size). + std::size_t payload_size() const noexcept + { + return buffer.size() < sizeof(header) ? 0 : buffer.size() - sizeof(header); + } + + byte_slice finalize_invoke(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, true); } + byte_slice finalize_notify(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, false); } + byte_slice finalize_response(uint32_t command, uint32_t return_code) + { + return finalize(command, LEVIN_PACKET_RESPONSE, return_code, false); + } + + //! Has space for levin header until a finalize method is used + byte_stream buffer; + }; + //! \return Intialized levin header. bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept; - //! \return A levin notification message. - byte_slice make_notify(int command, epee::span<const std::uint8_t> payload); - /*! Generate a dummy levin message. \param noise_bytes Total size of the returned `byte_slice`. @@ -140,12 +171,11 @@ namespace levin /*! Generate 1+ levin messages that are identical to the noise message size. - \param noise Each levin message will be identical to the size of this - message. The bytes from this message will be used for padding. + \param noise_size Each levin message will be identical to this value. \return `nullptr` if `noise.size()` is less than the levin header size. Otherwise, a levin notification message OR 2+ levin fragment messages. Each message is `noise.size()` in length. */ - byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload); + byte_slice make_fragmented_notify(const std::size_t noise_size, int command, message_writer message); } } diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index d062fa877..a6816cafc 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -51,6 +51,21 @@ #define MIN_BYTES_WANTED 512 #endif +template<typename context_t> +void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char* category) +{ + MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "") + << " for category " << category << " initiated by " << (initiator ? "us" : "peer")); +} + +template<typename context_t> +void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command) +{ + char buf[32]; + snprintf(buf, sizeof(buf), "command-%u", command); + on_levin_traffic(context, initiator, sent, error, bytes, buf); +} + namespace epee { namespace levin @@ -88,11 +103,10 @@ public: uint64_t m_max_packet_size; uint64_t m_invoke_timeout; - int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id); + int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id); template<class callback_t> - int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); + int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); - int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id); int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); bool close(boost::uuids::uuid connection_id); bool update_connection_context(const t_connection_context& contxt); @@ -122,12 +136,17 @@ class async_protocol_handler { std::string m_fragment_buffer; - bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response) + bool send_message(byte_slice message) { - const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); - if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) + if (message.size() < sizeof(message_writer::header)) + return false; + + message_writer::header head; + std::memcpy(std::addressof(head), message.data(), sizeof(head)); + if(!m_pservice_endpoint->do_send(std::move(message))) return false; + on_levin_traffic(m_connection_context, true, true, false, head.m_cb, head.m_command); MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags" << head.m_flags << ", r?=" << head.m_have_to_return_data @@ -523,26 +542,17 @@ public: { if(m_current_head.m_have_to_return_data) { - byte_slice return_buff; + levin::message_writer return_message{32 * 1024}; const uint32_t return_code = m_config.m_pcommands_handler->invoke( - m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context + m_current_head.m_command, buff_to_invoke, return_message.buffer, m_connection_context ); // peer_id remains unset if dropped if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete()) m_max_packet_size = m_config.m_max_packet_size; - bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); - head.m_return_code = SWAP32LE(return_code); - - if(!m_pservice_endpoint->do_send(byte_slice{{epee::as_byte_span(head), epee::to_span(return_buff)}})) + if(!send_message(return_message.finalize_response(m_current_head.m_command, return_code))) return false; - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb - << ", flags" << head.m_flags - << ", r?=" << head.m_have_to_return_data - <<", cmd = " << head.m_command - << ", ver=" << head.m_protocol_version); } else m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); @@ -619,7 +629,7 @@ public: } template<class callback_t> - bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) + bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) { misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( boost::bind(&async_protocol_handler::finish_outer_call, this)); @@ -638,7 +648,7 @@ public: if (command == m_connection_context.handshake_command()) m_max_packet_size = m_config.m_max_packet_size; - if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) + if(!send_message(in_msg.finalize_invoke(command))) { LOG_ERROR_CC(m_connection_context, "Failed to do_send"); err_code = LEVIN_ERROR_CONNECTION; @@ -664,7 +674,7 @@ public: return true; } - int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out) + int invoke(int command, message_writer in_msg, std::string& buff_out) { misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( boost::bind(&async_protocol_handler::finish_outer_call, this)); @@ -676,7 +686,7 @@ public: if (command == m_connection_context.handshake_command()) m_max_packet_size = m_config.m_max_packet_size; - if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) + if (!send_message(in_msg.finalize_invoke(command))) { LOG_ERROR_CC(m_connection_context, "Failed to send request"); return LEVIN_ERROR_CONNECTION; @@ -713,25 +723,9 @@ public: return m_invoke_result_code; } - int notify(int command, const epee::span<const uint8_t> in_buff) - { - misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( - boost::bind(&async_protocol_handler::finish_outer_call, this)); - - CRITICAL_REGION_LOCAL(m_call_lock); - - if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false)) - { - LOG_ERROR_CC(m_connection_context, "Failed to send notify message"); - return -1; - } - - return 1; - } - - /*! Sends `message` without adding a levin header. The message must have - been created with `make_notify`, `make_noise_notify` or - `make_fragmented_notify`. See additional instructions for + /*! Sends `message` without adding a levin header. The message must have been + created with `make_noise_notify`, `make_fragmented_notify`, or + `message_writer::finalize_notify`. See additional instructions for `make_fragmented_notify`. \return 1 on success */ @@ -741,14 +735,11 @@ public: boost::bind(&async_protocol_handler::finish_outer_call, this) ); - const std::size_t length = message.size(); - if (!m_pservice_endpoint->do_send(std::move(message))) + if (!send_message(std::move(message))) { LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it"); return -1; } - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]"); return 1; } //------------------------------------------------------------------------------------------ @@ -838,19 +829,19 @@ int async_protocol_handler_config<t_connection_context>::find_and_lock_connectio } //------------------------------------------------------------------------------------------ template<class t_connection_context> -int async_protocol_handler_config<t_connection_context>::invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id) +int async_protocol_handler_config<t_connection_context>::invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id) { async_protocol_handler<t_connection_context>* aph; int r = find_and_lock_connection(connection_id, aph); - return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r; + return LEVIN_OK == r ? aph->invoke(command, std::move(in_msg), buff_out) : r; } //------------------------------------------------------------------------------------------ template<class t_connection_context> template<class callback_t> -int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout) +int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout) { async_protocol_handler<t_connection_context>* aph; int r = find_and_lock_connection(connection_id, aph); - return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r; + return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r; } //------------------------------------------------------------------------------------------ template<class t_connection_context> template<class callback_t> @@ -929,14 +920,6 @@ void async_protocol_handler_config<t_connection_context>::set_handler(levin_comm } //------------------------------------------------------------------------------------------ template<class t_connection_context> -int async_protocol_handler_config<t_connection_context>::notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id) -{ - async_protocol_handler<t_connection_context>* aph; - int r = find_and_lock_connection(connection_id, aph); - return LEVIN_OK == r ? aph->notify(command, in_buff) : r; -} -//------------------------------------------------------------------------------------------ -template<class t_connection_context> int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id) { async_protocol_handler<t_connection_context>* aph; |