From bdfc63ae4ddc52e2dece2a031a91509418206cb0 Mon Sep 17 00:00:00 2001 From: Lee Clagett Date: Sat, 11 May 2019 11:38:35 -0400 Subject: Add ref-counted buffer byte_slice. Currently used for sending TCP data. --- contrib/epee/include/byte_slice.h | 145 ++++++++++++++ contrib/epee/include/net/abstract_tcp_server2.h | 6 +- contrib/epee/include/net/abstract_tcp_server2.inl | 63 +++---- contrib/epee/include/net/connection_basic.hpp | 3 +- contrib/epee/include/net/http_protocol_handler.inl | 9 +- contrib/epee/include/net/levin_protocol_handler.h | 5 +- .../include/net/levin_protocol_handler_async.h | 145 +++++--------- contrib/epee/include/net/net_utils_base.h | 5 +- contrib/epee/src/CMakeLists.txt | 4 +- contrib/epee/src/byte_slice.cpp | 209 +++++++++++++++++++++ 10 files changed, 445 insertions(+), 149 deletions(-) create mode 100644 contrib/epee/include/byte_slice.h create mode 100644 contrib/epee/src/byte_slice.cpp (limited to 'contrib') diff --git a/contrib/epee/include/byte_slice.h b/contrib/epee/include/byte_slice.h new file mode 100644 index 000000000..1fbba101e --- /dev/null +++ b/contrib/epee/include/byte_slice.h @@ -0,0 +1,145 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include +#include +#include +#include + +#include "span.h" + +namespace epee +{ + struct byte_slice_data; + + struct release_byte_slice + { + void operator()(byte_slice_data*) const noexcept; + }; + + /*! Inspired by slices in golang. Storage is thread-safe reference counted, + allowing for cheap copies or range selection on the bytes. The bytes + owned by this class are always immutable. + + The functions `operator=`, `take_slice` and `remove_prefix` may alter the + reference count for the backing store, which will invalidate pointers + previously returned if the reference count is zero. Be careful about + "caching" pointers in these circumstances. */ + class byte_slice + { + /* A custom reference count is used instead of shared_ptr because it allows + for an allocation optimization for the span constructor. This also + reduces the size of this class by one pointer. */ + std::unique_ptr storage_; + span portion_; // within storage_ + + //! Internal use only; use to increase `storage` reference count. + byte_slice(byte_slice_data* storage, span portion) noexcept; + + struct adapt_buffer{}; + + template + explicit byte_slice(const adapt_buffer, T&& buffer); + + public: + using value_type = std::uint8_t; + using size_type = std::size_t; + using difference_type = std::ptrdiff_t; + using pointer = const std::uint8_t*; + using const_pointer = const std::uint8_t*; + using reference = std::uint8_t; + using const_reference = std::uint8_t; + using iterator = pointer; + using const_iterator = const_pointer; + + //! Construct empty slice. + byte_slice() noexcept + : storage_(nullptr), portion_() + {} + + //! Construct empty slice + byte_slice(std::nullptr_t) noexcept + : byte_slice() + {} + + //! Scatter-gather (copy) multiple `sources` into a single allocated slice. + explicit byte_slice(std::initializer_list> sources); + + //! Convert `buffer` into a slice using one allocation for shared count. + explicit byte_slice(std::vector&& buffer); + + //! Convert `buffer` into a slice using one allocation for shared count. + explicit byte_slice(std::string&& buffer); + + byte_slice(byte_slice&& source) noexcept; + ~byte_slice() noexcept = default; + + //! \note May invalidate previously retrieved pointers. + byte_slice& operator=(byte_slice&&) noexcept; + + //! \return A shallow (cheap) copy of the data from `this` slice. + byte_slice clone() const noexcept { return {storage_.get(), portion_}; } + + iterator begin() const noexcept { return portion_.begin(); } + const_iterator cbegin() const noexcept { return portion_.begin(); } + + iterator end() const noexcept { return portion_.end(); } + const_iterator cend() const noexcept { return portion_.end(); } + + bool empty() const noexcept { return storage_ == nullptr; } + const std::uint8_t* data() const noexcept { return portion_.data(); } + std::size_t size() const noexcept { return portion_.size(); } + + /*! Drop bytes from the beginning of `this` slice. + + \note May invalidate previously retrieved pointers. + \post `this->size() = this->size() - std::min(this->size(), max_bytes)` + \post `if (this->size() <= max_bytes) this->data() = nullptr` + \return Number of bytes removed. */ + std::size_t remove_prefix(std::size_t max_bytes) noexcept; + + /*! "Take" bytes from the beginning of `this` slice. + + \note May invalidate previously retrieved pointers. + \post `this->size() = this->size() - std::min(this->size(), max_bytes)` + \post `if (this->size() <= max_bytes) this->data() = nullptr` + \return Slice containing the bytes removed from `this` slice. */ + byte_slice take_slice(std::size_t max_bytes) noexcept; + + /*! Return a shallow (cheap) copy of a slice from `begin` and `end` offsets. + + \throw std::out_of_range If `end < begin`. + \throw std::out_of_range If `size() < end`. + \return Slice starting at `data() + begin` of size `end - begin`. */ + byte_slice get_slice(std::size_t begin, std::size_t end) const; + }; +} // epee + diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index b38ab5399..373ef0dc6 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -53,6 +53,7 @@ #include #include #include +#include "byte_slice.h" #include "net_utils_base.h" #include "syncobj.h" #include "connection_basic.hpp" @@ -135,8 +136,7 @@ namespace net_utils private: //----------------- i_service_endpoint --------------------- - virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint) - virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data + virtual bool do_send(byte_slice message); ///< (see do_send from i_service_endpoint) virtual bool send_done(); virtual bool close(); virtual bool call_run_once_service_io(); @@ -145,6 +145,8 @@ namespace net_utils virtual bool add_ref(); virtual bool release(); //------------------------------------------------------ + bool do_send_chunk(byte_slice chunk); ///< will send (or queue) a part of data. internal use only + boost::shared_ptr > safe_shared_from_this(); bool shutdown(); /// Handle completion of a receive operation. diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index 19e9c9af9..ebe1d5aaf 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -520,7 +520,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) } //--------------------------------------------------------------------------------- template - bool connection::do_send(const void* ptr, size_t cb) { + bool connection::do_send(byte_slice message) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -529,6 +529,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) if (m_was_shutdown) return false; // TODO avoid copy + std::uint8_t const* const message_data = message.data(); + const std::size_t message_size = message.size(); + const double factor = 32; // TODO config typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) ); @@ -538,13 +541,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) CHECK_AND_ASSERT_MES(! (chunksize_max<0), false, "Negative chunksize_max" ); // make sure it is unsigned before removin sign with cast: long long unsigned int chunksize_max_unsigned = static_cast( chunksize_max ) ; - if (allow_split && (cb > chunksize_max_unsigned)) { + if (allow_split && (message_size > chunksize_max_unsigned)) { { // LOCK: chunking epee::critical_region_t send_guard(m_chunking_lock); // *** critical *** - MDEBUG("do_send() will SPLIT into small chunks, from packet="<( len ); - CHECK_AND_ASSERT_MES(len>0, false, "len not strictly positive"); // (redundant) - CHECK_AND_ASSERT_MES(len_unsigned < std::numeric_limits::max(), false, "Invalid len_unsigned"); // yeap we want strong < then max size, to be sure - - void *chunk_start = ((char*)ptr) + pos; - MDEBUG("chunk_start="<0, false, "pos <= 0"); - // (in catch block, or uniq pointer) delete buf; } // each chunk - MDEBUG("do_send() DONE SPLIT from packet="<::do_send", false); @@ -603,7 +589,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) //--------------------------------------------------------------------------------- template - bool connection::do_send_chunk(const void* ptr, size_t cb) + bool connection::do_send_chunk(byte_slice chunk) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -623,7 +609,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) //_info("[sock " << socket().native_handle() << "] SEND " << cb); context.m_last_send = time(NULL); - context.m_send_cnt += cb; + context.m_send_cnt += chunk.size(); //some data should be wrote to stream //request complete @@ -644,7 +630,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) }*/ long int ms = 250 + (rand()%50); - MDEBUG("Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="< #include +#include "byte_slice.h" #include "net/net_utils_base.h" #include "net/net_ssl.h" #include "syncobj.h" @@ -108,7 +109,7 @@ class connection_basic { // not-templated base class for rapid developmet of som volatile uint32_t m_want_close_connection; std::atomic m_was_shutdown; critical_section m_send_que_lock; - std::list m_send_que; + std::deque m_send_que; volatile bool m_is_multithreaded; /// Strand to ensure the connection's handlers are not called concurrently. boost::asio::io_service::strand strand_; diff --git a/contrib/epee/include/net/http_protocol_handler.inl b/contrib/epee/include/net/http_protocol_handler.inl index 790d0f3b1..19bdf4ff0 100644 --- a/contrib/epee/include/net/http_protocol_handler.inl +++ b/contrib/epee/include/net/http_protocol_handler.inl @@ -591,11 +591,12 @@ namespace net_utils std::string response_data = get_response_header(response); //LOG_PRINT_L0("HTTP_SEND: << \r\n" << response_data + response.m_body); - LOG_PRINT_L3("HTTP_RESPONSE_HEAD: << \r\n" << response_data); - - m_psnd_hndlr->do_send((void*)response_data.data(), response_data.size()); + LOG_PRINT_L3("HTTP_RESPONSE_HEAD: << \r\n" << response_data); + if ((response.m_body.size() && (query_info.m_http_method != http::http_method_head)) || (query_info.m_http_method == http::http_method_options)) - m_psnd_hndlr->do_send((void*)response.m_body.data(), response.m_body.size()); + response_data += response.m_body; + + m_psnd_hndlr->do_send(byte_slice{std::move(response_data)}); m_psnd_hndlr->send_done(); return res; } diff --git a/contrib/epee/include/net/levin_protocol_handler.h b/contrib/epee/include/net/levin_protocol_handler.h index 791766762..c510cfd79 100644 --- a/contrib/epee/include/net/levin_protocol_handler.h +++ b/contrib/epee/include/net/levin_protocol_handler.h @@ -157,10 +157,9 @@ namespace levin m_current_head.m_return_code = m_config.m_pcommands_handler->invoke(m_current_head.m_command, buff_to_invoke, return_buff, m_conn_context); m_current_head.m_cb = return_buff.size(); m_current_head.m_have_to_return_data = false; - std::string send_buff((const char*)&m_current_head, sizeof(m_current_head)); - send_buff += return_buff; - if(!m_psnd_hndlr->do_send(send_buff.data(), send_buff.size())) + return_buff.insert(0, (const char*)&m_current_head, sizeof(m_current_head)); + if(!m_psnd_hndlr->do_send(byte_slice{std::move(return_buff)})) return false; } diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index 8d7ffb2c2..3f734a72b 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -136,7 +136,6 @@ public: critical_section m_local_inv_buff_lock; std::string m_local_inv_buff; - critical_section m_send_lock; critical_section m_call_lock; volatile uint32_t m_wait_count; @@ -260,6 +259,34 @@ public: return handler->is_timer_started(); } template friend struct anvoke_handler; + + static bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept + { + bucket_head2 head = {0}; + head.m_signature = SWAP64LE(LEVIN_SIGNATURE); + head.m_have_to_return_data = expect_response; + head.m_cb = SWAP64LE(msg_size); + + head.m_command = SWAP32LE(command); + head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); + head.m_flags = SWAP32LE(flags); + return head; + } + + bool send_message(uint32_t command, epee::span in_buff, uint32_t flags, bool expect_response) + { + const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); + if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) + return false; + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); + return true; + } + public: async_protocol_handler(net_utils::i_service_endpoint* psnd_hndlr, config_type& config, @@ -458,37 +485,22 @@ public: if(m_current_head.m_have_to_return_data) { std::string return_buff; - m_current_head.m_return_code = m_config.m_pcommands_handler->invoke( - m_current_head.m_command, - buff_to_invoke, - return_buff, - m_connection_context); - m_current_head.m_cb = return_buff.size(); - m_current_head.m_have_to_return_data = false; - m_current_head.m_protocol_version = LEVIN_PROTOCOL_VER_1; - m_current_head.m_flags = LEVIN_PACKET_RESPONSE; -#if BYTE_ORDER == LITTLE_ENDIAN - std::string send_buff((const char*)&m_current_head, sizeof(m_current_head)); -#else - bucket_head2 head = m_current_head; - head.m_signature = SWAP64LE(head.m_signature); - head.m_cb = SWAP64LE(head.m_cb); - head.m_command = SWAP32LE(head.m_command); - head.m_return_code = SWAP32LE(head.m_return_code); - head.m_flags = SWAP32LE(head.m_flags); - head.m_protocol_version = SWAP32LE(head.m_protocol_version); - std::string send_buff((const char*)&head, sizeof(head)); -#endif - send_buff += return_buff; - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(send_buff.data(), send_buff.size())) + const uint32_t return_code = m_config.m_pcommands_handler->invoke( + m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context + ); + + bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); + head.m_return_code = SWAP32LE(return_code); + return_buff.insert(0, reinterpret_cast(&head), sizeof(head)); + + if(!m_pservice_endpoint->do_send(byte_slice{std::move(return_buff)})) return false; - CRITICAL_REGION_END(); - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << m_current_head.m_cb - << ", flags" << m_current_head.m_flags - << ", r?=" << m_current_head.m_have_to_return_data - <<", cmd = " << m_current_head.m_command - << ", ver=" << m_current_head.m_protocol_version); + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); } else m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); @@ -584,26 +596,10 @@ public: break; } - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_cb = SWAP64LE(in_buff.size()); - head.m_have_to_return_data = true; - - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); - CRITICAL_REGION_BEGIN(m_send_lock); - CRITICAL_REGION_LOCAL1(m_invoke_response_handlers_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) - { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); - err_code = LEVIN_ERROR_CONNECTION; - break; - } + CRITICAL_REGION_BEGIN(m_invoke_response_handlers_lock); - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { LOG_ERROR_CC(m_connection_context, "Failed to do_send"); err_code = LEVIN_ERROR_CONNECTION; @@ -642,35 +638,13 @@ public: if(m_deletion_initiated) return LEVIN_ERROR_CONNECTION_DESTROYED; - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_cb = SWAP64LE(in_buff.size()); - head.m_have_to_return_data = true; - - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) - { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); - return LEVIN_ERROR_CONNECTION; - } - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); + LOG_ERROR_CC(m_connection_context, "Failed to send request"); return LEVIN_ERROR_CONNECTION; } - CRITICAL_REGION_END(); - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb - << ", f=" << head.m_flags - << ", r?=" << head.m_have_to_return_data - << ", cmd = " << head.m_command - << ", ver=" << head.m_protocol_version); uint64_t ticks_start = misc_utils::get_tick_count(); size_t prev_size = 0; @@ -716,32 +690,11 @@ public: if(m_deletion_initiated) return LEVIN_ERROR_CONNECTION_DESTROYED; - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_have_to_return_data = false; - head.m_cb = SWAP64LE(in_buff.size()); - - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) - { - LOG_ERROR_CC(m_connection_context, "Failed to do_send()"); - return -1; - } - - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false)) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send()"); + LOG_ERROR_CC(m_connection_context, "Failed to send notify message"); return -1; } - CRITICAL_REGION_END(); - LOG_DEBUG_CC(m_connection_context, "LEVIN_PACKET_SENT. [len=" << head.m_cb << - ", f=" << head.m_flags << - ", r?=" << head.m_have_to_return_data << - ", cmd = " << head.m_command << - ", ver=" << head.m_protocol_version); return 1; } diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h index 5ae3e53b3..dd80fae8b 100644 --- a/contrib/epee/include/net/net_utils_base.h +++ b/contrib/epee/include/net/net_utils_base.h @@ -34,9 +34,10 @@ #include #include #include +#include "byte_slice.h" #include "enums.h" -#include "serialization/keyvalue_serialization.h" #include "misc_log_ex.h" +#include "serialization/keyvalue_serialization.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net" @@ -424,7 +425,7 @@ namespace net_utils /************************************************************************/ struct i_service_endpoint { - virtual bool do_send(const void* ptr, size_t cb)=0; + virtual bool do_send(byte_slice message)=0; virtual bool close()=0; virtual bool send_done()=0; virtual bool call_run_once_service_io()=0; diff --git a/contrib/epee/src/CMakeLists.txt b/contrib/epee/src/CMakeLists.txt index 2465afebb..d74e26634 100644 --- a/contrib/epee/src/CMakeLists.txt +++ b/contrib/epee/src/CMakeLists.txt @@ -26,8 +26,8 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -add_library(epee STATIC hex.cpp http_auth.cpp mlog.cpp net_helper.cpp net_utils_base.cpp string_tools.cpp wipeable_string.cpp memwipe.c - connection_basic.cpp network_throttle.cpp network_throttle-detail.cpp mlocker.cpp buffer.cpp net_ssl.cpp) +add_library(epee STATIC byte_slice.cpp hex.cpp http_auth.cpp mlog.cpp net_helper.cpp net_utils_base.cpp string_tools.cpp wipeable_string.cpp + memwipe.c connection_basic.cpp network_throttle.cpp network_throttle-detail.cpp mlocker.cpp buffer.cpp net_ssl.cpp) if (USE_READLINE AND GNU_READLINE_FOUND) add_library(epee_readline STATIC readline_buffer.cpp) diff --git a/contrib/epee/src/byte_slice.cpp b/contrib/epee/src/byte_slice.cpp new file mode 100644 index 000000000..216049e5b --- /dev/null +++ b/contrib/epee/src/byte_slice.cpp @@ -0,0 +1,209 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include +#include +#include +#include +#include +#include + +#include "byte_slice.h" + +namespace epee +{ + struct byte_slice_data + { + byte_slice_data() noexcept + : ref_count(1) + {} + + virtual ~byte_slice_data() noexcept + {} + + std::atomic ref_count; + }; + + void release_byte_slice::operator()(byte_slice_data* ptr) const noexcept + { + if (ptr && --(ptr->ref_count) == 0) + { + ptr->~byte_slice_data(); + free(ptr); + } + } + + namespace + { + template + struct adapted_byte_slice final : byte_slice_data + { + explicit adapted_byte_slice(T&& buffer) + : byte_slice_data(), buffer(std::move(buffer)) + {} + + virtual ~adapted_byte_slice() noexcept final override + {} + + const T buffer; + }; + + // bytes "follow" this structure in memory slab + struct raw_byte_slice final : byte_slice_data + { + raw_byte_slice() noexcept + : byte_slice_data() + {} + + virtual ~raw_byte_slice() noexcept final override + {} + }; + + /* This technique is not-standard, but allows for the reference count and + memory for the bytes (when given a list of spans) to be allocated in a + single call. In that situation, the dynamic sized bytes are after/behind + the raw_byte_slice class. The C runtime has to track the number of bytes + allocated regardless, so free'ing is relatively easy. */ + + template + std::unique_ptr allocate_slice(std::size_t extra_bytes, U&&... args) + { + if (std::numeric_limits::max() - sizeof(T) < extra_bytes) + throw std::bad_alloc{}; + + void* const ptr = malloc(sizeof(T) + extra_bytes); + if (ptr == nullptr) + throw std::bad_alloc{}; + + try + { + new (ptr) T{std::forward(args)...}; + } + catch (...) + { + free(ptr); + throw; + } + return std::unique_ptr{reinterpret_cast(ptr)}; + } + } // anonymous + + byte_slice::byte_slice(byte_slice_data* storage, span portion) noexcept + : storage_(storage), portion_(portion) + { + if (storage_) + ++(storage_->ref_count); + } + + template + byte_slice::byte_slice(const adapt_buffer, T&& buffer) + : storage_(nullptr), portion_(to_byte_span(to_span(buffer))) + { + if (!buffer.empty()) + storage_ = allocate_slice>(0, std::move(buffer)); + } + + byte_slice::byte_slice(std::initializer_list> sources) + : byte_slice() + { + std::size_t space_needed = 0; + for (const auto source : sources) + space_needed += source.size(); + + if (space_needed) + { + auto storage = allocate_slice(space_needed); + span out{reinterpret_cast(storage.get() + 1), space_needed}; + portion_ = {out.data(), out.size()}; + + for (const auto source : sources) + { + std::memcpy(out.data(), source.data(), source.size()); + if (out.remove_prefix(source.size()) < source.size()) + throw std::bad_alloc{}; // size_t overflow on space_needed + } + storage_ = std::move(storage); + } + } + + byte_slice::byte_slice(std::string&& buffer) + : byte_slice(adapt_buffer{}, std::move(buffer)) + {} + + byte_slice::byte_slice(std::vector&& buffer) + : byte_slice(adapt_buffer{}, std::move(buffer)) + {} + + byte_slice::byte_slice(byte_slice&& source) noexcept + : storage_(std::move(source.storage_)), portion_(source.portion_) + { + source.portion_ = epee::span{}; + } + + byte_slice& byte_slice::operator=(byte_slice&& source) noexcept + { + storage_ = std::move(source.storage_); + portion_ = source.portion_; + if (source.storage_ == nullptr) + source.portion_ = epee::span{}; + + return *this; + } + + std::size_t byte_slice::remove_prefix(std::size_t max_bytes) noexcept + { + max_bytes = portion_.remove_prefix(max_bytes); + if (portion_.empty()) + storage_ = nullptr; + return max_bytes; + } + + byte_slice byte_slice::take_slice(const std::size_t max_bytes) noexcept + { + byte_slice out{}; + std::uint8_t const* const ptr = data(); + out.portion_ = {ptr, portion_.remove_prefix(max_bytes)}; + + if (portion_.empty()) + out.storage_ = std::move(storage_); // no atomic inc/dec + else + out = {storage_.get(), out.portion_}; + + return out; + } + + byte_slice byte_slice::get_slice(const std::size_t begin, const std::size_t end) const + { + if (end < begin || portion_.size() < end) + throw std::out_of_range{"bad slice range"}; + + if (begin == end) + return {}; + return {storage_.get(), {portion_.begin() + begin, end - begin}}; + } +} // epee -- cgit v1.2.3 From 3b24b1d082da28da15dc5e3aeaa0ebebe7758f2f Mon Sep 17 00:00:00 2001 From: Lee Clagett Date: Thu, 16 May 2019 16:34:22 -0400 Subject: Added support for "noise" over I1P/Tor to mask Tx transmission. --- contrib/epee/include/net/abstract_tcp_server2.h | 22 ++-- contrib/epee/include/net/abstract_tcp_server2.inl | 17 ++- contrib/epee/include/net/connection_basic.hpp | 6 +- contrib/epee/include/net/enums.h | 2 +- contrib/epee/include/net/levin_base.h | 27 +++++ .../include/net/levin_protocol_handler_async.h | 132 +++++++++++++++------ contrib/epee/include/net/net_helper.h | 13 +- contrib/epee/src/CMakeLists.txt | 2 +- contrib/epee/src/connection_basic.cpp | 4 +- contrib/epee/src/levin_base.cpp | 128 ++++++++++++++++++++ 10 files changed, 289 insertions(+), 64 deletions(-) create mode 100644 contrib/epee/src/levin_base.cpp (limited to 'contrib') diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index 373ef0dc6..3a2c5341d 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -49,10 +49,11 @@ #include #include #include -#include +#include //! \TODO Convert to std::shared_ptr #include #include #include +#include #include "byte_slice.h" #include "net_utils_base.h" #include "syncobj.h" @@ -91,25 +92,24 @@ namespace net_utils public: typedef typename t_protocol_handler::connection_context t_connection_context; - struct shared_state : connection_basic_shared_state + struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type { shared_state() - : connection_basic_shared_state(), pfilter(nullptr), config(), stop_signal_sent(false) + : connection_basic_shared_state(), t_protocol_handler::config_type(), pfilter(nullptr), stop_signal_sent(false) {} i_connection_filter* pfilter; - typename t_protocol_handler::config_type config; bool stop_signal_sent; }; /// Construct a connection with the given io_service. explicit connection( boost::asio::io_service& io_service, - boost::shared_ptr state, + std::shared_ptr state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); explicit connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr state, + std::shared_ptr state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); @@ -271,7 +271,13 @@ namespace net_utils typename t_protocol_handler::config_type& get_config_object() { assert(m_state != nullptr); // always set in constructor - return m_state->config; + return *m_state; + } + + std::shared_ptr get_config_shared() + { + assert(m_state != nullptr); // always set in constructor + return {m_state}; } int get_binded_port(){return m_port;} @@ -352,7 +358,7 @@ namespace net_utils bool is_thread_worker(); - const boost::shared_ptr::shared_state> m_state; + const std::shared_ptr::shared_state> m_state; /// The io_service used to perform asynchronous operations. struct worker diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index ebe1d5aaf..12a87071a 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -68,7 +68,7 @@ namespace epee namespace net_utils { template - T& check_and_get(boost::shared_ptr& ptr) + T& check_and_get(std::shared_ptr& ptr) { CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null"); return *ptr; @@ -81,7 +81,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template connection::connection( boost::asio::io_service& io_service, - boost::shared_ptr state, + std::shared_ptr state, t_connection_type connection_type, ssl_support_t ssl_support ) @@ -91,13 +91,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) template connection::connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr state, + std::shared_ptr state, t_connection_type connection_type, ssl_support_t ssl_support ) : connection_basic(std::move(sock), state, ssl_support), - m_protocol_handler(this, check_and_get(state).config, context), + m_protocol_handler(this, check_and_get(state), context), buffer_ssl_init_fill(0), m_connection_type( connection_type ), m_throttle_speed_in("speed_in", "throttle_speed_in"), @@ -378,7 +378,6 @@ PRAGMA_WARNING_DISABLE_VS(4355) if(!recv_res) { //_info("[sock " << socket().native_handle() << "] protocol_want_close"); - //some error in protocol, protocol handler ask to close connection boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); bool do_shutdown = false; @@ -601,7 +600,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) double current_speed_up; { CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex); - m_throttle_speed_out.handle_trafic_exact(cb); + m_throttle_speed_out.handle_trafic_exact(chunk.size()); current_speed_up = m_throttle_speed_out.get_current_speed(); } context.m_current_speed_up = current_speed_up; @@ -665,7 +664,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) auto size_now = m_send_que.front().size(); MDEBUG("do_send_chunk() NOW SENSD: packet="< boosted_tcp_server::boosted_tcp_server( t_connection_type connection_type ) : - m_state(boost::make_shared::shared_state>()), + m_state(std::make_shared::shared_state>()), m_io_service_local_instance(new worker()), io_service_(m_io_service_local_instance->io_service), acceptor_(io_service_), @@ -912,7 +911,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template boosted_tcp_server::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) : - m_state(boost::make_shared::shared_state>()), + m_state(std::make_shared::shared_state>()), io_service_(extarnal_io_service), acceptor_(io_service_), acceptor_ipv6(io_service_), diff --git a/contrib/epee/include/net/connection_basic.hpp b/contrib/epee/include/net/connection_basic.hpp index baeb18e27..2f60f7604 100644 --- a/contrib/epee/include/net/connection_basic.hpp +++ b/contrib/epee/include/net/connection_basic.hpp @@ -100,7 +100,7 @@ class connection_basic_pimpl; // PIMPL for this class class connection_basic { // not-templated base class for rapid developmet of some code parts // beware of removing const, net_utils::connection is sketchily doing a cast to prevent storing ptr twice - const boost::shared_ptr m_state; + const std::shared_ptr m_state; public: std::unique_ptr< connection_basic_pimpl > mI; // my Implementation @@ -119,8 +119,8 @@ class connection_basic { // not-templated base class for rapid developmet of som public: // first counter is the ++/-- count of current sockets, the other socket_number is only-increasing ++ number generator - connection_basic(boost::asio::ip::tcp::socket&& socket, boost::shared_ptr state, ssl_support_t ssl_support); - connection_basic(boost::asio::io_service &io_service, boost::shared_ptr state, ssl_support_t ssl_support); + connection_basic(boost::asio::ip::tcp::socket&& socket, std::shared_ptr state, ssl_support_t ssl_support); + connection_basic(boost::asio::io_service &io_service, std::shared_ptr state, ssl_support_t ssl_support); virtual ~connection_basic() noexcept(false); diff --git a/contrib/epee/include/net/enums.h b/contrib/epee/include/net/enums.h index 078a4b274..2f27d07f9 100644 --- a/contrib/epee/include/net/enums.h +++ b/contrib/epee/include/net/enums.h @@ -49,7 +49,7 @@ namespace net_utils { invalid = 0, public_ = 1, // public is keyword - i2p = 2, + i2p = 2, // order from here changes priority of selection for origin TXes tor = 3 }; diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h index a88a1eb49..f9b6f9a81 100644 --- a/contrib/epee/include/net/levin_base.h +++ b/contrib/epee/include/net/levin_base.h @@ -29,7 +29,11 @@ #ifndef _LEVIN_BASE_H_ #define _LEVIN_BASE_H_ +#include + +#include "byte_slice.h" #include "net_utils_base.h" +#include "span.h" #define LEVIN_SIGNATURE 0x0101010101012101LL //Bender's nightmare @@ -72,6 +76,8 @@ namespace levin #define LEVIN_PACKET_REQUEST 0x00000001 #define LEVIN_PACKET_RESPONSE 0x00000002 +#define LEVIN_PACKET_BEGIN 0x00000004 +#define LEVIN_PACKET_END 0x00000008 #define LEVIN_PROTOCOL_VER_0 0 @@ -118,9 +124,30 @@ namespace levin } } + //! \return Intialized levin header. + bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept; + + //! \return A levin notification message. + byte_slice make_notify(int command, epee::span payload); + + /*! Generate a dummy levin message. + \param noise_bytes Total size of the returned `byte_slice`. + \return `nullptr` if `noise_size` is smaller than the levin header. + Otherwise, a dummy levin message. */ + byte_slice make_noise_notify(std::size_t noise_bytes); + + /*! Generate 1+ levin messages that are identical to the noise message size. + + \param noise Each levin message will be identical to the size of this + message. The bytes from this message will be used for padding. + \return `nullptr` if `noise.size()` is less than the levin header size. + Otherwise, a levin notification message OR 2+ levin fragment messages. + Each message is `noise.size()` in length. */ + byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span payload); } } #endif //_LEVIN_BASE_H_ + diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index 3f734a72b..208911e1a 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -32,6 +32,7 @@ #include #include +#include #include "levin_base.h" #include "buffer.h" @@ -91,6 +92,7 @@ public: int invoke_async(int command, const epee::span in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); int notify(int command, const epee::span in_buff, boost::uuids::uuid connection_id); + int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); bool close(boost::uuids::uuid connection_id); bool update_connection_context(const t_connection_context& contxt); bool request_callback(boost::uuids::uuid connection_id); @@ -117,6 +119,22 @@ public: template class async_protocol_handler { + std::string m_fragment_buffer; + + bool send_message(uint32_t command, epee::span in_buff, uint32_t flags, bool expect_response) + { + const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); + if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) + return false; + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); + return true; + } + public: typedef t_connection_context connection_context; typedef async_protocol_handler_config config_type; @@ -259,34 +277,6 @@ public: return handler->is_timer_started(); } template friend struct anvoke_handler; - - static bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept - { - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_have_to_return_data = expect_response; - head.m_cb = SWAP64LE(msg_size); - - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - head.m_flags = SWAP32LE(flags); - return head; - } - - bool send_message(uint32_t command, epee::span in_buff, uint32_t flags, bool expect_response) - { - const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); - if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) - return false; - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb - << ", flags" << head.m_flags - << ", r?=" << head.m_have_to_return_data - <<", cmd = " << head.m_command - << ", ver=" << head.m_protocol_version); - return true; - } - public: async_protocol_handler(net_utils::i_service_endpoint* psnd_hndlr, config_type& config, @@ -403,7 +393,12 @@ public: return false; } - if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size) + // these should never fail, but do runtime check for safety + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()"); + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()"); + + // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public + if(cb > m_config.m_max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size()) { MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size << ", packet received " << m_cache_in_buffer.size() + cb @@ -435,8 +430,38 @@ public: } break; } + { + std::string temp{}; epee::span buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb); + m_state = stream_state_head; + + // abstract_tcp_server2.h manages max bandwidth for a p2p link + if (!(m_current_head.m_flags & (LEVIN_PACKET_REQUEST | LEVIN_PACKET_RESPONSE))) + { + // special noise/fragment command + static constexpr const uint32_t both_flags = (LEVIN_PACKET_BEGIN | LEVIN_PACKET_END); + if ((m_current_head.m_flags & both_flags) == both_flags) + break; // noise message, skip to next message + + if (m_current_head.m_flags & LEVIN_PACKET_BEGIN) + m_fragment_buffer.clear(); + + m_fragment_buffer.append(reinterpret_cast(buff_to_invoke.data()), buff_to_invoke.size()); + if (!(m_current_head.m_flags & LEVIN_PACKET_END)) + break; // skip to next message + + if (m_fragment_buffer.size() < sizeof(bucket_head2)) + { + MERROR(m_connection_context << "Fragmented data too small for levin header"); + return false; + } + + temp = std::move(m_fragment_buffer); + m_fragment_buffer.clear(); + std::memcpy(std::addressof(m_current_head), std::addressof(temp[0]), sizeof(bucket_head2)); + buff_to_invoke = {reinterpret_cast(temp.data()) + sizeof(bucket_head2), temp.size() - sizeof(bucket_head2)}; + } bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE); @@ -489,9 +514,9 @@ public: m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context ); - bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); - head.m_return_code = SWAP32LE(return_code); - return_buff.insert(0, reinterpret_cast(&head), sizeof(head)); + bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); + head.m_return_code = SWAP32LE(return_code); + return_buff.insert(0, reinterpret_cast(&head), sizeof(head)); if(!m_pservice_endpoint->do_send(byte_slice{std::move(return_buff)})) return false; @@ -505,8 +530,13 @@ public: else m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); } + // reuse small buffer + if (!temp.empty() && temp.capacity() <= 64 * 1024) + { + temp.clear(); + m_fragment_buffer = std::move(temp); + } } - m_state = stream_state_head; break; case stream_state_head: { @@ -616,7 +646,7 @@ public: if (LEVIN_OK != err_code) { - epee::span stub_buff{(const uint8_t*)"", 0}; + epee::span stub_buff = nullptr; // Never call callback inside critical section, that can cause deadlock cb(err_code, stub_buff, m_connection_context); return false; @@ -698,6 +728,32 @@ public: return 1; } + + /*! Sends `message` without adding a levin header. The message must have + been created with `make_notify`, `make_noise_notify` or + `make_fragmented_notify`. See additional instructions for + `make_fragmented_notify`. + + \return 1 on success */ + int send(byte_slice message) + { + const misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this) + ); + + if(m_deletion_initiated) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + const std::size_t length = message.size(); + if (!m_pservice_endpoint->do_send(std::move(message))) + { + LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it"); + return -1; + } + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]"); + return 1; + } //------------------------------------------------------------------------------------------ boost::uuids::uuid get_connection_id() {return m_connection_context.m_connection_id;} //------------------------------------------------------------------------------------------ @@ -876,6 +932,14 @@ int async_protocol_handler_config::notify(int command, con } //------------------------------------------------------------------------------------------ template +int async_protocol_handler_config::send(byte_slice message, const boost::uuids::uuid& connection_id) +{ + async_protocol_handler* aph; + int r = find_and_lock_connection(connection_id, aph); + return LEVIN_OK == r ? aph->send(std::move(message)) : 0; +} +//------------------------------------------------------------------------------------------ +template bool async_protocol_handler_config::close(boost::uuids::uuid connection_id) { CRITICAL_REGION_LOCAL(m_connects_lock); diff --git a/contrib/epee/include/net/net_helper.h b/contrib/epee/include/net/net_helper.h index e315555fc..2b02eafa4 100644 --- a/contrib/epee/include/net/net_helper.h +++ b/contrib/epee/include/net/net_helper.h @@ -31,6 +31,7 @@ //#include //#include +#include #include #include #include @@ -154,7 +155,7 @@ namespace net_utils } inline - try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout, epee::net_utils::ssl_support_t ssl_support) + try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout) { m_deadline.expires_from_now(timeout); boost::unique_future connection = m_connector(addr, port, m_deadline); @@ -174,11 +175,11 @@ namespace net_utils m_connected = true; m_deadline.expires_at(std::chrono::steady_clock::time_point::max()); // SSL Options - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { if (!m_ssl_options.handshake(*m_ssl_socket, boost::asio::ssl::stream_base::client, addr)) { - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { boost::system::error_code ignored_ec; m_ssl_socket->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); @@ -217,7 +218,7 @@ namespace net_utils // Get a list of endpoints corresponding to the server name. - try_connect_result_t try_connect_result = try_connect(addr, port, timeout, m_ssl_options.support); + try_connect_result_t try_connect_result = try_connect(addr, port, timeout); if (try_connect_result == CONNECT_FAILURE) return false; if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) @@ -226,7 +227,7 @@ namespace net_utils { MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL"); m_ssl_options.support = epee::net_utils::ssl_support_t::e_ssl_support_disabled; - if (try_connect(addr, port, timeout, m_ssl_options.support) != CONNECT_SUCCESS) + if (try_connect(addr, port, timeout) != CONNECT_SUCCESS) return false; } } @@ -562,7 +563,7 @@ namespace net_utils { m_deadline.cancel(); boost::system::error_code ec; - if(m_ssl_options.support != ssl_support_t::e_ssl_support_disabled) + if(m_ssl_options) shutdown_ssl(); m_ssl_socket->next_layer().cancel(ec); if(ec) diff --git a/contrib/epee/src/CMakeLists.txt b/contrib/epee/src/CMakeLists.txt index d74e26634..c512e3b86 100644 --- a/contrib/epee/src/CMakeLists.txt +++ b/contrib/epee/src/CMakeLists.txt @@ -27,7 +27,7 @@ # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. add_library(epee STATIC byte_slice.cpp hex.cpp http_auth.cpp mlog.cpp net_helper.cpp net_utils_base.cpp string_tools.cpp wipeable_string.cpp - memwipe.c connection_basic.cpp network_throttle.cpp network_throttle-detail.cpp mlocker.cpp buffer.cpp net_ssl.cpp) + levin_base.cpp memwipe.c connection_basic.cpp network_throttle.cpp network_throttle-detail.cpp mlocker.cpp buffer.cpp net_ssl.cpp) if (USE_READLINE AND GNU_READLINE_FOUND) add_library(epee_readline STATIC readline_buffer.cpp) diff --git a/contrib/epee/src/connection_basic.cpp b/contrib/epee/src/connection_basic.cpp index 82d9e3b53..7526dde26 100644 --- a/contrib/epee/src/connection_basic.cpp +++ b/contrib/epee/src/connection_basic.cpp @@ -128,7 +128,7 @@ connection_basic_pimpl::connection_basic_pimpl(const std::string &name) : m_thro int connection_basic_pimpl::m_default_tos; // methods: -connection_basic::connection_basic(boost::asio::ip::tcp::socket&& sock, boost::shared_ptr state, ssl_support_t ssl_support) +connection_basic::connection_basic(boost::asio::ip::tcp::socket&& sock, std::shared_ptr state, ssl_support_t ssl_support) : m_state(std::move(state)), mI( new connection_basic_pimpl("peer") ), @@ -152,7 +152,7 @@ connection_basic::connection_basic(boost::asio::ip::tcp::socket&& sock, boost::s _note("Spawned connection #"<m_peer_number<<" to " << remote_addr_str << " currently we have sockets count:" << m_state->sock_count); } -connection_basic::connection_basic(boost::asio::io_service &io_service, boost::shared_ptr state, ssl_support_t ssl_support) +connection_basic::connection_basic(boost::asio::io_service &io_service, std::shared_ptr state, ssl_support_t ssl_support) : m_state(std::move(state)), mI( new connection_basic_pimpl("peer") ), diff --git a/contrib/epee/src/levin_base.cpp b/contrib/epee/src/levin_base.cpp new file mode 100644 index 000000000..ff845e2a7 --- /dev/null +++ b/contrib/epee/src/levin_base.cpp @@ -0,0 +1,128 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "net/levin_base.h" + +#include "int-util.h" + +namespace epee +{ +namespace levin +{ + bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept + { + bucket_head2 head = {0}; + head.m_signature = SWAP64LE(LEVIN_SIGNATURE); + head.m_have_to_return_data = expect_response; + head.m_cb = SWAP64LE(msg_size); + + head.m_command = SWAP32LE(command); + head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); + head.m_flags = SWAP32LE(flags); + return head; + } + + byte_slice make_notify(int command, epee::span payload) + { + const bucket_head2 head = make_header(command, payload.size(), LEVIN_PACKET_REQUEST, false); + return byte_slice{epee::as_byte_span(head), payload}; + } + + byte_slice make_noise_notify(const std::size_t noise_bytes) + { + static constexpr const std::uint32_t flags = + LEVIN_PACKET_BEGIN | LEVIN_PACKET_END; + + if (noise_bytes < sizeof(bucket_head2)) + return nullptr; + + std::string buffer(noise_bytes, char(0)); + const bucket_head2 head = make_header(0, noise_bytes - sizeof(bucket_head2), flags, false); + std::memcpy(std::addressof(buffer[0]), std::addressof(head), sizeof(head)); + + return byte_slice{std::move(buffer)}; + } + + byte_slice make_fragmented_notify(const byte_slice& noise_message, int command, epee::span payload) + { + const size_t noise_size = noise_message.size(); + if (noise_size < sizeof(bucket_head2) * 2) + return nullptr; + + if (payload.size() <= noise_size - sizeof(bucket_head2)) + { + /* The entire message can be sent at once, and the levin binary parser + will ignore extra bytes. So just pad with zeroes and otherwise send + a "normal", not fragmented message. */ + const size_t padding = noise_size - sizeof(bucket_head2) - payload.size(); + const span padding_bytes{noise_message.end() - padding, padding}; + + const bucket_head2 head = make_header(command, noise_size - sizeof(bucket_head2), LEVIN_PACKET_REQUEST, false); + return byte_slice{as_byte_span(head), payload, padding_bytes}; + } + + // fragment message + const size_t payload_space = noise_size - sizeof(bucket_head2); + const size_t expected_fragments = ((payload.size() - 2) / payload_space) + 1; + + std::string buffer{}; + buffer.reserve((expected_fragments + 1) * noise_size); // +1 here overselects for internal bucket_head2 value + + bucket_head2 head = make_header(0, noise_size - sizeof(bucket_head2), LEVIN_PACKET_BEGIN, false); + buffer.append(reinterpret_cast(&head), sizeof(head)); + + head.m_command = command; + head.m_flags = LEVIN_PACKET_REQUEST; + head.m_cb = payload.size(); + buffer.append(reinterpret_cast(&head), sizeof(head)); + + size_t copy_size = payload.remove_prefix(payload_space - sizeof(bucket_head2)); + buffer.append(reinterpret_cast(payload.data()) - copy_size, copy_size); + + head.m_command = 0; + head.m_flags = 0; + head.m_cb = noise_size - sizeof(bucket_head2); + + while (!payload.empty()) + { + copy_size = payload.remove_prefix(payload_space); + + if (payload.empty()) + head.m_flags = LEVIN_PACKET_END; + + buffer.append(reinterpret_cast(&head), sizeof(head)); + buffer.append(reinterpret_cast(payload.data()) - copy_size, copy_size); + } + + const size_t padding = noise_size - copy_size - sizeof(bucket_head2); + buffer.append(reinterpret_cast(noise_message.end()) - padding, padding); + + return byte_slice{std::move(buffer)}; + } +} // levin +} // epee -- cgit v1.2.3