diff options
Diffstat (limited to 'contrib/epee/include')
5 files changed, 99 insertions, 118 deletions
diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h index df59a6c44..fce6d4b7e 100644 --- a/contrib/epee/include/net/levin_base.h +++ b/contrib/epee/include/net/levin_base.h @@ -31,7 +31,6 @@ #include <cstdint> -#include "byte_stream.h" #include "net_utils_base.h" #include "span.h" @@ -84,12 +83,11 @@ namespace levin #define LEVIN_PROTOCOL_VER_0 0 #define LEVIN_PROTOCOL_VER_1 1 - template<class t_connection_context = net_utils::connection_context_base> struct levin_commands_handler { - virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, t_connection_context& context)=0; + virtual int invoke(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, t_connection_context& context)=0; virtual int notify(int command, const epee::span<const uint8_t> in_buff, t_connection_context& context)=0; virtual void callback(t_connection_context& context){}; @@ -127,41 +125,12 @@ namespace levin } } - //! Provides space for levin (p2p) header, so that payload can be sent without copy - class message_writer - { - byte_slice finalize(uint32_t command, uint32_t flags, uint32_t return_code, bool expect_response); - public: - using header = bucket_head2; - - explicit message_writer(std::size_t reserve = 8192); - - message_writer(const message_writer&) = delete; - message_writer(message_writer&&) = default; - ~message_writer() = default; - message_writer& operator=(const message_writer&) = delete; - message_writer& operator=(message_writer&&) = default; - - //! \return Size of payload (excludes header size). - std::size_t payload_size() const noexcept - { - return buffer.size() < sizeof(header) ? 0 : buffer.size() - sizeof(header); - } - - byte_slice finalize_invoke(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, true); } - byte_slice finalize_notify(uint32_t command) { return finalize(command, LEVIN_PACKET_REQUEST, 0, false); } - byte_slice finalize_response(uint32_t command, uint32_t return_code) - { - return finalize(command, LEVIN_PACKET_RESPONSE, return_code, false); - } - - //! Has space for levin header until a finalize method is used - byte_stream buffer; - }; - //! \return Intialized levin header. bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept; + //! \return A levin notification message. + byte_slice make_notify(int command, epee::span<const std::uint8_t> payload); + /*! Generate a dummy levin message. \param noise_bytes Total size of the returned `byte_slice`. @@ -171,11 +140,12 @@ namespace levin /*! Generate 1+ levin messages that are identical to the noise message size. - \param noise_size Each levin message will be identical to this value. + \param noise Each levin message will be identical to the size of this + message. The bytes from this message will be used for padding. \return `nullptr` if `noise.size()` is less than the levin header size. Otherwise, a levin notification message OR 2+ levin fragment messages. Each message is `noise.size()` in length. */ - byte_slice make_fragmented_notify(const std::size_t noise_size, int command, message_writer message); + byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload); } } diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index a6816cafc..d062fa877 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -51,21 +51,6 @@ #define MIN_BYTES_WANTED 512 #endif -template<typename context_t> -void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char* category) -{ - MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "") - << " for category " << category << " initiated by " << (initiator ? "us" : "peer")); -} - -template<typename context_t> -void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command) -{ - char buf[32]; - snprintf(buf, sizeof(buf), "command-%u", command); - on_levin_traffic(context, initiator, sent, error, bytes, buf); -} - namespace epee { namespace levin @@ -103,10 +88,11 @@ public: uint64_t m_max_packet_size; uint64_t m_invoke_timeout; - int invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id); + int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id); template<class callback_t> - int invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); + int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); + int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id); int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); bool close(boost::uuids::uuid connection_id); bool update_connection_context(const t_connection_context& contxt); @@ -136,17 +122,12 @@ class async_protocol_handler { std::string m_fragment_buffer; - bool send_message(byte_slice message) + bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response) { - if (message.size() < sizeof(message_writer::header)) - return false; - - message_writer::header head; - std::memcpy(std::addressof(head), message.data(), sizeof(head)); - if(!m_pservice_endpoint->do_send(std::move(message))) + const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); + if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) return false; - on_levin_traffic(m_connection_context, true, true, false, head.m_cb, head.m_command); MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb << ", flags" << head.m_flags << ", r?=" << head.m_have_to_return_data @@ -542,17 +523,26 @@ public: { if(m_current_head.m_have_to_return_data) { - levin::message_writer return_message{32 * 1024}; + byte_slice return_buff; const uint32_t return_code = m_config.m_pcommands_handler->invoke( - m_current_head.m_command, buff_to_invoke, return_message.buffer, m_connection_context + m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context ); // peer_id remains unset if dropped if (m_current_head.m_command == m_connection_context.handshake_command() && m_connection_context.handshake_complete()) m_max_packet_size = m_config.m_max_packet_size; - if(!send_message(return_message.finalize_response(m_current_head.m_command, return_code))) + bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); + head.m_return_code = SWAP32LE(return_code); + + if(!m_pservice_endpoint->do_send(byte_slice{{epee::as_byte_span(head), epee::to_span(return_buff)}})) return false; + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); } else m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); @@ -629,7 +619,7 @@ public: } template<class callback_t> - bool async_invoke(int command, message_writer in_msg, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) + bool async_invoke(int command, const epee::span<const uint8_t> in_buff, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED) { misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( boost::bind(&async_protocol_handler::finish_outer_call, this)); @@ -648,7 +638,7 @@ public: if (command == m_connection_context.handshake_command()) m_max_packet_size = m_config.m_max_packet_size; - if(!send_message(in_msg.finalize_invoke(command))) + if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { LOG_ERROR_CC(m_connection_context, "Failed to do_send"); err_code = LEVIN_ERROR_CONNECTION; @@ -674,7 +664,7 @@ public: return true; } - int invoke(int command, message_writer in_msg, std::string& buff_out) + int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out) { misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( boost::bind(&async_protocol_handler::finish_outer_call, this)); @@ -686,7 +676,7 @@ public: if (command == m_connection_context.handshake_command()) m_max_packet_size = m_config.m_max_packet_size; - if (!send_message(in_msg.finalize_invoke(command))) + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { LOG_ERROR_CC(m_connection_context, "Failed to send request"); return LEVIN_ERROR_CONNECTION; @@ -723,9 +713,25 @@ public: return m_invoke_result_code; } - /*! Sends `message` without adding a levin header. The message must have been - created with `make_noise_notify`, `make_fragmented_notify`, or - `message_writer::finalize_notify`. See additional instructions for + int notify(int command, const epee::span<const uint8_t> in_buff) + { + misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this)); + + CRITICAL_REGION_LOCAL(m_call_lock); + + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false)) + { + LOG_ERROR_CC(m_connection_context, "Failed to send notify message"); + return -1; + } + + return 1; + } + + /*! Sends `message` without adding a levin header. The message must have + been created with `make_notify`, `make_noise_notify` or + `make_fragmented_notify`. See additional instructions for `make_fragmented_notify`. \return 1 on success */ @@ -735,11 +741,14 @@ public: boost::bind(&async_protocol_handler::finish_outer_call, this) ); - if (!send_message(std::move(message))) + const std::size_t length = message.size(); + if (!m_pservice_endpoint->do_send(std::move(message))) { LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it"); return -1; } + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]"); return 1; } //------------------------------------------------------------------------------------------ @@ -829,19 +838,19 @@ int async_protocol_handler_config<t_connection_context>::find_and_lock_connectio } //------------------------------------------------------------------------------------------ template<class t_connection_context> -int async_protocol_handler_config<t_connection_context>::invoke(int command, message_writer in_msg, std::string& buff_out, boost::uuids::uuid connection_id) +int async_protocol_handler_config<t_connection_context>::invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, boost::uuids::uuid connection_id) { async_protocol_handler<t_connection_context>* aph; int r = find_and_lock_connection(connection_id, aph); - return LEVIN_OK == r ? aph->invoke(command, std::move(in_msg), buff_out) : r; + return LEVIN_OK == r ? aph->invoke(command, in_buff, buff_out) : r; } //------------------------------------------------------------------------------------------ template<class t_connection_context> template<class callback_t> -int async_protocol_handler_config<t_connection_context>::invoke_async(int command, message_writer in_msg, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout) +int async_protocol_handler_config<t_connection_context>::invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout) { async_protocol_handler<t_connection_context>* aph; int r = find_and_lock_connection(connection_id, aph); - return LEVIN_OK == r ? aph->async_invoke(command, std::move(in_msg), cb, timeout) : r; + return LEVIN_OK == r ? aph->async_invoke(command, in_buff, cb, timeout) : r; } //------------------------------------------------------------------------------------------ template<class t_connection_context> template<class callback_t> @@ -920,6 +929,14 @@ void async_protocol_handler_config<t_connection_context>::set_handler(levin_comm } //------------------------------------------------------------------------------------------ template<class t_connection_context> +int async_protocol_handler_config<t_connection_context>::notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id) +{ + async_protocol_handler<t_connection_context>* aph; + int r = find_and_lock_connection(connection_id, aph); + return LEVIN_OK == r ? aph->notify(command, in_buff) : r; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id) { async_protocol_handler<t_connection_context>* aph; diff --git a/contrib/epee/include/storages/levin_abstract_invoke2.h b/contrib/epee/include/storages/levin_abstract_invoke2.h index 383d67cc2..802e16c1b 100644 --- a/contrib/epee/include/storages/levin_abstract_invoke2.h +++ b/contrib/epee/include/storages/levin_abstract_invoke2.h @@ -37,14 +37,21 @@ #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net" -template<typename context_t> -void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category); - -template<typename context_t> -void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command); - namespace { + template<typename context_t> + void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, const char *category) + { + MCINFO("net.p2p.traffic", context << bytes << " bytes " << (sent ? "sent" : "received") << (error ? "/corrupt" : "") + << " for category " << category << " initiated by " << (initiator ? "us" : "peer")); + } + template<typename context_t> + void on_levin_traffic(const context_t &context, bool initiator, bool sent, bool error, size_t bytes, int command) + { + char buf[32]; + snprintf(buf, sizeof(buf), "command-%u", command); + return on_levin_traffic(context, initiator, sent, error, bytes, buf); + } static const constexpr epee::serialization::portable_storage::limits_t default_levin_limits = { 8192, // objects 16384, // fields @@ -110,11 +117,12 @@ namespace epee const boost::uuids::uuid &conn_id = context.m_connection_id; typename serialization::portable_storage stg; out_struct.store(stg); - levin::message_writer to_send{16 * 1024}; + byte_slice buff_to_send; std::string buff_to_recv; - stg.store_to_binary(to_send.buffer); + stg.store_to_binary(buff_to_send, 16 * 1024); - int res = transport.invoke(command, std::move(to_send), buff_to_recv, conn_id); + on_levin_traffic(context, true, true, false, buff_to_send.size(), command); + int res = transport.invoke(command, boost::string_ref{reinterpret_cast<const char*>(buff_to_send.data()), buff_to_send.size()}, buff_to_recv, conn_id); if( res <=0 ) { LOG_PRINT_L1("Failed to invoke command " << command << " return code " << res); @@ -137,9 +145,10 @@ namespace epee const boost::uuids::uuid &conn_id = context.m_connection_id; typename serialization::portable_storage stg; const_cast<t_arg&>(out_struct).store(stg);//TODO: add true const support to searilzation - levin::message_writer to_send{16 * 1024}; - stg.store_to_binary(to_send.buffer); - int res = transport.invoke_async(command, std::move(to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool + byte_slice buff_to_send; + stg.store_to_binary(buff_to_send, 16 * 1024); + on_levin_traffic(context, true, true, false, buff_to_send.size(), command); + int res = transport.invoke_async(command, epee::to_span(buff_to_send), conn_id, [cb, command](int code, const epee::span<const uint8_t> buff, typename t_transport::connection_context& context)->bool { t_result result_struct = AUTO_VAL_INIT(result_struct); if( code <=0 ) @@ -183,10 +192,11 @@ namespace epee const boost::uuids::uuid &conn_id = context.m_connection_id; serialization::portable_storage stg; out_struct.store(stg); - levin::message_writer to_send; - stg.store_to_binary(to_send.buffer); + byte_slice buff_to_send; + stg.store_to_binary(buff_to_send); - int res = transport.send(to_send.finalize_notify(command), conn_id); + on_levin_traffic(context, true, true, false, buff_to_send.size(), command); + int res = transport.notify(command, epee::to_span(buff_to_send), conn_id); if(res <=0 ) { MERROR("Failed to notify command " << command << " return code " << res); @@ -197,7 +207,7 @@ namespace epee //---------------------------------------------------------------------------------------------------- //---------------------------------------------------------------------------------------------------- template<class t_owner, class t_in_type, class t_out_type, class t_context, class callback_t> - int buff_to_t_adapter(int command, const epee::span<const uint8_t> in_buff, byte_stream& buff_out, callback_t cb, t_context& context ) + int buff_to_t_adapter(int command, const epee::span<const uint8_t> in_buff, byte_slice& buff_out, callback_t cb, t_context& context ) { serialization::portable_storage strg; if(!strg.load_from_binary(in_buff, &default_levin_limits)) @@ -220,11 +230,12 @@ namespace epee serialization::portable_storage strg_out; static_cast<t_out_type&>(out_struct).store(strg_out); - if(!strg_out.store_to_binary(buff_out)) + if(!strg_out.store_to_binary(buff_out, 32 * 1024)) { LOG_ERROR("Failed to store_to_binary in command" << command); return -1; } + on_levin_traffic(context, false, true, false, buff_out.size(), command); return res; } @@ -251,7 +262,7 @@ namespace epee } #define CHAIN_LEVIN_INVOKE_MAP2(context_type) \ - int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, context_type& context) \ + int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, context_type& context) \ { \ bool handled = false; \ return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \ @@ -260,13 +271,13 @@ namespace epee #define CHAIN_LEVIN_NOTIFY_MAP2(context_type) \ int notify(int command, const epee::span<const uint8_t> in_buff, context_type& context) \ { \ - bool handled = false; epee::byte_stream fake_str; \ - return handle_invoke_map(true, command, in_buff, fake_str, context, handled); \ + bool handled = false; epee::byte_slice fake_str; \ + return handle_invoke_map(true, command, in_buff, fake_str, context, handled); \ } #define CHAIN_LEVIN_INVOKE_MAP() \ - int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, epee::net_utils::connection_context_base& context) \ + int invoke(int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, epee::net_utils::connection_context_base& context) \ { \ bool handled = false; \ return handle_invoke_map(false, command, in_buff, buff_out, context, handled); \ @@ -286,7 +297,7 @@ namespace epee } #define BEGIN_INVOKE_MAP2(owner_type) \ - template <class t_context> int handle_invoke_map(bool is_notify, int command, const epee::span<const uint8_t> in_buff, epee::byte_stream& buff_out, t_context& context, bool& handled) \ + template <class t_context> int handle_invoke_map(bool is_notify, int command, const epee::span<const uint8_t> in_buff, epee::byte_slice& buff_out, t_context& context, bool& handled) \ { \ try { \ typedef owner_type internal_owner_type_name; diff --git a/contrib/epee/include/storages/portable_storage.h b/contrib/epee/include/storages/portable_storage.h index 655a2eb12..c5d0c48ee 100644 --- a/contrib/epee/include/storages/portable_storage.h +++ b/contrib/epee/include/storages/portable_storage.h @@ -34,7 +34,6 @@ namespace epee { class byte_slice; - class byte_stream; namespace serialization { /************************************************************************/ @@ -84,13 +83,8 @@ namespace epee //------------------------------------------------------------------------------- bool store_to_binary(byte_slice& target, std::size_t initial_buffer_size = 8192); - bool store_to_binary(byte_stream& ss); - bool load_from_binary(const epee::span<const uint8_t> target, const limits_t *limits = nullptr); - bool load_from_binary(const std::string& target, const limits_t *limits = nullptr) - { - return load_from_binary(epee::strspan<uint8_t>(target), limits); - } - + bool load_from_binary(const epee::span<const uint8_t> target, const limits_t *limits = NULL); + bool load_from_binary(const std::string& target, const limits_t *limits = NULL); template<class trace_policy> bool dump_as_xml(std::string& targetObj, const std::string& root_name = ""); bool dump_as_json(std::string& targetObj, size_t indent = 0, bool insert_newlines = true); diff --git a/contrib/epee/include/storages/portable_storage_template_helper.h b/contrib/epee/include/storages/portable_storage_template_helper.h index 7f6596f36..00ad15e9d 100644 --- a/contrib/epee/include/storages/portable_storage_template_helper.h +++ b/contrib/epee/include/storages/portable_storage_template_helper.h @@ -36,8 +36,6 @@ namespace epee { - class byte_stream; - namespace serialization { //----------------------------------------------------------------------------------------------------------- @@ -129,14 +127,5 @@ namespace epee store_t_to_binary(str_in, binary_buff, initial_buffer_size); return binary_buff; } - //----------------------------------------------------------------------------------------------------------- - template<class t_struct> - bool store_t_to_binary(t_struct& str_in, byte_stream& binary_buff) - { - portable_storage ps; - str_in.store(ps); - return ps.store_to_binary(binary_buff); - } - } } |