diff options
author | luigi1111 <luigi1111w@gmail.com> | 2019-08-29 14:36:41 -0500 |
---|---|---|
committer | luigi1111 <luigi1111w@gmail.com> | 2019-08-29 14:36:41 -0500 |
commit | 98af2e954b78dc7607d0236a9db84b2143a33a90 (patch) | |
tree | 4e9be3baf6c4693d4e79fa105991afddd6402494 | |
parent | Merge pull request #5707 (diff) | |
parent | Added support for "noise" over I1P/Tor to mask Tx transmission. (diff) | |
download | monero-98af2e954b78dc7607d0236a9db84b2143a33a90.tar.xz |
Merge pull request #5793
bdfc63a Add ref-counted buffer byte_slice. Currently used for sending TCP data. (vtnerd)
3b24b1d Added support for 'noise' over I1P/Tor to mask Tx transmission. (vtnerd)
Diffstat (limited to '')
35 files changed, 3577 insertions, 279 deletions
diff --git a/ANONYMITY_NETWORKS.md b/ANONYMITY_NETWORKS.md index feb8528da..6eede44aa 100644 --- a/ANONYMITY_NETWORKS.md +++ b/ANONYMITY_NETWORKS.md @@ -160,25 +160,6 @@ the system clock is noticeably off (and therefore more fingerprintable), linking the public IPv4/IPv6 connections with the anonymity networks will be more difficult. -### Bandwidth Usage - -An ISP can passively monitor `monerod` connections from a node and observe when -a transaction is sent over a Tor/I2P connection via timing analysis + size of -data sent during that timeframe. I2P should provide better protection against -this attack - its connections are not circuit based. However, if a node is -only using I2P for broadcasting Monero transactions, the total aggregate of -I2P data would also leak information. - -#### Mitigation - -There is no current mitigation for the user right now. This attack is fairly -sophisticated, and likely requires support from the internet host of a Monero -user. - -In the near future, "whitening" the amount of data sent over anonymity network -connections will be performed. An attempt will be made to make a transaction -broadcast indistinguishable from a peer timed sync command. - ### Intermittent Monero Syncing If a user only runs `monerod` to send a transaction then quit, this can also @@ -208,3 +189,36 @@ is a tradeoff in potential isses. Also, anyone attempting this strategy really wants to uncover a user, it seems unlikely that this would be performed against every Tor/I2P user. +### I2P/Tor Stream Used Twice + +If a single I2P/Tor stream is used 2+ times for transmitting a transaction, the +operator of the hidden service can conclude that both transactions came from the +same source. If the subsequent transactions spend a change output from the +earlier transactions, this will also reveal the "real" spend in the ring +signature. This issue was (primarily) raised by @secparam on Twitter. + +#### Mitigation + +`monerod` currently selects two outgoing connections every 5 minutes for +transmitting transactions over I2P/Tor. Using outgoing connections prevents an +adversary from making many incoming connections to obtain information (this +technique was taken from Dandelion). Outgoing connections also do not have a +persistent public key identity - the creation of a new circuit will generate +a new public key identity. The lock time on a change address is ~20 minutes, so +`monerod` will have rotated its selected outgoing connections several times in +most cases. However, the number of outgoing connections is typically a small +fixed number, so there is a decent probability of re-use with the same public +key identity. + +@secparam (twitter) recommended changing circuits (Tor) as an additional +precaution. This is likely not a good idea - forcibly requesting Tor to change +circuits is observable by the ISP. Instead, `monerod` should likely disconnect +from peers ocassionally. Tor will rotate circuits every ~10 minutes, so +establishing new connections will use a new public key identity and make it +more difficult for the hidden service to link information. This process will +have to be done carefully because closing/reconnecting connections can also +leak information to hidden services if done improperly. + +At the current time, if users need to frequently make transactions, I2P/Tor +will improve privacy from ISPs and other common adversaries, but still have +some metadata leakages to unknown hidden service operators. diff --git a/LEVIN_PROTOCOL.md b/LEVIN_PROTOCOL.md new file mode 100644 index 000000000..207509146 --- /dev/null +++ b/LEVIN_PROTOCOL.md @@ -0,0 +1,165 @@ +# Levin Protocol +This is a document explaining the current design of the levin protocol, as +used by Monero. The protocol is largely inherited from cryptonote, but has +undergone some changes. + +This document also may differ from the `struct bucket_head2` in Monero's +code slightly - the spec here is slightly more strict to allow for +extensibility. + +One of the goals of this document is to clearly indicate what is being sent +"on the wire" to identify metadata that could de-anonymize users over I2P/Tor. +These issues will be addressed as they are found. See `ANONMITY_NETWORKS.md` in +the top-level folder for any outstanding issues. + +> This document does not currently list all data being sent by the monero +> protocol, that portion is a work-in-progress. Please take the time to do it +> if interested in learning about Monero p2p traffic! + + +## Header +This header is sent for every Monero p2p message. + +``` + 0 1 2 3 + 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| 0x01 | 0x21 | 0x01 | 0x01 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| 0x01 | 0x01 | 0x01 | 0x01 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length | +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| E. Response | Command ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Return Code ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |Q|S|B|E| Reserved ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | 0x01 | 0x00 | 0x00 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| 0x00 | ++-+-+-+-+-+-+-+-+ +``` + +### Signature +The first 8 bytes are the "signature" which helps identify the protocol (in +case someone connected to the wrong port, etc). The comments indicate that byte +sequence is from "benders nightmare". + +This also can be used by deep packet inspection (DPI) engines to identify +Monero when the link is not encrypted. SSL has been proposed as a means to +mitigate this issue, but BIP-151 or the Noise protocol should also be considered. + +### Length +The length is an unsigned 64-bit little endian integer. The length does _not_ +include the header. + +The implementation currently rejects received messages that exceed 100 MB +(base 10) by default. + +### Expect Response +A zero-byte if no response is expected from the peer, and a non-zero byte if a +response is expected from the peer. Peers must respond to requests with this +flag in the same order that they were received, however, other messages can be +sent between responses. + +There are some commands in the +[cryptonote protocol](#cryptonote-protocol-commands) where a response is +expected from the peer, but this flag is not set. Those responses are returned +as notify messages and can be sent in any order by the peer. + +### Command +An unsigned 32-bit little endian integer representing the Monero specific +command being invoked. + +### Return Code +A signed 32-bit little integer integer representing the response from the peer +from the last command that was invoked. This is `0` for request messages. + +### Flags + * `Q` - Bit is set if the message is a request. + * `S` - Bit is set if the message is a response. + * `B` - Bit is set if this is a the beginning of a [fragmented message](#fragmented-messages). + * `E` - Bit is set if this is the end of a [fragmented message](#fragmented-messages). + +### Version +A fixed value of `1` as an unsigned 32-bit little endian integer. + + +## Message Flow +The protocol can be subdivided into: (1) notifications, (2) requests, +(3) responses, (4) fragmented messages, and (5) dummy messages. Response +messages must be sent in the same order that a peer issued a request message. +A peer does not have to send a response immediately following a request - any +other message type can be sent instead. + +### Notifications +Notifications are one-way messages that can be sent at any time without +an expectation of a response from the peer. The `Q` bit must be set, the `S`, +`B` and `E` bits must be unset, and the `Expect Response` field must be zeroed. + +Some notifications must be in response to other notifications. This is not +part of the levin messaging layer, and is described in the +[commands](#commands) section. + +### Requests +Requests are the basis of the admin protocol for Monero. The `Q` bit must be +set, the `S`, `B` and `E` bits must be unset, and the `Expect Response` field +must be non-zero. The peer is expected to send a response message with the same +`command` number. + +### Responses +Response message can only be sent after a peer first issues a request message. +Responses must have the `S` bit set, the `Q`, `B` and `E` bits unset, and have +a zeroed `Expect Response` field. The `Command` field must be the same value +that was sent in the request message. The `Return Code` is specific to the +`Command` being issued (see [commands])(#commands)). + +### Fragmented +Fragmented messages were introduced for the "white noise" feature for i2p/tor. +A transaction can be sent in fragments to conceal when "real" data is being +sent instead of dummy messages. Only one fragmented message can be sent at a +time, and bits `B` and `E` are never set at the same time +(see [dummy messages](#dummy)). The re-constructed message must contain a +levin header for a different (non-fragment) message type. + +The `Q` and `S` bits are never set and the `Expect Response` field must always +be zero. The first fragment has the `B` bit set, neither `B` nor `E` is set for +"middle" fragments, and `E` is set for the last fragment. + +### Dummy +Dummy messages have the `B` and `E` bits set, the `Q` and `S` bits unset, and +the `Expect Reponse` field zeroed. When a message of this type is received, the +contents can be safely ignored. + + +## Commands +### P2P (Admin) Commands + +#### (`1001` Request) Handshake +#### (`1001` Response) Handshake +#### (`1002` Request) Timed Sync +#### (`1002` Response) Timed Sync +#### (`1003` Request) Ping +#### (`1003` Response) Ping +#### (`1004` Request) Stat Info +#### (`1004` Response) Stat Info +#### (`1005` Request) Network State +#### (`1005` Response) Network State +#### (`1006` Request) Peer ID +#### (`1006` Reponse) Peer ID +#### (`1007` Request) Support Flags +#### (`1007` Response) Support Flags + +### Cryptonote Protocol Commands + +#### (`2001` Notification) New Block +#### (`2002` Notification) New Transactions +#### (`2003` Notification) Request Get Objects +#### (`2004` Notification) Response Get Objects +#### (`2006` Notification) Request Chain +#### (`2007` Notification) Response Chain Entry +#### (`2008` Notification) New Fluffy Block +#### (`2009` Notification) Request Fluffy Missing TX diff --git a/contrib/epee/include/byte_slice.h b/contrib/epee/include/byte_slice.h new file mode 100644 index 000000000..1fbba101e --- /dev/null +++ b/contrib/epee/include/byte_slice.h @@ -0,0 +1,145 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include <cstddef> +#include <cstdint> +#include <memory> +#include <string> +#include <vector> + +#include "span.h" + +namespace epee +{ + struct byte_slice_data; + + struct release_byte_slice + { + void operator()(byte_slice_data*) const noexcept; + }; + + /*! Inspired by slices in golang. Storage is thread-safe reference counted, + allowing for cheap copies or range selection on the bytes. The bytes + owned by this class are always immutable. + + The functions `operator=`, `take_slice` and `remove_prefix` may alter the + reference count for the backing store, which will invalidate pointers + previously returned if the reference count is zero. Be careful about + "caching" pointers in these circumstances. */ + class byte_slice + { + /* A custom reference count is used instead of shared_ptr because it allows + for an allocation optimization for the span constructor. This also + reduces the size of this class by one pointer. */ + std::unique_ptr<byte_slice_data, release_byte_slice> storage_; + span<const std::uint8_t> portion_; // within storage_ + + //! Internal use only; use to increase `storage` reference count. + byte_slice(byte_slice_data* storage, span<const std::uint8_t> portion) noexcept; + + struct adapt_buffer{}; + + template<typename T> + explicit byte_slice(const adapt_buffer, T&& buffer); + + public: + using value_type = std::uint8_t; + using size_type = std::size_t; + using difference_type = std::ptrdiff_t; + using pointer = const std::uint8_t*; + using const_pointer = const std::uint8_t*; + using reference = std::uint8_t; + using const_reference = std::uint8_t; + using iterator = pointer; + using const_iterator = const_pointer; + + //! Construct empty slice. + byte_slice() noexcept + : storage_(nullptr), portion_() + {} + + //! Construct empty slice + byte_slice(std::nullptr_t) noexcept + : byte_slice() + {} + + //! Scatter-gather (copy) multiple `sources` into a single allocated slice. + explicit byte_slice(std::initializer_list<span<const std::uint8_t>> sources); + + //! Convert `buffer` into a slice using one allocation for shared count. + explicit byte_slice(std::vector<std::uint8_t>&& buffer); + + //! Convert `buffer` into a slice using one allocation for shared count. + explicit byte_slice(std::string&& buffer); + + byte_slice(byte_slice&& source) noexcept; + ~byte_slice() noexcept = default; + + //! \note May invalidate previously retrieved pointers. + byte_slice& operator=(byte_slice&&) noexcept; + + //! \return A shallow (cheap) copy of the data from `this` slice. + byte_slice clone() const noexcept { return {storage_.get(), portion_}; } + + iterator begin() const noexcept { return portion_.begin(); } + const_iterator cbegin() const noexcept { return portion_.begin(); } + + iterator end() const noexcept { return portion_.end(); } + const_iterator cend() const noexcept { return portion_.end(); } + + bool empty() const noexcept { return storage_ == nullptr; } + const std::uint8_t* data() const noexcept { return portion_.data(); } + std::size_t size() const noexcept { return portion_.size(); } + + /*! Drop bytes from the beginning of `this` slice. + + \note May invalidate previously retrieved pointers. + \post `this->size() = this->size() - std::min(this->size(), max_bytes)` + \post `if (this->size() <= max_bytes) this->data() = nullptr` + \return Number of bytes removed. */ + std::size_t remove_prefix(std::size_t max_bytes) noexcept; + + /*! "Take" bytes from the beginning of `this` slice. + + \note May invalidate previously retrieved pointers. + \post `this->size() = this->size() - std::min(this->size(), max_bytes)` + \post `if (this->size() <= max_bytes) this->data() = nullptr` + \return Slice containing the bytes removed from `this` slice. */ + byte_slice take_slice(std::size_t max_bytes) noexcept; + + /*! Return a shallow (cheap) copy of a slice from `begin` and `end` offsets. + + \throw std::out_of_range If `end < begin`. + \throw std::out_of_range If `size() < end`. + \return Slice starting at `data() + begin` of size `end - begin`. */ + byte_slice get_slice(std::size_t begin, std::size_t end) const; + }; +} // epee + diff --git a/contrib/epee/include/net/abstract_tcp_server2.h b/contrib/epee/include/net/abstract_tcp_server2.h index b38ab5399..3a2c5341d 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.h +++ b/contrib/epee/include/net/abstract_tcp_server2.h @@ -49,10 +49,12 @@ #include <boost/asio/ssl.hpp> #include <boost/array.hpp> #include <boost/noncopyable.hpp> -#include <boost/shared_ptr.hpp> +#include <boost/shared_ptr.hpp> //! \TODO Convert to std::shared_ptr #include <boost/enable_shared_from_this.hpp> #include <boost/interprocess/detail/atomic.hpp> #include <boost/thread/thread.hpp> +#include <memory> +#include "byte_slice.h" #include "net_utils_base.h" #include "syncobj.h" #include "connection_basic.hpp" @@ -90,25 +92,24 @@ namespace net_utils public: typedef typename t_protocol_handler::connection_context t_connection_context; - struct shared_state : connection_basic_shared_state + struct shared_state : connection_basic_shared_state, t_protocol_handler::config_type { shared_state() - : connection_basic_shared_state(), pfilter(nullptr), config(), stop_signal_sent(false) + : connection_basic_shared_state(), t_protocol_handler::config_type(), pfilter(nullptr), stop_signal_sent(false) {} i_connection_filter* pfilter; - typename t_protocol_handler::config_type config; bool stop_signal_sent; }; /// Construct a connection with the given io_service. explicit connection( boost::asio::io_service& io_service, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); explicit connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, epee::net_utils::ssl_support_t ssl_support); @@ -135,8 +136,7 @@ namespace net_utils private: //----------------- i_service_endpoint --------------------- - virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint) - virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data + virtual bool do_send(byte_slice message); ///< (see do_send from i_service_endpoint) virtual bool send_done(); virtual bool close(); virtual bool call_run_once_service_io(); @@ -145,6 +145,8 @@ namespace net_utils virtual bool add_ref(); virtual bool release(); //------------------------------------------------------ + bool do_send_chunk(byte_slice chunk); ///< will send (or queue) a part of data. internal use only + boost::shared_ptr<connection<t_protocol_handler> > safe_shared_from_this(); bool shutdown(); /// Handle completion of a receive operation. @@ -269,7 +271,13 @@ namespace net_utils typename t_protocol_handler::config_type& get_config_object() { assert(m_state != nullptr); // always set in constructor - return m_state->config; + return *m_state; + } + + std::shared_ptr<typename t_protocol_handler::config_type> get_config_shared() + { + assert(m_state != nullptr); // always set in constructor + return {m_state}; } int get_binded_port(){return m_port;} @@ -350,7 +358,7 @@ namespace net_utils bool is_thread_worker(); - const boost::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state; + const std::shared_ptr<typename connection<t_protocol_handler>::shared_state> m_state; /// The io_service used to perform asynchronous operations. struct worker diff --git a/contrib/epee/include/net/abstract_tcp_server2.inl b/contrib/epee/include/net/abstract_tcp_server2.inl index 19e9c9af9..12a87071a 100644 --- a/contrib/epee/include/net/abstract_tcp_server2.inl +++ b/contrib/epee/include/net/abstract_tcp_server2.inl @@ -68,7 +68,7 @@ namespace epee namespace net_utils { template<typename T> - T& check_and_get(boost::shared_ptr<T>& ptr) + T& check_and_get(std::shared_ptr<T>& ptr) { CHECK_AND_ASSERT_THROW_MES(bool(ptr), "shared_state cannot be null"); return *ptr; @@ -81,7 +81,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::io_service& io_service, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) @@ -91,13 +91,13 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> connection<t_protocol_handler>::connection( boost::asio::ip::tcp::socket&& sock, - boost::shared_ptr<shared_state> state, + std::shared_ptr<shared_state> state, t_connection_type connection_type, ssl_support_t ssl_support ) : connection_basic(std::move(sock), state, ssl_support), - m_protocol_handler(this, check_and_get(state).config, context), + m_protocol_handler(this, check_and_get(state), context), buffer_ssl_init_fill(0), m_connection_type( connection_type ), m_throttle_speed_in("speed_in", "throttle_speed_in"), @@ -378,7 +378,6 @@ PRAGMA_WARNING_DISABLE_VS(4355) if(!recv_res) { //_info("[sock " << socket().native_handle() << "] protocol_want_close"); - //some error in protocol, protocol handler ask to close connection boost::interprocess::ipcdetail::atomic_write32(&m_want_close_connection, 1); bool do_shutdown = false; @@ -520,7 +519,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) } //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool connection<t_protocol_handler>::do_send(const void* ptr, size_t cb) { + bool connection<t_protocol_handler>::do_send(byte_slice message) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -529,6 +528,9 @@ PRAGMA_WARNING_DISABLE_VS(4355) if (m_was_shutdown) return false; // TODO avoid copy + std::uint8_t const* const message_data = message.data(); + const std::size_t message_size = message.size(); + const double factor = 32; // TODO config typedef long long signed int t_safe; // my t_size to avoid any overunderflow in arithmetic const t_safe chunksize_good = (t_safe)( 1024 * std::max(1.0,factor) ); @@ -538,13 +540,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) CHECK_AND_ASSERT_MES(! (chunksize_max<0), false, "Negative chunksize_max" ); // make sure it is unsigned before removin sign with cast: long long unsigned int chunksize_max_unsigned = static_cast<long long unsigned int>( chunksize_max ) ; - if (allow_split && (cb > chunksize_max_unsigned)) { + if (allow_split && (message_size > chunksize_max_unsigned)) { { // LOCK: chunking epee::critical_region_t<decltype(m_chunking_lock)> send_guard(m_chunking_lock); // *** critical *** - MDEBUG("do_send() will SPLIT into small chunks, from packet="<<cb<<" B for ptr="<<ptr); - t_safe all = cb; // all bytes to send - t_safe pos = 0; // current sending position + MDEBUG("do_send() will SPLIT into small chunks, from packet="<<message_size<<" B for ptr="<<message_data); // 01234567890 // ^^^^ (pos=0, len=4) ; pos:=pos+len, pos=4 // ^^^^ (pos=4, len=4) ; pos:=pos+len, pos=8 @@ -554,40 +554,25 @@ PRAGMA_WARNING_DISABLE_VS(4355) // char* buf = new char[ bufsize ]; bool all_ok = true; - while (pos < all) { - t_safe lenall = all-pos; // length from here to end - t_safe len = std::min( chunksize_good , lenall); // take a smaller part - CHECK_AND_ASSERT_MES(len<=chunksize_good, false, "len too large"); - // pos=8; len=4; all=10; len=3; - - CHECK_AND_ASSERT_MES(! (len<0), false, "negative len"); // check before we cast away sign: - unsigned long long int len_unsigned = static_cast<long long int>( len ); - CHECK_AND_ASSERT_MES(len>0, false, "len not strictly positive"); // (redundant) - CHECK_AND_ASSERT_MES(len_unsigned < std::numeric_limits<size_t>::max(), false, "Invalid len_unsigned"); // yeap we want strong < then max size, to be sure - - void *chunk_start = ((char*)ptr) + pos; - MDEBUG("chunk_start="<<chunk_start<<" ptr="<<ptr<<" pos="<<pos); - CHECK_AND_ASSERT_MES(chunk_start >= ptr, false, "Pointer wraparound"); // not wrapped around address? - //std::memcpy( (void*)buf, chunk_start, len); - - MDEBUG("part of " << lenall << ": pos="<<pos << " len="<<len); - - bool ok = do_send_chunk(chunk_start, len); // <====== *** + while (!message.empty()) { + byte_slice chunk = message.take_slice(chunksize_good); + + MDEBUG("chunk_start="<<chunk.data()<<" ptr="<<message_data<<" pos="<<(chunk.data() - message_data)); + MDEBUG("part of " << message.size() << ": pos="<<(chunk.data() - message_data) << " len="<<chunk.size()); + + bool ok = do_send_chunk(std::move(chunk)); // <====== *** all_ok = all_ok && ok; if (!all_ok) { - MDEBUG("do_send() DONE ***FAILED*** from packet="<<cb<<" B for ptr="<<ptr); + MDEBUG("do_send() DONE ***FAILED*** from packet="<<message_size<<" B for ptr="<<message_data); MDEBUG("do_send() SEND was aborted in middle of big package - this is mostly harmless " - << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << cb); + << " (e.g. peer closed connection) but if it causes trouble tell us at #monero-dev. " << message_size); return false; // partial failure in sending } - pos = pos+len; - CHECK_AND_ASSERT_MES(pos >0, false, "pos <= 0"); - // (in catch block, or uniq pointer) delete buf; } // each chunk - MDEBUG("do_send() DONE SPLIT from packet="<<cb<<" B for ptr="<<ptr); + MDEBUG("do_send() DONE SPLIT from packet="<<message_size<<" B for ptr="<<message_data); MDEBUG("do_send() m_connection_type = " << m_connection_type); @@ -595,7 +580,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) } // LOCK: chunking } // a big block (to be chunked) - all chunks else { // small block - return do_send_chunk(ptr,cb); // just send as 1 big chunk + return do_send_chunk(std::move(message)); // just send as 1 big chunk } CATCH_ENTRY_L0("connection<t_protocol_handler>::do_send", false); @@ -603,7 +588,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) //--------------------------------------------------------------------------------- template<class t_protocol_handler> - bool connection<t_protocol_handler>::do_send_chunk(const void* ptr, size_t cb) + bool connection<t_protocol_handler>::do_send_chunk(byte_slice chunk) { TRY_ENTRY(); // Use safe_shared_from_this, because of this is public method and it can be called on the object being deleted @@ -615,7 +600,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) double current_speed_up; { CRITICAL_REGION_LOCAL(m_throttle_speed_out_mutex); - m_throttle_speed_out.handle_trafic_exact(cb); + m_throttle_speed_out.handle_trafic_exact(chunk.size()); current_speed_up = m_throttle_speed_out.get_current_speed(); } context.m_current_speed_up = current_speed_up; @@ -623,7 +608,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) //_info("[sock " << socket().native_handle() << "] SEND " << cb); context.m_last_send = time(NULL); - context.m_send_cnt += cb; + context.m_send_cnt += chunk.size(); //some data should be wrote to stream //request complete @@ -644,7 +629,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) }*/ long int ms = 250 + (rand()%50); - MDEBUG("Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<cb); // XXX debug sleep + MDEBUG("Sleeping because QUEUE is FULL, in " << __FUNCTION__ << " for " << ms << " ms before packet_size="<<chunk.size()); // XXX debug sleep m_send_que_lock.unlock(); boost::this_thread::sleep(boost::posix_time::milliseconds( ms ) ); m_send_que_lock.lock(); @@ -657,12 +642,11 @@ PRAGMA_WARNING_DISABLE_VS(4355) } } - m_send_que.resize(m_send_que.size()+1); - m_send_que.back().assign((const char*)ptr, cb); - + m_send_que.push_back(std::move(chunk)); + if(m_send_que.size() > 1) { // active operation should be in progress, nothing to do, just wait last operation callback - auto size_now = cb; + auto size_now = m_send_que.back().size(); MDEBUG("do_send_chunk() NOW just queues: packet="<<size_now<<" B, is added to queue-size="<<m_send_que.size()); //do_send_handler_delayed( ptr , size_now ); // (((H))) // empty function @@ -680,7 +664,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) auto size_now = m_send_que.front().size(); MDEBUG("do_send_chunk() NOW SENSD: packet="<<size_now<<" B"); if (speed_limit_is_enabled()) - do_send_handler_write( ptr , size_now ); // (((H))) + do_send_handler_write( m_send_que.back().data(), m_send_que.back().size() ); // (((H))) CHECK_AND_ASSERT_MES( size_now == m_send_que.front().size(), false, "Unexpected queue size"); reset_timer(get_default_timeout(), false); @@ -908,7 +892,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server( t_connection_type connection_type ) : - m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), + m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), m_io_service_local_instance(new worker()), io_service_(m_io_service_local_instance->io_service), acceptor_(io_service_), @@ -927,7 +911,7 @@ PRAGMA_WARNING_DISABLE_VS(4355) template<class t_protocol_handler> boosted_tcp_server<t_protocol_handler>::boosted_tcp_server(boost::asio::io_service& extarnal_io_service, t_connection_type connection_type) : - m_state(boost::make_shared<typename connection<t_protocol_handler>::shared_state>()), + m_state(std::make_shared<typename connection<t_protocol_handler>::shared_state>()), io_service_(extarnal_io_service), acceptor_(io_service_), acceptor_ipv6(io_service_), diff --git a/contrib/epee/include/net/connection_basic.hpp b/contrib/epee/include/net/connection_basic.hpp index 2acc6cdda..2f60f7604 100644 --- a/contrib/epee/include/net/connection_basic.hpp +++ b/contrib/epee/include/net/connection_basic.hpp @@ -49,6 +49,7 @@ #include <boost/asio.hpp> #include <boost/asio/ssl.hpp> +#include "byte_slice.h" #include "net/net_utils_base.h" #include "net/net_ssl.h" #include "syncobj.h" @@ -99,7 +100,7 @@ class connection_basic_pimpl; // PIMPL for this class class connection_basic { // not-templated base class for rapid developmet of some code parts // beware of removing const, net_utils::connection is sketchily doing a cast to prevent storing ptr twice - const boost::shared_ptr<connection_basic_shared_state> m_state; + const std::shared_ptr<connection_basic_shared_state> m_state; public: std::unique_ptr< connection_basic_pimpl > mI; // my Implementation @@ -108,7 +109,7 @@ class connection_basic { // not-templated base class for rapid developmet of som volatile uint32_t m_want_close_connection; std::atomic<bool> m_was_shutdown; critical_section m_send_que_lock; - std::list<std::string> m_send_que; + std::deque<byte_slice> m_send_que; volatile bool m_is_multithreaded; /// Strand to ensure the connection's handlers are not called concurrently. boost::asio::io_service::strand strand_; @@ -118,8 +119,8 @@ class connection_basic { // not-templated base class for rapid developmet of som public: // first counter is the ++/-- count of current sockets, the other socket_number is only-increasing ++ number generator - connection_basic(boost::asio::ip::tcp::socket&& socket, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); - connection_basic(boost::asio::io_service &io_service, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); + connection_basic(boost::asio::ip::tcp::socket&& socket, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); + connection_basic(boost::asio::io_service &io_service, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support); virtual ~connection_basic() noexcept(false); diff --git a/contrib/epee/include/net/enums.h b/contrib/epee/include/net/enums.h index 078a4b274..2f27d07f9 100644 --- a/contrib/epee/include/net/enums.h +++ b/contrib/epee/include/net/enums.h @@ -49,7 +49,7 @@ namespace net_utils { invalid = 0, public_ = 1, // public is keyword - i2p = 2, + i2p = 2, // order from here changes priority of selection for origin TXes tor = 3 }; diff --git a/contrib/epee/include/net/http_protocol_handler.inl b/contrib/epee/include/net/http_protocol_handler.inl index 790d0f3b1..19bdf4ff0 100644 --- a/contrib/epee/include/net/http_protocol_handler.inl +++ b/contrib/epee/include/net/http_protocol_handler.inl @@ -591,11 +591,12 @@ namespace net_utils std::string response_data = get_response_header(response); //LOG_PRINT_L0("HTTP_SEND: << \r\n" << response_data + response.m_body); - LOG_PRINT_L3("HTTP_RESPONSE_HEAD: << \r\n" << response_data); - - m_psnd_hndlr->do_send((void*)response_data.data(), response_data.size()); + LOG_PRINT_L3("HTTP_RESPONSE_HEAD: << \r\n" << response_data); + if ((response.m_body.size() && (query_info.m_http_method != http::http_method_head)) || (query_info.m_http_method == http::http_method_options)) - m_psnd_hndlr->do_send((void*)response.m_body.data(), response.m_body.size()); + response_data += response.m_body; + + m_psnd_hndlr->do_send(byte_slice{std::move(response_data)}); m_psnd_hndlr->send_done(); return res; } diff --git a/contrib/epee/include/net/levin_base.h b/contrib/epee/include/net/levin_base.h index a88a1eb49..f9b6f9a81 100644 --- a/contrib/epee/include/net/levin_base.h +++ b/contrib/epee/include/net/levin_base.h @@ -29,7 +29,11 @@ #ifndef _LEVIN_BASE_H_ #define _LEVIN_BASE_H_ +#include <cstdint> + +#include "byte_slice.h" #include "net_utils_base.h" +#include "span.h" #define LEVIN_SIGNATURE 0x0101010101012101LL //Bender's nightmare @@ -72,6 +76,8 @@ namespace levin #define LEVIN_PACKET_REQUEST 0x00000001 #define LEVIN_PACKET_RESPONSE 0x00000002 +#define LEVIN_PACKET_BEGIN 0x00000004 +#define LEVIN_PACKET_END 0x00000008 #define LEVIN_PROTOCOL_VER_0 0 @@ -118,9 +124,30 @@ namespace levin } } + //! \return Intialized levin header. + bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept; + + //! \return A levin notification message. + byte_slice make_notify(int command, epee::span<const std::uint8_t> payload); + + /*! Generate a dummy levin message. + \param noise_bytes Total size of the returned `byte_slice`. + \return `nullptr` if `noise_size` is smaller than the levin header. + Otherwise, a dummy levin message. */ + byte_slice make_noise_notify(std::size_t noise_bytes); + + /*! Generate 1+ levin messages that are identical to the noise message size. + + \param noise Each levin message will be identical to the size of this + message. The bytes from this message will be used for padding. + \return `nullptr` if `noise.size()` is less than the levin header size. + Otherwise, a levin notification message OR 2+ levin fragment messages. + Each message is `noise.size()` in length. */ + byte_slice make_fragmented_notify(const byte_slice& noise, int command, epee::span<const std::uint8_t> payload); } } #endif //_LEVIN_BASE_H_ + diff --git a/contrib/epee/include/net/levin_protocol_handler.h b/contrib/epee/include/net/levin_protocol_handler.h index 791766762..c510cfd79 100644 --- a/contrib/epee/include/net/levin_protocol_handler.h +++ b/contrib/epee/include/net/levin_protocol_handler.h @@ -157,10 +157,9 @@ namespace levin m_current_head.m_return_code = m_config.m_pcommands_handler->invoke(m_current_head.m_command, buff_to_invoke, return_buff, m_conn_context); m_current_head.m_cb = return_buff.size(); m_current_head.m_have_to_return_data = false; - std::string send_buff((const char*)&m_current_head, sizeof(m_current_head)); - send_buff += return_buff; - if(!m_psnd_hndlr->do_send(send_buff.data(), send_buff.size())) + return_buff.insert(0, (const char*)&m_current_head, sizeof(m_current_head)); + if(!m_psnd_hndlr->do_send(byte_slice{std::move(return_buff)})) return false; } diff --git a/contrib/epee/include/net/levin_protocol_handler_async.h b/contrib/epee/include/net/levin_protocol_handler_async.h index 8d7ffb2c2..208911e1a 100644 --- a/contrib/epee/include/net/levin_protocol_handler_async.h +++ b/contrib/epee/include/net/levin_protocol_handler_async.h @@ -32,6 +32,7 @@ #include <boost/smart_ptr/make_shared.hpp> #include <atomic> +#include <deque> #include "levin_base.h" #include "buffer.h" @@ -91,6 +92,7 @@ public: int invoke_async(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id, const callback_t &cb, size_t timeout = LEVIN_DEFAULT_TIMEOUT_PRECONFIGURED); int notify(int command, const epee::span<const uint8_t> in_buff, boost::uuids::uuid connection_id); + int send(epee::byte_slice message, const boost::uuids::uuid& connection_id); bool close(boost::uuids::uuid connection_id); bool update_connection_context(const t_connection_context& contxt); bool request_callback(boost::uuids::uuid connection_id); @@ -117,6 +119,22 @@ public: template<class t_connection_context = net_utils::connection_context_base> class async_protocol_handler { + std::string m_fragment_buffer; + + bool send_message(uint32_t command, epee::span<const uint8_t> in_buff, uint32_t flags, bool expect_response) + { + const bucket_head2 head = make_header(command, in_buff.size(), flags, expect_response); + if(!m_pservice_endpoint->do_send(byte_slice{as_byte_span(head), in_buff})) + return false; + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); + return true; + } + public: typedef t_connection_context connection_context; typedef async_protocol_handler_config<t_connection_context> config_type; @@ -136,7 +154,6 @@ public: critical_section m_local_inv_buff_lock; std::string m_local_inv_buff; - critical_section m_send_lock; critical_section m_call_lock; volatile uint32_t m_wait_count; @@ -376,7 +393,12 @@ public: return false; } - if(m_cache_in_buffer.size() + cb > m_config.m_max_packet_size) + // these should never fail, but do runtime check for safety + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size >= m_cache_in_buffer.size(), false, "Bad m_cache_in_buffer.size()"); + CHECK_AND_ASSERT_MES(m_config.m_max_packet_size - m_cache_in_buffer.size() >= m_fragment_buffer.size(), false, "Bad m_cache_in_buffer.size() + m_fragment_buffer.size()"); + + // flipped to subtraction; prevent overflow since m_max_packet_size is variable and public + if(cb > m_config.m_max_packet_size - m_cache_in_buffer.size() - m_fragment_buffer.size()) { MWARNING(m_connection_context << "Maximum packet size exceed!, m_max_packet_size = " << m_config.m_max_packet_size << ", packet received " << m_cache_in_buffer.size() + cb @@ -408,8 +430,38 @@ public: } break; } + { + std::string temp{}; epee::span<const uint8_t> buff_to_invoke = m_cache_in_buffer.carve((std::string::size_type)m_current_head.m_cb); + m_state = stream_state_head; + + // abstract_tcp_server2.h manages max bandwidth for a p2p link + if (!(m_current_head.m_flags & (LEVIN_PACKET_REQUEST | LEVIN_PACKET_RESPONSE))) + { + // special noise/fragment command + static constexpr const uint32_t both_flags = (LEVIN_PACKET_BEGIN | LEVIN_PACKET_END); + if ((m_current_head.m_flags & both_flags) == both_flags) + break; // noise message, skip to next message + + if (m_current_head.m_flags & LEVIN_PACKET_BEGIN) + m_fragment_buffer.clear(); + + m_fragment_buffer.append(reinterpret_cast<const char*>(buff_to_invoke.data()), buff_to_invoke.size()); + if (!(m_current_head.m_flags & LEVIN_PACKET_END)) + break; // skip to next message + + if (m_fragment_buffer.size() < sizeof(bucket_head2)) + { + MERROR(m_connection_context << "Fragmented data too small for levin header"); + return false; + } + + temp = std::move(m_fragment_buffer); + m_fragment_buffer.clear(); + std::memcpy(std::addressof(m_current_head), std::addressof(temp[0]), sizeof(bucket_head2)); + buff_to_invoke = {reinterpret_cast<const uint8_t*>(temp.data()) + sizeof(bucket_head2), temp.size() - sizeof(bucket_head2)}; + } bool is_response = (m_oponent_protocol_ver == LEVIN_PROTOCOL_VER_1 && m_current_head.m_flags&LEVIN_PACKET_RESPONSE); @@ -458,43 +510,33 @@ public: if(m_current_head.m_have_to_return_data) { std::string return_buff; - m_current_head.m_return_code = m_config.m_pcommands_handler->invoke( - m_current_head.m_command, - buff_to_invoke, - return_buff, - m_connection_context); - m_current_head.m_cb = return_buff.size(); - m_current_head.m_have_to_return_data = false; - m_current_head.m_protocol_version = LEVIN_PROTOCOL_VER_1; - m_current_head.m_flags = LEVIN_PACKET_RESPONSE; -#if BYTE_ORDER == LITTLE_ENDIAN - std::string send_buff((const char*)&m_current_head, sizeof(m_current_head)); -#else - bucket_head2 head = m_current_head; - head.m_signature = SWAP64LE(head.m_signature); - head.m_cb = SWAP64LE(head.m_cb); - head.m_command = SWAP32LE(head.m_command); - head.m_return_code = SWAP32LE(head.m_return_code); - head.m_flags = SWAP32LE(head.m_flags); - head.m_protocol_version = SWAP32LE(head.m_protocol_version); - std::string send_buff((const char*)&head, sizeof(head)); -#endif - send_buff += return_buff; - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(send_buff.data(), send_buff.size())) + const uint32_t return_code = m_config.m_pcommands_handler->invoke( + m_current_head.m_command, buff_to_invoke, return_buff, m_connection_context + ); + + bucket_head2 head = make_header(m_current_head.m_command, return_buff.size(), LEVIN_PACKET_RESPONSE, false); + head.m_return_code = SWAP32LE(return_code); + return_buff.insert(0, reinterpret_cast<const char*>(&head), sizeof(head)); + + if(!m_pservice_endpoint->do_send(byte_slice{std::move(return_buff)})) return false; - CRITICAL_REGION_END(); - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << m_current_head.m_cb - << ", flags" << m_current_head.m_flags - << ", r?=" << m_current_head.m_have_to_return_data - <<", cmd = " << m_current_head.m_command - << ", ver=" << m_current_head.m_protocol_version); + + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb + << ", flags" << head.m_flags + << ", r?=" << head.m_have_to_return_data + <<", cmd = " << head.m_command + << ", ver=" << head.m_protocol_version); } else m_config.m_pcommands_handler->notify(m_current_head.m_command, buff_to_invoke, m_connection_context); } + // reuse small buffer + if (!temp.empty() && temp.capacity() <= 64 * 1024) + { + temp.clear(); + m_fragment_buffer = std::move(temp); + } } - m_state = stream_state_head; break; case stream_state_head: { @@ -584,26 +626,10 @@ public: break; } - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_cb = SWAP64LE(in_buff.size()); - head.m_have_to_return_data = true; - - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); - CRITICAL_REGION_BEGIN(m_send_lock); - CRITICAL_REGION_LOCAL1(m_invoke_response_handlers_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) - { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); - err_code = LEVIN_ERROR_CONNECTION; - break; - } + CRITICAL_REGION_BEGIN(m_invoke_response_handlers_lock); - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + if(!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { LOG_ERROR_CC(m_connection_context, "Failed to do_send"); err_code = LEVIN_ERROR_CONNECTION; @@ -620,7 +646,7 @@ public: if (LEVIN_OK != err_code) { - epee::span<const uint8_t> stub_buff{(const uint8_t*)"", 0}; + epee::span<const uint8_t> stub_buff = nullptr; // Never call callback inside critical section, that can cause deadlock cb(err_code, stub_buff, m_connection_context); return false; @@ -642,35 +668,13 @@ public: if(m_deletion_initiated) return LEVIN_ERROR_CONNECTION_DESTROYED; - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_cb = SWAP64LE(in_buff.size()); - head.m_have_to_return_data = true; - - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - boost::interprocess::ipcdetail::atomic_write32(&m_invoke_buf_ready, 0); - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) - { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); - return LEVIN_ERROR_CONNECTION; - } - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, true)) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send"); + LOG_ERROR_CC(m_connection_context, "Failed to send request"); return LEVIN_ERROR_CONNECTION; } - CRITICAL_REGION_END(); - - MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << head.m_cb - << ", f=" << head.m_flags - << ", r?=" << head.m_have_to_return_data - << ", cmd = " << head.m_command - << ", ver=" << head.m_protocol_version); uint64_t ticks_start = misc_utils::get_tick_count(); size_t prev_size = 0; @@ -716,33 +720,38 @@ public: if(m_deletion_initiated) return LEVIN_ERROR_CONNECTION_DESTROYED; - bucket_head2 head = {0}; - head.m_signature = SWAP64LE(LEVIN_SIGNATURE); - head.m_have_to_return_data = false; - head.m_cb = SWAP64LE(in_buff.size()); - - head.m_command = SWAP32LE(command); - head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); - head.m_flags = SWAP32LE(LEVIN_PACKET_REQUEST); - CRITICAL_REGION_BEGIN(m_send_lock); - if(!m_pservice_endpoint->do_send(&head, sizeof(head))) + if (!send_message(command, in_buff, LEVIN_PACKET_REQUEST, false)) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send()"); + LOG_ERROR_CC(m_connection_context, "Failed to send notify message"); return -1; } - if(!m_pservice_endpoint->do_send(in_buff.data(), in_buff.size())) + return 1; + } + + /*! Sends `message` without adding a levin header. The message must have + been created with `make_notify`, `make_noise_notify` or + `make_fragmented_notify`. See additional instructions for + `make_fragmented_notify`. + + \return 1 on success */ + int send(byte_slice message) + { + const misc_utils::auto_scope_leave_caller scope_exit_handler = misc_utils::create_scope_leave_handler( + boost::bind(&async_protocol_handler::finish_outer_call, this) + ); + + if(m_deletion_initiated) + return LEVIN_ERROR_CONNECTION_DESTROYED; + + const std::size_t length = message.size(); + if (!m_pservice_endpoint->do_send(std::move(message))) { - LOG_ERROR_CC(m_connection_context, "Failed to do_send()"); + LOG_ERROR_CC(m_connection_context, "Failed to send message, dropping it"); return -1; } - CRITICAL_REGION_END(); - LOG_DEBUG_CC(m_connection_context, "LEVIN_PACKET_SENT. [len=" << head.m_cb << - ", f=" << head.m_flags << - ", r?=" << head.m_have_to_return_data << - ", cmd = " << head.m_command << - ", ver=" << head.m_protocol_version); + MDEBUG(m_connection_context << "LEVIN_PACKET_SENT. [len=" << (length - sizeof(bucket_head2)) << ", r?=0]"); return 1; } //------------------------------------------------------------------------------------------ @@ -923,6 +932,14 @@ int async_protocol_handler_config<t_connection_context>::notify(int command, con } //------------------------------------------------------------------------------------------ template<class t_connection_context> +int async_protocol_handler_config<t_connection_context>::send(byte_slice message, const boost::uuids::uuid& connection_id) +{ + async_protocol_handler<t_connection_context>* aph; + int r = find_and_lock_connection(connection_id, aph); + return LEVIN_OK == r ? aph->send(std::move(message)) : 0; +} +//------------------------------------------------------------------------------------------ +template<class t_connection_context> bool async_protocol_handler_config<t_connection_context>::close(boost::uuids::uuid connection_id) { CRITICAL_REGION_LOCAL(m_connects_lock); diff --git a/contrib/epee/include/net/net_helper.h b/contrib/epee/include/net/net_helper.h index e315555fc..2b02eafa4 100644 --- a/contrib/epee/include/net/net_helper.h +++ b/contrib/epee/include/net/net_helper.h @@ -31,6 +31,7 @@ //#include <Winsock2.h> //#include <Ws2tcpip.h> +#include <atomic> #include <string> #include <boost/version.hpp> #include <boost/asio/io_service.hpp> @@ -154,7 +155,7 @@ namespace net_utils } inline - try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout, epee::net_utils::ssl_support_t ssl_support) + try_connect_result_t try_connect(const std::string& addr, const std::string& port, std::chrono::milliseconds timeout) { m_deadline.expires_from_now(timeout); boost::unique_future<boost::asio::ip::tcp::socket> connection = m_connector(addr, port, m_deadline); @@ -174,11 +175,11 @@ namespace net_utils m_connected = true; m_deadline.expires_at(std::chrono::steady_clock::time_point::max()); // SSL Options - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_enabled || m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { if (!m_ssl_options.handshake(*m_ssl_socket, boost::asio::ssl::stream_base::client, addr)) { - if (ssl_support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) + if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) { boost::system::error_code ignored_ec; m_ssl_socket->next_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); @@ -217,7 +218,7 @@ namespace net_utils // Get a list of endpoints corresponding to the server name. - try_connect_result_t try_connect_result = try_connect(addr, port, timeout, m_ssl_options.support); + try_connect_result_t try_connect_result = try_connect(addr, port, timeout); if (try_connect_result == CONNECT_FAILURE) return false; if (m_ssl_options.support == epee::net_utils::ssl_support_t::e_ssl_support_autodetect) @@ -226,7 +227,7 @@ namespace net_utils { MERROR("SSL handshake failed on an autodetect connection, reconnecting without SSL"); m_ssl_options.support = epee::net_utils::ssl_support_t::e_ssl_support_disabled; - if (try_connect(addr, port, timeout, m_ssl_options.support) != CONNECT_SUCCESS) + if (try_connect(addr, port, timeout) != CONNECT_SUCCESS) return false; } } @@ -562,7 +563,7 @@ namespace net_utils { m_deadline.cancel(); boost::system::error_code ec; - if(m_ssl_options.support != ssl_support_t::e_ssl_support_disabled) + if(m_ssl_options) shutdown_ssl(); m_ssl_socket->next_layer().cancel(ec); if(ec) diff --git a/contrib/epee/include/net/net_utils_base.h b/contrib/epee/include/net/net_utils_base.h index 5ae3e53b3..dd80fae8b 100644 --- a/contrib/epee/include/net/net_utils_base.h +++ b/contrib/epee/include/net/net_utils_base.h @@ -34,9 +34,10 @@ #include <boost/asio/ip/address_v6.hpp> #include <typeinfo> #include <type_traits> +#include "byte_slice.h" #include "enums.h" -#include "serialization/keyvalue_serialization.h" #include "misc_log_ex.h" +#include "serialization/keyvalue_serialization.h" #undef MONERO_DEFAULT_LOG_CATEGORY #define MONERO_DEFAULT_LOG_CATEGORY "net" @@ -424,7 +425,7 @@ namespace net_utils /************************************************************************/ struct i_service_endpoint { - virtual bool do_send(const void* ptr, size_t cb)=0; + virtual bool do_send(byte_slice message)=0; virtual bool close()=0; virtual bool send_done()=0; virtual bool call_run_once_service_io()=0; diff --git a/contrib/epee/src/CMakeLists.txt b/contrib/epee/src/CMakeLists.txt index 2465afebb..c512e3b86 100644 --- a/contrib/epee/src/CMakeLists.txt +++ b/contrib/epee/src/CMakeLists.txt @@ -26,8 +26,8 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -add_library(epee STATIC hex.cpp http_auth.cpp mlog.cpp net_helper.cpp net_utils_base.cpp string_tools.cpp wipeable_string.cpp memwipe.c - connection_basic.cpp network_throttle.cpp network_throttle-detail.cpp mlocker.cpp buffer.cpp net_ssl.cpp) +add_library(epee STATIC byte_slice.cpp hex.cpp http_auth.cpp mlog.cpp net_helper.cpp net_utils_base.cpp string_tools.cpp wipeable_string.cpp + levin_base.cpp memwipe.c connection_basic.cpp network_throttle.cpp network_throttle-detail.cpp mlocker.cpp buffer.cpp net_ssl.cpp) if (USE_READLINE AND GNU_READLINE_FOUND) add_library(epee_readline STATIC readline_buffer.cpp) diff --git a/contrib/epee/src/byte_slice.cpp b/contrib/epee/src/byte_slice.cpp new file mode 100644 index 000000000..216049e5b --- /dev/null +++ b/contrib/epee/src/byte_slice.cpp @@ -0,0 +1,209 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include <atomic> +#include <cstdlib> +#include <cstring> +#include <limits> +#include <stdexcept> +#include <utility> + +#include "byte_slice.h" + +namespace epee +{ + struct byte_slice_data + { + byte_slice_data() noexcept + : ref_count(1) + {} + + virtual ~byte_slice_data() noexcept + {} + + std::atomic<std::size_t> ref_count; + }; + + void release_byte_slice::operator()(byte_slice_data* ptr) const noexcept + { + if (ptr && --(ptr->ref_count) == 0) + { + ptr->~byte_slice_data(); + free(ptr); + } + } + + namespace + { + template<typename T> + struct adapted_byte_slice final : byte_slice_data + { + explicit adapted_byte_slice(T&& buffer) + : byte_slice_data(), buffer(std::move(buffer)) + {} + + virtual ~adapted_byte_slice() noexcept final override + {} + + const T buffer; + }; + + // bytes "follow" this structure in memory slab + struct raw_byte_slice final : byte_slice_data + { + raw_byte_slice() noexcept + : byte_slice_data() + {} + + virtual ~raw_byte_slice() noexcept final override + {} + }; + + /* This technique is not-standard, but allows for the reference count and + memory for the bytes (when given a list of spans) to be allocated in a + single call. In that situation, the dynamic sized bytes are after/behind + the raw_byte_slice class. The C runtime has to track the number of bytes + allocated regardless, so free'ing is relatively easy. */ + + template<typename T, typename... U> + std::unique_ptr<T, release_byte_slice> allocate_slice(std::size_t extra_bytes, U&&... args) + { + if (std::numeric_limits<std::size_t>::max() - sizeof(T) < extra_bytes) + throw std::bad_alloc{}; + + void* const ptr = malloc(sizeof(T) + extra_bytes); + if (ptr == nullptr) + throw std::bad_alloc{}; + + try + { + new (ptr) T{std::forward<U>(args)...}; + } + catch (...) + { + free(ptr); + throw; + } + return std::unique_ptr<T, release_byte_slice>{reinterpret_cast<T*>(ptr)}; + } + } // anonymous + + byte_slice::byte_slice(byte_slice_data* storage, span<const std::uint8_t> portion) noexcept + : storage_(storage), portion_(portion) + { + if (storage_) + ++(storage_->ref_count); + } + + template<typename T> + byte_slice::byte_slice(const adapt_buffer, T&& buffer) + : storage_(nullptr), portion_(to_byte_span(to_span(buffer))) + { + if (!buffer.empty()) + storage_ = allocate_slice<adapted_byte_slice<T>>(0, std::move(buffer)); + } + + byte_slice::byte_slice(std::initializer_list<span<const std::uint8_t>> sources) + : byte_slice() + { + std::size_t space_needed = 0; + for (const auto source : sources) + space_needed += source.size(); + + if (space_needed) + { + auto storage = allocate_slice<raw_byte_slice>(space_needed); + span<std::uint8_t> out{reinterpret_cast<std::uint8_t*>(storage.get() + 1), space_needed}; + portion_ = {out.data(), out.size()}; + + for (const auto source : sources) + { + std::memcpy(out.data(), source.data(), source.size()); + if (out.remove_prefix(source.size()) < source.size()) + throw std::bad_alloc{}; // size_t overflow on space_needed + } + storage_ = std::move(storage); + } + } + + byte_slice::byte_slice(std::string&& buffer) + : byte_slice(adapt_buffer{}, std::move(buffer)) + {} + + byte_slice::byte_slice(std::vector<std::uint8_t>&& buffer) + : byte_slice(adapt_buffer{}, std::move(buffer)) + {} + + byte_slice::byte_slice(byte_slice&& source) noexcept + : storage_(std::move(source.storage_)), portion_(source.portion_) + { + source.portion_ = epee::span<const std::uint8_t>{}; + } + + byte_slice& byte_slice::operator=(byte_slice&& source) noexcept + { + storage_ = std::move(source.storage_); + portion_ = source.portion_; + if (source.storage_ == nullptr) + source.portion_ = epee::span<const std::uint8_t>{}; + + return *this; + } + + std::size_t byte_slice::remove_prefix(std::size_t max_bytes) noexcept + { + max_bytes = portion_.remove_prefix(max_bytes); + if (portion_.empty()) + storage_ = nullptr; + return max_bytes; + } + + byte_slice byte_slice::take_slice(const std::size_t max_bytes) noexcept + { + byte_slice out{}; + std::uint8_t const* const ptr = data(); + out.portion_ = {ptr, portion_.remove_prefix(max_bytes)}; + + if (portion_.empty()) + out.storage_ = std::move(storage_); // no atomic inc/dec + else + out = {storage_.get(), out.portion_}; + + return out; + } + + byte_slice byte_slice::get_slice(const std::size_t begin, const std::size_t end) const + { + if (end < begin || portion_.size() < end) + throw std::out_of_range{"bad slice range"}; + + if (begin == end) + return {}; + return {storage_.get(), {portion_.begin() + begin, end - begin}}; + } +} // epee diff --git a/contrib/epee/src/connection_basic.cpp b/contrib/epee/src/connection_basic.cpp index 82d9e3b53..7526dde26 100644 --- a/contrib/epee/src/connection_basic.cpp +++ b/contrib/epee/src/connection_basic.cpp @@ -128,7 +128,7 @@ connection_basic_pimpl::connection_basic_pimpl(const std::string &name) : m_thro int connection_basic_pimpl::m_default_tos; // methods: -connection_basic::connection_basic(boost::asio::ip::tcp::socket&& sock, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support) +connection_basic::connection_basic(boost::asio::ip::tcp::socket&& sock, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support) : m_state(std::move(state)), mI( new connection_basic_pimpl("peer") ), @@ -152,7 +152,7 @@ connection_basic::connection_basic(boost::asio::ip::tcp::socket&& sock, boost::s _note("Spawned connection #"<<mI->m_peer_number<<" to " << remote_addr_str << " currently we have sockets count:" << m_state->sock_count); } -connection_basic::connection_basic(boost::asio::io_service &io_service, boost::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support) +connection_basic::connection_basic(boost::asio::io_service &io_service, std::shared_ptr<connection_basic_shared_state> state, ssl_support_t ssl_support) : m_state(std::move(state)), mI( new connection_basic_pimpl("peer") ), diff --git a/contrib/epee/src/levin_base.cpp b/contrib/epee/src/levin_base.cpp new file mode 100644 index 000000000..ff845e2a7 --- /dev/null +++ b/contrib/epee/src/levin_base.cpp @@ -0,0 +1,128 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "net/levin_base.h" + +#include "int-util.h" + +namespace epee +{ +namespace levin +{ + bucket_head2 make_header(uint32_t command, uint64_t msg_size, uint32_t flags, bool expect_response) noexcept + { + bucket_head2 head = {0}; + head.m_signature = SWAP64LE(LEVIN_SIGNATURE); + head.m_have_to_return_data = expect_response; + head.m_cb = SWAP64LE(msg_size); + + head.m_command = SWAP32LE(command); + head.m_protocol_version = SWAP32LE(LEVIN_PROTOCOL_VER_1); + head.m_flags = SWAP32LE(flags); + return head; + } + + byte_slice make_notify(int command, epee::span<const std::uint8_t> payload) + { + const bucket_head2 head = make_header(command, payload.size(), LEVIN_PACKET_REQUEST, false); + return byte_slice{epee::as_byte_span(head), payload}; + } + + byte_slice make_noise_notify(const std::size_t noise_bytes) + { + static constexpr const std::uint32_t flags = + LEVIN_PACKET_BEGIN | LEVIN_PACKET_END; + + if (noise_bytes < sizeof(bucket_head2)) + return nullptr; + + std::string buffer(noise_bytes, char(0)); + const bucket_head2 head = make_header(0, noise_bytes - sizeof(bucket_head2), flags, false); + std::memcpy(std::addressof(buffer[0]), std::addressof(head), sizeof(head)); + + return byte_slice{std::move(buffer)}; + } + + byte_slice make_fragmented_notify(const byte_slice& noise_message, int command, epee::span<const std::uint8_t> payload) + { + const size_t noise_size = noise_message.size(); + if (noise_size < sizeof(bucket_head2) * 2) + return nullptr; + + if (payload.size() <= noise_size - sizeof(bucket_head2)) + { + /* The entire message can be sent at once, and the levin binary parser + will ignore extra bytes. So just pad with zeroes and otherwise send + a "normal", not fragmented message. */ + const size_t padding = noise_size - sizeof(bucket_head2) - payload.size(); + const span<const uint8_t> padding_bytes{noise_message.end() - padding, padding}; + + const bucket_head2 head = make_header(command, noise_size - sizeof(bucket_head2), LEVIN_PACKET_REQUEST, false); + return byte_slice{as_byte_span(head), payload, padding_bytes}; + } + + // fragment message + const size_t payload_space = noise_size - sizeof(bucket_head2); + const size_t expected_fragments = ((payload.size() - 2) / payload_space) + 1; + + std::string buffer{}; + buffer.reserve((expected_fragments + 1) * noise_size); // +1 here overselects for internal bucket_head2 value + + bucket_head2 head = make_header(0, noise_size - sizeof(bucket_head2), LEVIN_PACKET_BEGIN, false); + buffer.append(reinterpret_cast<const char*>(&head), sizeof(head)); + + head.m_command = command; + head.m_flags = LEVIN_PACKET_REQUEST; + head.m_cb = payload.size(); + buffer.append(reinterpret_cast<const char*>(&head), sizeof(head)); + + size_t copy_size = payload.remove_prefix(payload_space - sizeof(bucket_head2)); + buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size); + + head.m_command = 0; + head.m_flags = 0; + head.m_cb = noise_size - sizeof(bucket_head2); + + while (!payload.empty()) + { + copy_size = payload.remove_prefix(payload_space); + + if (payload.empty()) + head.m_flags = LEVIN_PACKET_END; + + buffer.append(reinterpret_cast<const char*>(&head), sizeof(head)); + buffer.append(reinterpret_cast<const char*>(payload.data()) - copy_size, copy_size); + } + + const size_t padding = noise_size - copy_size - sizeof(bucket_head2); + buffer.append(reinterpret_cast<const char*>(noise_message.end()) - padding, padding); + + return byte_slice{std::move(buffer)}; + } +} // levin +} // epee diff --git a/src/cryptonote_basic/connection_context.h b/src/cryptonote_basic/connection_context.h index 96398a90b..51076e8c0 100644 --- a/src/cryptonote_basic/connection_context.h +++ b/src/cryptonote_basic/connection_context.h @@ -31,8 +31,10 @@ #pragma once #include <unordered_set> #include <atomic> +#include <boost/date_time/posix_time/posix_time.hpp> #include "net/net_utils_base.h" #include "copyable_atomic.h" +#include "crypto/hash.h" namespace cryptonote { diff --git a/src/cryptonote_config.h b/src/cryptonote_config.h index b68bb41e1..173b454f6 100644 --- a/src/cryptonote_config.h +++ b/src/cryptonote_config.h @@ -100,6 +100,16 @@ #define CRYPTONOTE_MEMPOOL_TX_LIVETIME (86400*3) //seconds, three days #define CRYPTONOTE_MEMPOOL_TX_FROM_ALT_BLOCK_LIVETIME 604800 //seconds, one week +// see src/cryptonote_protocol/levin_notify.cpp +#define CRYPTONOTE_NOISE_MIN_EPOCH 5 // minutes +#define CRYPTONOTE_NOISE_EPOCH_RANGE 30 // seconds +#define CRYPTONOTE_NOISE_MIN_DELAY 10 // seconds +#define CRYPTONOTE_NOISE_DELAY_RANGE 5 // seconds +#define CRYPTONOTE_NOISE_BYTES 3*1024 // 3 KiB +#define CRYPTONOTE_NOISE_CHANNELS 2 // Max outgoing connections per zone used for noise/covert sending + +#define CRYPTONOTE_MAX_FRAGMENTS 20 // ~20 * NOISE_BYTES max payload size for covert/noise send + #define COMMAND_RPC_GET_BLOCKS_FAST_MAX_COUNT 1000 #define P2P_LOCAL_WHITE_PEERLIST_LIMIT 1000 diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.inl b/src/cryptonote_protocol/cryptonote_protocol_handler.inl index 2ff7b5938..7bd4eeead 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.inl +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.inl @@ -2227,69 +2227,11 @@ skip: template<class t_core> bool t_cryptonote_protocol_handler<t_core>::relay_transactions(NOTIFY_NEW_TRANSACTIONS::request& arg, cryptonote_connection_context& exclude_context) { - const bool hide_tx_broadcast = - 1 < m_p2p->get_zone_count() && exclude_context.m_remote_address.get_zone() == epee::net_utils::zone::invalid; - - if (hide_tx_broadcast) - MDEBUG("Attempting to conceal origin of tx via anonymity network connection(s)"); + for(auto& tx_blob : arg.txs) + m_core.on_transaction_relayed(tx_blob); // no check for success, so tell core they're relayed unconditionally - const bool pad_transactions = m_core.pad_transactions() || hide_tx_broadcast; - size_t bytes = pad_transactions ? 9 /* header */ + 4 /* 1 + 'txs' */ + tools::get_varint_data(arg.txs.size()).size() : 0; - for(auto tx_blob_it = arg.txs.begin(); tx_blob_it!=arg.txs.end(); ++tx_blob_it) - { - m_core.on_transaction_relayed(*tx_blob_it); - if (pad_transactions) - bytes += tools::get_varint_data(tx_blob_it->size()).size() + tx_blob_it->size(); - } - - if (pad_transactions) - { - // stuff some dummy bytes in to stay safe from traffic volume analysis - static constexpr size_t granularity = 1024; - size_t padding = granularity - bytes % granularity; - const size_t overhead = 2 /* 1 + '_' */ + tools::get_varint_data(padding).size(); - if (overhead > padding) - padding = 0; - else - padding -= overhead; - arg._ = std::string(padding, ' '); - - std::string arg_buff; - epee::serialization::store_t_to_binary(arg, arg_buff); - - // we probably lowballed the payload size a bit, so added a but too much. Fix this now. - size_t remove = arg_buff.size() % granularity; - if (remove > arg._.size()) - arg._.clear(); - else - arg._.resize(arg._.size() - remove); - // if the size of _ moved enough, we might lose byte in size encoding, we don't care - } - - std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections; - m_p2p->for_each_connection([hide_tx_broadcast, &exclude_context, &connections](connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags) - { - const epee::net_utils::zone current_zone = context.m_remote_address.get_zone(); - const bool broadcast_to_peer = - peer_id && - (hide_tx_broadcast != bool(current_zone == epee::net_utils::zone::public_)) && - exclude_context.m_connection_id != context.m_connection_id; - - if (broadcast_to_peer) - connections.push_back({current_zone, context.m_connection_id}); - - return true; - }); - - if (connections.empty()) - MERROR("Transaction not relayed - no" << (hide_tx_broadcast ? " privacy": "") << " peers available"); - else - { - std::string fullBlob; - epee::serialization::store_t_to_binary(arg, fullBlob); - m_p2p->relay_notify_to_list(NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan<uint8_t>(fullBlob), std::move(connections)); - } + m_p2p->send_txs(std::move(arg.txs), exclude_context.m_remote_address.get_zone(), exclude_context.m_connection_id, m_core.pad_transactions()); return true; } //------------------------------------------------------------------------------------------------------------------------ diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp new file mode 100644 index 000000000..26cd93b5a --- /dev/null +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -0,0 +1,574 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "levin_notify.h" + +#include <boost/asio/steady_timer.hpp> +#include <boost/system/system_error.hpp> +#include <chrono> +#include <deque> +#include <stdexcept> + +#include "common/expect.h" +#include "common/varint.h" +#include "cryptonote_config.h" +#include "crypto/random.h" +#include "cryptonote_basic/connection_context.h" +#include "cryptonote_protocol/cryptonote_protocol_defs.h" +#include "net/dandelionpp.h" +#include "p2p/net_node.h" + +namespace cryptonote +{ +namespace levin +{ + namespace + { + constexpr std::size_t connection_id_reserve_size = 100; + + constexpr const std::chrono::minutes noise_min_epoch{CRYPTONOTE_NOISE_MIN_EPOCH}; + constexpr const std::chrono::seconds noise_epoch_range{CRYPTONOTE_NOISE_EPOCH_RANGE}; + + constexpr const std::chrono::seconds noise_min_delay{CRYPTONOTE_NOISE_MIN_DELAY}; + constexpr const std::chrono::seconds noise_delay_range{CRYPTONOTE_NOISE_DELAY_RANGE}; + + /*! Select a randomized duration from 0 to `range`. The precision will be to + the systems `steady_clock`. As an example, supplying 3 seconds to this + function will select a duration from [0, 3] seconds, and the increments + for the selection will be determined by the `steady_clock` precision + (typically nanoseconds). + + \return A randomized duration from 0 to `range`. */ + std::chrono::steady_clock::duration random_duration(std::chrono::steady_clock::duration range) + { + using rep = std::chrono::steady_clock::rep; + return std::chrono::steady_clock::duration{crypto::rand_range(rep(0), range.count())}; + } + + //! \return All outgoing connections supporting fragments in `connections`. + std::vector<boost::uuids::uuid> get_out_connections(connections& p2p) + { + std::vector<boost::uuids::uuid> outs; + outs.reserve(connection_id_reserve_size); + + /* The foreach call is serialized with a lock, but should be quick due to + the reserve call so a strand is not used. Investigate if there is lots + of waiting in here. */ + + p2p.foreach_connection([&outs] (detail::p2p_context& context) { + if (!context.m_is_income) + outs.emplace_back(context.m_connection_id); + return true; + }); + + return outs; + } + + std::string make_tx_payload(std::vector<blobdata>&& txs, const bool pad) + { + NOTIFY_NEW_TRANSACTIONS::request request{}; + request.txs = std::move(txs); + + if (pad) + { + size_t bytes = 9 /* header */ + 4 /* 1 + 'txs' */ + tools::get_varint_data(request.txs.size()).size(); + for(auto tx_blob_it = request.txs.begin(); tx_blob_it!=request.txs.end(); ++tx_blob_it) + bytes += tools::get_varint_data(tx_blob_it->size()).size() + tx_blob_it->size(); + + // stuff some dummy bytes in to stay safe from traffic volume analysis + static constexpr const size_t granularity = 1024; + size_t padding = granularity - bytes % granularity; + const size_t overhead = 2 /* 1 + '_' */ + tools::get_varint_data(padding).size(); + if (overhead > padding) + padding = 0; + else + padding -= overhead; + request._ = std::string(padding, ' '); + + std::string arg_buff; + epee::serialization::store_t_to_binary(request, arg_buff); + + // we probably lowballed the payload size a bit, so added a but too much. Fix this now. + size_t remove = arg_buff.size() % granularity; + if (remove > request._.size()) + request._.clear(); + else + request._.resize(request._.size() - remove); + // if the size of _ moved enough, we might lose byte in size encoding, we don't care + } + + std::string fullBlob; + if (!epee::serialization::store_t_to_binary(request, fullBlob)) + throw std::runtime_error{"Failed to serialize to epee binary format"}; + + return fullBlob; + } + + /* The current design uses `asio::strand`s. The documentation isn't as clear + as it should be - a `strand` has an internal `mutex` and `bool`. The + `mutex` synchronizes thread access and the `bool` is set when a thread is + executing something "in the strand". Therefore, if a callback has lots of + work to do in a `strand`, asio can switch to some other task instead of + blocking 1+ threads to wait for the original thread to complete the task + (as is the case when client code has a `mutex` inside the callback). The + downside is that asio _always_ allocates for the callback, even if it can + be immediately executed. So if all work in a strand is minimal, a lock + may be better. + + This code uses a strand per "zone" and a strand per "channel in a zone". + `dispatch` is used heavily, which means "execute immediately in _this_ + thread if the strand is not in use, otherwise queue the callback to be + executed immediately after the strand completes its current task". + `post` is used where deferred execution to an `asio::io_service::run` + thread is preferred. + + The strand per "zone" is useful because the levin + `foreach_connection` is blocked with a mutex anyway. So this primarily + helps with reducing blocking of a thread attempting a "flood" + notification. Updating/merging the outgoing connections in the + Dandelion++ map is also somewhat expensive. + + The strand per "channel" may need a re-visit. The most "expensive" code + is figuring out the noise/notification to send. If levin code is + optimized further, it might be better to just use standard locks per + channel. */ + + //! A queue of levin messages for a noise i2p/tor link + struct noise_channel + { + explicit noise_channel(boost::asio::io_service& io_service) + : active(nullptr), + queue(), + strand(io_service), + next_noise(io_service), + connection(boost::uuids::nil_uuid()) + {} + + // `asio::io_service::strand` cannot be copied or moved + noise_channel(const noise_channel&) = delete; + noise_channel& operator=(const noise_channel&) = delete; + + // Only read/write these values "inside the strand" + + epee::byte_slice active; + std::deque<epee::byte_slice> queue; + boost::asio::io_service::strand strand; + boost::asio::steady_timer next_noise; + boost::uuids::uuid connection; + }; + } // anonymous + + namespace detail + { + struct zone + { + explicit zone(boost::asio::io_service& io_service, std::shared_ptr<connections> p2p, epee::byte_slice noise_in) + : p2p(std::move(p2p)), + noise(std::move(noise_in)), + next_epoch(io_service), + strand(io_service), + map(), + channels(), + connection_count(0) + { + for (std::size_t count = 0; !noise.empty() && count < CRYPTONOTE_NOISE_CHANNELS; ++count) + channels.emplace_back(io_service); + } + + const std::shared_ptr<connections> p2p; + const epee::byte_slice noise; //!< `!empty()` means zone is using noise channels + boost::asio::steady_timer next_epoch; + boost::asio::io_service::strand strand; + net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems + std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand` + std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time + }; + } // detail + + namespace + { + //! Adds a message to the sending queue of the channel. + class queue_covert_notify + { + std::shared_ptr<detail::zone> zone_; + epee::byte_slice message_; // Requires manual copy constructor + const std::size_t destination_; + + public: + queue_covert_notify(std::shared_ptr<detail::zone> zone, epee::byte_slice message, std::size_t destination) + : zone_(std::move(zone)), message_(std::move(message)), destination_(destination) + {} + + queue_covert_notify(queue_covert_notify&&) = default; + queue_covert_notify(const queue_covert_notify& source) + : zone_(source.zone_), message_(source.message_.clone()), destination_(source.destination_) + {} + + //! \pre Called within `zone_->channels[destionation_].strand`. + void operator()() + { + if (!zone_) + return; + + noise_channel& channel = zone_->channels.at(destination_); + assert(channel.strand.running_in_this_thread()); + + if (!channel.connection.is_nil()) + channel.queue.push_back(std::move(message_)); + } + }; + + //! Sends a message to every active connection + class flood_notify + { + std::shared_ptr<detail::zone> zone_; + epee::byte_slice message_; // Requires manual copy + boost::uuids::uuid source_; + + public: + explicit flood_notify(std::shared_ptr<detail::zone> zone, epee::byte_slice message, const boost::uuids::uuid& source) + : zone_(std::move(zone)), message_(message.clone()), source_(source) + {} + + flood_notify(flood_notify&&) = default; + flood_notify(const flood_notify& source) + : zone_(source.zone_), message_(source.message_.clone()), source_(source.source_) + {} + + void operator()() const + { + if (!zone_ || !zone_->p2p) + return; + + assert(zone_->strand.running_in_this_thread()); + + /* The foreach should be quick, but then it iterates and acquires the + same lock for every connection. So do in a strand because two threads + will ping-pong each other with cacheline invalidations. Revisit if + algorithm changes or the locking strategy within the levin config + class changes. */ + + std::vector<boost::uuids::uuid> connections; + connections.reserve(connection_id_reserve_size); + zone_->p2p->foreach_connection([this, &connections] (detail::p2p_context& context) { + if (this->source_ != context.m_connection_id) + connections.emplace_back(context.m_connection_id); + return true; + }); + + for (const boost::uuids::uuid& connection : connections) + zone_->p2p->send(message_.clone(), connection); + } + }; + + //! Updates the connection for a channel. + struct update_channel + { + std::shared_ptr<detail::zone> zone_; + const std::size_t channel_; + const boost::uuids::uuid connection_; + + //! \pre Called within `stem_.strand`. + void operator()() const + { + if (!zone_) + return; + + noise_channel& channel = zone_->channels.at(channel_); + assert(channel.strand.running_in_this_thread()); + static_assert( + CRYPTONOTE_MAX_FRAGMENTS <= (noise_min_epoch / (noise_min_delay + noise_delay_range)), + "Max fragments more than the max that can be sent in an epoch" + ); + + /* This clears the active message so that a message "in-flight" is + restarted. DO NOT try to send the remainder of the fragments, this + additional send time can leak that this node was sending out a real + notify (tx) instead of dummy noise. */ + + channel.connection = connection_; + channel.active = nullptr; + + if (connection_.is_nil()) + channel.queue.clear(); + } + }; + + //! Merges `out_connections_` into the existing `zone_->map`. + struct update_channels + { + std::shared_ptr<detail::zone> zone_; + std::vector<boost::uuids::uuid> out_connections_; + + //! \pre Called within `zone->strand`. + static void post(std::shared_ptr<detail::zone> zone) + { + if (!zone) + return; + + assert(zone->strand.running_in_this_thread()); + + zone->connection_count = zone->map.size(); + for (auto id = zone->map.begin(); id != zone->map.end(); ++id) + { + const std::size_t i = id - zone->map.begin(); + zone->channels[i].strand.post(update_channel{zone, i, *id}); + } + } + + //! \pre Called within `zone_->strand`. + void operator()() + { + if (!zone_) + return; + + assert(zone_->strand.running_in_this_thread()); + if (zone_->map.update(std::move(out_connections_))) + post(std::move(zone_)); + } + }; + + //! Swaps out noise channels entirely; new epoch start. + class change_channels + { + std::shared_ptr<detail::zone> zone_; + net::dandelionpp::connection_map map_; // Requires manual copy constructor + + public: + explicit change_channels(std::shared_ptr<detail::zone> zone, net::dandelionpp::connection_map map) + : zone_(std::move(zone)), map_(std::move(map)) + {} + + change_channels(change_channels&&) = default; + change_channels(const change_channels& source) + : zone_(source.zone_), map_(source.map_.clone()) + {} + + //! \pre Called within `zone_->strand`. + void operator()() + { + if (!zone_) + return + + assert(zone_->strand.running_in_this_thread()); + + zone_->map = std::move(map_); + update_channels::post(std::move(zone_)); + } + }; + + //! Sends a noise packet or real notification and sets timer for next call. + struct send_noise + { + std::shared_ptr<detail::zone> zone_; + const std::size_t channel_; + + static void wait(const std::chrono::steady_clock::time_point start, std::shared_ptr<detail::zone> zone, const std::size_t index) + { + if (!zone) + return; + + noise_channel& channel = zone->channels.at(index); + channel.next_noise.expires_at(start + noise_min_delay + random_duration(noise_delay_range)); + channel.next_noise.async_wait( + channel.strand.wrap(send_noise{std::move(zone), index}) + ); + } + + //! \pre Called within `zone_->channels[channel_].strand`. + void operator()(boost::system::error_code error) + { + if (!zone_ || !zone_->p2p || zone_->noise.empty()) + return; + + if (error && error != boost::system::errc::operation_canceled) + throw boost::system::system_error{error, "send_noise timer failed"}; + + assert(zone_->channels.at(channel_).strand.running_in_this_thread()); + + const auto start = std::chrono::steady_clock::now(); + noise_channel& channel = zone_->channels.at(channel_); + + if (!channel.connection.is_nil()) + { + epee::byte_slice message = nullptr; + if (!channel.active.empty()) + message = channel.active.take_slice(zone_->noise.size()); + else if (!channel.queue.empty()) + { + channel.active = channel.queue.front().clone(); + message = channel.active.take_slice(zone_->noise.size()); + } + else + message = zone_->noise.clone(); + + if (zone_->p2p->send(std::move(message), channel.connection)) + { + if (!channel.queue.empty() && channel.active.empty()) + channel.queue.pop_front(); + } + else + { + channel.active = nullptr; + channel.connection = boost::uuids::nil_uuid(); + zone_->strand.post( + update_channels{zone_, get_out_connections(*zone_->p2p)} + ); + } + } + + wait(start, std::move(zone_), channel_); + } + }; + + //! Prepares connections for new channel epoch and sets timer for next epoch + struct start_epoch + { + // Variables allow for Dandelion++ extension + std::shared_ptr<detail::zone> zone_; + std::chrono::seconds min_epoch_; + std::chrono::seconds epoch_range_; + std::size_t count_; + + //! \pre Should not be invoked within any strand to prevent blocking. + void operator()(const boost::system::error_code error = {}) + { + if (!zone_ || !zone_->p2p) + return; + + if (error && error != boost::system::errc::operation_canceled) + throw boost::system::system_error{error, "start_epoch timer failed"}; + + const auto start = std::chrono::steady_clock::now(); + zone_->strand.dispatch( + change_channels{zone_, net::dandelionpp::connection_map{get_out_connections(*(zone_->p2p)), count_}} + ); + + detail::zone& alias = *zone_; + alias.next_epoch.expires_at(start + min_epoch_ + random_duration(epoch_range_)); + alias.next_epoch.async_wait(start_epoch{std::move(*this)}); + } + }; + } // anonymous + + notify::notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise) + : zone_(std::make_shared<detail::zone>(service, std::move(p2p), std::move(noise))) + { + if (!zone_->p2p) + throw std::logic_error{"cryptonote::levin::notify cannot have nullptr p2p argument"}; + + if (!zone_->noise.empty()) + { + const auto now = std::chrono::steady_clock::now(); + start_epoch{zone_, noise_min_epoch, noise_epoch_range, CRYPTONOTE_NOISE_CHANNELS}(); + for (std::size_t channel = 0; channel < zone_->channels.size(); ++channel) + send_noise::wait(now, zone_, channel); + } + } + + notify::~notify() noexcept + {} + + notify::status notify::get_status() const noexcept + { + if (!zone_) + return {false, false}; + + return {!zone_->noise.empty(), CRYPTONOTE_NOISE_CHANNELS <= zone_->connection_count}; + } + + void notify::new_out_connection() + { + if (!zone_ || zone_->noise.empty() || CRYPTONOTE_NOISE_CHANNELS <= zone_->connection_count) + return; + + zone_->strand.dispatch( + update_channels{zone_, get_out_connections(*(zone_->p2p))} + ); + } + + void notify::run_epoch() + { + if (!zone_) + return; + zone_->next_epoch.cancel(); + } + + void notify::run_stems() + { + if (!zone_) + return; + + for (noise_channel& channel : zone_->channels) + channel.next_noise.cancel(); + } + + bool notify::send_txs(std::vector<cryptonote::blobdata> txs, const boost::uuids::uuid& source, const bool pad_txs) + { + if (!zone_) + return false; + + if (!zone_->noise.empty() && !zone_->channels.empty()) + { + // covert send in "noise" channel + static_assert( + CRYPTONOTE_MAX_FRAGMENTS * CRYPTONOTE_NOISE_BYTES <= LEVIN_DEFAULT_MAX_PACKET_SIZE, "most nodes will reject this fragment setting" + ); + + // padding is not useful when using noise mode + const std::string payload = make_tx_payload(std::move(txs), false); + epee::byte_slice message = epee::levin::make_fragmented_notify( + zone_->noise, NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan<std::uint8_t>(payload) + ); + if (CRYPTONOTE_MAX_FRAGMENTS * zone_->noise.size() < message.size()) + { + MERROR("notify::send_txs provided message exceeding covert fragment size"); + return false; + } + + for (std::size_t channel = 0; channel < zone_->channels.size(); ++channel) + { + zone_->channels[channel].strand.dispatch( + queue_covert_notify{zone_, message.clone(), channel} + ); + } + } + else + { + const std::string payload = make_tx_payload(std::move(txs), pad_txs); + epee::byte_slice message = + epee::levin::make_notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan<std::uint8_t>(payload)); + + // traditional monero send technique + zone_->strand.dispatch(flood_notify{zone_, std::move(message), source}); + } + + return true; + } +} // levin +} // net diff --git a/src/cryptonote_protocol/levin_notify.h b/src/cryptonote_protocol/levin_notify.h new file mode 100644 index 000000000..82d22680a --- /dev/null +++ b/src/cryptonote_protocol/levin_notify.h @@ -0,0 +1,132 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include <boost/asio/io_service.hpp> +#include <boost/uuid/uuid.hpp> +#include <memory> +#include <vector> + +#include "byte_slice.h" +#include "cryptonote_basic/blobdatatype.h" +#include "net/enums.h" +#include "span.h" + +namespace epee +{ +namespace levin +{ + template<typename> class async_protocol_handler_config; +} +} + +namespace nodetool +{ + template<typename> struct p2p_connection_context_t; +} + +namespace cryptonote +{ + struct cryptonote_connection_context; +} + +namespace cryptonote +{ +namespace levin +{ + namespace detail + { + using p2p_context = nodetool::p2p_connection_context_t<cryptonote::cryptonote_connection_context>; + struct zone; //!< Internal data needed for zone notifications + } // detail + + using connections = epee::levin::async_protocol_handler_config<detail::p2p_context>; + + //! Provides tx notification privacy + class notify + { + std::shared_ptr<detail::zone> zone_; + + public: + struct status + { + bool has_noise; + bool connections_filled; + }; + + //! Construct an instance that cannot notify. + notify() noexcept + : zone_(nullptr) + {} + + //! Construct an instance with available notification `zones`. + explicit notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise); + + notify(const notify&) = delete; + notify(notify&&) = default; + + ~notify() noexcept; + + notify& operator=(const notify&) = delete; + notify& operator=(notify&&) = default; + + //! \return Status information for zone selection. + status get_status() const noexcept; + + //! Probe for new outbound connection - skips if not needed. + void new_out_connection(); + + //! Run the logic for the next epoch immediately. Only use in testing. + void run_epoch(); + + //! Run the logic for the next stem timeout imemdiately. Only use in testing. + void run_stems(); + + /*! Send txs using `cryptonote_protocol_defs.h` payload format wrapped in a + levin header. The message will be sent in a "discreet" manner if + enabled - if `!noise.empty()` then the `command`/`payload` will be + queued to send at the next available noise interval. Otherwise, a + standard Monero flood notification will be used. + + \note Eventually Dandelion++ stem sending will be used here when + enabled. + + \param txs The transactions that need to be serialized and relayed. + \param source The source of the notification. `is_nil()` indicates this + node is the source. Dandelion++ will use this to map a source to a + particular stem. + \param pad_txs A request to pad txs to help conceal origin via + statistical analysis. Ignored if noise was enabled during + construction. + + \return True iff the notification is queued for sending. */ + bool send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source, bool pad_txs); + }; +} // levin +} // net diff --git a/src/net/CMakeLists.txt b/src/net/CMakeLists.txt index 738f858f0..24b707f77 100644 --- a/src/net/CMakeLists.txt +++ b/src/net/CMakeLists.txt @@ -26,8 +26,8 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -set(net_sources error.cpp i2p_address.cpp parse.cpp socks.cpp socks_connect.cpp tor_address.cpp) -set(net_headers error.h i2p_address.h parse.h socks.h socks_connect.h tor_address.h) +set(net_sources dandelionpp.cpp error.cpp i2p_address.cpp parse.cpp socks.cpp socks_connect.cpp tor_address.cpp) +set(net_headers dandelionpp.h error.h i2p_address.h parse.h socks.h socks_connect.h tor_address.h) monero_add_library(net ${net_sources} ${net_headers}) target_link_libraries(net common epee ${Boost_ASIO_LIBRARY}) diff --git a/src/net/dandelionpp.cpp b/src/net/dandelionpp.cpp new file mode 100644 index 000000000..4d2f75428 --- /dev/null +++ b/src/net/dandelionpp.cpp @@ -0,0 +1,212 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "dandelionpp.h" + +#include <boost/container/small_vector.hpp> +#include <boost/uuid/nil_generator.hpp> +#include <chrono> + +#include "common/expect.h" +#include "cryptonote_config.h" +#include "crypto/crypto.h" + +namespace net +{ +namespace dandelionpp +{ + namespace + { + constexpr const std::size_t expected_max_channels = CRYPTONOTE_NOISE_CHANNELS; + + // could be in util somewhere + struct key_less + { + template<typename K, typename V> + bool operator()(const std::pair<K, V>& left, const K& right) const + { + return left.first < right; + } + + template<typename K, typename V> + bool operator()(const K& left, const std::pair<K, V>& right) const + { + return left < right.first; + } + }; + + std::size_t select_stem(epee::span<const std::size_t> usage, epee::span<const boost::uuids::uuid> out_map) + { + assert(usage.size() < std::numeric_limits<std::size_t>::max()); // prevented in constructor + if (usage.size() < out_map.size()) + return std::numeric_limits<std::size_t>::max(); + + // small_vector uses stack space if `expected_max_channels < capacity()` + std::size_t lowest = std::numeric_limits<std::size_t>::max(); + boost::container::small_vector<std::size_t, expected_max_channels> choices; + static_assert(sizeof(choices) < 256, "choices is too large based on current configuration"); + + for (const boost::uuids::uuid& out : out_map) + { + if (!out.is_nil()) + { + const std::size_t location = std::addressof(out) - out_map.begin(); + if (usage[location] < lowest) + { + lowest = usage[location]; + choices = {location}; + } + else if (usage[location] == lowest) + choices.push_back(location); + } + } + + switch (choices.size()) + { + case 0: + return std::numeric_limits<std::size_t>::max(); + case 1: + return choices[0]; + default: + break; + } + + return choices[crypto::rand_idx(choices.size())]; + } + } // anonymous + + connection_map::connection_map(std::vector<boost::uuids::uuid> out_connections, const std::size_t stems) + : out_mapping_(std::move(out_connections)), + in_mapping_(), + usage_count_() + { + // max value is used by `select_stem` as error case + if (stems == std::numeric_limits<std::size_t>::max()) + MONERO_THROW(common_error::kInvalidArgument, "stems value cannot be max size_t"); + + usage_count_.resize(stems); + if (stems < out_mapping_.size()) + { + for (unsigned i = 0; i < stems; ++i) + std::swap(out_mapping_[i], out_mapping_.at(i + crypto::rand_idx(out_mapping_.size() - i))); + + out_mapping_.resize(stems); + } + else + { + std::shuffle(out_mapping_.begin(), out_mapping_.end(), crypto::random_device{}); + } + } + + connection_map::~connection_map() noexcept + {} + + connection_map connection_map::clone() const + { + return {*this}; + } + + bool connection_map::update(std::vector<boost::uuids::uuid> current) + { + std::sort(current.begin(), current.end()); + + bool replace = false; + for (auto& existing_out : out_mapping_) + { + const auto elem = std::lower_bound(current.begin(), current.end(), existing_out); + if (elem == current.end() || *elem != existing_out) + { + existing_out = boost::uuids::nil_uuid(); + replace = true; + } + else // already using connection, remove it from candidate list + current.erase(elem); + } + + if (!replace && out_mapping_.size() == usage_count_.size()) + return false; + + const std::size_t existing_outs = out_mapping_.size(); + for (std::size_t i = 0; i < usage_count_.size() && !current.empty(); ++i) + { + const bool increase_stems = out_mapping_.size() <= i; + if (increase_stems || out_mapping_[i].is_nil()) + { + std::swap(current.back(), current.at(crypto::rand_idx(current.size()))); + if (increase_stems) + out_mapping_.push_back(current.back()); + else + out_mapping_[i] = current.back(); + current.pop_back(); + } + } + + return replace || existing_outs < out_mapping_.size(); + } + + std::size_t connection_map::size() const noexcept + { + std::size_t count = 0; + for (const boost::uuids::uuid& connection : out_mapping_) + { + if (!connection.is_nil()) + ++count; + } + return count; + } + + boost::uuids::uuid connection_map::get_stem(const boost::uuids::uuid& source) + { + auto elem = std::lower_bound(in_mapping_.begin(), in_mapping_.end(), source, key_less{}); + if (elem == in_mapping_.end() || elem->first != source) + { + const std::size_t index = select_stem(epee::to_span(usage_count_), epee::to_span(out_mapping_)); + if (out_mapping_.size() < index) + return boost::uuids::nil_uuid(); + + elem = in_mapping_.emplace(elem, source, index); + usage_count_[index]++; + } + else if (out_mapping_.at(elem->second).is_nil()) // stem connection disconnected after mapping + { + usage_count_.at(elem->second)--; + const std::size_t index = select_stem(epee::to_span(usage_count_), epee::to_span(out_mapping_)); + if (out_mapping_.size() < index) + { + in_mapping_.erase(elem); + return boost::uuids::nil_uuid(); + } + + elem->second = index; + usage_count_[index]++; + } + + return out_mapping_[elem->second]; + } +} // dandelionpp +} // net diff --git a/src/net/dandelionpp.h b/src/net/dandelionpp.h new file mode 100644 index 000000000..75b63bc0c --- /dev/null +++ b/src/net/dandelionpp.h @@ -0,0 +1,106 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include <boost/uuid/uuid.hpp> +#include <cstddef> +#include <memory> +#include <utility> +#include <vector> + +#include "span.h" + +namespace net +{ +namespace dandelionpp +{ + //! Assists with mapping source -> stem and tracking connections for stem. + class connection_map + { + // Make sure to update clone method if changing members + std::vector<boost::uuids::uuid> out_mapping_; //<! Current outgoing uuid connection at index. + std::vector<std::pair<boost::uuids::uuid, std::size_t>> in_mapping_; //<! uuid source to an `out_mapping_` index. + std::vector<std::size_t> usage_count_; + + // Use clone method to prevent "hidden" copies. + connection_map(const connection_map&) = default; + + public: + using value_type = boost::uuids::uuid; + using size_type = std::vector<boost::uuids::uuid>::size_type; + using difference_type = std::vector<boost::uuids::uuid>::difference_type; + using reference = const boost::uuids::uuid&; + using const_reference = reference; + using iterator = std::vector<boost::uuids::uuid>::const_iterator; + using const_iterator = iterator; + + //! Initialized with zero stem connections. + explicit connection_map() + : connection_map(std::vector<boost::uuids::uuid>{}, 0) + {} + + //! Initialized with `out_connections` and `stem_count`. + explicit connection_map(std::vector<boost::uuids::uuid> out_connections, std::size_t stems); + + connection_map(connection_map&&) = default; + ~connection_map() noexcept; + connection_map& operator=(connection_map&&) = default; + connection_map& operator=(const connection_map&) = delete; + + //! \return An exact duplicate of `this` map. + connection_map clone() const; + + //! \return First stem connection. + const_iterator begin() const noexcept + { + return out_mapping_.begin(); + } + + //! \return One-past the last stem connection. + const_iterator end() const noexcept + { + return out_mapping_.end(); + } + + /*! Merges in current connections with the previous set of connections. + If a connection died, a new one will take its place in the stem or + the stem is marked as dead. + + \param connections Current outbound connection ids. + \return True if any updates to `get_connections()` was made. */ + bool update(std::vector<boost::uuids::uuid> current); + + //! \return Number of outgoing connections in use. + std::size_t size() const noexcept; + + //! \return Current stem mapping for `source` or `nil_uuid()` if none is possible. + boost::uuids::uuid get_stem(const boost::uuids::uuid& source); + }; +} // dandelionpp +} // net diff --git a/src/p2p/net_node.cpp b/src/p2p/net_node.cpp index bb51be242..c7fc058ca 100644 --- a/src/p2p/net_node.cpp +++ b/src/p2p/net_node.cpp @@ -144,7 +144,7 @@ namespace nodetool const command_line::arg_descriptor<std::vector<std::string> > arg_p2p_add_exclusive_node = {"add-exclusive-node", "Specify list of peers to connect to only." " If this option is given the options add-priority-node and seed-node are ignored"}; const command_line::arg_descriptor<std::vector<std::string> > arg_p2p_seed_node = {"seed-node", "Connect to a node to retrieve peer addresses, and disconnect"}; - const command_line::arg_descriptor<std::vector<std::string> > arg_proxy = {"proxy", "<network-type>,<socks-ip:port>[,max_connections] i.e. \"tor,127.0.0.1:9050,100\""}; + const command_line::arg_descriptor<std::vector<std::string> > arg_proxy = {"proxy", "<network-type>,<socks-ip:port>[,max_connections][,disable_noise] i.e. \"tor,127.0.0.1:9050,100,disable_noise\""}; const command_line::arg_descriptor<std::vector<std::string> > arg_anonymous_inbound = {"anonymous-inbound", "<hidden-service-address>,<[bind-ip:]port>[,max_connections] i.e. \"x.onion,127.0.0.1:18083,100\""}; const command_line::arg_descriptor<bool> arg_p2p_hide_my_port = {"hide-my-port", "Do not announce yourself as peerlist candidate", false, true}; const command_line::arg_descriptor<bool> arg_no_sync = {"no-sync", "Don't synchronize the blockchain with other peers", false}; @@ -163,7 +163,7 @@ namespace nodetool boost::optional<std::vector<proxy>> get_proxies(boost::program_options::variables_map const& vm) { - namespace ip = boost::asio::ip; + namespace ip = boost::asio::ip; std::vector<proxy> proxies{}; @@ -183,14 +183,25 @@ namespace nodetool const boost::string_ref proxy{next->begin(), next->size()}; ++next; - if (!next.eof()) + for (unsigned count = 0; !next.eof(); ++count, ++next) { - proxies.back().max_connections = get_max_connections(*next); - if (proxies.back().max_connections == 0) + if (2 <= count) { - MERROR("Invalid max connections given to --" << arg_proxy.name); + MERROR("Too many ',' characters given to --" << arg_proxy.name); return boost::none; } + + if (boost::string_ref{next->begin(), next->size()} == "disable_noise") + proxies.back().noise = false; + else + { + proxies.back().max_connections = get_max_connections(*next); + if (proxies.back().max_connections == 0) + { + MERROR("Invalid max connections given to --" << arg_proxy.name); + return boost::none; + } + } } switch (epee::net_utils::zone_from_string(zone)) @@ -214,7 +225,7 @@ namespace nodetool return boost::none; } proxies.back().address = ip::tcp::endpoint{ip::address_v4{boost::endian::native_to_big(ip)}, port}; - } + } return proxies; } diff --git a/src/p2p/net_node.h b/src/p2p/net_node.h index 6d2ae878f..231175dd2 100644 --- a/src/p2p/net_node.h +++ b/src/p2p/net_node.h @@ -43,6 +43,7 @@ #include <vector> #include "cryptonote_config.h" +#include "cryptonote_protocol/levin_notify.h" #include "warnings.h" #include "net/abstract_tcp_server2.h" #include "net/levin_protocol_handler.h" @@ -66,12 +67,14 @@ namespace nodetool proxy() : max_connections(-1), address(), - zone(epee::net_utils::zone::invalid) + zone(epee::net_utils::zone::invalid), + noise(true) {} std::int64_t max_connections; boost::asio::ip::tcp::endpoint address; epee::net_utils::zone zone; + bool noise; }; struct anonymous_inbound @@ -154,6 +157,7 @@ namespace nodetool m_bind_ipv6_address(), m_port(), m_port_ipv6(), + m_notifier(), m_our_address(), m_peerlist(), m_config{}, @@ -172,6 +176,7 @@ namespace nodetool m_bind_ipv6_address(), m_port(), m_port_ipv6(), + m_notifier(), m_our_address(), m_peerlist(), m_config{}, @@ -189,6 +194,7 @@ namespace nodetool std::string m_bind_ipv6_address; std::string m_port; std::string m_port_ipv6; + cryptonote::levin::notify m_notifier; epee::net_utils::network_address m_our_address; // in anonymity networks peerlist_manager m_peerlist; config m_config; @@ -255,7 +261,6 @@ namespace nodetool size_t get_public_gray_peers_count(); void get_public_peerlist(std::vector<peerlist_entry>& gray, std::vector<peerlist_entry>& white); void get_peerlist(std::vector<peerlist_entry>& gray, std::vector<peerlist_entry>& white); - size_t get_zone_count() const { return m_network_zones.size(); } void change_max_out_public_peers(size_t count); uint32_t get_max_out_public_peers() const; @@ -330,6 +335,7 @@ namespace nodetool virtual void callback(p2p_connection_context& context); //----------------- i_p2p_endpoint ------------------------------------------------------------- virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections); + virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, const bool pad_txs); virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context); virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context); virtual bool drop_connection(const epee::net_utils::connection_context_base& context); diff --git a/src/p2p/net_node.inl b/src/p2p/net_node.inl index 8c0cff7e2..41ca19917 100644 --- a/src/p2p/net_node.inl +++ b/src/p2p/net_node.inl @@ -383,6 +383,9 @@ namespace nodetool m_offline = command_line::get_arg(vm, cryptonote::arg_offline); m_use_ipv6 = command_line::get_arg(vm, arg_p2p_use_ipv6); m_require_ipv4 = command_line::get_arg(vm, arg_p2p_require_ipv4); + public_zone.m_notifier = cryptonote::levin::notify{ + public_zone.m_net_server.get_io_service(), public_zone.m_net_server.get_config_shared(), nullptr + }; if (command_line::has_arg(vm, arg_p2p_add_peer)) { @@ -462,6 +465,7 @@ namespace nodetool return false; + epee::byte_slice noise = nullptr; auto proxies = get_proxies(vm); if (!proxies) return false; @@ -479,6 +483,20 @@ namespace nodetool if (!set_max_out_peers(zone, proxy.max_connections)) return false; + + epee::byte_slice this_noise = nullptr; + if (proxy.noise) + { + static_assert(sizeof(epee::levin::bucket_head2) < CRYPTONOTE_NOISE_BYTES, "noise bytes too small"); + if (noise.empty()) + noise = epee::levin::make_noise_notify(CRYPTONOTE_NOISE_BYTES); + + this_noise = noise.clone(); + } + + zone.m_notifier = cryptonote::levin::notify{ + zone.m_net_server.get_io_service(), zone.m_net_server.get_config_shared(), std::move(this_noise) + }; } for (const auto& zone : m_network_zones) @@ -494,6 +512,7 @@ namespace nodetool if (!inbounds) return false; + const std::size_t tx_relay_zones = m_network_zones.size(); for (auto& inbound : *inbounds) { network_zone& zone = add_zone(inbound.our_address.get_zone()); @@ -504,6 +523,12 @@ namespace nodetool return false; } + if (zone.m_connect == nullptr && tx_relay_zones <= 1) + { + MERROR("Listed --" << arg_anonymous_inbound.name << " without listing any --" << arg_proxy.name << ". The latter is necessary for sending origin txes over anonymity networks"); + return false; + } + zone.m_bind_ip = std::move(inbound.local_ip); zone.m_port = std::move(inbound.local_port); zone.m_net_server.set_default_remote(std::move(inbound.default_remote)); @@ -1266,6 +1291,7 @@ namespace nodetool ape.first_seen = first_seen_stamp ? first_seen_stamp : time(nullptr); zone.m_peerlist.append_with_peer_anchor(ape); + zone.m_notifier.new_out_connection(); LOG_DEBUG_CC(*con, "CONNECTION HANDSHAKED OK."); return true; @@ -1990,7 +2016,7 @@ namespace nodetool } if (c_id.first <= zone->first) break; - + ++zone; } if (zone->first == c_id.first) @@ -2000,6 +2026,61 @@ namespace nodetool } //----------------------------------------------------------------------------------- template<class t_payload_net_handler> + epee::net_utils::zone node_server<t_payload_net_handler>::send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, const bool pad_txs) + { + namespace enet = epee::net_utils; + + const auto send = [&txs, &source, pad_txs] (std::pair<const enet::zone, network_zone>& network) + { + if (network.second.m_notifier.send_txs(std::move(txs), source, (pad_txs || network.first != enet::zone::public_))) + return network.first; + return enet::zone::invalid; + }; + + if (m_network_zones.empty()) + return enet::zone::invalid; + + if (origin != enet::zone::invalid) + return send(*m_network_zones.begin()); // send all txs received via p2p over public network + + if (m_network_zones.size() <= 2) + return send(*m_network_zones.rbegin()); // see static asserts below; sends over anonymity network iff enabled + + /* These checks are to ensure that i2p is highest priority if multiple + zones are selected. Make sure to update logic if the values cannot be + in the same relative order. `m_network_zones` must be sorted map too. */ + static_assert(std::is_same<std::underlying_type<enet::zone>::type, std::uint8_t>{}, "expected uint8_t zone"); + static_assert(unsigned(enet::zone::invalid) == 0, "invalid expected to be 0"); + static_assert(unsigned(enet::zone::public_) == 1, "public_ expected to be 1"); + static_assert(unsigned(enet::zone::i2p) == 2, "i2p expected to be 2"); + static_assert(unsigned(enet::zone::tor) == 3, "tor expected to be 3"); + + // check for anonymity networks with noise and connections + for (auto network = ++m_network_zones.begin(); network != m_network_zones.end(); ++network) + { + if (enet::zone::tor < network->first) + break; // unknown network + + const auto status = network->second.m_notifier.get_status(); + if (status.has_noise && status.connections_filled) + return send(*network); + } + + // use the anonymity network with outbound support + for (auto network = ++m_network_zones.begin(); network != m_network_zones.end(); ++network) + { + if (enet::zone::tor < network->first) + break; // unknown network + + if (network->second.m_connect) + return send(*network); + } + + // configuration should not allow this scenario + return enet::zone::invalid; + } + //----------------------------------------------------------------------------------- + template<class t_payload_net_handler> void node_server<t_payload_net_handler>::callback(p2p_connection_context& context) { m_payload_handler.on_callback(context); diff --git a/src/p2p/net_node_common.h b/src/p2p/net_node_common.h index 34d151f5f..239814c2c 100644 --- a/src/p2p/net_node_common.h +++ b/src/p2p/net_node_common.h @@ -33,6 +33,8 @@ #include <boost/uuid/uuid.hpp> #include <utility> #include <vector> +#include "cryptonote_basic/blobdatatype.h" +#include "net/enums.h" #include "net/net_utils_base.h" #include "p2p_protocol_defs.h" @@ -46,12 +48,12 @@ namespace nodetool struct i_p2p_endpoint { virtual bool relay_notify_to_list(int command, const epee::span<const uint8_t> data_buff, std::vector<std::pair<epee::net_utils::zone, boost::uuids::uuid>> connections)=0; + virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, const bool pad_txs)=0; virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context)=0; virtual bool invoke_notify_to_peer(int command, const epee::span<const uint8_t> req_buff, const epee::net_utils::connection_context_base& context)=0; virtual bool drop_connection(const epee::net_utils::connection_context_base& context)=0; virtual void request_callback(const epee::net_utils::connection_context_base& context)=0; virtual uint64_t get_public_connections_count()=0; - virtual size_t get_zone_count() const=0; virtual void for_each_connection(std::function<bool(t_connection_context&, peerid_type, uint32_t)> f)=0; virtual bool for_connection(const boost::uuids::uuid&, std::function<bool(t_connection_context&, peerid_type, uint32_t)> f)=0; virtual bool block_host(const epee::net_utils::network_address &address, time_t seconds = 0)=0; @@ -71,6 +73,10 @@ namespace nodetool { return false; } + virtual epee::net_utils::zone send_txs(std::vector<cryptonote::blobdata> txs, const epee::net_utils::zone origin, const boost::uuids::uuid& source, const bool pad_txs) + { + return epee::net_utils::zone::invalid; + } virtual bool invoke_command_to_peer(int command, const epee::span<const uint8_t> req_buff, std::string& resp_buff, const epee::net_utils::connection_context_base& context) { return false; @@ -96,11 +102,6 @@ namespace nodetool return false; } - virtual size_t get_zone_count() const - { - return 0; - } - virtual uint64_t get_public_connections_count() { return false; diff --git a/tests/fuzz/levin.cpp b/tests/fuzz/levin.cpp index fe9ef418e..6c16a0a85 100644 --- a/tests/fuzz/levin.cpp +++ b/tests/fuzz/levin.cpp @@ -149,11 +149,11 @@ namespace } // Implement epee::net_utils::i_service_endpoint interface - virtual bool do_send(const void* ptr, size_t cb) + virtual bool do_send(epee::byte_slice message) { m_send_counter.inc(); boost::unique_lock<boost::mutex> lock(m_mutex); - m_last_send_data.append(reinterpret_cast<const char*>(ptr), cb); + m_last_send_data.append(reinterpret_cast<const char*>(message.data()), message.size()); return m_send_return; } diff --git a/tests/unit_tests/CMakeLists.txt b/tests/unit_tests/CMakeLists.txt index 1c4c4384c..a5d179040 100644 --- a/tests/unit_tests/CMakeLists.txt +++ b/tests/unit_tests/CMakeLists.txt @@ -56,6 +56,7 @@ set(unit_tests_sources hmac_keccak.cpp http.cpp keccak.cpp + levin.cpp logging.cpp long_term_block_weight.cpp lmdb.cpp diff --git a/tests/unit_tests/epee_levin_protocol_handler_async.cpp b/tests/unit_tests/epee_levin_protocol_handler_async.cpp index 697845f60..7bdb4c43d 100644 --- a/tests/unit_tests/epee_levin_protocol_handler_async.cpp +++ b/tests/unit_tests/epee_levin_protocol_handler_async.cpp @@ -140,12 +140,12 @@ namespace } // Implement epee::net_utils::i_service_endpoint interface - virtual bool do_send(const void* ptr, size_t cb) + virtual bool do_send(epee::byte_slice message) { //std::cout << "test_connection::do_send()" << std::endl; m_send_counter.inc(); boost::unique_lock<boost::mutex> lock(m_mutex); - m_last_send_data.append(reinterpret_cast<const char*>(ptr), cb); + m_last_send_data.append(reinterpret_cast<const char*>(message.data()), message.size()); return m_send_return; } @@ -367,8 +367,8 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process // Parse send data std::string send_data = conn->last_send_data(); epee::levin::bucket_head2 resp_head; - resp_head = *reinterpret_cast<const epee::levin::bucket_head2*>(send_data.data()); ASSERT_LT(sizeof(resp_head), send_data.size()); + std::memcpy(std::addressof(resp_head), send_data.data(), sizeof(resp_head)); std::string out_data = send_data.substr(sizeof(resp_head)); // Check sent response @@ -425,6 +425,95 @@ TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_process ASSERT_EQ(3, m_commands_handler.callback_counter()); } +TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_processes_handle_read_as_dummy) +{ + // Setup + const int expected_command = 4673261; + const std::string in_data(256, 'e'); + + const epee::byte_slice noise = epee::levin::make_noise_notify(1024); + const epee::byte_slice notify = epee::levin::make_notify(expected_command, epee::strspan<std::uint8_t>(in_data)); + + test_connection_ptr conn = create_connection(); + + // Test + ASSERT_TRUE(conn->m_protocol_handler.handle_recv(noise.data(), noise.size())); + + // Check connection and levin_commands_handler states + ASSERT_EQ(0u, m_commands_handler.notify_counter()); + ASSERT_EQ(0u, m_commands_handler.invoke_counter()); + ASSERT_EQ(-1, m_commands_handler.last_command()); + ASSERT_TRUE(m_commands_handler.last_in_buf().empty()); + ASSERT_EQ(0u, conn->send_counter()); + ASSERT_TRUE(conn->last_send_data().empty()); + + + ASSERT_TRUE(conn->m_protocol_handler.handle_recv(notify.data(), notify.size())); + + // Check connection and levin_commands_handler states + ASSERT_EQ(1u, m_commands_handler.notify_counter()); + ASSERT_EQ(0u, m_commands_handler.invoke_counter()); + ASSERT_EQ(expected_command, m_commands_handler.last_command()); + ASSERT_EQ(in_data, m_commands_handler.last_in_buf()); + ASSERT_EQ(0u, conn->send_counter()); + ASSERT_TRUE(conn->last_send_data().empty()); +} + +TEST_F(positive_test_connection_to_levin_protocol_handler_calls, handler_processes_handle_read_as_fragment) +{ + // Setup + const int expected_command = 4673261; + const int expected_fragmented_command = 46732; + const std::string in_data(256, 'e'); + std::string in_fragmented_data(1024 * 4, 'c'); + + const epee::byte_slice noise = epee::levin::make_noise_notify(1024); + const epee::byte_slice notify = epee::levin::make_notify(expected_command, epee::strspan<std::uint8_t>(in_data)); + epee::byte_slice fragmented = epee::levin::make_fragmented_notify(noise, expected_fragmented_command, epee::strspan<std::uint8_t>(in_fragmented_data)); + + EXPECT_EQ(5u, fragmented.size() / 1024); + EXPECT_EQ(0u, fragmented.size() % 1024); + + test_connection_ptr conn = create_connection(); + + while (!fragmented.empty()) + { + if ((fragmented.size() / 1024) % 2 == 1) + { + ASSERT_TRUE(conn->m_protocol_handler.handle_recv(notify.data(), notify.size())); + } + + ASSERT_EQ(3u - (fragmented.size() / 2048), m_commands_handler.notify_counter()); + ASSERT_EQ(0u, m_commands_handler.invoke_counter()); + ASSERT_EQ(expected_command, m_commands_handler.last_command()); + ASSERT_EQ(in_data, m_commands_handler.last_in_buf()); + ASSERT_EQ(0u, conn->send_counter()); + ASSERT_TRUE(conn->last_send_data().empty()); + + epee::byte_slice next = fragmented.take_slice(1024); + ASSERT_TRUE(conn->m_protocol_handler.handle_recv(next.data(), next.size())); + } + + in_fragmented_data.resize(((1024 - sizeof(epee::levin::bucket_head2)) * 5) - sizeof(epee::levin::bucket_head2)); // add padding zeroes + ASSERT_EQ(4u, m_commands_handler.notify_counter()); + ASSERT_EQ(0u, m_commands_handler.invoke_counter()); + ASSERT_EQ(expected_fragmented_command, m_commands_handler.last_command()); + ASSERT_EQ(in_fragmented_data, m_commands_handler.last_in_buf()); + ASSERT_EQ(0u, conn->send_counter()); + ASSERT_TRUE(conn->last_send_data().empty()); + + + ASSERT_TRUE(conn->m_protocol_handler.handle_recv(notify.data(), notify.size())); + + ASSERT_EQ(5u, m_commands_handler.notify_counter()); + ASSERT_EQ(0u, m_commands_handler.invoke_counter()); + ASSERT_EQ(expected_command, m_commands_handler.last_command()); + ASSERT_EQ(in_data, m_commands_handler.last_in_buf()); + ASSERT_EQ(0u, conn->send_counter()); + ASSERT_TRUE(conn->last_send_data().empty()); +} + + TEST_F(test_levin_protocol_handler__hanle_recv_with_invalid_data, handles_big_packet_1) { std::string buf("yyyyyy"); @@ -534,3 +623,19 @@ TEST_F(test_levin_protocol_handler__hanle_recv_with_invalid_data, handles_unexpe ASSERT_FALSE(m_conn->m_protocol_handler.handle_recv(m_buf.data(), m_buf.size())); } + +TEST_F(test_levin_protocol_handler__hanle_recv_with_invalid_data, handles_short_fragment) +{ + m_req_head.m_cb = 1; + m_req_head.m_flags = LEVIN_PACKET_BEGIN; + m_req_head.m_command = 0; + m_in_data.resize(1); + prepare_buf(); + + ASSERT_TRUE(m_conn->m_protocol_handler.handle_recv(m_buf.data(), m_buf.size())); + + m_req_head.m_flags = LEVIN_PACKET_END; + prepare_buf(); + + ASSERT_FALSE(m_conn->m_protocol_handler.handle_recv(m_buf.data(), m_buf.size())); +} diff --git a/tests/unit_tests/epee_utils.cpp b/tests/unit_tests/epee_utils.cpp index 32328edd9..6f887afda 100644 --- a/tests/unit_tests/epee_utils.cpp +++ b/tests/unit_tests/epee_utils.cpp @@ -44,6 +44,7 @@ #include "boost/archive/portable_binary_iarchive.hpp" #include "boost/archive/portable_binary_oarchive.hpp" +#include "byte_slice.h" #include "hex.h" #include "net/net_utils_base.h" #include "net/local_ip.h" @@ -375,6 +376,438 @@ TEST(Span, ToMutSpan) EXPECT_EQ((std::vector<unsigned>{1, 2, 3, 4}), mut); } +TEST(ByteSlice, Construction) +{ + EXPECT_TRUE(std::is_default_constructible<epee::byte_slice>()); + EXPECT_TRUE(std::is_move_constructible<epee::byte_slice>()); + EXPECT_FALSE(std::is_copy_constructible<epee::byte_slice>()); + EXPECT_TRUE(std::is_move_assignable<epee::byte_slice>()); + EXPECT_FALSE(std::is_copy_assignable<epee::byte_slice>()); +} + +TEST(ByteSlice, NoExcept) +{ + EXPECT_TRUE(std::is_nothrow_default_constructible<epee::byte_slice>()); + EXPECT_TRUE(std::is_nothrow_move_constructible<epee::byte_slice>()); + EXPECT_TRUE(std::is_nothrow_move_assignable<epee::byte_slice>()); + + epee::byte_slice lvalue{}; + const epee::byte_slice clvalue{}; + + EXPECT_TRUE(noexcept(lvalue.clone())); + EXPECT_TRUE(noexcept(clvalue.clone())); + + EXPECT_TRUE(noexcept(lvalue.begin())); + EXPECT_TRUE(noexcept(clvalue.begin())); + EXPECT_TRUE(noexcept(lvalue.end())); + EXPECT_TRUE(noexcept(clvalue.end())); + + EXPECT_TRUE(noexcept(lvalue.cbegin())); + EXPECT_TRUE(noexcept(clvalue.cbegin())); + EXPECT_TRUE(noexcept(lvalue.cend())); + EXPECT_TRUE(noexcept(clvalue.cend())); + + EXPECT_TRUE(noexcept(lvalue.empty())); + EXPECT_TRUE(noexcept(clvalue.empty())); + + EXPECT_TRUE(noexcept(lvalue.data())); + EXPECT_TRUE(noexcept(clvalue.data())); + EXPECT_TRUE(noexcept(lvalue.size())); + EXPECT_TRUE(noexcept(clvalue.size())); + + EXPECT_TRUE(noexcept(lvalue.remove_prefix(0))); + EXPECT_TRUE(noexcept(lvalue.take_slice(0))); +} + +TEST(ByteSlice, Empty) +{ + epee::byte_slice slice{}; + + EXPECT_EQ(slice.begin(), slice.end()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_TRUE(slice.empty()); + EXPECT_EQ(0u, slice.size()); + EXPECT_EQ(slice.begin(), slice.data()); + + EXPECT_EQ(0u, slice.get_slice(0, 0).size()); + EXPECT_THROW(slice.get_slice(0, 1), std::out_of_range); + EXPECT_EQ(0u, slice.remove_prefix(1)); + EXPECT_EQ(0u, slice.take_slice(1).size()); +} + +TEST(ByteSlice, CopySpans) +{ + const epee::span<const std::uint8_t> part1 = epee::as_byte_span("this is part1"); + const epee::span<const std::uint8_t> part2 = epee::as_byte_span("then part2"); + const epee::span<const std::uint8_t> part3 = epee::as_byte_span("finally part3"); + + const epee::byte_slice slice{part1, part2, part3}; + + EXPECT_NE(nullptr, slice.begin()); + EXPECT_NE(nullptr, slice.end()); + EXPECT_NE(slice.begin(), slice.end()); + EXPECT_NE(slice.cbegin(), slice.cend()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(slice.end(), slice.cend()); + ASSERT_EQ(slice.size(), std::size_t(slice.end() - slice.begin())); + + EXPECT_FALSE(slice.empty()); + EXPECT_EQ(slice.begin(), slice.data()); + ASSERT_EQ(part1.size() + part2.size() + part3.size(), slice.size()); + EXPECT_TRUE( + boost::range::equal( + part1, boost::make_iterator_range(slice.begin(), slice.begin() + part1.size()) + ) + ); + EXPECT_TRUE( + boost::range::equal( + part2, boost::make_iterator_range(slice.begin() + part1.size(), slice.end() - part3.size()) + ) + ); + EXPECT_TRUE( + boost::range::equal( + part3, boost::make_iterator_range(slice.end() - part3.size(), slice.end()) + ) + ); +} + +TEST(ByteSlice, AdaptString) +{ + static constexpr const char base_string[] = "this is an example message"; + std::string adapted = base_string; + + const epee::span<const uint8_t> original = epee::to_byte_span(epee::to_span(adapted)); + const epee::byte_slice slice{std::move(adapted)}; + + EXPECT_EQ(original.begin(), slice.begin()); + EXPECT_EQ(original.cbegin(), slice.cbegin()); + EXPECT_EQ(original.end(), slice.end()); + EXPECT_EQ(original.cend(), slice.cend()); + + EXPECT_FALSE(slice.empty()); + EXPECT_EQ(original.data(), slice.data()); + EXPECT_EQ(original.size(), slice.size()); + EXPECT_TRUE(boost::range::equal(boost::string_ref{base_string}, slice)); +} + +TEST(ByteSlice, EmptyAdaptString) +{ + epee::byte_slice slice{std::string{}}; + + EXPECT_EQ(slice.begin(), slice.end()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_TRUE(slice.empty()); + EXPECT_EQ(0u, slice.size()); + EXPECT_EQ(slice.begin(), slice.data()); + + EXPECT_EQ(0u, slice.get_slice(0, 0).size()); + EXPECT_THROW(slice.get_slice(0, 1), std::out_of_range); + EXPECT_EQ(0u, slice.remove_prefix(1)); + EXPECT_EQ(0u, slice.take_slice(1).size()); +} + +TEST(ByteSlice, AdaptVector) +{ + static constexpr const char base_string[] = "this is an example message"; + std::vector<std::uint8_t> adapted(sizeof(base_string)); + + ASSERT_EQ(sizeof(base_string), adapted.size()); + std::memcpy(adapted.data(), base_string, sizeof(base_string)); + + const epee::span<const uint8_t> original = epee::to_span(adapted); + const epee::byte_slice slice{std::move(adapted)}; + + EXPECT_EQ(sizeof(base_string), original.size()); + + EXPECT_EQ(original.begin(), slice.begin()); + EXPECT_EQ(original.cbegin(), slice.cbegin()); + EXPECT_EQ(original.end(), slice.end()); + EXPECT_EQ(original.cend(), slice.cend()); + + EXPECT_FALSE(slice.empty()); + EXPECT_EQ(original.data(), slice.data()); + EXPECT_EQ(original.size(), slice.size()); + EXPECT_TRUE(boost::range::equal(base_string, slice)); +} + +TEST(ByteSlice, EmptyAdaptVector) +{ + epee::byte_slice slice{std::vector<std::uint8_t>{}}; + + EXPECT_EQ(slice.begin(), slice.end()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_TRUE(slice.empty()); + EXPECT_EQ(0u, slice.size()); + EXPECT_EQ(slice.begin(), slice.data()); + + EXPECT_EQ(0u, slice.get_slice(0, 0).size()); + EXPECT_THROW(slice.get_slice(0, 1), std::out_of_range); + EXPECT_EQ(0u, slice.remove_prefix(1)); + EXPECT_EQ(0u, slice.take_slice(1).size()); +} + +TEST(ByteSlice, Move) +{ + static constexpr const char base_string[] = "another example message"; + + epee::byte_slice slice{epee::as_byte_span(base_string)}; + EXPECT_TRUE(boost::range::equal(base_string, slice)); + + const epee::span<const std::uint8_t> original = epee::to_span(slice); + epee::byte_slice moved{std::move(slice)}; + EXPECT_TRUE(boost::range::equal(base_string, moved)); + + EXPECT_EQ(slice.begin(), slice.end()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_EQ(original.begin(), moved.begin()); + EXPECT_EQ(moved.begin(), moved.cbegin()); + EXPECT_EQ(original.end(), moved.end()); + EXPECT_EQ(moved.end(), moved.cend()); + + EXPECT_TRUE(slice.empty()); + EXPECT_EQ(slice.begin(), slice.data()); + EXPECT_EQ(0u, slice.size()); + + EXPECT_FALSE(moved.empty()); + EXPECT_EQ(moved.begin(), moved.data()); + EXPECT_EQ(original.size(), moved.size()); + + slice = std::move(moved); + EXPECT_TRUE(boost::range::equal(base_string, slice)); + + EXPECT_EQ(original.begin(), slice.begin()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(original.end(), slice.end()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_FALSE(slice.empty()); + EXPECT_EQ(slice.begin(), slice.data()); + EXPECT_EQ(original.size(), slice.size()); + + EXPECT_TRUE(moved.empty()); + EXPECT_EQ(moved.begin(), moved.data()); + EXPECT_EQ(0u, moved.size()); +} + +TEST(ByteSlice, Clone) +{ + static constexpr const char base_string[] = "another example message"; + + const epee::byte_slice slice{epee::as_byte_span(base_string)}; + EXPECT_TRUE(boost::range::equal(base_string, slice)); + + const epee::byte_slice clone{slice.clone()}; + EXPECT_TRUE(boost::range::equal(base_string, clone)); + + EXPECT_EQ(slice.begin(), clone.begin()); + EXPECT_EQ(slice.cbegin(), clone.cbegin()); + EXPECT_EQ(slice.end(), clone.end()); + EXPECT_EQ(slice.cend(), clone.cend()); + + EXPECT_FALSE(slice.empty()); + EXPECT_FALSE(clone.empty()); + EXPECT_EQ(slice.cbegin(), slice.data()); + EXPECT_EQ(slice.data(), clone.data()); + EXPECT_EQ(sizeof(base_string), slice.size()); + EXPECT_EQ(slice.size(), clone.size()); +} + +TEST(ByteSlice, RemovePrefix) +{ + static constexpr const char base_string[] = "another example message"; + static constexpr std::size_t remove_size = sizeof("another"); + static constexpr std::size_t remaining = sizeof(base_string) - remove_size; + + epee::byte_slice slice{epee::as_byte_span(base_string)}; + EXPECT_TRUE(boost::range::equal(base_string, slice)); + + const epee::span<const std::uint8_t> original = epee::to_span(slice); + EXPECT_EQ(remove_size, slice.remove_prefix(remove_size)); + + EXPECT_EQ(original.begin() + remove_size, slice.begin()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(original.end(), slice.end()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_FALSE(slice.empty()); + EXPECT_EQ(slice.cbegin(), slice.data()); + EXPECT_EQ(remaining, slice.size()); + + // touch original pointers to check "free" status + EXPECT_TRUE(boost::range::equal(base_string, original)); + + EXPECT_EQ(remaining, slice.remove_prefix(remaining + 1)); + + EXPECT_EQ(slice.begin(), slice.end()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_TRUE(slice.empty()); + EXPECT_EQ(slice.cbegin(), slice.data()); + EXPECT_EQ(0, slice.size()); +} + +TEST(ByteSlice, TakeSlice) +{ + static constexpr const char base_string[] = "another example message"; + static constexpr std::size_t remove_size = sizeof("another"); + static constexpr std::size_t remaining = sizeof(base_string) - remove_size; + + epee::byte_slice slice{epee::as_byte_span(base_string)}; + EXPECT_TRUE(boost::range::equal(base_string, slice)); + + const epee::span<const std::uint8_t> original = epee::to_span(slice); + const epee::byte_slice slice2 = slice.take_slice(remove_size); + + EXPECT_EQ(original.begin() + remove_size, slice.begin()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(original.end(), slice.end()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_EQ(original.begin(), slice2.begin()); + EXPECT_EQ(slice2.begin(), slice2.cbegin()); + EXPECT_EQ(original.begin() + remove_size, slice2.end()); + EXPECT_EQ(slice2.end(), slice2.cend()); + + EXPECT_FALSE(slice.empty()); + EXPECT_EQ(slice.cbegin(), slice.data()); + EXPECT_EQ(remaining, slice.size()); + + EXPECT_FALSE(slice2.empty()); + EXPECT_EQ(slice2.cbegin(), slice2.data()); + EXPECT_EQ(remove_size, slice2.size()); + + // touch original pointers to check "free" status + EXPECT_TRUE(boost::range::equal(base_string, original)); + + const epee::byte_slice slice3 = slice.take_slice(remaining + 1); + + EXPECT_EQ(slice.begin(), slice.end()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_EQ(original.begin(), slice2.begin()); + EXPECT_EQ(slice2.begin(), slice2.cbegin()); + EXPECT_EQ(original.begin() + remove_size, slice2.end()); + EXPECT_EQ(slice2.end(), slice2.cend()); + + EXPECT_EQ(slice2.end(), slice3.begin()); + EXPECT_EQ(slice3.begin(), slice3.cbegin()); + EXPECT_EQ(original.end(), slice3.end()); + EXPECT_EQ(slice3.end(), slice3.cend()); + + EXPECT_TRUE(slice.empty()); + EXPECT_EQ(slice.cbegin(), slice.data()); + EXPECT_EQ(0, slice.size()); + + EXPECT_FALSE(slice2.empty()); + EXPECT_EQ(slice2.cbegin(), slice2.data()); + EXPECT_EQ(remove_size, slice2.size()); + + EXPECT_FALSE(slice3.empty()); + EXPECT_EQ(slice3.cbegin(), slice3.data()); + EXPECT_EQ(remaining, slice3.size()); + + // touch original pointers to check "free" status + slice = nullptr; + EXPECT_TRUE(boost::range::equal(base_string, original)); +} + +TEST(ByteSlice, GetSlice) +{ + static constexpr const char base_string[] = "another example message"; + static constexpr std::size_t get_size = sizeof("another"); + static constexpr std::size_t get2_size = sizeof(base_string) - get_size; + + epee::span<const std::uint8_t> original{}; + epee::byte_slice slice2{}; + epee::byte_slice slice3{}; + + // make sure get_slice increments ref count + { + const epee::byte_slice slice{epee::as_byte_span(base_string)}; + EXPECT_TRUE(boost::range::equal(base_string, slice)); + + original = epee::to_span(slice); + slice2 = slice.get_slice(0, get_size); + + EXPECT_EQ(original.begin(), slice.begin()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(original.end(), slice.end()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_EQ(original.begin(), slice2.begin()); + EXPECT_EQ(slice2.begin(), slice2.cbegin()); + EXPECT_EQ(original.begin() + get_size, slice2.end()); + EXPECT_EQ(slice2.end(), slice2.cend()); + + EXPECT_FALSE(slice.empty()); + EXPECT_EQ(slice.cbegin(), slice.data()); + EXPECT_EQ(original.size(), slice.size()); + + EXPECT_FALSE(slice2.empty()); + EXPECT_EQ(slice2.cbegin(), slice2.data()); + EXPECT_EQ(get_size, slice2.size()); + + // touch original pointers to check "free" status + EXPECT_TRUE(boost::range::equal(base_string, original)); + + slice3 = slice.get_slice(get_size, sizeof(base_string)); + + EXPECT_EQ(original.begin(), slice.begin()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(original.end(), slice.end()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_EQ(original.begin(), slice2.begin()); + EXPECT_EQ(slice2.begin(), slice2.cbegin()); + EXPECT_EQ(original.begin() + get_size, slice2.end()); + EXPECT_EQ(slice2.end(), slice2.cend()); + + EXPECT_EQ(slice2.end(), slice3.begin()); + EXPECT_EQ(slice3.begin(), slice3.cbegin()); + EXPECT_EQ(original.end(), slice3.end()); + EXPECT_EQ(slice3.end(), slice3.cend()); + + EXPECT_FALSE(slice.empty()); + EXPECT_EQ(slice.cbegin(), slice.data()); + EXPECT_EQ(original.size(), slice.size()); + + EXPECT_FALSE(slice2.empty()); + EXPECT_EQ(slice2.cbegin(), slice2.data()); + EXPECT_EQ(get_size, slice2.size()); + + EXPECT_FALSE(slice3.empty()); + EXPECT_EQ(slice3.cbegin(), slice3.data()); + EXPECT_EQ(get2_size, slice3.size()); + + EXPECT_THROW(slice.get_slice(1, 0), std::out_of_range); + EXPECT_THROW(slice.get_slice(0, sizeof(base_string) + 1), std::out_of_range); + EXPECT_THROW(slice.get_slice(sizeof(base_string) + 1, sizeof(base_string) + 1), std::out_of_range); + EXPECT_TRUE(slice.get_slice(sizeof(base_string), sizeof(base_string)).empty()); + + EXPECT_EQ(original.begin(), slice.begin()); + EXPECT_EQ(slice.begin(), slice.cbegin()); + EXPECT_EQ(original.end(), slice.end()); + EXPECT_EQ(slice.end(), slice.cend()); + + EXPECT_FALSE(slice.empty()); + EXPECT_EQ(slice.cbegin(), slice.data()); + EXPECT_EQ(original.size(), slice.size()); + } + + // touch original pointers to check "free" status + EXPECT_TRUE(boost::range::equal(base_string, original)); +} + TEST(ToHex, String) { EXPECT_TRUE(epee::to_hex::string(nullptr).empty()); diff --git a/tests/unit_tests/levin.cpp b/tests/unit_tests/levin.cpp new file mode 100644 index 000000000..3188167f9 --- /dev/null +++ b/tests/unit_tests/levin.cpp @@ -0,0 +1,586 @@ +// Copyright (c) 2019, The Monero Project +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include <algorithm> +#include <boost/uuid/nil_generator.hpp> +#include <boost/uuid/random_generator.hpp> +#include <boost/uuid/uuid.hpp> +#include <cstring> +#include <gtest/gtest.h> +#include <limits> +#include <set> + +#include "byte_slice.h" +#include "crypto/crypto.h" +#include "cryptonote_basic/connection_context.h" +#include "cryptonote_protocol/cryptonote_protocol_defs.h" +#include "cryptonote_protocol/levin_notify.h" +#include "int-util.h" +#include "p2p/net_node.h" +#include "net/dandelionpp.h" +#include "net/levin_base.h" +#include "span.h" + +namespace +{ + class test_endpoint final : public epee::net_utils::i_service_endpoint + { + boost::asio::io_service& io_service_; + std::size_t ref_count_; + + virtual bool do_send(epee::byte_slice message) override final + { + send_queue_.push_back(std::move(message)); + return true; + } + + virtual bool close() override final + { + return true; + } + + virtual bool send_done() override final + { + throw std::logic_error{"send_done not implemented"}; + } + + virtual bool call_run_once_service_io() override final + { + return io_service_.run_one(); + } + + virtual bool request_callback() override final + { + throw std::logic_error{"request_callback not implemented"}; + } + + virtual boost::asio::io_service& get_io_service() override final + { + return io_service_; + } + + virtual bool add_ref() override final + { + ++ref_count_; + return true; + } + + virtual bool release() override final + { + --ref_count_; + return true; + } + + public: + test_endpoint(boost::asio::io_service& io_service) + : epee::net_utils::i_service_endpoint(), + io_service_(io_service), + ref_count_(0), + send_queue_() + {} + + virtual ~test_endpoint() noexcept(false) override final + { + EXPECT_EQ(0u, ref_count_); + } + + std::deque<epee::byte_slice> send_queue_; + }; + + class test_connection + { + test_endpoint endpoint_; + cryptonote::levin::detail::p2p_context context_; + epee::levin::async_protocol_handler<cryptonote::levin::detail::p2p_context> handler_; + + public: + test_connection(boost::asio::io_service& io_service, cryptonote::levin::connections& connections, boost::uuids::random_generator& random_generator) + : context_(), + endpoint_(io_service), + handler_(std::addressof(endpoint_), connections, context_) + { + const_cast<boost::uuids::uuid&>(context_.m_connection_id) = random_generator(); + handler_.after_init_connection(); + } + + //\return Number of messages processed + std::size_t process_send_queue() + { + std::size_t count = 0; + for ( ; !endpoint_.send_queue_.empty(); ++count, endpoint_.send_queue_.pop_front()) + { + // invalid messages shoudn't be possible in this test; + EXPECT_TRUE(handler_.handle_recv(endpoint_.send_queue_.front().data(), endpoint_.send_queue_.front().size())); + } + return count; + } + + const boost::uuids::uuid& get_id() const noexcept + { + return context_.m_connection_id; + } + }; + + struct received_message + { + boost::uuids::uuid connection; + int command; + std::string payload; + }; + + class test_receiver : public epee::levin::levin_commands_handler<cryptonote::levin::detail::p2p_context> + { + std::deque<received_message> invoked_; + std::deque<received_message> notified_; + + template<typename T> + static std::pair<boost::uuids::uuid, typename T::request> get_message(std::deque<received_message>& queue) + { + if (queue.empty()) + throw std::logic_error{"Queue has no received messges"}; + + if (queue.front().command != T::ID) + throw std::logic_error{"Unexpected ID at front of message queue"}; + + epee::serialization::portable_storage storage{}; + if(!storage.load_from_binary(epee::strspan<std::uint8_t>(queue.front().payload))) + throw std::logic_error{"Unable to parse epee binary format"}; + + typename T::request request{}; + if (!request.load(storage)) + throw std::logic_error{"Unable to load into expected request"}; + + boost::uuids::uuid connection = queue.front().connection; + queue.pop_front(); + return {connection, std::move(request)}; + } + + virtual int invoke(int command, const epee::span<const uint8_t> in_buff, std::string& buff_out, cryptonote::levin::detail::p2p_context& context) override final + { + buff_out.clear(); + invoked_.push_back( + {context.m_connection_id, command, std::string{reinterpret_cast<const char*>(in_buff.data()), in_buff.size()}} + ); + return 1; + } + + virtual int notify(int command, const epee::span<const uint8_t> in_buff, cryptonote::levin::detail::p2p_context& context) override final + { + notified_.push_back( + {context.m_connection_id, command, std::string{reinterpret_cast<const char*>(in_buff.data()), in_buff.size()}} + ); + return 1; + } + + virtual void callback(cryptonote::levin::detail::p2p_context& context) override final + {} + + virtual void on_connection_new(cryptonote::levin::detail::p2p_context&) override final + {} + + virtual void on_connection_close(cryptonote::levin::detail::p2p_context&) override final + {} + + public: + test_receiver() + : epee::levin::levin_commands_handler<cryptonote::levin::detail::p2p_context>(), + invoked_(), + notified_() + {} + + virtual ~test_receiver() noexcept override final{} + + std::size_t invoked_size() const noexcept + { + return invoked_.size(); + } + + std::size_t notified_size() const noexcept + { + return notified_.size(); + } + + template<typename T> + std::pair<boost::uuids::uuid, typename T::request> get_invoked() + { + return get_message<T>(invoked_); + } + + template<typename T> + std::pair<boost::uuids::uuid, typename T::request> get_notification() + { + return get_message<T>(notified_); + } + }; + + class levin_notify : public ::testing::Test + { + const std::shared_ptr<cryptonote::levin::connections> connections_; + std::set<boost::uuids::uuid> connection_ids_; + + public: + levin_notify() + : ::testing::Test(), + connections_(std::make_shared<cryptonote::levin::connections>()), + connection_ids_(), + random_generator_(), + io_service_(), + receiver_(), + contexts_() + { + connections_->set_handler(std::addressof(receiver_), nullptr); + } + + virtual void TearDown() override final + { + EXPECT_EQ(0u, receiver_.invoked_size()); + EXPECT_EQ(0u, receiver_.notified_size()); + } + + void add_connection() + { + contexts_.emplace_back(io_service_, *connections_, random_generator_); + EXPECT_TRUE(connection_ids_.emplace(contexts_.back().get_id()).second); + EXPECT_EQ(connection_ids_.size(), connections_->get_connections_count()); + } + + cryptonote::levin::notify make_notifier(const std::size_t noise_size) + { + epee::byte_slice noise = nullptr; + if (noise_size) + noise = epee::levin::make_noise_notify(noise_size); + return cryptonote::levin::notify{io_service_, connections_, std::move(noise)}; + } + + boost::uuids::random_generator random_generator_; + boost::asio::io_service io_service_; + test_receiver receiver_; + std::deque<test_connection> contexts_; + }; +} + +TEST(make_header, no_expect_return) +{ + static constexpr const std::size_t max_length = std::numeric_limits<std::size_t>::max(); + + const epee::levin::bucket_head2 header1 = epee::levin::make_header(1024, max_length, 5601, false); + EXPECT_EQ(SWAP64LE(LEVIN_SIGNATURE), header1.m_signature); + EXPECT_FALSE(header1.m_have_to_return_data); + EXPECT_EQ(SWAP64LE(max_length), header1.m_cb); + EXPECT_EQ(SWAP32LE(1024), header1.m_command); + EXPECT_EQ(SWAP32LE(LEVIN_PROTOCOL_VER_1), header1.m_protocol_version); + EXPECT_EQ(SWAP32LE(5601), header1.m_flags); +} + +TEST(make_header, expect_return) +{ + const epee::levin::bucket_head2 header1 = epee::levin::make_header(65535, 0, 0, true); + EXPECT_EQ(SWAP64LE(LEVIN_SIGNATURE), header1.m_signature); + EXPECT_TRUE(header1.m_have_to_return_data); + EXPECT_EQ(0u, header1.m_cb); + EXPECT_EQ(SWAP32LE(65535), header1.m_command); + EXPECT_EQ(SWAP32LE(LEVIN_PROTOCOL_VER_1), header1.m_protocol_version); + EXPECT_EQ(0u, header1.m_flags); +} + +TEST(make_notify, empty_payload) +{ + const epee::byte_slice message = epee::levin::make_notify(443, nullptr); + const epee::levin::bucket_head2 header = + epee::levin::make_header(443, 0, LEVIN_PACKET_REQUEST, false); + ASSERT_EQ(sizeof(header), message.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0); +} + +TEST(make_notify, with_payload) +{ + std::string bytes(100, 'a'); + std::generate(bytes.begin(), bytes.end(), crypto::random_device{}); + + const epee::byte_slice message = epee::levin::make_notify(443, epee::strspan<std::uint8_t>(bytes)); + const epee::levin::bucket_head2 header = + epee::levin::make_header(443, bytes.size(), LEVIN_PACKET_REQUEST, false); + + ASSERT_EQ(sizeof(header) + bytes.size(), message.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), message.data(), sizeof(header)) == 0); + EXPECT_TRUE(std::memcmp(bytes.data(), message.data() + sizeof(header), bytes.size()) == 0); +} + +TEST(make_noise, invalid) +{ + EXPECT_TRUE(epee::levin::make_noise_notify(sizeof(epee::levin::bucket_head2) - 1).empty()); +} + +TEST(make_noise, valid) +{ + static constexpr const std::uint32_t flags = + LEVIN_PACKET_BEGIN | LEVIN_PACKET_END; + + const epee::byte_slice noise = epee::levin::make_noise_notify(1024); + const epee::levin::bucket_head2 header = + epee::levin::make_header(0, 1024 - sizeof(epee::levin::bucket_head2), flags, false); + + ASSERT_EQ(1024, noise.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), noise.data(), sizeof(header)) == 0); + EXPECT_EQ(1024 - sizeof(header), std::count(noise.cbegin() + sizeof(header), noise.cend(), 0)); +} + +TEST(make_fragment, invalid) +{ + EXPECT_TRUE(epee::levin::make_fragmented_notify(nullptr, 0, nullptr).empty()); +} + +TEST(make_fragment, single) +{ + const epee::byte_slice noise = epee::levin::make_noise_notify(1024); + const epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise, 11, nullptr); + const epee::levin::bucket_head2 header = + epee::levin::make_header(11, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_REQUEST, false); + + EXPECT_EQ(1024, noise.size()); + ASSERT_EQ(1024, fragment.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), fragment.data(), sizeof(header)) == 0); + EXPECT_EQ(1024 - sizeof(header), std::count(noise.cbegin() + sizeof(header), noise.cend(), 0)); +} + +TEST(make_fragment, multiple) +{ + std::string bytes(1024 * 3 - 150, 'a'); + std::generate(bytes.begin(), bytes.end(), crypto::random_device{}); + + const epee::byte_slice noise = epee::levin::make_noise_notify(1024); + epee::byte_slice fragment = epee::levin::make_fragmented_notify(noise, 114, epee::strspan<std::uint8_t>(bytes)); + + epee::levin::bucket_head2 header = + epee::levin::make_header(0, 1024 - sizeof(epee::levin::bucket_head2), LEVIN_PACKET_BEGIN, false); + + ASSERT_LE(sizeof(header), fragment.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), fragment.data(), sizeof(header)) == 0); + + fragment.take_slice(sizeof(header)); + header.m_flags = LEVIN_PACKET_REQUEST; + header.m_cb = bytes.size(); + header.m_command = 114; + + ASSERT_LE(sizeof(header), fragment.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), fragment.data(), sizeof(header)) == 0); + + fragment.take_slice(sizeof(header)); + + ASSERT_LE(bytes.size(), fragment.size()); + EXPECT_TRUE(std::memcmp(bytes.data(), fragment.data(), 1024 - sizeof(header) * 2) == 0); + + bytes.erase(0, 1024 - sizeof(header) * 2); + fragment.take_slice(1024 - sizeof(header) * 2); + header.m_flags = 0; + header.m_cb = 1024 - sizeof(header); + header.m_command = 0; + + ASSERT_LE(sizeof(header), fragment.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), fragment.data(), sizeof(header)) == 0); + + fragment.take_slice(sizeof(header)); + + ASSERT_LE(bytes.size(), fragment.size()); + EXPECT_TRUE(std::memcmp(bytes.data(), fragment.data(), 1024 - sizeof(header)) == 0); + + bytes.erase(0, 1024 - sizeof(header)); + fragment.take_slice(1024 - sizeof(header)); + header.m_flags = LEVIN_PACKET_END; + + ASSERT_LE(sizeof(header), fragment.size()); + EXPECT_TRUE(std::memcmp(std::addressof(header), fragment.data(), sizeof(header)) == 0); + + fragment.take_slice(sizeof(header)); + EXPECT_TRUE(std::memcmp(bytes.data(), fragment.data(), bytes.size()) == 0); + + fragment.take_slice(bytes.size()); + + EXPECT_EQ(18, std::count(fragment.cbegin(), fragment.cend(), 0)); +} + +TEST_F(levin_notify, defaulted) +{ + cryptonote::levin::notify notifier{}; + { + const auto status = notifier.get_status(); + EXPECT_FALSE(status.has_noise); + EXPECT_FALSE(status.connections_filled); + } + EXPECT_FALSE(notifier.send_txs({}, random_generator_(), false)); +} + +TEST_F(levin_notify, flood) +{ + cryptonote::levin::notify notifier = make_notifier(0); + + for (unsigned count = 0; count < 10; ++count) + add_connection(); + + { + const auto status = notifier.get_status(); + EXPECT_FALSE(status.has_noise); + EXPECT_FALSE(status.connections_filled); + } + notifier.new_out_connection(); + io_service_.poll(); + { + const auto status = notifier.get_status(); + EXPECT_FALSE(status.has_noise); + EXPECT_FALSE(status.connections_filled); // not tracked + } + + std::vector<cryptonote::blobdata> txs(2); + txs[0].resize(100, 'e'); + txs[1].resize(200, 'f'); + + ASSERT_EQ(10u, contexts_.size()); + { + auto context = contexts_.begin(); + EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), false)); + + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + EXPECT_EQ(0u, context->process_send_queue()); + for (++context; context != contexts_.end(); ++context) + EXPECT_EQ(1u, context->process_send_queue()); + + ASSERT_EQ(9u, receiver_.notified_size()); + for (unsigned count = 0; count < 9; ++count) + { + auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; + EXPECT_EQ(txs, notification.txs); + EXPECT_TRUE(notification._.empty()); + } + } + + ASSERT_EQ(10u, contexts_.size()); + { + auto context = contexts_.begin(); + EXPECT_TRUE(notifier.send_txs(txs, context->get_id(), true)); + + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + EXPECT_EQ(0u, context->process_send_queue()); + for (++context; context != contexts_.end(); ++context) + EXPECT_EQ(1u, context->process_send_queue()); + + ASSERT_EQ(9u, receiver_.notified_size()); + for (unsigned count = 0; count < 9; ++count) + { + auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; + EXPECT_EQ(txs, notification.txs); + EXPECT_FALSE(notification._.empty()); + } + } +} + +TEST_F(levin_notify, noise) +{ + for (unsigned count = 0; count < 10; ++count) + add_connection(); + + std::vector<cryptonote::blobdata> txs(1); + txs[0].resize(1900, 'h'); + + const boost::uuids::uuid incoming_id = random_generator_(); + cryptonote::levin::notify notifier = make_notifier(2048); + + { + const auto status = notifier.get_status(); + EXPECT_TRUE(status.has_noise); + EXPECT_FALSE(status.connections_filled); + } + ASSERT_LT(0u, io_service_.poll()); + { + const auto status = notifier.get_status(); + EXPECT_TRUE(status.has_noise); + EXPECT_TRUE(status.connections_filled); + } + + notifier.run_stems(); + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + { + std::size_t sent = 0; + for (auto& context : contexts_) + sent += context.process_send_queue(); + + EXPECT_EQ(2u, sent); + EXPECT_EQ(0u, receiver_.notified_size()); + } + + EXPECT_TRUE(notifier.send_txs(txs, incoming_id, false)); + notifier.run_stems(); + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + { + std::size_t sent = 0; + for (auto& context : contexts_) + sent += context.process_send_queue(); + + ASSERT_EQ(2u, sent); + while (sent--) + { + auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; + EXPECT_EQ(txs, notification.txs); + EXPECT_TRUE(notification._.empty()); + } + } + + txs[0].resize(3000, 'r'); + EXPECT_TRUE(notifier.send_txs(txs, incoming_id, true)); + notifier.run_stems(); + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + { + std::size_t sent = 0; + for (auto& context : contexts_) + sent += context.process_send_queue(); + + EXPECT_EQ(2u, sent); + EXPECT_EQ(0u, receiver_.notified_size()); + } + + notifier.run_stems(); + io_service_.reset(); + ASSERT_LT(0u, io_service_.poll()); + { + std::size_t sent = 0; + for (auto& context : contexts_) + sent += context.process_send_queue(); + + ASSERT_EQ(2u, sent); + while (sent--) + { + auto notification = receiver_.get_notification<cryptonote::NOTIFY_NEW_TRANSACTIONS>().second; + EXPECT_EQ(txs, notification.txs); + EXPECT_TRUE(notification._.empty()); + } + } +} diff --git a/tests/unit_tests/net.cpp b/tests/unit_tests/net.cpp index 3acf75f3b..f24ffb45c 100644 --- a/tests/unit_tests/net.cpp +++ b/tests/unit_tests/net.cpp @@ -26,6 +26,7 @@ // STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF // THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#include <algorithm> #include <atomic> #include <boost/archive/portable_binary_oarchive.hpp> #include <boost/archive/portable_binary_iarchive.hpp> @@ -36,13 +37,22 @@ #include <boost/asio/steady_timer.hpp> #include <boost/asio/write.hpp> #include <boost/endian/conversion.hpp> +#include <boost/range/adaptor/sliced.hpp> +#include <boost/range/combine.hpp> #include <boost/system/error_code.hpp> #include <boost/thread/thread.hpp> +#include <boost/uuid/nil_generator.hpp> +#include <boost/uuid/random_generator.hpp> +#include <boost/uuid/uuid.hpp> +#include <cstdint> #include <cstring> #include <functional> #include <gtest/gtest.h> +#include <map> #include <memory> +#include <type_traits> +#include "net/dandelionpp.h" #include "net/error.h" #include "net/net_utils_base.h" #include "net/socks.h" @@ -857,3 +867,389 @@ TEST(socks_connector, timeout) EXPECT_THROW(sock.get().is_open(), boost::system::system_error); } +TEST(dandelionpp_map, traits) +{ + EXPECT_TRUE(std::is_default_constructible<net::dandelionpp::connection_map>()); + EXPECT_TRUE(std::is_move_constructible<net::dandelionpp::connection_map>()); + EXPECT_TRUE(std::is_move_assignable<net::dandelionpp::connection_map>()); + EXPECT_FALSE(std::is_copy_constructible<net::dandelionpp::connection_map>()); + EXPECT_FALSE(std::is_copy_assignable<net::dandelionpp::connection_map>()); +} + +TEST(dandelionpp_map, empty) +{ + const net::dandelionpp::connection_map mapper{}; + + EXPECT_EQ(mapper.begin(), mapper.end()); + EXPECT_EQ(0u, mapper.size()); + + const net::dandelionpp::connection_map cloned = mapper.clone(); + EXPECT_EQ(cloned.begin(), cloned.end()); + EXPECT_EQ(0u, cloned.size()); +} + +TEST(dandelionpp_map, zero_stems) +{ + std::vector<boost::uuids::uuid> connections{6}; + std::generate(connections.begin(), connections.end(), boost::uuids::random_generator{}); + + net::dandelionpp::connection_map mapper{connections, 0}; + EXPECT_EQ(mapper.begin(), mapper.end()); + EXPECT_EQ(0u, mapper.size()); + + for (const boost::uuids::uuid& connection : connections) + EXPECT_TRUE(mapper.get_stem(connection).is_nil()); + + EXPECT_FALSE(mapper.update(connections)); + EXPECT_EQ(mapper.begin(), mapper.end()); + EXPECT_EQ(0u, mapper.size()); + + for (const boost::uuids::uuid& connection : connections) + EXPECT_TRUE(mapper.get_stem(connection).is_nil()); + + const net::dandelionpp::connection_map cloned = mapper.clone(); + EXPECT_EQ(cloned.end(), cloned.begin()); + EXPECT_EQ(0u, cloned.size()); +} + +TEST(dandelionpp_map, dropped_connection) +{ + std::vector<boost::uuids::uuid> connections{6}; + std::generate(connections.begin(), connections.end(), boost::uuids::random_generator{}); + std::sort(connections.begin(), connections.end()); + + // select 3 of 6 outgoing connections + net::dandelionpp::connection_map mapper{connections, 3}; + EXPECT_EQ(3u, mapper.size()); + EXPECT_EQ(3, mapper.end() - mapper.begin()); + { + std::set<boost::uuids::uuid> used; + for (const boost::uuids::uuid& connection : mapper) + { + EXPECT_TRUE(used.insert(connection).second); + EXPECT_TRUE(std::binary_search(connections.begin(), connections.end(), connection)); + } + } + { + const net::dandelionpp::connection_map cloned = mapper.clone(); + EXPECT_EQ(3u, cloned.size()); + ASSERT_EQ(mapper.end() - mapper.begin(), cloned.end() - cloned.begin()); + for (auto elem : boost::combine(mapper, cloned)) + EXPECT_EQ(boost::get<0>(elem), boost::get<1>(elem)); + } + EXPECT_FALSE(mapper.update(connections)); + EXPECT_EQ(3u, mapper.size()); + ASSERT_EQ(3, mapper.end() - mapper.begin()); + { + std::set<boost::uuids::uuid> used; + for (const boost::uuids::uuid& connection : mapper) + { + EXPECT_FALSE(connection.is_nil()); + EXPECT_TRUE(used.insert(connection).second); + EXPECT_TRUE(std::binary_search(connections.begin(), connections.end(), connection)); + } + } + std::map<boost::uuids::uuid, boost::uuids::uuid> mapping; + std::vector<boost::uuids::uuid> in_connections{9}; + std::generate(in_connections.begin(), in_connections.end(), boost::uuids::random_generator{}); + { + std::map<boost::uuids::uuid, std::size_t> used; + std::multimap<boost::uuids::uuid, boost::uuids::uuid> inverse_mapping; + for (const boost::uuids::uuid& connection : in_connections) + { + const boost::uuids::uuid out = mapper.get_stem(connection); + EXPECT_FALSE(out.is_nil()); + EXPECT_TRUE(mapping.emplace(connection, out).second); + inverse_mapping.emplace(out, connection); + used[out]++; + } + + EXPECT_EQ(3u, used.size()); + for (const std::pair<boost::uuids::uuid, std::size_t>& entry : used) + EXPECT_EQ(3u, entry.second); + + for (const boost::uuids::uuid& connection : in_connections) + EXPECT_EQ(mapping[connection], mapper.get_stem(connection)); + + // drop 1 connection, and select replacement from 1 of unused 3. + const boost::uuids::uuid lost_connection = *(++mapper.begin()); + const auto elem = std::lower_bound(connections.begin(), connections.end(), lost_connection); + ASSERT_NE(connections.end(), elem); + ASSERT_EQ(lost_connection, *elem); + connections.erase(elem); + + EXPECT_TRUE(mapper.update(connections)); + EXPECT_EQ(3u, mapper.size()); + ASSERT_EQ(3, mapper.end() - mapper.begin()); + + for (const boost::uuids::uuid& connection : mapper) + { + EXPECT_FALSE(connection.is_nil()); + EXPECT_NE(lost_connection, connection); + } + + const boost::uuids::uuid newly_mapped = *(++mapper.begin()); + EXPECT_FALSE(newly_mapped.is_nil()); + EXPECT_NE(lost_connection, newly_mapped); + + for (auto elems = inverse_mapping.equal_range(lost_connection); elems.first != elems.second; ++elems.first) + mapping[elems.first->second] = newly_mapped; + } + { + const net::dandelionpp::connection_map cloned = mapper.clone(); + EXPECT_EQ(3u, cloned.size()); + ASSERT_EQ(mapper.end() - mapper.begin(), cloned.end() - cloned.begin()); + for (auto elem : boost::combine(mapper, cloned)) + EXPECT_EQ(boost::get<0>(elem), boost::get<1>(elem)); + } + // mappings should remain evenly distributed amongst 2, with 3 sitting in waiting + { + std::set<boost::uuids::uuid> used; + for (const boost::uuids::uuid& connection : mapper) + { + EXPECT_FALSE(connection.is_nil()); + EXPECT_TRUE(used.insert(connection).second); + EXPECT_TRUE(std::binary_search(connections.begin(), connections.end(), connection)); + } + } + { + std::map<boost::uuids::uuid, std::size_t> used; + for (const boost::uuids::uuid& connection : in_connections) + { + const boost::uuids::uuid& out = mapper.get_stem(connection); + EXPECT_FALSE(out.is_nil()); + EXPECT_EQ(mapping[connection], out); + used[out]++; + } + + EXPECT_EQ(3u, used.size()); + for (const std::pair<boost::uuids::uuid, std::size_t>& entry : used) + EXPECT_EQ(3u, entry.second); + } + { + const net::dandelionpp::connection_map cloned = mapper.clone(); + EXPECT_EQ(3u, cloned.size()); + ASSERT_EQ(mapper.end() - mapper.begin(), cloned.end() - cloned.begin()); + for (auto elem : boost::combine(mapper, cloned)) + EXPECT_EQ(boost::get<0>(elem), boost::get<1>(elem)); + } +} + +TEST(dandelionpp_map, dropped_connection_remapped) +{ + boost::uuids::random_generator random_uuid{}; + + std::vector<boost::uuids::uuid> connections{3}; + std::generate(connections.begin(), connections.end(), random_uuid); + std::sort(connections.begin(), connections.end()); + + // select 3 of 3 outgoing connections + net::dandelionpp::connection_map mapper{connections, 3}; + EXPECT_EQ(3u, mapper.size()); + EXPECT_EQ(3, mapper.end() - mapper.begin()); + { + std::set<boost::uuids::uuid> used; + for (const boost::uuids::uuid& connection : mapper) + { + EXPECT_FALSE(connection.is_nil()); + EXPECT_TRUE(used.insert(connection).second); + EXPECT_TRUE(std::binary_search(connections.begin(), connections.end(), connection)); + } + } + EXPECT_FALSE(mapper.update(connections)); + EXPECT_EQ(3u, mapper.size()); + ASSERT_EQ(3, mapper.end() - mapper.begin()); + { + std::set<boost::uuids::uuid> used; + for (const boost::uuids::uuid& connection : mapper) + { + EXPECT_FALSE(connection.is_nil()); + EXPECT_TRUE(used.insert(connection).second); + EXPECT_TRUE(std::binary_search(connections.begin(), connections.end(), connection)); + } + } + std::map<boost::uuids::uuid, boost::uuids::uuid> mapping; + std::vector<boost::uuids::uuid> in_connections{9}; + std::generate(in_connections.begin(), in_connections.end(), random_uuid); + { + std::map<boost::uuids::uuid, std::size_t> used; + std::multimap<boost::uuids::uuid, boost::uuids::uuid> inverse_mapping; + for (const boost::uuids::uuid& connection : in_connections) + { + const boost::uuids::uuid out = mapper.get_stem(connection); + EXPECT_FALSE(out.is_nil()); + EXPECT_TRUE(mapping.emplace(connection, out).second); + inverse_mapping.emplace(out, connection); + used[out]++; + } + + EXPECT_EQ(3u, used.size()); + for (const std::pair<boost::uuids::uuid, std::size_t>& entry : used) + EXPECT_EQ(3u, entry.second); + + for (const boost::uuids::uuid& connection : in_connections) + EXPECT_EQ(mapping[connection], mapper.get_stem(connection)); + + // drop 1 connection leaving "hole" + const boost::uuids::uuid lost_connection = *(++mapper.begin()); + const auto elem = std::lower_bound(connections.begin(), connections.end(), lost_connection); + ASSERT_NE(connections.end(), elem); + ASSERT_EQ(lost_connection, *elem); + connections.erase(elem); + + EXPECT_TRUE(mapper.update(connections)); + EXPECT_EQ(2u, mapper.size()); + EXPECT_EQ(3, mapper.end() - mapper.begin()); + + for (auto elems = inverse_mapping.equal_range(lost_connection); elems.first != elems.second; ++elems.first) + mapping[elems.first->second] = boost::uuids::nil_uuid(); + } + // remap 3 connections and map 1 new connection to 2 remaining out connections + in_connections.resize(10); + in_connections[9] = random_uuid(); + { + std::map<boost::uuids::uuid, std::size_t> used; + for (const boost::uuids::uuid& connection : in_connections) + { + const boost::uuids::uuid& out = mapper.get_stem(connection); + EXPECT_FALSE(out.is_nil()); + used[out]++; + + boost::uuids::uuid& expected = mapping[connection]; + if (!expected.is_nil()) + EXPECT_EQ(expected, out); + else + expected = out; + } + + EXPECT_EQ(2u, used.size()); + for (const std::pair<boost::uuids::uuid, std::size_t>& entry : used) + EXPECT_EQ(5u, entry.second); + } + // select 3 of 3 connections but do not remap existing links + connections.resize(3); + connections[2] = random_uuid(); + EXPECT_TRUE(mapper.update(connections)); + EXPECT_EQ(3u, mapper.size()); + EXPECT_EQ(3, mapper.end() - mapper.begin()); + { + std::map<boost::uuids::uuid, std::size_t> used; + for (const boost::uuids::uuid& connection : in_connections) + { + const boost::uuids::uuid& out = mapper.get_stem(connection); + EXPECT_FALSE(out.is_nil()); + used[out]++; + + EXPECT_EQ(mapping[connection], out); + } + + EXPECT_EQ(2u, used.size()); + for (const std::pair<boost::uuids::uuid, std::size_t>& entry : used) + EXPECT_EQ(5u, entry.second); + } + // map 8 new incoming connections across 3 outgoing links + in_connections.resize(18); + std::generate(in_connections.begin() + 10, in_connections.end(), random_uuid); + { + std::map<boost::uuids::uuid, std::size_t> used; + for (const boost::uuids::uuid& connection : in_connections) + { + const boost::uuids::uuid& out = mapper.get_stem(connection); + EXPECT_FALSE(out.is_nil()); + used[out]++; + + boost::uuids::uuid& expected = mapping[connection]; + if (!expected.is_nil()) + EXPECT_EQ(expected, out); + else + expected = out; + } + + EXPECT_EQ(3u, used.size()); + for (const std::pair<boost::uuids::uuid, std::size_t>& entry : used) + EXPECT_EQ(6u, entry.second); + } +} + +TEST(dandelionpp_map, dropped_all_connections) +{ + boost::uuids::random_generator random_uuid{}; + + std::vector<boost::uuids::uuid> connections{8}; + std::generate(connections.begin(), connections.end(), random_uuid); + std::sort(connections.begin(), connections.end()); + + // select 3 of 8 outgoing connections + net::dandelionpp::connection_map mapper{connections, 3}; + EXPECT_EQ(3u, mapper.size()); + EXPECT_EQ(3, mapper.end() - mapper.begin()); + { + std::set<boost::uuids::uuid> used; + for (const boost::uuids::uuid& connection : mapper) + { + EXPECT_FALSE(connection.is_nil()); + EXPECT_TRUE(used.insert(connection).second); + EXPECT_TRUE(std::binary_search(connections.begin(), connections.end(), connection)); + } + } + EXPECT_FALSE(mapper.update(connections)); + EXPECT_EQ(3u, mapper.size()); + ASSERT_EQ(3, mapper.end() - mapper.begin()); + { + std::set<boost::uuids::uuid> used; + for (const boost::uuids::uuid& connection : mapper) + { + EXPECT_FALSE(connection.is_nil()); + EXPECT_TRUE(used.insert(connection).second); + EXPECT_TRUE(std::binary_search(connections.begin(), connections.end(), connection)); + } + } + std::vector<boost::uuids::uuid> in_connections{9}; + std::generate(in_connections.begin(), in_connections.end(), random_uuid); + { + std::map<boost::uuids::uuid, std::size_t> used; + std::map<boost::uuids::uuid, boost::uuids::uuid> mapping; + for (const boost::uuids::uuid& connection : in_connections) + { + const boost::uuids::uuid out = mapper.get_stem(connection); + EXPECT_FALSE(out.is_nil()); + EXPECT_TRUE(mapping.emplace(connection, out).second); + used[out]++; + } + + EXPECT_EQ(3u, used.size()); + for (const std::pair<boost::uuids::uuid, std::size_t>& entry : used) + EXPECT_EQ(3u, entry.second); + + for (const boost::uuids::uuid& connection : in_connections) + EXPECT_EQ(mapping[connection], mapper.get_stem(connection)); + + // drop all connections + connections.clear(); + + EXPECT_TRUE(mapper.update(connections)); + EXPECT_EQ(0u, mapper.size()); + EXPECT_EQ(3, mapper.end() - mapper.begin()); + } + // remap 7 connections to nothing + for (const boost::uuids::uuid& connection : boost::adaptors::slice(in_connections, 0, 7)) + EXPECT_TRUE(mapper.get_stem(connection).is_nil()); + + // select 3 of 30 connections, only 7 should be remapped to new indexes (but all to new uuids) + connections.resize(30); + std::generate(connections.begin(), connections.end(), random_uuid); + EXPECT_TRUE(mapper.update(connections)); + { + std::map<boost::uuids::uuid, std::size_t> used; + for (const boost::uuids::uuid& connection : in_connections) + { + const boost::uuids::uuid& out = mapper.get_stem(connection); + EXPECT_FALSE(out.is_nil()); + used[out]++; + } + + EXPECT_EQ(3u, used.size()); + for (const std::pair<boost::uuids::uuid, std::size_t>& entry : used) + EXPECT_EQ(3u, entry.second); + } +} |