diff options
Diffstat (limited to 'contrib/epee/include')
19 files changed, 557 insertions, 282 deletions
diff --git a/contrib/epee/include/byte_slice.h b/contrib/epee/include/byte_slice.h new file mode 100644 index 000000000..1fbba101e --- /dev/null +++ b/contrib/epee/include/byte_slice.h @@ -0,0 +1,145 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include <cstddef> +#include <cstdint> +#include <memory> +#include <string> +#include <vector> + +#include "span.h" + +namespace epee +{ + struct byte_slice_data; + + struct release_byte_slice + { + void operator()(byte_slice_data*) const noexcept; + }; + + /*! Inspired by slices in golang. Storage is thread-safe reference counted, + allowing for cheap copies or range selection on the bytes. The bytes + owned by this class are always immutable. + + The functions `operator=`, `take_slice` and `remove_prefix` may alter the + reference count for the backing store, which will invalidate pointers + previously returned if the reference count is zero. Be careful about + "caching" pointers in these circumstances. */ + class byte_slice + { + /* A custom reference count is used instead of shared_ptr because it allows + for an allocation optimization for the span constructor. This also + reduces the size of this class by one pointer. */ + std::unique_ptr<byte_slice_data, release_byte_slice> storage_; + span<const std::uint8_t> portion_; // within storage_ + + //! Internal use only; use to increase `storage` reference count. + byte_slice(byte_slice_data* storage, span<const std::uint8_t> portion) noexcept; + + struct adapt_buffer{}; + + template<typename T> + explicit byte_slice(const adapt_buffer, T&& buffer); + + public: + using value_type = std::uint8_t; + using size_type = std::size_t; + using difference_type = std::ptrdiff_t; + using pointer = const std::uint8_t*; + using const_pointer = const std::uint8_t*; + using reference = std::uint8_t; + using const_reference = std::uint8_t; + using iterator = pointer; + using const_iterator = const_pointer; + + //! Construct empty slice. + byte_slice() noexcept + : storage_(nullptr), portion_() + {} + + //! Construct empty slice + byte_slice(std::nullptr_t) noexcept + : byte_slice() + {} + + //! Scatter-gather (copy) multiple `sources` into a single allocated slice. + explicit byte_slice(std::initializer_list<span<const std::uint8_t>> sources); + + //! Convert `buffer` into a slice using one allocation for shared count. + explicit byte_slice(std::vector<std::uint8_t>&& buffer); + + //! Convert `buffer` into a slice using one allocation for shared count. + explicit byte_slice(std::string&& buffer); + + byte_slice(byte_slice&& source) noexcept; + ~byte_slice() noexcept = default; + + //! \note May invalidate previously retrieved pointers. + byte_slice& operator=(byte_slice&&) noexcept; + + //! \return A shallow (cheap) copy of the data from `this` slice. + byte_slice clone() const noexcept { return {storage_.get(), portion_}; } + + iterator begin() const noexcept { return portion_.begin(); } + const_iterator cbegin() const noexcept { return portion_.begin(); } + + iterator end() const noexcept { return portion_.end(); } + const_iterator cend() const noexcept { return portion_.end(); } + + bool empty() const noexcept { return storage_ == nullptr; } + const std::uint8_t* data() const noexcept { return portion_.data(); } + std::size_t size() const noexcept { return portion_.size(); } + + /*! Drop bytes from the beginning of `this` slice. + + \note May invalidate previously retrieved pointers. + \post `this->size() = this->size() - std::min(this->size(), max_bytes)` + \post `if (this->size() <= max_bytes) this->data() = nullptr` + \return Number of bytes removed. */ + std::size_t remove_prefix(std::size_t max_bytes) noexcept; + + /*! "Take" bytes from the beginning of `this` slice. + + \note May invalidate previously retrieved pointers. + \post `this->size() = this->size() - std::min(this->size(), max_bytes)` + \post `if (this->size() <= max_bytes) this->data() = nullptr` + \return Slice containing the bytes removed from `this` slice. */ + byte_slice take_slice(std::size_t max_bytes) noexcept; + + /*! Return a shallow (cheap) copy of a slice from `begin` and `end` offsets. + + \throw std::out_of_range If `end < begin`. + \throw std::out_of_range If `size() < end`. + \return Slice starting at `data() + begin` of size `end - begin`. */ + byte_slice get_slice(std::size_t begin, std::size_t end) const; + }; +} // epee + diff --git a/contrib/epee/include/console_handler.h b/contrib/epee/include/console_handler.h index e07e16d91..13747b0c8 100644 --- a/contrib/epee/include/console_handler.h +++ b/contrib/epee/include/console_handler.h @@ -45,6 +45,9 @@ #include "readline_buffer.h" #endif +#undef MONERO_DEFAULT_LOG_CATEGORY +#define MONERO_DEFAULT_LOG_CATEGORY "console_handler" + namespace epee { class async_stdin_reader @@ -96,7 +99,7 @@ namespace epee res = true; } - if (!eos()) + if (!eos() && m_read_status != state_cancelled) m_read_status = state_init; return res; @@ -122,6 +125,14 @@ namespace epee } } + void cancel() + { + boost::unique_lock<boost::mutex> lock(m_response_mutex); + m_read_status = state_cancelled; + m_has_read_request = false; + m_response_cv.notify_one(); + } + private: bool start_read() { @@ -162,6 +173,9 @@ namespace epee while (m_run.load(std::memory_order_relaxed)) { + if (m_read_status == state_cancelled) + return false; + fd_set read_set; FD_ZERO(&read_set); FD_SET(stdin_fileno, &read_set); @@ -179,6 +193,9 @@ namespace epee #else while (m_run.load(std::memory_order_relaxed)) { + if (m_read_status == state_cancelled) + return false; + int retval = ::WaitForSingleObject(::GetStdHandle(STD_INPUT_HANDLE), 100); switch (retval) { @@ -219,7 +236,8 @@ reread: case rdln::full: break; } #else - std::getline(std::cin, line); + if (m_read_status != state_cancelled) + std::getline(std::cin, line); #endif read_ok = !std::cin.eof() && !std::cin.fail(); } @@ -303,7 +321,7 @@ eof: template<class chain_handler> bool run(chain_handler ch_handler, std::function<std::string(void)> prompt, const std::string& usage = "", std::function<void(void)> exit_handler = NULL) { - return run(prompt, usage, [&](const std::string& cmd) { return ch_handler(cmd); }, exit_handler); + return run(prompt, usage, [&](const boost::optional<std::string>& cmd) { return ch_handler(cmd); }, exit_handler); } void stop() @@ -312,6 +330,12 @@ eof: m_stdin_reader.stop(); } + void cancel() + { + m_cancel = true; + m_stdin_reader.cancel(); + } + void print_prompt() { std::string prompt = m_prompt(); @@ -360,18 +384,23 @@ eof: std::cout << std::endl; break; } + + if (m_cancel) + { + MDEBUG("Input cancelled"); + cmd_handler(boost::none); + m_cancel = false; + continue; + } if (!get_line_ret) { MERROR("Failed to read line."); } + string_tools::trim(command); LOG_PRINT_L2("Read command: " << command); - if (command.empty()) - { - continue; - } - else if(cmd_handler(command)) + if(cmd_handler(command)) { continue; } @@ -401,6 +430,7 @@ eof: private: async_stdin_reader m_stdin_reader; std::atomic<bool> m_running = {true}; + std::atomic<bool> m_cancel = {false}; std::function<std::string(void)> m_prompt; }; @@ -482,8 +512,16 @@ eof: class command_handler { public: typedef boost::function<bool (const std::vector<std::string> &)> callback; + typedef boost::function<bool (void)> empty_callback; typedef std::map<std::string, std::pair<callback, std::pair<std::string, std::string>>> lookup; + command_handler(): + m_unknown_command_handler([](const std::vector<std::string>&){return false;}), + m_empty_command_handler([](){return true;}), + m_cancel_handler([](){return true;}) + { + } + std::string get_usage() { std::stringstream ss; @@ -516,25 +554,45 @@ eof: #endif } + void set_unknown_command_handler(const callback& hndlr) + { + m_unknown_command_handler = hndlr; + } + + void set_empty_command_handler(const empty_callback& hndlr) + { + m_empty_command_handler = hndlr; + } + + void set_cancel_handler(const empty_callback& hndlr) + { + m_cancel_handler = hndlr; + } + bool process_command_vec(const std::vector<std::string>& cmd) { - if(!cmd.size()) - return false; + if(!cmd.size() || (cmd.size() == 1 && !cmd[0].size())) + return m_empty_command_handler(); auto it = m_command_handlers.find(cmd.front()); if(it == m_command_handlers.end()) - return false; + return m_unknown_command_handler(cmd); std::vector<std::string> cmd_local(cmd.begin()+1, cmd.end()); return it->second.first(cmd_local); } - bool process_command_str(const std::string& cmd) + bool process_command_str(const boost::optional<std::string>& cmd) { + if (!cmd) + return m_cancel_handler(); std::vector<std::string> cmd_v; - boost::split(cmd_v,cmd,boost::is_any_of(" "), boost::token_compress_on); + boost::split(cmd_v,*cmd,boost::is_any_of(" "), boost::token_compress_on); return process_command_vec(cmd_v); } private: lookup m_command_handlers; + callback m_unknown_command_handler; + empty_callback m_empty_command_handler; + empty_callback m_cancel_handler; }; /************************************************************************/ @@ -572,6 +630,11 @@ eof: { m_console_handler.print_prompt(); } + + void cancel_input() + { + m_console_handler.cancel(); + } }; ///* work around because of broken boost bind */ diff --git a/contrib/epee/include/int-util.h b/contrib/epee/include/int-util.h index 0ed6505ff..8ef5be40a 100644 --- a/contrib/epee/include/int-util.h +++ b/contrib/epee/include/int-util.h @@ -129,9 +129,12 @@ static inline uint32_t div128_32(uint64_t dividend_hi, uint64_t dividend_lo, uin return remainder; } +#define IDENT16(x) ((uint16_t) (x)) #define IDENT32(x) ((uint32_t) (x)) #define IDENT64(x) ((uint64_t) (x)) +#define SWAP16(x) ((((uint16_t) (x) & 0x00ff) << 8) | \ + (((uint16_t) (x) & 0xff00) >> 8)) #define SWAP32(x) ((((uint32_t) (x) & 0x000000ff) << 24) | \ (((uint32_t) (x) & 0x0000ff00) << 8) | \ (((uint32_t) (x) & 0x00ff0000) >> 8) | \ @@ -145,10 +148,18 @@ static inline uint32_t div128_32(uint64_t dividend_hi, uint64_t dividend_lo, uin (((uint64_t) (x) & 0x00ff000000000000) >> 40) | \ (((uint64_t) (x) & 0xff00000000000000) >> 56)) +static inline uint16_t ident16(uint16_t x) { return x; } static inline uint32_t ident32(uint32_t x) { return x; } static inline uint64_t ident64(uint64_t x) { return x; } #ifndef __OpenBSD__ +# if defined(__ANDROID__) && defined(__swap16) && !defined(swap16) +# define swap16 __swap16 +# elif !defined(swap16) +static inline uint16_t swap16(uint16_t x) { + return ((x & 0x00ff) << 8) | ((x & 0xff00) >> 8); +} +# endif # if defined(__ANDROID__) && defined(__swap32) && !defined(swap32) # define swap32 __swap32 # elif !defined(swap32) @@ -176,6 +187,12 @@ static inline uint64_t swap64(uint64_t x) { static inline void mem_inplace_ident(void *mem UNUSED, size_t n UNUSED) { } #undef UNUSED +static inline void mem_inplace_swap16(void *mem, size_t n) { + size_t i; + for (i = 0; i < n; i++) { + ((uint16_t *) mem)[i] = swap16(((const uint16_t *) mem)[i]); + } +} static inline void mem_inplace_swap32(void *mem, size_t n) { size_t i; for (i = 0; i < n; i++) { @@ -189,6 +206,9 @@ static inline void mem_inplace_swap64(void *mem, size_t n) { } } +static inline void memcpy_ident16(void *dst, const void *src, size_t n) { + memcpy(dst, src, 2 * n); +} static inline void memcpy_ident32(void *dst, const void *src, size_t n) { memcpy(dst, src, 4 * n); } @@ -196,6 +216,12 @@ static inline void memcpy_ident64(void *dst, const void *src, size_t n) { memcpy(dst, src, 8 * n); } +static inline void memcpy_swap16(void *dst, const void *src, size_t n) { + size_t i; + for (i = 0; i < n; i++) { + ((uint16_t *) dst)[i] = swap16(((const uint16_t *) src)[i]); + } +} static inline void memcpy_swap32(void *dst, const void *src, size_t n) { size_t i; for (i = 0; i < n; i++) { @@ -220,6 +246,14 @@ static_assert(false, "BYTE_ORDER is undefined. Perhaps, GNU extensions are not e #endif #if BYTE_ORDER == LITTLE_ENDIAN +#define SWAP16LE IDENT16 +#define SWAP16BE SWAP16 +#define swap16le ident16 +#define swap16be swap16 +#define mem_inplace_swap16le mem_inplace_ident +#define mem_inplace_swap16be mem_inplace_swap16 +#define memcpy_swap16le memcpy_ident16 +#define memcpy_swap16be memcpy_swap16 #define SWAP32LE IDENT32 #define SWAP32BE SWAP32 #define swap32le ident32 @@ -239,6 +273,14 @@ static_assert(false, "BYTE_ORDER is undefined. Perhaps, GNU extensions are not e #endif #if BYTE_ORDER == BIG_ENDIAN +#define SWAP16BE IDENT16 +#define SWAP16LE SWAP16 +#define swap16be ident16 +#define swap16le swap16 +#define mem_inplace_swap16be mem_inplace_ident +#define mem_inplace_swap16le mem_inplace_swap16 +#define memcpy_swap16be memcpy_ident16 +#define memcpy_swap16le memcpy_swap16 #define SWAP32BE IDENT32 #define SWAP32LE SWAP32 #define swap32be ident32 diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index b38ab5399..3a2c5341d 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -49,10 +49,12 @@ #include <boost/asio/ssl.hpp> #include <boost/array.hpp> #include <boost/noncopyable.hpp> -#include <boost/shared_ptr.hpp> +#include <boost/shared_ptr.hpp> //! \TODO Convert to std::shared_ptr #include <boost/enable_shared_from_this.hpp> #include <boost/interprocess/detail/atomic.hpp> #include <boost/thread/thread.hpp> +#include <memory> +#include "byte_slice.h" #include "net_utils_base.h" #include "syncobj.h" #include "connection_basic.hpp" @@ -90,25 +92,24 @@ namespace net_utils public: typedef typename t_protocol_handler::connection_context t_connection_context; - struct shared_state : connection_basic_shared_state + struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type { shared_state() - : connection_basic_shared_state(), pfilter(nullptr), config(), stop_signal_sent(false) + : connection_basic_shared_state(), t_protocol_handler::config_type(), pfilter(nullptr), stop_signal_sent(false) {} i_connection_filter* pfilter; - typename t_protocol_handler::config_type config; bool stop_signal_sent; }; /// Construct a connection with the given io_service. explicit connection( boost::asio::io_service& io_service, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); explicit connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); @@ -135,8 +136,7 @@ namespace net_utils private: //----------------- i_service_endpoint --------------------- - virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint) - virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data + virtual bool do_send(byte_slice message); ///< (see do_send from i_service_endpoint) virtual bool send_done(); virtual bool close(); virtual bool call_run_once_service_io(); @@ -145,6 +145,8 @@ namespace net_utils virtual bool add_ref(); virtual bool release(); //------------------------------------------------------ + bool do_send_chunk(byte_slice chunk); ///< will send (or queue) a part of data. internal use only + boost::shared_ptr<connection<t_protocol_handler> > safe_shared_from_this(); bool shutdown(); /// Handle completion of a receive operation. @@ -269,7 +271,13 @@ namespace net_utils typename t_protocol_handler::config_type& get_config_object() { assert(m_state != nullptr); // always set in constructor - return m_state->config; + return *m_state; + } + + std::shared_ptr<typename t_protocol_handler::config_type> get_config_shared() + { + assert(m_state != nullptr); // always set in constructor + return {m_state}; } int get_binded_port(){return m_port;} @@ -350,7 +358,7 @@ namespace net_utils bool is_thread_worker(); - const boost::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state; + const std::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state; /// The io_service used to perform asynchronous operations. struct worker diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index 19e9c9af9..12a87071a 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -68,7 +68,7 @@ namespace epee namespace net_utils { template<typename T> - T& check_and_get(boost::shared_ptr<T>& ptr) + T& check_and_get(std::shared_ptr<T>& ptr) { CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null"); return *ptr; @@ -81,7 +81,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::io_service& io_service, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) @@ -91,13 +91,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) : connection_basic(std::move(sock), state, ssl_support), - m_protocol_handler(this, check_and_get(state).config, context), + m_protocol_handler(this, check_and_get(state), context), buffer_ssl_init_fill(0), m_connection_type( connection_type ), m_throttle_speed_in("speed_in", "throttle_speed_in"), @@ -378,7 +378,6 @@ PRAGMA_WARNING_DISABLE_VS(4355) if(!recv_res) { //_info("[sock " << socket().native_handle() << "] protocol_want_close"); - //some error in protocol, protocol handler ask to close connection boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); bool do_shutdown = false; @@ -520,7 +519,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) } //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) { + bool connection<t_protocol_handler>::do_send(byte_slice message) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -529,6 +528,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) if (m_was_shutdown) return false; // TODO avoid copy + std::uint8_t const* const message_data = message.data(); + const std::size_t message_size = message.size(); + const double factor = 32; // TODO config typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) ); @@ -538,13 +540,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) CHECK_AND_ASSERT_MES(! (chunksize_max<0), false, "Negative chunksize_max" ); // make sure it is unsigned before removin sign with cast: long long unsigned int chunksize_max_unsigned = static_cast<long long unsigned int>( chunksize_max ) ; - if (allow_split && (cb > chunksize_max_unsigned)) { + if (allow_split && (message_size > chunksize_max_unsigned)) { { // LOCK: chunking epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical *** - MDEBUG("do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr); - t_safe all = cb; // all bytes to send - t_safe pos = 0; // current sending position + MDEBUG("do_send() will SPLIT into small chunks, from packet="<<message_size<<" B for ptr="<<message_data); // 01234567890 // ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4 // ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8 @@ -554,40 +554,25 @@ PRAGMA_WARNING_DISABLE_VS(4355) // char* buf = new char[ bufsize ]; bool all_ok = true; - while (pos < all) { - t_safe lenall = all-pos; // length from here to end - t_safe len = std::min( chunksize_good , lenall); // take a smaller part - CHECK_AND_ASSERT_MES(len<=chunksize_good, false, "len too large"); - // pos=8; len=4; all=10; len=3; - - CHECK_AND_ASSERT_MES(! (len<0), false, "negative len"); // check before we cast away sign: - unsigned long long int len_unsigned = static_cast<long long int>( len ); - CHECK_AND_ASSERT_MES(len>0, false, "len not strictly positive"); // (redundant) - CHECK_AND_ASSERT_MES(len_unsigned < std::numeric_limits<size_t>::max(), false, "Invalid len_unsigned"); // yeap we want strong < then max size, to be sure - - void *chunk_start = ((char*)ptr) + pos; - MDEBUG("chunk_start="<<chunk_start<<" ptr="<<ptr<<" pos="<<pos); - CHECK_AND_ASSERT_MES(chunk_start >= ptr, false, "Pointer wraparound"); // not wrapped around address? - //std::memcpy( (void*)buf, chunk_start, len); - - MDEBUG("part of " << lenall << ": pos="<<pos << " len="<<len); - - bool ok = do_send_chunk(chunk_start, len); // <====== *** + while (!message.empty()) { + byte_slice chunk = message.take_slice(chunksize_good); + + MDEBUG("chunk_start="<<chunk.data()<<" ptr="<<message_data<<" pos="<<(chunk.data() - message_data)); + MDEBUG("part of " << message.size() << ": pos="<<(chunk.data() - message_data) << " len="<<chunk.size()); + + bool ok = do_send_chunk(std::move(chunk)); // <====== *** all_ok = all_ok && ok; if (!all_ok) { - MDEBUG("do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr); + MDEBUG("do_send() DONE ***FAILED*** from packet="<<message_size<<" B for ptr="<<message_data); MDEBUG("do_send() SEND was aborted in middle of big package - this is mostly harmless " - << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << cb); + << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << message_size); return false; // partial failure in sending } - pos = pos+len; - CHECK_AND_ASSERT_MES(pos >0, false, "pos <= 0"); - // (in catch block, or uniq pointer) delete buf; } // each chunk - MDEBUG("do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr); + MDEBUG("do_send() DONE SPLIT from packet="<<message_size<<" B for ptr="<<message_data); MDEBUG("do_send() m_connection_type = " << m_connection_type); @@ -595,7 +580,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) } // LOCK: chunking } // a big block (to be chunked) - all chunks else { // small block - return do_send_chunk(ptr,cb); // just send as 1 big chunk + return do_send_chunk(std::move(message)); // just send as 1 big chunk } CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); @@ -603,7 +588,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool connection<t_protocol_handler>::do_send_chunk(const void* ptr, size_t cb) + bool connection<t_protocol_handler>::do_send_chunk(byte_slice chunk) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -615,7 +600,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) double current_speed_up; { CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex); - m_throttle_speed_out.handle_trafic_exact(cb); + m_throttle_speed_out.handle_trafic_exact(chunk.size()); current_speed_up = m_throttle_speed_out.get_current_speed(); } context.m_current_speed_up = current_speed_up; @@ -623,7 +608,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) //_info("[sock " << socket().native_handle() << "] SEND " << cb); context.m_last_send = time(NULL); - context.m_send_cnt += cb; + context.m_send_cnt += chunk.size(); //some data should be wrote to stream //request complete @@ -644,7 +629,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) }*/ long int ms = 250 + (rand()%50); - MDEBUG("Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<cb); // XXX debug sleep + MDEBUG("Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<chunk.size()); // XXX debug sleep m_send_que_lock.unlock(); boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); m_send_que_lock.lock(); @@ -657,12 +642,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) } } - m_send_que.resize(m_send_que.size()+1); - m_send_que.back().assign((const char*)ptr, cb); - + m_send_que.push_back(std::move(chunk)); + if(m_send_que.size() > 1) { // active operation should be in progress, nothing to do, just wait last operation callback - auto size_now = cb; + auto size_now = m_send_que.back().size(); MDEBUG("do_send_chunk() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size()); //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function @@ -680,7 +664,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) auto size_now = m_send_que.front().size(); MDEBUG("do_send_chunk() NOW SENSD: packet="<<size_now<<" B"); if (speed_limit_is_enabled()) - do_send_handler_write( ptr , size_now ); // (((H))) + do_send_handler_write( m_send_que.back().data(), m_send_que.back().size() ); // (((H))) CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size"); reset_timer(get_default_timeout(), false); @@ -908,7 +892,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server( t_connection_type connection_type ) : - m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), + m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), m_io_service_local_instance(new worker()), io_service_(m_io_service_local_instance->io_service), acceptor_(io_service_), @@ -927,7 +911,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) : - m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), + m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), io_service_(extarnal_io_service), acceptor_(io_service_), acceptor_ipv6(io_service_), diff --git a/contrib/epee/include/net/connection_basic.hpp b/contrib/epee/include/net/connection_basic.hpp index 2acc6cdda..2f60f7604 100644 --- a/contrib/epee/include/net/connection_basic.hpp +++ b/contrib/epee/include/net/connection_basic.hpp @@ -49,6 +49,7 @@ #include <boost/asio.hpp> #include <boost/asio/ssl.hpp> +#include "byte_slice.h" #include "net/net_utils_base.h" #include "net/net_ssl.h" #include "syncobj.h" @@ -99,7 +100,7 @@ class connection_basic_pimpl; // PIMPL for this class class connection_basic { // not-templated base class for rapid developmet of some code parts // beware of removing const, net_utils::connection is sketchily doing a cast to prevent storing ptr twice - const boost::shared_ptr<connection_basic_shared_state> m_state; + const std::shared_ptr<connection_basic_shared_state> m_state; public: std::unique_ptr< connection_basic_pimpl > mI; // my Implementation @@ -108,7 +109,7 @@ class connection_basic { // not-templated base class for rapid developmet of som volatile uint32_t m_want_close_connection; std::atomic<bool> m_was_shutdown; critical_section m_send_que_lock; - std::list<std::string> m_send_que; + std::deque<byte_slice> m_send_que; volatile bool m_is_multithreaded; /// Strand to ensure the connection's handlers are not called concurrently. boost::asio::io_service::strand strand_; @@ -118,8 +119,8 @@ class connection_basic { // not-templated base class for rapid developmet of som public: // first counter is the ++/-- count of current sockets, the other socket_number is only-increasing ++ number generator - connection_basic(boost::asio::ip::tcp::socket&& socket, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); - connection_basic(boost::asio::io_service &io_service, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); + connection_basic(boost::asio::ip::tcp::socket&& socket, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); + connection_basic(boost::asio::io_service &io_service, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); virtual ~connection_basic() noexcept(false); diff --git a/contrib/epee/include/net/enums.h b/contrib/epee/include/net/enums.h index 078a4b274..2f27d07f9 100644 --- a/contrib/epee/include/net/enums.h +++ b/contrib/epee/include/net/enums.h @@ -49,7 +49,7 @@ namespace net_utils { invalid = 0, public_ = 1, // public is keyword - i2p = 2, + i2p = 2, // order from here changes priority of selection for origin TXes tor = 3 }; diff --git a/contrib/epee/include/net/http_protocol_handler.inl b/contrib/epee/include/net/http_protocol_handler.inl index 790d0f3b1..19bdf4ff0 100644 --- a/contrib/epee/include/net/http_protocol_handler.inl +++ b/contrib/epee/include/net/http_protocol_handler.inl @@ -591,11 +591,12 @@ namespace net_utils std::string response_data = get_response_header(response); //LOG_PRINT_L0("HTTP_SEND: << \r\n" << response_data + response.m_body); - LOG_PRINT_L3("HTTP_RESPONSE_HEAD: << \r\n" << response_data); - - m_psnd_hndlr->do_send((void*)response_data.data(), response_data.size()); + LOG_PRINT_L3("HTTP_RESPONSE_HEAD: << \r\n" << response_data); + if ((response.m_body.size() && (query_info.m_http_method != http::http_method_head)) || (query_info.m_http_method == http::http_method_options)) - m_psnd_hndlr->do_send((void*)response.m_body.data(), response.m_body.size()); + response_data += response.m_body; + + m_psnd_hndlr->do_send(byte_slice{std::move(response_data)}); m_psnd_hndlr->send_done(); return res; } diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h index a88a1eb49..f9b6f9a81 100644 --- a/contrib/epee/include/net/levin_base.h +++ b/contrib/epee/include/net/levin_base.h @@ -29,7 +29,11 @@ #ifndef _LEVIN_BASE_H_ #define _LEVIN_BASE_H_ +#include <cstdint> + +#include "byte_slice.h" #include "net_utils_base.h" +#include "span.h" #define LEVIN_SIGNATURE 0x0101010101012101LL //Bender's nightmare @@ -72,6 +76,8 @@ namespace levin #define LEVIN_PACKET_REQUEST 0x00000001 #define LEVIN_PACKET_RESPONSE 0x00000002 +#define LEVIN_PACKET_BEGIN 0x00000004 +#define LEVIN_PACKET_END 0x00000008 #define LEVIN_PROTOCOL_VER_0 0 @@ -118,9 +124,30 @@ namespace levin } } + //! \return Intialized levin header. + bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept; + + //! \return A levin notification message. + byte_slice make_notify(int command, epee::span<const std::uint8_t> payload); + + /*! Generate a dummy levin message. + \param noise_bytes Total size of the returned `byte_slice`. + \return `nullptr` if `noise_size` is smaller than the levin header. + Otherwise, a dummy levin message. */ + byte_slice make_noise_notify(std::size_t noise_bytes); + + /*! Generate 1+ levin messages that are identical to the noise message size. + + \param noise Each levin message will be identical to the size of this + message. The bytes from this message will be used for padding. + \return `nullptr` if `noise.size()` is less than the levin header size. + Otherwise, a levin notification message OR 2+ levin fragment messages. + Each message is `noise.size()` in length. */ + byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload); } } #endif //_LEVIN_BASE_H_ + diff --git a/contrib/epee/include/net/levin_protocol_handler.h b/contrib/epee/include/net/levin_protocol_handler.h index 791766762..c510cfd79 100644 --- a/contrib/epee/include/net/levin_protocol_handler.h +++ b/contrib/epee/include/net/levin_protocol_handler.h @@ -157,10 +157,9 @@ namespace levin m_current_head.m_return_code = m_config.m_pcommands_handler->invoke(m_current_head.m_command, buff_to_invoke, return_buff, m_conn_context); m_current_head.m_cb = return_buff.size(); m_current_head.m_have_to_return_data = false; - std::string send_buff((const char*)&m_current_head, sizeof(m_current_head)); - send_buff += return_buff; - if(!m_psnd_hndlr->do_send(send_buff.data(), send_buff.size())) + return_buff.insert(0, (const char*)&m_current_head, sizeof(m_current_head)); + if(!m_psnd_hndlr->do_send(byte_slice{std::move(return_buff)})) return false; } diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index 8d7ffb2c2..41f01e9a0 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -32,6 +32,7 @@ #include <boost/smart_ptr/make_shared.hpp> #include <atomic> +#include <deque> #include "levin_base.h" #include "buffer.h" @@ -91,6 +92,7 @@ public: int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id); + int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); bool close(boost::uuids::uuid connection_id); bool update_connection_context(const t_connection_context& contxt); bool request_callback(boost::uuids::uuid connection_id); @@ -117,6 +119,22 @@ public: template<class t_connection_context = net_utils::connection_context_base> class async_protocol_handler { + std::string m_fragment_buffer; + + bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response) + { + const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); + if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) + return false; + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); + return true; + } + public: typedef t_connection_context connection_context; typedef async_protocol_handler_config<t_connection_context> config_type; @@ -136,7 +154,6 @@ public: critical_section m_local_inv_buff_lock; std::string m_local_inv_buff; - critical_section m_send_lock; critical_section m_call_lock; volatile uint32_t m_wait_count; @@ -376,7 +393,12 @@ public: return false; } - if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size) + // these should never fail, but do runtime check for safety + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()"); + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()"); + + // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public + if(cb > m_config.m_max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size()) { MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size << ", packet received " << m_cache_in_buffer.size() + cb @@ -408,8 +430,38 @@ public: } break; } + { + std::string temp{}; epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb); + m_state = stream_state_head; + + // abstract_tcp_server2.h manages max bandwidth for a p2p link + if (!(m_current_head.m_flags & (LEVIN_PACKET_REQUEST | LEVIN_PACKET_RESPONSE))) + { + // special noise/fragment command + static constexpr const uint32_t both_flags = (LEVIN_PACKET_BEGIN | LEVIN_PACKET_END); + if ((m_current_head.m_flags & both_flags) == both_flags) + break; // noise message, skip to next message + + if (m_current_head.m_flags & LEVIN_PACKET_BEGIN) + m_fragment_buffer.clear(); + + m_fragment_buffer.append(reinterpret_cast<const char*>(buff_to_invoke.data()), buff_to_invoke.size()); + if (!(m_current_head.m_flags & LEVIN_PACKET_END)) + break; // skip to next message + + if (m_fragment_buffer.size() < sizeof(bucket_head2)) + { + MERROR(m_connection_context << "Fragmented data too small for levin header"); + return false; + } + + temp = std::move(m_fragment_buffer); + m_fragment_buffer.clear(); + std::memcpy(std::addressof(m_current_head), std::addressof(temp[0]), sizeof(bucket_head2)); + buff_to_invoke = {reinterpret_cast<const uint8_t*>(temp.data()) + sizeof(bucket_head2), temp.size() - sizeof(bucket_head2)}; + } bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE); @@ -458,43 +510,33 @@ public: if(m_current_head.m_have_to_return_data) { std::string return_buff; - m_current_head.m_return_code = m_config.m_pcommands_handler->invoke( - m_current_head.m_command, - buff_to_invoke, - return_buff, - m_connection_context); - m_current_head.m_cb = return_buff.size(); - m_current_head.m_have_to_return_data = false; - m_current_head.m_protocol_version = LEVIN_PROTOCOL_VER_1; - m_current_head.m_flags = LEVIN_PACKET_RESPONSE; -#if BYTE_ORDER == LITTLE_ENDIAN - std::string send_buff((const char*)&m_current_head, sizeof(m_current_head)); -#else - bucket_head2 head = m_current_head; - head.m_signature = SWAP64LE(head.m_signature); - head.m_cb = SWAP64LE(head.m_cb); - head.m_command = SWAP32LE(head.m_command); - head.m_return_code = SWAP32LE(head.m_return_code); - head.m_flags = SWAP32LE(head.m_flags); - head.m_protocol_version = SWAP32LE(head.m_protocol_version); - std::string send_buff((const char*)&head, sizeof(head)); -#endif - send_buff += return_buff; - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(send_buff.data(), send_buff.size())) + const uint32_t return_code = m_config.m_pcommands_handler->invoke( + m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context + ); + + bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); + head.m_return_code = SWAP32LE(return_code); + return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head)); + + if(!m_pservice_endpoint->do_send(byte_slice{std::move(return_buff)})) return false; - CRITICAL_REGION_END(); - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << m_current_head.m_cb - << ", flags" << m_current_head.m_flags - << ", r?=" << m_current_head.m_have_to_return_data - <<", cmd = " << m_current_head.m_command - << ", ver=" << m_current_head.m_protocol_version); + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); } else m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); } + // reuse small buffer + if (!temp.empty() && temp.capacity() <= 64 * 1024) + { + temp.clear(); + m_fragment_buffer = std::move(temp); + } } - m_state = stream_state_head; break; case stream_state_head: { @@ -584,26 +626,10 @@ public: break; } - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_cb = SWAP64LE(in_buff.size()); - head.m_have_to_return_data = true; - - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); - CRITICAL_REGION_BEGIN(m_send_lock); - CRITICAL_REGION_LOCAL1(m_invoke_response_handlers_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) - { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); - err_code = LEVIN_ERROR_CONNECTION; - break; - } + CRITICAL_REGION_BEGIN(m_invoke_response_handlers_lock); - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { LOG_ERROR_CC(m_connection_context, "Failed to do_send"); err_code = LEVIN_ERROR_CONNECTION; @@ -620,7 +646,7 @@ public: if (LEVIN_OK != err_code) { - epee::span<const uint8_t> stub_buff{(const uint8_t*)"", 0}; + epee::span<const uint8_t> stub_buff = nullptr; // Never call callback inside critical section, that can cause deadlock cb(err_code, stub_buff, m_connection_context); return false; @@ -642,35 +668,13 @@ public: if(m_deletion_initiated) return LEVIN_ERROR_CONNECTION_DESTROYED; - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_cb = SWAP64LE(in_buff.size()); - head.m_have_to_return_data = true; - - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) - { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); - return LEVIN_ERROR_CONNECTION; - } - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); + LOG_ERROR_CC(m_connection_context, "Failed to send request"); return LEVIN_ERROR_CONNECTION; } - CRITICAL_REGION_END(); - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb - << ", f=" << head.m_flags - << ", r?=" << head.m_have_to_return_data - << ", cmd = " << head.m_command - << ", ver=" << head.m_protocol_version); uint64_t ticks_start = misc_utils::get_tick_count(); size_t prev_size = 0; @@ -716,33 +720,38 @@ public: if(m_deletion_initiated) return LEVIN_ERROR_CONNECTION_DESTROYED; - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_have_to_return_data = false; - head.m_cb = SWAP64LE(in_buff.size()); - - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false)) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send()"); + LOG_ERROR_CC(m_connection_context, "Failed to send notify message"); return -1; } - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + return 1; + } + + /*! Sends `message` without adding a levin header. The message must have + been created with `make_notify`, `make_noise_notify` or + `make_fragmented_notify`. See additional instructions for + `make_fragmented_notify`. + + \return 1 on success */ + int send(byte_slice message) + { + const misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this) + ); + + if(m_deletion_initiated) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + const std::size_t length = message.size(); + if (!m_pservice_endpoint->do_send(std::move(message))) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send()"); + LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it"); return -1; } - CRITICAL_REGION_END(); - LOG_DEBUG_CC(m_connection_context, "LEVIN_PACKET_SENT. [len=" << head.m_cb << - ", f=" << head.m_flags << - ", r?=" << head.m_have_to_return_data << - ", cmd = " << head.m_command << - ", ver=" << head.m_protocol_version); + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]"); return 1; } //------------------------------------------------------------------------------------------ @@ -782,7 +791,7 @@ void async_protocol_handler_config<t_connection_context>::delete_connections(siz auto i = connections.end() - 1; async_protocol_handler<t_connection_context> *conn = m_connects.at(*i); del_connection(conn); - close(*i); + conn->close(); connections.erase(i); } catch (const std::out_of_range &e) @@ -923,6 +932,14 @@ int async_protocol_handler_config<t_connection_context>::notify(int command, con } //------------------------------------------------------------------------------------------ template<class t_connection_context> +int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id) +{ + async_protocol_handler<t_connection_context>* aph; + int r = find_and_lock_connection(connection_id, aph); + return LEVIN_OK == r ? aph->send(std::move(message)) : 0; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id) { CRITICAL_REGION_LOCAL(m_connects_lock); diff --git a/contrib/epee/include/net/local_ip.h b/contrib/epee/include/net/local_ip.h index ce74e1cd3..246cf6ad8 100644 --- a/contrib/epee/include/net/local_ip.h +++ b/contrib/epee/include/net/local_ip.h @@ -30,6 +30,11 @@ #include <string> #include <boost/algorithm/string/predicate.hpp> #include <boost/asio/ip/address_v6.hpp> +#include "int-util.h" + +// IP addresses are kept in network byte order +// Masks below are little endian +// -> convert from network byte order to host byte order before comparing namespace epee { @@ -62,6 +67,7 @@ namespace epee inline bool is_ip_local(uint32_t ip) { + ip = SWAP32LE(ip); /* local ip area 10.0.0.0 — 10.255.255.255 @@ -85,6 +91,7 @@ namespace epee inline bool is_ip_loopback(uint32_t ip) { + ip = SWAP32LE(ip); if( (ip | 0xffffff00) == 0xffffff7f) return true; //MAKE_IP diff --git a/contrib/epee/include/net/net_helper.h b/contrib/epee/include/net/net_helper.h index e315555fc..2b02eafa4 100644 --- a/contrib/epee/include/net/net_helper.h +++ b/contrib/epee/include/net/net_helper.h @@ -31,6 +31,7 @@ //#include <Winsock2.h> //#include <Ws2tcpip.h> +#include <atomic> #include <string> #include <boost/version.hpp> #include <boost/asio/io_service.hpp> @@ -154,7 +155,7 @@ namespace net_utils } inline - try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout, epee::net_utils::ssl_support_t ssl_support) + try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout) { m_deadline.expires_from_now(timeout); boost::unique_future<boost::asio::ip::tcp::socket> connection = m_connector(addr, port, m_deadline); @@ -174,11 +175,11 @@ namespace net_utils m_connected = true; m_deadline.expires_at(std::chrono::steady_clock::time_point::max()); // SSL Options - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { if (!m_ssl_options.handshake(*m_ssl_socket, boost::asio::ssl::stream_base::client, addr)) { - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { boost::system::error_code ignored_ec; m_ssl_socket->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); @@ -217,7 +218,7 @@ namespace net_utils // Get a list of endpoints corresponding to the server name. - try_connect_result_t try_connect_result = try_connect(addr, port, timeout, m_ssl_options.support); + try_connect_result_t try_connect_result = try_connect(addr, port, timeout); if (try_connect_result == CONNECT_FAILURE) return false; if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) @@ -226,7 +227,7 @@ namespace net_utils { MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL"); m_ssl_options.support = epee::net_utils::ssl_support_t::e_ssl_support_disabled; - if (try_connect(addr, port, timeout, m_ssl_options.support) != CONNECT_SUCCESS) + if (try_connect(addr, port, timeout) != CONNECT_SUCCESS) return false; } } @@ -562,7 +563,7 @@ namespace net_utils { m_deadline.cancel(); boost::system::error_code ec; - if(m_ssl_options.support != ssl_support_t::e_ssl_support_disabled) + if(m_ssl_options) shutdown_ssl(); m_ssl_socket->next_layer().cancel(ec); if(ec) diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h index 5ae3e53b3..028e605d7 100644 --- a/contrib/epee/include/net/net_utils_base.h +++ b/contrib/epee/include/net/net_utils_base.h @@ -34,9 +34,11 @@ #include <boost/asio/ip/address_v6.hpp> #include <typeinfo> #include <type_traits> +#include "byte_slice.h" #include "enums.h" -#include "serialization/keyvalue_serialization.h" #include "misc_log_ex.h" +#include "serialization/keyvalue_serialization.h" +#include "int-util.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net" @@ -90,7 +92,20 @@ namespace net_utils static constexpr bool is_blockable() noexcept { return true; } BEGIN_KV_SERIALIZE_MAP() - KV_SERIALIZE(m_ip) + if (is_store) + { + KV_SERIALIZE_VAL_POD_AS_BLOB_N(m_ip, "ip") + uint32_t ip = SWAP32LE(this_ref.m_ip); + epee::serialization::selector<is_store>::serialize(ip, stg, hparent_section, "m_ip"); + } + else + { + if (!epee::serialization::selector<is_store>::serialize_t_val_as_blob(this_ref.m_ip, stg, hparent_section, "ip")) + { + KV_SERIALIZE(m_ip) + const_cast<ipv4_network_address&>(this_ref).m_ip = SWAP32LE(this_ref.m_ip); + } + } KV_SERIALIZE(m_port) END_KV_SERIALIZE_MAP() }; @@ -424,7 +439,7 @@ namespace net_utils /************************************************************************/ struct i_service_endpoint { - virtual bool do_send(const void* ptr, size_t cb)=0; + virtual bool do_send(byte_slice message)=0; virtual bool close()=0; virtual bool send_done()=0; virtual bool call_run_once_service_io()=0; diff --git a/contrib/epee/include/readline_buffer.h b/contrib/epee/include/readline_buffer.h index 5968d243d..e8f75a9e1 100644 --- a/contrib/epee/include/readline_buffer.h +++ b/contrib/epee/include/readline_buffer.h @@ -40,5 +40,7 @@ namespace rdln readline_buffer* m_buffer; bool m_restart; }; + + void clear_screen(); } diff --git a/contrib/epee/include/storages/portable_storage_bin_utils.h b/contrib/epee/include/storages/portable_storage_bin_utils.h new file mode 100644 index 000000000..bcde64487 --- /dev/null +++ b/contrib/epee/include/storages/portable_storage_bin_utils.h @@ -0,0 +1,46 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include "int-util.h" + +template<typename T> T convert_swapper(T t) { return t; } +template<> inline uint16_t convert_swapper(uint16_t t) { return SWAP16LE(t); } +template<> inline int16_t convert_swapper(int16_t t) { return SWAP16LE((uint16_t&)t); } +template<> inline uint32_t convert_swapper(uint32_t t) { return SWAP32LE(t); } +template<> inline int32_t convert_swapper(int32_t t) { return SWAP32LE((uint32_t&)t); } +template<> inline uint64_t convert_swapper(uint64_t t) { return SWAP64LE(t); } +template<> inline int64_t convert_swapper(int64_t t) { return SWAP64LE((uint64_t&)t); } +template<> inline double convert_swapper(double t) { union { uint64_t u; double d; } u; u.d = t; u.u = SWAP64LE(u.u); return u.d; } + +#if BYTE_ORDER == BIG_ENDIAN +#define CONVERT_POD(x) convert_swapper(x) +#else +#define CONVERT_POD(x) (x) +#endif diff --git a/contrib/epee/include/storages/portable_storage_from_bin.h b/contrib/epee/include/storages/portable_storage_from_bin.h index e0a32b3ca..c0b6cc7b1 100644 --- a/contrib/epee/include/storages/portable_storage_from_bin.h +++ b/contrib/epee/include/storages/portable_storage_from_bin.h @@ -30,6 +30,7 @@ #include "misc_language.h" #include "portable_storage_base.h" +#include "portable_storage_bin_utils.h" #ifdef EPEE_PORTABLE_STORAGE_RECURSION_LIMIT #define EPEE_PORTABLE_STORAGE_RECURSION_LIMIT_INTERNAL EPEE_PORTABLE_STORAGE_RECURSION_LIMIT @@ -117,6 +118,7 @@ namespace epee RECURSION_LIMITATION(); static_assert(std::is_pod<t_pod_type>::value, "POD type expected"); read(&pod_val, sizeof(pod_val)); + pod_val = CONVERT_POD(pod_val); } template<class t_type> @@ -140,7 +142,7 @@ namespace epee sa.reserve(size); //TODO: add some optimization here later while(size--) - sa.m_array.push_back(read<type_name>()); + sa.m_array.push_back(read<type_name>()); return storage_entry(array_entry(sa)); } diff --git a/contrib/epee/include/storages/portable_storage_to_bin.h b/contrib/epee/include/storages/portable_storage_to_bin.h index 9501bbc2a..137497e19 100644 --- a/contrib/epee/include/storages/portable_storage_to_bin.h +++ b/contrib/epee/include/storages/portable_storage_to_bin.h @@ -31,6 +31,7 @@ #include "pragma_comp_defs.h" #include "misc_language.h" #include "portable_storage_base.h" +#include "portable_storage_bin_utils.h" namespace epee { @@ -40,8 +41,9 @@ namespace epee template<class pack_value, class t_stream> size_t pack_varint_t(t_stream& strm, uint8_t type_or, size_t& pv) { - pack_value v = (*((pack_value*)&pv)) << 2; + pack_value v = pv << 2; v |= type_or; + v = CONVERT_POD(v); strm.write((const char*)&v, sizeof(pack_value)); return sizeof(pack_value); } @@ -93,8 +95,11 @@ namespace epee uint8_t type = contained_type|SERIALIZE_FLAG_ARRAY; m_strm.write((const char*)&type, 1); pack_varint(m_strm, arr_pod.m_array.size()); - for(const t_pod_type& x: arr_pod.m_array) + for(t_pod_type x: arr_pod.m_array) + { + x = CONVERT_POD(x); m_strm.write((const char*)&x, sizeof(t_pod_type)); + } return true; } @@ -147,7 +152,8 @@ namespace epee bool pack_pod_type(uint8_t type, const pod_type& v) { m_strm.write((const char*)&type, 1); - m_strm.write((const char*)&v, sizeof(pod_type)); + pod_type v0 = CONVERT_POD(v); + m_strm.write((const char*)&v0, sizeof(pod_type)); return true; } //section, array_entry diff --git a/contrib/epee/include/syncobj.h b/contrib/epee/include/syncobj.h index 9f2404856..dba02f270 100644 --- a/contrib/epee/include/syncobj.h +++ b/contrib/epee/include/syncobj.h @@ -150,81 +150,6 @@ namespace epee }; -#if defined(WINDWOS_PLATFORM) - class shared_critical_section - { - public: - shared_critical_section() - { - ::InitializeSRWLock(&m_srw_lock); - } - ~shared_critical_section() - {} - - bool lock_shared() - { - AcquireSRWLockShared(&m_srw_lock); - return true; - } - bool unlock_shared() - { - ReleaseSRWLockShared(&m_srw_lock); - return true; - } - bool lock_exclusive() - { - ::AcquireSRWLockExclusive(&m_srw_lock); - return true; - } - bool unlock_exclusive() - { - ::ReleaseSRWLockExclusive(&m_srw_lock); - return true; - } - private: - SRWLOCK m_srw_lock; - }; - - - class shared_guard - { - public: - shared_guard(shared_critical_section& ref_sec):m_ref_sec(ref_sec) - { - m_ref_sec.lock_shared(); - } - - ~shared_guard() - { - m_ref_sec.unlock_shared(); - } - - private: - shared_critical_section& m_ref_sec; - }; - - - class exclusive_guard - { - public: - exclusive_guard(shared_critical_section& ref_sec):m_ref_sec(ref_sec) - { - m_ref_sec.lock_exclusive(); - } - - ~exclusive_guard() - { - m_ref_sec.unlock_exclusive(); - } - - private: - shared_critical_section& m_ref_sec; - }; -#endif - -#define SHARED_CRITICAL_REGION_BEGIN(x) { shared_guard critical_region_var(x) -#define EXCLUSIVE_CRITICAL_REGION_BEGIN(x) { exclusive_guard critical_region_var(x) - #define CRITICAL_REGION_LOCAL(x) {boost::this_thread::sleep_for(boost::chrono::milliseconds(epee::debug::g_test_dbg_lock_sleep()));} epee::critical_region_t<decltype(x)> critical_region_var(x) #define CRITICAL_REGION_BEGIN(x) { boost::this_thread::sleep_for(boost::chrono::milliseconds(epee::debug::g_test_dbg_lock_sleep())); epee::critical_region_t<decltype(x)> critical_region_var(x) #define CRITICAL_REGION_LOCAL1(x) {boost::this_thread::sleep_for(boost::chrono::milliseconds(epee::debug::g_test_dbg_lock_sleep()));} epee::critical_region_t<decltype(x)> critical_region_var1(x) @@ -232,22 +157,6 @@ namespace epee #define CRITICAL_REGION_END() } - -#if defined(WINDWOS_PLATFORM) - inline const char* get_wait_for_result_as_text(DWORD res) - { - switch(res) - { - case WAIT_ABANDONED: return "WAIT_ABANDONED"; - case WAIT_TIMEOUT: return "WAIT_TIMEOUT"; - case WAIT_OBJECT_0: return "WAIT_OBJECT_0"; - case WAIT_OBJECT_0+1: return "WAIT_OBJECT_1"; - case WAIT_OBJECT_0+2: return "WAIT_OBJECT_2"; - default: return "UNKNOWN CODE"; - } - } -#endif - } #endif |