diff options
author | Lee Clagett <code@leeclagett.com> | 2021-01-19 02:22:32 +0000 |
---|---|---|
committer | Lee Clagett <code@leeclagett.com> | 2021-01-19 02:22:32 +0000 |
commit | 679d05567d1b69b4d51ca80ddc3e58de877814df (patch) | |
tree | 8e66fe26898fddf9345d0143a8c600e99a77907c | |
parent | Revert "Merge pull request #7136" (diff) | |
download | monero-679d05567d1b69b4d51ca80ddc3e58de877814df.tar.xz |
Remove payload copy in all outgoing p2p messages
Diffstat (limited to '')
21 files changed, 325 insertions, 231 deletions
diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h index fce6d4b7e..df59a6c44 100644 --- a/contrib/epee/include/net/levin_base.h +++ b/contrib/epee/include/net/levin_base.h @@ -31,6 +31,7 @@ #include <cstdint> +#include "byte_stream.h" #include "net_utils_base.h" #include "span.h" @@ -83,11 +84,12 @@ namespace levin #define LEVIN_PROTOCOL_VER_0 0 #define LEVIN_PROTOCOL_VER_1 1 + template<class t_connection_context = net_utils::connection_context_base> struct levin_commands_handler { - virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, t_connection_context& context)=0; + virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, t_connection_context& context)=0; virtual int notify(int command, const epee::span<const uint8_t> in_buff, t_connection_context& context)=0; virtual void callback(t_connection_context& context){}; @@ -125,12 +127,41 @@ namespace levin } } + //! Provides space for levin (p2p) header, so that payload can be sent without copy + class message_writer + { + byte_slice finalize(uint32_t command, uint32_t flags, uint32_t return_code, bool expect_response); + public: + using header = bucket_head2; + + explicit message_writer(std::size_t reserve = 8192); + + message_writer(const message_writer&) = delete; + message_writer(message_writer&&) = default; + ~message_writer() = default; + message_writer& operator=(const message_writer&) = delete; + message_writer& operator=(message_writer&&) = default; + + //! \return Size of payload (excludes header size). + std::size_t payload_size() const noexcept + { + return buffer.size() < sizeof(header) ? 0 : buffer.size() - sizeof(header); + } + + byte_slice finalize_invoke(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, true); } + byte_slice finalize_notify(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, false); } + byte_slice finalize_response(uint32_t command, uint32_t return_code) + { + return finalize(command, LEVIN_PACKET_RESPONSE, return_code, false); + } + + //! Has space for levin header until a finalize method is used + byte_stream buffer; + }; + //! \return Intialized levin header. bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept; - //! \return A levin notification message. - byte_slice make_notify(int command, epee::span<const std::uint8_t> payload); - /*! Generate a dummy levin message. \param noise_bytes Total size of the returned `byte_slice`. @@ -140,12 +171,11 @@ namespace levin /*! Generate 1+ levin messages that are identical to the noise message size. - \param noise Each levin message will be identical to the size of this - message. The bytes from this message will be used for padding. + \param noise_size Each levin message will be identical to this value. \return `nullptr` if `noise.size()` is less than the levin header size. Otherwise, a levin notification message OR 2+ levin fragment messages. Each message is `noise.size()` in length. */ - byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload); + byte_slice make_fragmented_notify(const std::size_t noise_size, int command, message_writer message); } } diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index d062fa877..a6816cafc 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -51,6 +51,21 @@ #define MIN_BYTES_WANTED 512 #endif +template<typename context_t> +void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char* category) +{ + MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "") + << " for category " << category << " initiated by " << (initiator ? "us" : "peer")); +} + +template<typename context_t> +void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command) +{ + char buf[32]; + snprintf(buf, sizeof(buf), "command-%u", command); + on_levin_traffic(context, initiator, sent, error, bytes, buf); +} + namespace epee { namespace levin @@ -88,11 +103,10 @@ public: uint64_t m_max_packet_size; uint64_t m_invoke_timeout; - int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id); + int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id); template<class callback_t> - int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); + int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); - int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id); int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); bool close(boost::uuids::uuid connection_id); bool update_connection_context(const t_connection_context& contxt); @@ -122,12 +136,17 @@ class async_protocol_handler { std::string m_fragment_buffer; - bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response) + bool send_message(byte_slice message) { - const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); - if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) + if (message.size() < sizeof(message_writer::header)) + return false; + + message_writer::header head; + std::memcpy(std::addressof(head), message.data(), sizeof(head)); + if(!m_pservice_endpoint->do_send(std::move(message))) return false; + on_levin_traffic(m_connection_context, true, true, false, head.m_cb, head.m_command); MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags" << head.m_flags << ", r?=" << head.m_have_to_return_data @@ -523,26 +542,17 @@ public: { if(m_current_head.m_have_to_return_data) { - byte_slice return_buff; + levin::message_writer return_message{32 * 1024}; const uint32_t return_code = m_config.m_pcommands_handler->invoke( - m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context + m_current_head.m_command, buff_to_invoke, return_message.buffer, m_connection_context ); // peer_id remains unset if dropped if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete()) m_max_packet_size = m_config.m_max_packet_size; - bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); - head.m_return_code = SWAP32LE(return_code); - - if(!m_pservice_endpoint->do_send(byte_slice{{epee::as_byte_span(head), epee::to_span(return_buff)}})) + if(!send_message(return_message.finalize_response(m_current_head.m_command, return_code))) return false; - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb - << ", flags" << head.m_flags - << ", r?=" << head.m_have_to_return_data - <<", cmd = " << head.m_command - << ", ver=" << head.m_protocol_version); } else m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); @@ -619,7 +629,7 @@ public: } template<class callback_t> - bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) + bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) { misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( boost::bind(&async_protocol_handler::finish_outer_call, this)); @@ -638,7 +648,7 @@ public: if (command == m_connection_context.handshake_command()) m_max_packet_size = m_config.m_max_packet_size; - if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) + if(!send_message(in_msg.finalize_invoke(command))) { LOG_ERROR_CC(m_connection_context, "Failed to do_send"); err_code = LEVIN_ERROR_CONNECTION; @@ -664,7 +674,7 @@ public: return true; } - int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out) + int invoke(int command, message_writer in_msg, std::string& buff_out) { misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( boost::bind(&async_protocol_handler::finish_outer_call, this)); @@ -676,7 +686,7 @@ public: if (command == m_connection_context.handshake_command()) m_max_packet_size = m_config.m_max_packet_size; - if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) + if (!send_message(in_msg.finalize_invoke(command))) { LOG_ERROR_CC(m_connection_context, "Failed to send request"); return LEVIN_ERROR_CONNECTION; @@ -713,25 +723,9 @@ public: return m_invoke_result_code; } - int notify(int command, const epee::span<const uint8_t> in_buff) - { - misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( - boost::bind(&async_protocol_handler::finish_outer_call, this)); - - CRITICAL_REGION_LOCAL(m_call_lock); - - if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false)) - { - LOG_ERROR_CC(m_connection_context, "Failed to send notify message"); - return -1; - } - - return 1; - } - - /*! Sends `message` without adding a levin header. The message must have - been created with `make_notify`, `make_noise_notify` or - `make_fragmented_notify`. See additional instructions for + /*! Sends `message` without adding a levin header. The message must have been + created with `make_noise_notify`, `make_fragmented_notify`, or + `message_writer::finalize_notify`. See additional instructions for `make_fragmented_notify`. \return 1 on success */ @@ -741,14 +735,11 @@ public: boost::bind(&async_protocol_handler::finish_outer_call, this) ); - const std::size_t length = message.size(); - if (!m_pservice_endpoint->do_send(std::move(message))) + if (!send_message(std::move(message))) { LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it"); return -1; } - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]"); return 1; } //------------------------------------------------------------------------------------------ @@ -838,19 +829,19 @@ int async_protocol_handler_config<t_connection_context>::find_and_lock_connectio } //------------------------------------------------------------------------------------------ template<class t_connection_context> -int async_protocol_handler_config<t_connection_context>::invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id) +int async_protocol_handler_config<t_connection_context>::invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id) { async_protocol_handler<t_connection_context>* aph; int r = find_and_lock_connection(connection_id, aph); - return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r; + return LEVIN_OK == r ? aph->invoke(command, std::move(in_msg), buff_out) : r; } //------------------------------------------------------------------------------------------ template<class t_connection_context> template<class callback_t> -int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout) +int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout) { async_protocol_handler<t_connection_context>* aph; int r = find_and_lock_connection(connection_id, aph); - return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r; + return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r; } //------------------------------------------------------------------------------------------ template<class t_connection_context> template<class callback_t> @@ -929,14 +920,6 @@ void async_protocol_handler_config<t_connection_context>::set_handler(levin_comm } //------------------------------------------------------------------------------------------ template<class t_connection_context> -int async_protocol_handler_config<t_connection_context>::notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id) -{ - async_protocol_handler<t_connection_context>* aph; - int r = find_and_lock_connection(connection_id, aph); - return LEVIN_OK == r ? aph->notify(command, in_buff) : r; -} -//------------------------------------------------------------------------------------------ -template<class t_connection_context> int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id) { async_protocol_handler<t_connection_context>* aph; diff --git a/contrib/epee/include/storages/levin_abstract_invoke2.h b/contrib/epee/include/storages/levin_abstract_invoke2.h index 802e16c1b..383d67cc2 100644 --- a/contrib/epee/include/storages/levin_abstract_invoke2.h +++ b/contrib/epee/include/storages/levin_abstract_invoke2.h @@ -37,21 +37,14 @@ #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net" +template<typename context_t> +void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category); + +template<typename context_t> +void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command); + namespace { - template<typename context_t> - void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category) - { - MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "") - << " for category " << category << " initiated by " << (initiator ? "us" : "peer")); - } - template<typename context_t> - void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command) - { - char buf[32]; - snprintf(buf, sizeof(buf), "command-%u", command); - return on_levin_traffic(context, initiator, sent, error, bytes, buf); - } static const constexpr epee::serialization::portable_storage::limits_t default_levin_limits = { 8192, // objects 16384, // fields @@ -117,12 +110,11 @@ namespace epee const boost::uuids::uuid &conn_id = context.m_connection_id; typename serialization::portable_storage stg; out_struct.store(stg); - byte_slice buff_to_send; + levin::message_writer to_send{16 * 1024}; std::string buff_to_recv; - stg.store_to_binary(buff_to_send, 16 * 1024); + stg.store_to_binary(to_send.buffer); - on_levin_traffic(context, true, true, false, buff_to_send.size(), command); - int res = transport.invoke(command, boost::string_ref{reinterpret_cast<const char*>(buff_to_send.data()), buff_to_send.size()}, buff_to_recv, conn_id); + int res = transport.invoke(command, std::move(to_send), buff_to_recv, conn_id); if( res <=0 ) { LOG_PRINT_L1("Failed to invoke command " << command << " return code " << res); @@ -145,10 +137,9 @@ namespace epee const boost::uuids::uuid &conn_id = context.m_connection_id; typename serialization::portable_storage stg; const_cast<t_arg&>(out_struct).store(stg);//TODO: add true const support to searilzation - byte_slice buff_to_send; - stg.store_to_binary(buff_to_send, 16 * 1024); - on_levin_traffic(context, true, true, false, buff_to_send.size(), command); - int res = transport.invoke_async(command, epee::to_span(buff_to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool + levin::message_writer to_send{16 * 1024}; + stg.store_to_binary(to_send.buffer); + int res = transport.invoke_async(command, std::move(to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool { t_result result_struct = AUTO_VAL_INIT(result_struct); if( code <=0 ) @@ -192,11 +183,10 @@ namespace epee const boost::uuids::uuid &conn_id = context.m_connection_id; serialization::portable_storage stg; out_struct.store(stg); - byte_slice buff_to_send; - stg.store_to_binary(buff_to_send); + levin::message_writer to_send; + stg.store_to_binary(to_send.buffer); - on_levin_traffic(context, true, true, false, buff_to_send.size(), command); - int res = transport.notify(command, epee::to_span(buff_to_send), conn_id); + int res = transport.send(to_send.finalize_notify(command), conn_id); if(res <=0 ) { MERROR("Failed to notify command " << command << " return code " << res); @@ -207,7 +197,7 @@ namespace epee //---------------------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------------------- template<class t_owner, class t_in_type, class t_out_type, class t_context, class callback_t> - int buff_to_t_adapter(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, callback_t cb, t_context& context ) + int buff_to_t_adapter(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, callback_t cb, t_context& context ) { serialization::portable_storage strg; if(!strg.load_from_binary(in_buff, &default_levin_limits)) @@ -230,12 +220,11 @@ namespace epee serialization::portable_storage strg_out; static_cast<t_out_type&>(out_struct).store(strg_out); - if(!strg_out.store_to_binary(buff_out, 32 * 1024)) + if(!strg_out.store_to_binary(buff_out)) { LOG_ERROR("Failed to store_to_binary in command" << command); return -1; } - on_levin_traffic(context, false, true, false, buff_out.size(), command); return res; } @@ -262,7 +251,7 @@ namespace epee } #define CHAIN_LEVIN_INVOKE_MAP2(context_type) \ - int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, context_type& context) \ + int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, context_type& context) \ { \ bool handled = false; \ return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \ @@ -271,13 +260,13 @@ namespace epee #define CHAIN_LEVIN_NOTIFY_MAP2(context_type) \ int notify(int command, const epee::span<const uint8_t> in_buff, context_type& context) \ { \ - bool handled = false; epee::byte_slice fake_str; \ - return handle_invoke_map(true, command, in_buff, fake_str, context, handled); \ + bool handled = false; epee::byte_stream fake_str; \ + return handle_invoke_map(true, command, in_buff, fake_str, context, handled); \ } #define CHAIN_LEVIN_INVOKE_MAP() \ - int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, epee::net_utils::connection_context_base& context) \ + int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, epee::net_utils::connection_context_base& context) \ { \ bool handled = false; \ return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \ @@ -297,7 +286,7 @@ namespace epee } #define BEGIN_INVOKE_MAP2(owner_type) \ - template <class t_context> int handle_invoke_map(bool is_notify, int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, t_context& context, bool& handled) \ + template <class t_context> int handle_invoke_map(bool is_notify, int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, t_context& context, bool& handled) \ { \ try { \ typedef owner_type internal_owner_type_name; diff --git a/contrib/epee/include/storages/portable_storage.h b/contrib/epee/include/storages/portable_storage.h index c5d0c48ee..655a2eb12 100644 --- a/contrib/epee/include/storages/portable_storage.h +++ b/contrib/epee/include/storages/portable_storage.h @@ -34,6 +34,7 @@ namespace epee { class byte_slice; + class byte_stream; namespace serialization { /************************************************************************/ @@ -83,8 +84,13 @@ namespace epee //------------------------------------------------------------------------------- bool store_to_binary(byte_slice& target, std::size_t initial_buffer_size = 8192); - bool load_from_binary(const epee::span<const uint8_t> target, const limits_t *limits = NULL); - bool load_from_binary(const std::string& target, const limits_t *limits = NULL); + bool store_to_binary(byte_stream& ss); + bool load_from_binary(const epee::span<const uint8_t> target, const limits_t *limits = nullptr); + bool load_from_binary(const std::string& target, const limits_t *limits = nullptr) + { + return load_from_binary(epee::strspan<uint8_t>(target), limits); + } + template<class trace_policy> bool dump_as_xml(std::string& targetObj, const std::string& root_name = ""); bool dump_as_json(std::string& targetObj, size_t indent = 0, bool insert_newlines = true); diff --git a/contrib/epee/include/storages/portable_storage_template_helper.h b/contrib/epee/include/storages/portable_storage_template_helper.h index 00ad15e9d..7f6596f36 100644 --- a/contrib/epee/include/storages/portable_storage_template_helper.h +++ b/contrib/epee/include/storages/portable_storage_template_helper.h @@ -36,6 +36,8 @@ namespace epee { + class byte_stream; + namespace serialization { //----------------------------------------------------------------------------------------------------------- @@ -127,5 +129,14 @@ namespace epee store_t_to_binary(str_in, binary_buff, initial_buffer_size); return binary_buff; } + //----------------------------------------------------------------------------------------------------------- + template<class t_struct> + bool store_t_to_binary(t_struct& str_in, byte_stream& binary_buff) + { + portable_storage ps; + str_in.store(ps); + return ps.store_to_binary(binary_buff); + } + } } diff --git a/contrib/epee/src/levin_base.cpp b/contrib/epee/src/levin_base.cpp index 5ec86b3d6..7c5cd5a78 100644 --- a/contrib/epee/src/levin_base.cpp +++ b/contrib/epee/src/levin_base.cpp @@ -34,6 +34,25 @@ namespace epee { namespace levin { + message_writer::message_writer(const std::size_t reserve) + : buffer() + { + buffer.reserve(reserve); + buffer.put_n(0, sizeof(header)); + } + + byte_slice message_writer::finalize(const uint32_t command, const uint32_t flags, const uint32_t return_code, const bool expect_response) + { + if (buffer.size() < sizeof(header)) + throw std::runtime_error{"levin_writer::finalize already called"}; + + header head = make_header(command, payload_size(), flags, expect_response); + head.m_return_code = SWAP32LE(return_code); + + std::memcpy(buffer.tellp() - buffer.size(), std::addressof(head), sizeof(head)); + return byte_slice{std::move(buffer)}; + } + bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept { bucket_head2 head = {0}; @@ -47,12 +66,6 @@ namespace levin return head; } - byte_slice make_notify(int command, epee::span<const std::uint8_t> payload) - { - const bucket_head2 head = make_header(command, payload.size(), LEVIN_PACKET_REQUEST, false); - return byte_slice{epee::as_byte_span(head), payload}; - } - byte_slice make_noise_notify(const std::size_t noise_bytes) { static constexpr const std::uint32_t flags = @@ -68,46 +81,40 @@ namespace levin return byte_slice{std::move(buffer)}; } - byte_slice make_fragmented_notify(const byte_slice& noise_message, int command, epee::span<const std::uint8_t> payload) + byte_slice make_fragmented_notify(const std::size_t noise_size, const int command, message_writer message) { - const size_t noise_size = noise_message.size(); if (noise_size < sizeof(bucket_head2) * 2) return nullptr; - if (payload.size() <= noise_size - sizeof(bucket_head2)) + if (message.buffer.size() <= noise_size) { /* The entire message can be sent at once, and the levin binary parser will ignore extra bytes. So just pad with zeroes and otherwise send a "normal", not fragmented message. */ - const size_t padding = noise_size - sizeof(bucket_head2) - payload.size(); - const span<const uint8_t> padding_bytes{noise_message.end() - padding, padding}; - const bucket_head2 head = make_header(command, noise_size - sizeof(bucket_head2), LEVIN_PACKET_REQUEST, false); - return byte_slice{as_byte_span(head), payload, padding_bytes}; + message.buffer.put_n(0, noise_size - message.buffer.size()); + return message.finalize_notify(command); } // fragment message + const byte_slice payload_bytes = message.finalize_notify(command); + span<const std::uint8_t> payload = to_span(payload_bytes); + const size_t payload_space = noise_size - sizeof(bucket_head2); const size_t expected_fragments = ((payload.size() - 2) / payload_space) + 1; - std::string buffer{}; - buffer.reserve((expected_fragments + 1) * noise_size); // +1 here overselects for internal bucket_head2 value + byte_stream buffer{}; + buffer.reserve(expected_fragments * noise_size); - bucket_head2 head = make_header(0, noise_size - sizeof(bucket_head2), LEVIN_PACKET_BEGIN, false); - buffer.append(reinterpret_cast<const char*>(&head), sizeof(head)); + bucket_head2 head = make_header(0, payload_space, LEVIN_PACKET_BEGIN, false); + buffer.write(as_byte_span(head)); - head.m_command = command; - head.m_flags = LEVIN_PACKET_REQUEST; - head.m_cb = payload.size(); - buffer.append(reinterpret_cast<const char*>(&head), sizeof(head)); + // internal levin header is in payload already - size_t copy_size = payload.remove_prefix(payload_space - sizeof(bucket_head2)); - buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size); + size_t copy_size = payload.remove_prefix(payload_space); + buffer.write(payload.data() - copy_size, copy_size); - head.m_command = 0; head.m_flags = 0; - head.m_cb = noise_size - sizeof(bucket_head2); - while (!payload.empty()) { copy_size = payload.remove_prefix(payload_space); @@ -115,12 +122,12 @@ namespace levin if (payload.empty()) head.m_flags = LEVIN_PACKET_END; - buffer.append(reinterpret_cast<const char*>(&head), sizeof(head)); - buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size); + buffer.write(as_byte_span(head)); + buffer.write(payload.data() - copy_size, copy_size); } const size_t padding = noise_size - copy_size - sizeof(bucket_head2); - buffer.append(reinterpret_cast<const char*>(noise_message.end()) - padding, padding); + buffer.put_n(0, padding); return byte_slice{std::move(buffer)}; } diff --git a/contrib/epee/src/portable_storage.cpp b/contrib/epee/src/portable_storage.cpp index c3c9ccc02..b922cc9e3 100644 --- a/contrib/epee/src/portable_storage.cpp +++ b/contrib/epee/src/portable_storage.cpp @@ -48,15 +48,23 @@ namespace serialization TRY_ENTRY(); byte_stream ss; ss.reserve(initial_buffer_size); + store_to_binary(ss); + target = epee::byte_slice{std::move(ss)}; + return true; + CATCH_ENTRY("portable_storage::store_to_binary", false); + } + + bool portable_storage::store_to_binary(byte_stream& ss) + { + TRY_ENTRY(); storage_block_header sbh{}; sbh.m_signature_a = SWAP32LE(PORTABLE_STORAGE_SIGNATUREA); sbh.m_signature_b = SWAP32LE(PORTABLE_STORAGE_SIGNATUREB); sbh.m_ver = PORTABLE_STORAGE_FORMAT_VER; ss.write(epee::as_byte_span(sbh)); pack_entry_to_buff(ss, m_root); - target = epee::byte_slice{std::move(ss)}; return true; - CATCH_ENTRY("portable_storage::store_to_binary", false) + CATCH_ENTRY("portable_storage::store_to_binary", false); } bool portable_storage::dump_as_json(std::string& buff, size_t indent, bool insert_newlines) @@ -76,11 +84,6 @@ namespace serialization CATCH_ENTRY("portable_storage::load_from_json", false) } - bool portable_storage::load_from_binary(const std::string& target, const limits_t *limits) - { - return load_from_binary(epee::strspan<uint8_t>(target), limits); - } - bool portable_storage::load_from_binary(const epee::span<const uint8_t> source, const limits_t *limits) { m_root.m_entries.clear(); diff --git a/src/cryptonote_core/blockchain.cpp b/src/cryptonote_core/blockchain.cpp index cdad39a2c..a3d695b85 100644 --- a/src/cryptonote_core/blockchain.cpp +++ b/src/cryptonote_core/blockchain.cpp @@ -57,6 +57,7 @@ #include "common/notify.h" #include "common/varint.h" #include "common/pruning.h" +#include "time_helper.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "blockchain" diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.h b/src/cryptonote_protocol/cryptonote_protocol_handler.h index 28530f3e7..80dd2bc39 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.h +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.h @@ -46,6 +46,8 @@ #include "block_queue.h" #include "common/perf_timer.h" #include "cryptonote_basic/connection_context.h" +#include "net/levin_base.h" +#include "p2p/net_node_common.h" #include <boost/circular_buffer.hpp> PUSH_WARNINGS @@ -195,10 +197,11 @@ namespace cryptonote bool post_notify(typename t_parameter::request& arg, cryptonote_connection_context& context) { LOG_PRINT_L2("[" << epee::net_utils::print_connection_context_short(context) << "] post " << typeid(t_parameter).name() << " -->"); - epee::byte_slice blob; - epee::serialization::store_t_to_binary(arg, blob, 256 * 1024); // optimize for block responses + + epee::levin::message_writer out{256 * 1024}; // optimize for block responses + epee::serialization::store_t_to_binary(arg, out.buffer); //handler_response_blocks_now(blob.size()); // XXX - return m_p2p->invoke_notify_to_peer(t_parameter::ID, epee::to_span(blob), context); + return m_p2p->invoke_notify_to_peer(t_parameter::ID, std::move(out), context); } }; diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.inl b/src/cryptonote_protocol/cryptonote_protocol_handler.inl index 69ef0970b..afc81f552 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.inl +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.inl @@ -2713,15 +2713,15 @@ skip: // send fluffy ones first, we want to encourage people to run that if (!fluffyConnections.empty()) { - epee::byte_slice fluffyBlob; - epee::serialization::store_t_to_binary(fluffy_arg, fluffyBlob, 32 * 1024); - m_p2p->relay_notify_to_list(NOTIFY_NEW_FLUFFY_BLOCK::ID, epee::to_span(fluffyBlob), std::move(fluffyConnections)); + epee::levin::message_writer fluffyBlob{32 * 1024}; + epee::serialization::store_t_to_binary(fluffy_arg, fluffyBlob.buffer); + m_p2p->relay_notify_to_list(NOTIFY_NEW_FLUFFY_BLOCK::ID, std::move(fluffyBlob), std::move(fluffyConnections)); } if (!fullConnections.empty()) { - epee::byte_slice fullBlob; - epee::serialization::store_t_to_binary(arg, fullBlob, 128 * 1024); - m_p2p->relay_notify_to_list(NOTIFY_NEW_BLOCK::ID, epee::to_span(fullBlob), std::move(fullConnections)); + epee::levin::message_writer fullBlob{128 * 1024}; + epee::serialization::store_t_to_binary(arg, fullBlob.buffer); + m_p2p->relay_notify_to_list(NOTIFY_NEW_BLOCK::ID, std::move(fullBlob), std::move(fullConnections)); } return true; diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler_common.h b/src/cryptonote_protocol/cryptonote_protocol_handler_common.h index 79c2edf1d..57b1d049c 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler_common.h +++ b/src/cryptonote_protocol/cryptonote_protocol_handler_common.h @@ -30,8 +30,8 @@ #pragma once -#include "p2p/net_node_common.h" #include "cryptonote_protocol/cryptonote_protocol_defs.h" +#include "cryptonote_protocol/enums.h" #include "cryptonote_basic/connection_context.h" namespace cryptonote { diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp index 1e9f3e399..0b065c3c3 100644 --- a/src/cryptonote_protocol/levin_notify.cpp +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -159,7 +159,7 @@ namespace levin return get_out_connections(p2p, get_blockchain_height(p2p, core)); } - epee::byte_slice make_tx_payload(std::vector<blobdata>&& txs, const bool pad, const bool fluff) + epee::levin::message_writer make_tx_message(std::vector<blobdata>&& txs, const bool pad, const bool fluff) { NOTIFY_NEW_TRANSACTIONS::request request{}; request.txs = std::move(txs); @@ -193,21 +193,17 @@ namespace levin // if the size of _ moved enough, we might lose byte in size encoding, we don't care } - epee::byte_slice fullBlob; - if (!epee::serialization::store_t_to_binary(request, fullBlob)) + epee::levin::message_writer out; + if (!epee::serialization::store_t_to_binary(request, out.buffer)) throw std::runtime_error{"Failed to serialize to epee binary format"}; - return fullBlob; + return out; } bool make_payload_send_txs(connections& p2p, std::vector<blobdata>&& txs, const boost::uuids::uuid& destination, const bool pad, const bool fluff) { - const epee::byte_slice blob = make_tx_payload(std::move(txs), pad, fluff); - p2p.for_connection(destination, [&blob](detail::p2p_context& context) { - on_levin_traffic(context, true, true, false, blob.size(), NOTIFY_NEW_TRANSACTIONS::ID); - return true; - }); - return p2p.notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::to_span(blob), destination); + epee::byte_slice blob = make_tx_message(std::move(txs), pad, fluff).finalize_notify(NOTIFY_NEW_TRANSACTIONS::ID); + return p2p.send(std::move(blob), destination); } /* The current design uses `asio::strand`s. The documentation isn't as clear @@ -653,10 +649,6 @@ namespace levin else message = zone_->noise.clone(); - zone_->p2p->for_connection(channel.connection, [&](detail::p2p_context& context) { - on_levin_traffic(context, true, true, false, message.size(), "noise"); - return true; - }); if (zone_->p2p->send(std::move(message), channel.connection)) { if (!channel.queue.empty() && channel.active.empty()) @@ -816,9 +808,8 @@ namespace levin // Padding is not useful when using noise mode. Send as stem so receiver // forwards in Dandelion++ mode. - const epee::byte_slice payload = make_tx_payload(std::move(txs), false, false); epee::byte_slice message = epee::levin::make_fragmented_notify( - zone_->noise, NOTIFY_NEW_TRANSACTIONS::ID, epee::to_span(payload) + zone_->noise.size(), NOTIFY_NEW_TRANSACTIONS::ID, make_tx_message(std::move(txs), false, false) ); if (CRYPTONOTE_MAX_FRAGMENTS * zone_->noise.size() < message.size()) { diff --git a/src/p2p/net_node.h b/src/p2p/net_node.h index db931122e..f2888674b 100644 --- a/src/p2p/net_node.h +++ b/src/p2p/net_node.h @@ -344,10 +344,9 @@ namespace nodetool virtual void on_connection_close(p2p_connection_context& context); virtual void callback(p2p_connection_context& context); //----------------- i_p2p_endpoint ------------------------------------------------------------- - virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections); + virtual bool relay_notify_to_list(int command, epee::levin::message_writer message, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections) final; virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, cryptonote::relay_method tx_relay); - virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context); - virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context); + virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context) final; virtual bool drop_connection(const epee::net_utils::connection_context_base& context); virtual void request_callback(const epee::net_utils::connection_context_base& context); virtual void for_each_connection(std::function<bool(typename t_payload_net_handler::connection_context&, peerid_type, uint32_t)> f); diff --git a/src/p2p/net_node.inl b/src/p2p/net_node.inl index ea69760c7..e1d6d1e10 100644 --- a/src/p2p/net_node.inl +++ b/src/p2p/net_node.inl @@ -2175,8 +2175,9 @@ namespace nodetool } //----------------------------------------------------------------------------------- template<class t_payload_net_handler> - bool node_server<t_payload_net_handler>::relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections) + bool node_server<t_payload_net_handler>::relay_notify_to_list(int command, epee::levin::message_writer data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections) { + epee::byte_slice message = data_buff.finalize_notify(command); std::sort(connections.begin(), connections.end()); auto zone = m_network_zones.begin(); for(const auto& c_id: connections) @@ -2194,7 +2195,7 @@ namespace nodetool ++zone; } if (zone->first == c_id.first) - zone->second.m_net_server.get_config_object().notify(command, data_buff, c_id.second); + zone->second.m_net_server.get_config_object().send(message.clone(), c_id.second); } return true; } @@ -2261,24 +2262,13 @@ namespace nodetool } //----------------------------------------------------------------------------------- template<class t_payload_net_handler> - bool node_server<t_payload_net_handler>::invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context) + bool node_server<t_payload_net_handler>::invoke_notify_to_peer(const int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context) { if(is_filtered_command(context.m_remote_address, command)) return false; network_zone& zone = m_network_zones.at(context.m_remote_address.get_zone()); - int res = zone.m_net_server.get_config_object().notify(command, req_buff, context.m_connection_id); - return res > 0; - } - //----------------------------------------------------------------------------------- - template<class t_payload_net_handler> - bool node_server<t_payload_net_handler>::invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context) - { - if(is_filtered_command(context.m_remote_address, command)) - return false; - - network_zone& zone = m_network_zones.at(context.m_remote_address.get_zone()); - int res = zone.m_net_server.get_config_object().invoke(command, req_buff, resp_buff, context.m_connection_id); + int res = zone.m_net_server.get_config_object().send(message.finalize_notify(command), context.m_connection_id); return res > 0; } //----------------------------------------------------------------------------------- diff --git a/src/p2p/net_node_common.h b/src/p2p/net_node_common.h index 0da758ad4..92b7596ae 100644 --- a/src/p2p/net_node_common.h +++ b/src/p2p/net_node_common.h @@ -40,6 +40,8 @@ #include "net/net_utils_base.h" #include "p2p_protocol_defs.h" +namespace epee { namespace levin { class message_writer; } } + namespace nodetool { @@ -49,10 +51,9 @@ namespace nodetool template<class t_connection_context> struct i_p2p_endpoint { - virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)=0; + virtual bool relay_notify_to_list(int command, epee::levin::message_writer message, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)=0; virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, cryptonote::relay_method tx_relay)=0; - virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context)=0; - virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context)=0; + virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context)=0; virtual bool drop_connection(const epee::net_utils::connection_context_base& context)=0; virtual void request_callback(const epee::net_utils::connection_context_base& context)=0; virtual uint64_t get_public_connections_count()=0; @@ -71,7 +72,7 @@ namespace nodetool template<class t_connection_context> struct p2p_endpoint_stub: public i_p2p_endpoint<t_connection_context> { - virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections) + virtual bool relay_notify_to_list(int command, epee::levin::message_writer message, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections) { return false; } @@ -79,11 +80,7 @@ namespace nodetool { return epee::net_utils::zone::invalid; } - virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context) - { - return false; - } - virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context) + virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer message, const epee::net_utils::connection_context_base& context) { return true; } diff --git a/tests/fuzz/levin.cpp b/tests/fuzz/levin.cpp index 78b7b6863..209a75221 100644 --- a/tests/fuzz/levin.cpp +++ b/tests/fuzz/levin.cpp @@ -68,13 +68,13 @@ namespace { } - virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, test_levin_connection_context& context) + virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, test_levin_connection_context& context) { m_invoke_counter.inc(); boost::unique_lock<boost::mutex> lock(m_mutex); m_last_command = command; m_last_in_buf = std::string((const char*)in_buff.data(), in_buff.size()); - buff_out = m_invoke_out_buf.clone(); + buff_out.write(epee::to_span(m_invoke_out_buf)); return m_return_code; } diff --git a/tests/net_load_tests/net_load_tests.h b/tests/net_load_tests/net_load_tests.h index baab07d31..59eef7bd1 100644 --- a/tests/net_load_tests/net_load_tests.h +++ b/tests/net_load_tests/net_load_tests.h @@ -67,7 +67,7 @@ namespace net_load_tests { } - virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, test_connection_context& context) + virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, test_connection_context& context) { //m_invoke_counter.inc(); //std::unique_lock<std::mutex> lock(m_mutex); diff --git a/tests/unit_tests/epee_boosted_tcp_server.cpp b/tests/unit_tests/epee_boosted_tcp_server.cpp index 457c05c15..84fc0a29b 100644 --- a/tests/unit_tests/epee_boosted_tcp_server.cpp +++ b/tests/unit_tests/epee_boosted_tcp_server.cpp @@ -153,7 +153,7 @@ TEST(test_epee_connection, test_lifetime) delay(delay), on_connection_close_f(on_connection_close_f) {} - virtual int invoke(int, const epee::span<const uint8_t>, epee::byte_slice&, context_t&) override { epee::misc_utils::sleep_no_w(delay); return {}; } + virtual int invoke(int, const epee::span<const uint8_t>, epee::byte_stream&, context_t&) override { epee::misc_utils::sleep_no_w(delay); return {}; } virtual int notify(int, const epee::span<const uint8_t>, context_t&) override { return {}; } virtual void callback(context_t&) override {} virtual void on_connection_new(context_t&) override {} @@ -282,7 +282,7 @@ TEST(test_epee_connection, test_lifetime) for (auto i = 0; i < N; ++i) { tag = create_connection(); ASSERT_TRUE(shared_state->get_connections_count() == 1); - success = shared_state->invoke_async(1, {}, tag, [](int, const epee::span<const uint8_t>, context_t&){}, TIMEOUT); + success = shared_state->invoke_async(1, epee::levin::message_writer{}, tag, [](int, const epee::span<const uint8_t>, context_t&){}, TIMEOUT); ASSERT_TRUE(success); while (shared_state->sock_count == 1) { success = shared_state->foreach_connection([&shared_state, &tag](context_t&){ diff --git a/tests/unit_tests/epee_levin_protocol_handler_async.cpp b/tests/unit_tests/epee_levin_protocol_handler_async.cpp index a499fa608..d9de99b8b 100644 --- a/tests/unit_tests/epee_levin_protocol_handler_async.cpp +++ b/tests/unit_tests/epee_levin_protocol_handler_async.cpp @@ -59,13 +59,13 @@ namespace { } - virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, test_levin_connection_context& context) + virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, test_levin_connection_context& context) { m_invoke_counter.inc(); boost::unique_lock<boost::mutex> lock(m_mutex); m_last_command = command; m_last_in_buf = std::string((const char*)in_buff.data(), in_buff.size()); - buff_out = m_invoke_out_buf.clone(); + buff_out.write(epee::to_span(m_invoke_out_buf)); return m_return_code; } @@ -434,8 +434,11 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process const int expected_command = 4673261; const std::string in_data(256, 'e'); + epee::levin::message_writer message{}; + message.buffer.write(epee::to_span(in_data)); + const epee::byte_slice noise = epee::levin::make_noise_notify(1024); - const epee::byte_slice notify = epee::levin::make_notify(expected_command, epee::strspan<std::uint8_t>(in_data)); + const epee::byte_slice notify = message.finalize_notify(expected_command); test_connection_ptr conn = create_connection(); @@ -468,11 +471,16 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process const int expected_command = 4673261; const int expected_fragmented_command = 46732; const std::string in_data(256, 'e'); - std::string in_fragmented_data(1024 * 4, 'c'); + + epee::levin::message_writer message{}; + message.buffer.write(epee::to_span(in_data)); + + epee::levin::message_writer in_fragmented_data; + in_fragmented_data.buffer.put_n('c', 1024 * 4); const epee::byte_slice noise = epee::levin::make_noise_notify(1024); - const epee::byte_slice notify = epee::levin::make_notify(expected_command, epee::strspan<std::uint8_t>(in_data)); - epee::byte_slice fragmented = epee::levin::make_fragmented_notify(noise, expected_fragmented_command, epee::strspan<std::uint8_t>(in_fragmented_data)); + const epee::byte_slice notify = message.finalize_notify(expected_command); + epee::byte_slice fragmented = epee::levin::make_fragmented_notify(noise.size(), expected_fragmented_command, std::move(in_fragmented_data)); EXPECT_EQ(5u, fragmented.size() / 1024); EXPECT_EQ(0u, fragmented.size() % 1024); @@ -497,11 +505,13 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process ASSERT_TRUE(conn->m_protocol_handler.handle_recv(next.data(), next.size())); } - in_fragmented_data.resize(((1024 - sizeof(epee::levin::bucket_head2)) * 5) - sizeof(epee::levin::bucket_head2)); // add padding zeroes + std::string compare_buffer(1024 * 4, 'c'); + compare_buffer.resize(((1024 - sizeof(epee::levin::bucket_head2)) * 5) - sizeof(epee::levin::bucket_head2)); // add padding zeroes + ASSERT_EQ(4u, m_commands_handler.notify_counter()); ASSERT_EQ(0u, m_commands_handler.invoke_counter()); ASSERT_EQ(expected_fragmented_command, m_commands_handler.last_command()); - ASSERT_EQ(in_fragmented_data, m_commands_handler.last_in_buf()); + ASSERT_EQ(compare_buffer, m_commands_handler.last_in_buf()); ASSERT_EQ(0u, conn->send_counter()); ASSERT_TRUE(conn->last_send_data().empty()); diff --git a/tests/unit_tests/levin.cpp b/tests/unit_tests/levin.cpp index eee9e224d..30d6f8133 100644 --- a/tests/unit_tests/levin.cpp +++ b/tests/unit_tests/levin.cpp @@ -245,9 +245,9 @@ namespace return out; } - virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, cryptonote::levin::detail::p2p_context& context) override final + virtual int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, cryptonote::levin::detail::p2p_context& context) override final { - buff_out = nullptr; + buff_out.clear(); invoked_.push_back( {context.m_connection_id, command, std::string{reinterpret_cast<const char*>(in_buff.data()), in_buff.size()}} ); @@ -384,21 +384,50 @@ TEST(make_header, expect_return) EXPECT_EQ(0u, header1.m_flags); } -TEST(make_notify, empty_payload) +TEST(message_writer, invoke_with_empty_payload) { - const epee::byte_slice message = epee::levin::make_notify(443, nullptr); + const epee::byte_slice message = epee::levin::message_writer{}.finalize_invoke(443); + const epee::levin::bucket_head2 header = + epee::levin::make_header(443, 0, LEVIN_PACKET_REQUEST, true); + ASSERT_EQ(sizeof(header), message.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0); +} + +TEST(message_writer, invoke_with_payload) +{ + std::string bytes(100, 'a'); + std::generate(bytes.begin(), bytes.end(), crypto::random_device{}); + + epee::levin::message_writer writer{}; + writer.buffer.write(epee::to_span(bytes)); + + const epee::byte_slice message = writer.finalize_invoke(443); + const epee::levin::bucket_head2 header = + epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_REQUEST, true); + + ASSERT_EQ(sizeof(header) + bytes.size(), message.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0); + EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0); +} + +TEST(message_writer, notify_with_empty_payload) +{ + const epee::byte_slice message = epee::levin::message_writer{}.finalize_notify(443); const epee::levin::bucket_head2 header = epee::levin::make_header(443, 0, LEVIN_PACKET_REQUEST, false); ASSERT_EQ(sizeof(header), message.size()); EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0); } -TEST(make_notify, with_payload) +TEST(message_writer, notify_with_payload) { std::string bytes(100, 'a'); std::generate(bytes.begin(), bytes.end(), crypto::random_device{}); - const epee::byte_slice message = epee::levin::make_notify(443, epee::strspan<std::uint8_t>(bytes)); + epee::levin::message_writer writer{}; + writer.buffer.write(epee::to_span(bytes)); + + const epee::byte_slice message = writer.finalize_notify(443); const epee::levin::bucket_head2 header = epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_REQUEST, false); @@ -407,6 +436,44 @@ TEST(make_notify, with_payload) EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0); } +TEST(message_writer, response_with_empty_payload) +{ + const epee::byte_slice message = epee::levin::message_writer{}.finalize_response(443, 1); + epee::levin::bucket_head2 header = + epee::levin::make_header(443, 0, LEVIN_PACKET_RESPONSE, false); + header.m_return_code = SWAP32LE(1); + ASSERT_EQ(sizeof(header), message.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0); +} + +TEST(message_writer, response_with_payload) +{ + std::string bytes(100, 'a'); + std::generate(bytes.begin(), bytes.end(), crypto::random_device{}); + + epee::levin::message_writer writer{}; + writer.buffer.write(epee::to_span(bytes)); + + const epee::byte_slice message = writer.finalize_response(443, 6450); + epee::levin::bucket_head2 header = + epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_RESPONSE, false); + header.m_return_code = SWAP32LE(6450); + + ASSERT_EQ(sizeof(header) + bytes.size(), message.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0); + EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0); +} + +TEST(message_writer, error) +{ + epee::levin::message_writer writer{}; + writer.buffer.clear(); + + EXPECT_THROW(writer.finalize_invoke(0), std::runtime_error); + EXPECT_THROW(writer.finalize_notify(0), std::runtime_error); + EXPECT_THROW(writer.finalize_response(0, 0), std::runtime_error); +} + TEST(make_noise, invalid) { EXPECT_TRUE(epee::levin::make_noise_notify(sizeof(epee::levin::bucket_head2) - 1).empty()); @@ -428,13 +495,13 @@ TEST(make_noise, valid) TEST(make_fragment, invalid) { - EXPECT_TRUE(epee::levin::make_fragmented_notify(nullptr, 0, nullptr).empty()); + EXPECT_TRUE(epee::levin::make_fragmented_notify(0, 0, epee::levin::message_writer{}).empty()); } TEST(make_fragment, single) { const epee::byte_slice noise = epee::levin::make_noise_notify(1024); - const epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise, 11, nullptr); + const epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise.size(), 11, epee::levin::message_writer{}); const epee::levin::bucket_head2 header = epee::levin::make_header(11, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_REQUEST, false); @@ -449,8 +516,13 @@ TEST(make_fragment, multiple) std::string bytes(1024 * 3 - 150, 'a'); std::generate(bytes.begin(), bytes.end(), crypto::random_device{}); + epee::levin::message_writer message; + message.buffer.write(epee::to_span(bytes)); + const epee::byte_slice noise = epee::levin::make_noise_notify(1024); - epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise, 114, epee::strspan<std::uint8_t>(bytes)); + epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise.size(), 114, std::move(message)); + + EXPECT_EQ(1024 * 3, fragment.size()); epee::levin::bucket_head2 header = epee::levin::make_header(0, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_BEGIN, false); @@ -497,6 +569,7 @@ TEST(make_fragment, multiple) fragment.take_slice(bytes.size()); + EXPECT_EQ(18, fragment.size()); EXPECT_EQ(18, std::count(fragment.cbegin(), fragment.cend(), 0)); } @@ -2164,20 +2237,31 @@ TEST_F(levin_notify, command_max_bytes) add_connection(true); - std::string bytes(4096, 'h'); + std::string payload(4096, 'h'); + epee::byte_slice bytes; + { + epee::levin::message_writer dest{}; + dest.buffer.write(epee::to_span(payload)); + bytes = dest.finalize_notify(ping_command); + } - EXPECT_EQ(1, get_connections().notify(ping_command, epee::strspan<std::uint8_t>(bytes), contexts_.front().get_id())); + EXPECT_EQ(1, get_connections().send(bytes.clone(), contexts_.front().get_id())); EXPECT_EQ(1u, contexts_.front().process_send_queue(true)); EXPECT_EQ(1u, receiver_.notified_size()); const received_message msg = receiver_.get_raw_notification(); EXPECT_EQ(ping_command, msg.command); EXPECT_EQ(contexts_.front().get_id(), msg.connection); - EXPECT_EQ(bytes, msg.payload); + EXPECT_EQ(payload, msg.payload); - bytes.push_back('e'); + { + payload.push_back('h'); + epee::levin::message_writer dest{}; + dest.buffer.write(epee::to_span(payload)); + bytes = dest.finalize_notify(ping_command); + } - EXPECT_EQ(1, get_connections().notify(ping_command, epee::strspan<std::uint8_t>(bytes), contexts_.front().get_id())); + EXPECT_EQ(1, get_connections().send(std::move(bytes), contexts_.front().get_id())); EXPECT_EQ(1u, contexts_.front().process_send_queue(false)); EXPECT_EQ(0u, receiver_.notified_size()); } diff --git a/tests/unit_tests/node_server.cpp b/tests/unit_tests/node_server.cpp index 775feace8..2c80acda5 100644 --- a/tests/unit_tests/node_server.cpp +++ b/tests/unit_tests/node_server.cpp @@ -449,7 +449,6 @@ TEST(cryptonote_protocol_handler, race_condition) }; struct net_node_t: commands_handler_t, p2p_endpoint_t { using span_t = epee::span<const uint8_t>; - using string_t = std::string; using zone_t = epee::net_utils::zone; using uuid_t = boost::uuids::uuid; using relay_t = cryptonote::relay_method; @@ -462,12 +461,9 @@ TEST(cryptonote_protocol_handler, race_condition) using subnets = std::map<epee::net_utils::ipv4_network_subnet, time_t>; using hosts = std::map<std::string, time_t>; }; - struct slice { - using bytes = epee::byte_slice; - }; shared_state_ptr shared_state; core_protocol_ptr core_protocol; - virtual int invoke(int command, const span_t in, slice::bytes &out, context_t &context) override { + virtual int invoke(int command, const span_t in, epee::byte_stream &out, context_t &context) override { if (core_protocol) { if (command == messages::handshake::ID) { return epee::net_utils::buff_to_t_adapter<void, typename messages::handshake::request, typename messages::handshake::response>( @@ -491,7 +487,7 @@ TEST(cryptonote_protocol_handler, race_condition) virtual int notify(int command, const span_t in, context_t &context) override { if (core_protocol) { bool handled; - slice::bytes out; + epee::byte_stream out; return core_protocol->handle_invoke_map(true, command, in, out, context, handled); } else @@ -527,22 +523,16 @@ TEST(cryptonote_protocol_handler, race_condition) else return {}; } - virtual bool invoke_command_to_peer(int command, const span_t in, string_t& out, const contexts::basic& context) override { - if (shared_state) - return shared_state->invoke(command, in, out, context.m_connection_id); - else - return {}; - } - virtual bool invoke_notify_to_peer(int command, const span_t in, const contexts::basic& context) override { + virtual bool invoke_notify_to_peer(int command, epee::levin::message_writer in, const contexts::basic& context) override { if (shared_state) - return shared_state->notify(command, in, context.m_connection_id); + return shared_state->send(in.finalize_notify(command), context.m_connection_id); else return {}; } - virtual bool relay_notify_to_list(int command, const span_t in, connections_t connections) override { + virtual bool relay_notify_to_list(int command, epee::levin::message_writer in, connections_t connections) override { if (shared_state) { for (auto &e: connections) - shared_state->notify(command, in, e.second); + shared_state->send(in.finalize_notify(command), e.second); } return {}; } |