diff options
Diffstat (limited to 'src/cryptonote_protocol/levin_notify.cpp')
-rw-r--r-- | src/cryptonote_protocol/levin_notify.cpp | 208 |
1 files changed, 168 insertions, 40 deletions
diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp index 4b41b5bfc..e45c34e02 100644 --- a/src/cryptonote_protocol/levin_notify.cpp +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -33,6 +33,7 @@ #include <chrono> #include <deque> #include <stdexcept> +#include <utility> #include "common/expect.h" #include "common/varint.h" @@ -43,6 +44,14 @@ #include "net/dandelionpp.h" #include "p2p/net_node.h" +namespace +{ + int get_command_from_message(const cryptonote::blobdata &msg) + { + return msg.size() >= sizeof(epee::levin::bucket_head2) ? SWAP32LE(((epee::levin::bucket_head2*)msg.data())->m_command) : 0; + } +} + namespace cryptonote { namespace levin @@ -57,6 +66,37 @@ namespace levin constexpr const std::chrono::seconds noise_min_delay{CRYPTONOTE_NOISE_MIN_DELAY}; constexpr const std::chrono::seconds noise_delay_range{CRYPTONOTE_NOISE_DELAY_RANGE}; + /* A custom duration is used for the poisson distribution because of the + variance. If 5 seconds is given to `std::poisson_distribution`, 95% of + the values fall between 1-9s in 1s increments (not granular enough). If + 5000 milliseconds is given, 95% of the values fall between 4859ms-5141ms + in 1ms increments (not enough time variance). Providing 20 quarter + seconds yields 95% of the values between 3s-7.25s in 1/4s increments. */ + using fluff_stepsize = std::chrono::duration<std::chrono::milliseconds::rep, std::ratio<1, 4>>; + constexpr const std::chrono::seconds fluff_average_in{CRYPTONOTE_DANDELIONPP_FLUSH_AVERAGE}; + + /*! Bitcoin Core is using 1/2 average seconds for outgoing connections + compared to incoming. The thinking is that the user controls outgoing + connections (Dandelion++ makes similar assumptions in its stem + algorithm). The randomization yields 95% values between 1s-4s in + 1/4s increments. */ + constexpr const fluff_stepsize fluff_average_out{fluff_stepsize{fluff_average_in} / 2}; + + class random_poisson + { + std::poisson_distribution<fluff_stepsize::rep> dist; + public: + explicit random_poisson(fluff_stepsize average) + : dist(average.count() < 0 ? 0 : average.count()) + {} + + fluff_stepsize operator()() + { + crypto::random_device rand{}; + return fluff_stepsize{dist(rand)}; + } + }; + /*! Select a randomized duration from 0 to `range`. The precision will be to the systems `steady_clock`. As an example, supplying 3 seconds to this function will select a duration from [0, 3] seconds, and the increments @@ -129,6 +169,16 @@ namespace levin return fullBlob; } + bool make_payload_send_txs(connections& p2p, std::vector<blobdata>&& txs, const boost::uuids::uuid& destination, const bool pad) + { + const cryptonote::blobdata blob = make_tx_payload(std::move(txs), pad); + p2p.for_connection(destination, [&blob](detail::p2p_context& context) { + on_levin_traffic(context, true, true, false, blob.size(), get_command_from_message(blob)); + return true; + }); + return p2p.notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan<std::uint8_t>(blob), destination); + } + /* The current design uses `asio::strand`s. The documentation isn't as clear as it should be - a `strand` has an internal `mutex` and `bool`. The `mutex` synchronizes thread access and the `bool` is set when a thread is @@ -187,15 +237,18 @@ namespace levin { struct zone { - explicit zone(boost::asio::io_service& io_service, std::shared_ptr<connections> p2p, epee::byte_slice noise_in, bool is_public) + explicit zone(boost::asio::io_service& io_service, std::shared_ptr<connections> p2p, epee::byte_slice noise_in, bool is_public, bool pad_txs) : p2p(std::move(p2p)), noise(std::move(noise_in)), next_epoch(io_service), + flush_txs(io_service), strand(io_service), map(), channels(), + flush_time(std::chrono::steady_clock::time_point::max()), connection_count(0), - is_public(is_public) + is_public(is_public), + pad_txs(pad_txs) { for (std::size_t count = 0; !noise.empty() && count < CRYPTONOTE_NOISE_CHANNELS; ++count) channels.emplace_back(io_service); @@ -204,11 +257,14 @@ namespace levin const std::shared_ptr<connections> p2p; const epee::byte_slice noise; //!< `!empty()` means zone is using noise channels boost::asio::steady_timer next_epoch; + boost::asio::steady_timer flush_txs; boost::asio::io_service::strand strand; net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand` + std::chrono::steady_clock::time_point flush_time; //!< Next expected Dandelion++ fluff flush std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time const bool is_public; //!< Zone is public ipv4/ipv6 connections + const bool pad_txs; //!< Pad txs to the next boundary for privacy }; } // detail @@ -245,49 +301,112 @@ namespace levin } }; - //! Sends a message to every active connection - class flood_notify + //! Sends txs on connections with expired timers, and queues callback for next timer expiration (if any). + struct fluff_flush { std::shared_ptr<detail::zone> zone_; - epee::byte_slice message_; // Requires manual copy - boost::uuids::uuid source_; + std::chrono::steady_clock::time_point flush_time_; - public: - explicit flood_notify(std::shared_ptr<detail::zone> zone, epee::byte_slice message, const boost::uuids::uuid& source) - : zone_(std::move(zone)), message_(message.clone()), source_(source) - {} + static void queue(std::shared_ptr<detail::zone> zone, const std::chrono::steady_clock::time_point flush_time) + { + assert(zone != nullptr); + assert(zone->strand.running_in_this_thread()); - flood_notify(flood_notify&&) = default; - flood_notify(const flood_notify& source) - : zone_(source.zone_), message_(source.message_.clone()), source_(source.source_) - {} + detail::zone& this_zone = *zone; + this_zone.flush_time = flush_time; + this_zone.flush_txs.expires_at(flush_time); + this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone), flush_time})); + } - void operator()() const + void operator()(const boost::system::error_code error) { if (!zone_ || !zone_->p2p) return; assert(zone_->strand.running_in_this_thread()); - /* The foreach should be quick, but then it iterates and acquires the - same lock for every connection. So do in a strand because two threads - will ping-pong each other with cacheline invalidations. Revisit if - algorithm changes or the locking strategy within the levin config - class changes. */ - - std::vector<boost::uuids::uuid> connections; - connections.reserve(connection_id_reserve_size); - zone_->p2p->foreach_connection([this, &connections] (detail::p2p_context& context) { - /* Only send to outgoing connections when "flooding" over i2p/tor. - Otherwise this makes the tx linkable to a hidden service address, - making things linkable across connections. */ + const bool timer_error = bool(error); + if (timer_error) + { + if (error != boost::system::errc::operation_canceled) + throw boost::system::system_error{error, "fluff_flush timer failed"}; + + // new timer canceled this one set in future + if (zone_->flush_time < flush_time_) + return; + } + + const auto now = std::chrono::steady_clock::now(); + auto next_flush = std::chrono::steady_clock::time_point::max(); + std::vector<std::pair<std::vector<blobdata>, boost::uuids::uuid>> connections{}; + zone_->p2p->foreach_connection([timer_error, now, &next_flush, &connections] (detail::p2p_context& context) + { + if (!context.fluff_txs.empty()) + { + if (context.flush_time <= now || timer_error) // flush on canceled timer + { + context.flush_time = std::chrono::steady_clock::time_point::max(); + connections.emplace_back(std::move(context.fluff_txs), context.m_connection_id); + context.fluff_txs.clear(); + } + else // not flushing yet + next_flush = std::min(next_flush, context.flush_time); + } + else // nothing to flush + context.flush_time = std::chrono::steady_clock::time_point::max(); + return true; + }); + + for (auto& connection : connections) + make_payload_send_txs(*zone_->p2p, std::move(connection.first), connection.second, zone_->pad_txs); + + if (next_flush != std::chrono::steady_clock::time_point::max()) + fluff_flush::queue(std::move(zone_), next_flush); + else + zone_->flush_time = next_flush; // signal that no timer is set + } + }; + + /*! The "fluff" portion of the Dandelion++ algorithm. Every tx is queued + per-connection and flushed with a randomized poisson timer. This + implementation only has one system timer per-zone, and instead tracks + the lowest flush time. */ + struct fluff_notify + { + std::shared_ptr<detail::zone> zone_; + std::vector<blobdata> txs_; + boost::uuids::uuid source_; + + void operator()() + { + if (!zone_ || !zone_->p2p || txs_.empty()) + return; + + assert(zone_->strand.running_in_this_thread()); + + const auto now = std::chrono::steady_clock::now(); + auto next_flush = std::chrono::steady_clock::time_point::max(); + + random_poisson in_duration(fluff_average_in); + random_poisson out_duration(fluff_average_out); + + zone_->p2p->foreach_connection([this, now, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context) + { if (this->source_ != context.m_connection_id && (this->zone_->is_public || !context.m_is_income)) - connections.emplace_back(context.m_connection_id); + { + if (context.fluff_txs.empty()) + context.flush_time = now + (context.m_is_income ? in_duration() : out_duration()); + + next_flush = std::min(next_flush, context.flush_time); + context.fluff_txs.reserve(context.fluff_txs.size() + this->txs_.size()); + for (const blobdata& tx : this->txs_) + context.fluff_txs.push_back(tx); // must copy instead of move (multiple conns) + } return true; }); - for (const boost::uuids::uuid& connection : connections) - zone_->p2p->send(message_.clone(), connection); + if (next_flush < zone_->flush_time) + fluff_flush::queue(std::move(zone_), next_flush); } }; @@ -432,6 +551,10 @@ namespace levin else message = zone_->noise.clone(); + zone_->p2p->for_connection(channel.connection, [&](detail::p2p_context& context) { + on_levin_traffic(context, true, true, false, message.size(), "noise"); + return true; + }); if (zone_->p2p->send(std::move(message), channel.connection)) { if (!channel.queue.empty() && channel.active.empty()) @@ -451,7 +574,7 @@ namespace levin } }; - //! Prepares connections for new channel epoch and sets timer for next epoch + //! Prepares connections for new channel/dandelionpp epoch and sets timer for next epoch struct start_epoch { // Variables allow for Dandelion++ extension @@ -481,8 +604,8 @@ namespace levin }; } // anonymous - notify::notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, bool is_public) - : zone_(std::make_shared<detail::zone>(service, std::move(p2p), std::move(noise), is_public)) + notify::notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, const bool is_public, const bool pad_txs) + : zone_(std::make_shared<detail::zone>(service, std::move(p2p), std::move(noise), is_public, pad_txs)) { if (!zone_->p2p) throw std::logic_error{"cryptonote::levin::notify cannot have nullptr p2p argument"}; @@ -533,9 +656,19 @@ namespace levin channel.next_noise.cancel(); } - bool notify::send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source, const bool pad_txs) + void notify::run_fluff() { if (!zone_) + return; + zone_->flush_txs.cancel(); + } + + bool notify::send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source) + { + if (txs.empty()) + return true; + + if (!zone_) return false; if (!zone_->noise.empty() && !zone_->channels.empty()) @@ -565,12 +698,7 @@ namespace levin } else { - const std::string payload = make_tx_payload(std::move(txs), pad_txs); - epee::byte_slice message = - epee::levin::make_notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan<std::uint8_t>(payload)); - - // traditional monero send technique - zone_->strand.dispatch(flood_notify{zone_, std::move(message), source}); + zone_->strand.dispatch(fluff_notify{zone_, std::move(txs), source}); } return true; |