diff options
Diffstat (limited to 'src/cryptonote_protocol')
-rw-r--r-- | src/cryptonote_protocol/cryptonote_protocol_defs.h | 28 | ||||
-rw-r--r-- | src/cryptonote_protocol/cryptonote_protocol_handler.h | 7 | ||||
-rw-r--r-- | src/cryptonote_protocol/cryptonote_protocol_handler.inl | 95 | ||||
-rw-r--r-- | src/cryptonote_protocol/enums.h | 2 | ||||
-rw-r--r-- | src/cryptonote_protocol/levin_notify.cpp | 208 | ||||
-rw-r--r-- | src/cryptonote_protocol/levin_notify.h | 12 |
6 files changed, 283 insertions, 69 deletions
diff --git a/src/cryptonote_protocol/cryptonote_protocol_defs.h b/src/cryptonote_protocol/cryptonote_protocol_defs.h index 201001c8e..f809bff74 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_defs.h +++ b/src/cryptonote_protocol/cryptonote_protocol_defs.h @@ -257,7 +257,10 @@ namespace cryptonote BEGIN_KV_SERIALIZE_MAP() KV_SERIALIZE(current_height) KV_SERIALIZE(cumulative_difficulty) - KV_SERIALIZE(cumulative_difficulty_top64) + if (is_store) + KV_SERIALIZE(cumulative_difficulty_top64) + else + KV_SERIALIZE_OPT(cumulative_difficulty_top64, (uint64_t)0) KV_SERIALIZE_VAL_POD_AS_BLOB(top_id) KV_SERIALIZE_OPT(top_version, (uint8_t)0) KV_SERIALIZE_OPT(pruning_seed, (uint32_t)0) @@ -298,7 +301,10 @@ namespace cryptonote KV_SERIALIZE(start_height) KV_SERIALIZE(total_height) KV_SERIALIZE(cumulative_difficulty) - KV_SERIALIZE(cumulative_difficulty_top64) + if (is_store) + KV_SERIALIZE(cumulative_difficulty_top64) + else + KV_SERIALIZE_OPT(cumulative_difficulty_top64, (uint64_t)0) KV_SERIALIZE_CONTAINER_POD_AS_BLOB(m_block_ids) KV_SERIALIZE_CONTAINER_POD_AS_BLOB(m_block_weights) END_KV_SERIALIZE_MAP() @@ -347,5 +353,23 @@ namespace cryptonote }; typedef epee::misc_utils::struct_init<request_t> request; }; + + /************************************************************************/ + /* */ + /************************************************************************/ + struct NOTIFY_GET_TXPOOL_COMPLEMENT + { + const static int ID = BC_COMMANDS_POOL_BASE + 10; + + struct request_t + { + std::vector<crypto::hash> hashes; + + BEGIN_KV_SERIALIZE_MAP() + KV_SERIALIZE_CONTAINER_POD_AS_BLOB(hashes) + END_KV_SERIALIZE_MAP() + }; + typedef epee::misc_utils::struct_init<request_t> request; + }; } diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.h b/src/cryptonote_protocol/cryptonote_protocol_handler.h index ddbd45a61..2664716a8 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.h +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.h @@ -45,7 +45,6 @@ #include "block_queue.h" #include "common/perf_timer.h" #include "cryptonote_basic/connection_context.h" -#include "cryptonote_basic/cryptonote_stat_info.h" #include <boost/circular_buffer.hpp> PUSH_WARNINGS @@ -77,7 +76,6 @@ namespace cryptonote { public: typedef cryptonote_connection_context connection_context; - typedef core_stat_info stat_info; typedef t_cryptonote_protocol_handler<t_core> cryptonote_protocol_handler; typedef CORE_SYNC_DATA payload_type; @@ -92,6 +90,7 @@ namespace cryptonote HANDLE_NOTIFY_T2(NOTIFY_RESPONSE_CHAIN_ENTRY, &cryptonote_protocol_handler::handle_response_chain_entry) HANDLE_NOTIFY_T2(NOTIFY_NEW_FLUFFY_BLOCK, &cryptonote_protocol_handler::handle_notify_new_fluffy_block) HANDLE_NOTIFY_T2(NOTIFY_REQUEST_FLUFFY_MISSING_TX, &cryptonote_protocol_handler::handle_request_fluffy_missing_tx) + HANDLE_NOTIFY_T2(NOTIFY_GET_TXPOOL_COMPLEMENT, &cryptonote_protocol_handler::handle_notify_get_txpool_complement) END_INVOKE_MAP2() bool on_idle(); @@ -102,7 +101,6 @@ namespace cryptonote bool process_payload_sync_data(const CORE_SYNC_DATA& hshd, cryptonote_connection_context& context, bool is_inital); bool get_payload_sync_data(blobdata& data); bool get_payload_sync_data(CORE_SYNC_DATA& hshd); - bool get_stat_info(core_stat_info& stat_inf); bool on_callback(cryptonote_connection_context& context); t_core& get_core(){return m_core;} bool is_synchronized(){return m_synchronized;} @@ -127,6 +125,7 @@ namespace cryptonote int handle_response_chain_entry(int command, NOTIFY_RESPONSE_CHAIN_ENTRY::request& arg, cryptonote_connection_context& context); int handle_notify_new_fluffy_block(int command, NOTIFY_NEW_FLUFFY_BLOCK::request& arg, cryptonote_connection_context& context); int handle_request_fluffy_missing_tx(int command, NOTIFY_REQUEST_FLUFFY_MISSING_TX::request& arg, cryptonote_connection_context& context); + int handle_notify_get_txpool_complement(int command, NOTIFY_GET_TXPOOL_COMPLEMENT::request& arg, cryptonote_connection_context& context); //----------------- i_bc_protocol_layout --------------------------------------- virtual bool relay_block(NOTIFY_NEW_BLOCK::request& arg, cryptonote_connection_context& exclude_context); @@ -147,6 +146,7 @@ namespace cryptonote int try_add_next_blocks(cryptonote_connection_context &context); void notify_new_stripe(cryptonote_connection_context &context, uint32_t stripe); void skip_unneeded_hashes(cryptonote_connection_context& context, bool check_block_queue) const; + bool request_txpool_complement(cryptonote_connection_context &context); t_core& m_core; @@ -156,6 +156,7 @@ namespace cryptonote std::atomic<bool> m_synchronized; std::atomic<bool> m_stopping; std::atomic<bool> m_no_sync; + std::atomic<bool> m_ask_for_txpool_complement; boost::mutex m_sync_lock; block_queue m_block_queue; epee::math_helper::once_a_time_seconds<30> m_idle_peer_kicker; diff --git a/src/cryptonote_protocol/cryptonote_protocol_handler.inl b/src/cryptonote_protocol/cryptonote_protocol_handler.inl index e20934a25..3aacce421 100644 --- a/src/cryptonote_protocol/cryptonote_protocol_handler.inl +++ b/src/cryptonote_protocol/cryptonote_protocol_handler.inl @@ -83,6 +83,7 @@ namespace cryptonote m_p2p(p_net_layout), m_syncronized_connections_count(0), m_synchronized(offline), + m_ask_for_txpool_complement(true), m_stopping(false), m_no_sync(false) @@ -154,12 +155,6 @@ namespace cryptonote } //------------------------------------------------------------------------------------------------------------------------ template<class t_core> - bool t_cryptonote_protocol_handler<t_core>::get_stat_info(core_stat_info& stat_inf) - { - return m_core.get_stat_info(stat_inf); - } - //------------------------------------------------------------------------------------------------------------------------ - template<class t_core> void t_cryptonote_protocol_handler<t_core>::log_connections() { std::stringstream ss; @@ -188,7 +183,7 @@ namespace cryptonote auto connection_time = time(NULL) - cntxt.m_started; ss << std::setw(30) << std::left << std::string(cntxt.m_is_income ? " [INC]":"[OUT]") + cntxt.m_remote_address.str() - << std::setw(20) << std::hex << peer_id + << std::setw(20) << nodetool::peerid_to_string(peer_id) << std::setw(20) << std::hex << support_flags << std::setw(30) << std::to_string(cntxt.m_recv_cnt)+ "(" + std::to_string(time(NULL) - cntxt.m_last_recv) + ")" + "/" + std::to_string(cntxt.m_send_cnt) + "(" + std::to_string(time(NULL) - cntxt.m_last_send) + ")" << std::setw(25) << get_protocol_state_string(cntxt.m_state) @@ -248,9 +243,7 @@ namespace cryptonote cnx.rpc_port = cntxt.m_rpc_port; cnx.rpc_credits_per_hash = cntxt.m_rpc_credits_per_hash; - std::stringstream peer_id_str; - peer_id_str << std::hex << std::setw(16) << peer_id; - peer_id_str >> cnx.peer_id; + cnx.peer_id = nodetool::peerid_to_string(peer_id); cnx.support_flags = support_flags; @@ -314,7 +307,7 @@ namespace cryptonote if (version >= 6 && version != hshd.top_version) { if (version < hshd.top_version && version == m_core.get_ideal_hard_fork_version()) - MCLOG_RED(el::Level::Warning, "global", context << " peer claims higher version that we think (" << + MCLOG_RED(el::Level::Warning, "global", context << " peer claims higher version than we think (" << (unsigned)hshd.top_version << " for " << (hshd.current_height - 1) << " instead of " << (unsigned)version << ") - we may be forked from the network and a software upgrade may be needed"); return false; @@ -346,7 +339,7 @@ namespace cryptonote if(m_core.have_block(hshd.top_id)) { context.m_state = cryptonote_connection_context::state_normal; - if(is_inital && target == m_core.get_current_blockchain_height()) + if(is_inital && hshd.current_height >= target && target == m_core.get_current_blockchain_height()) on_connection_synchronized(); return true; } @@ -887,6 +880,34 @@ namespace cryptonote } //------------------------------------------------------------------------------------------------------------------------ template<class t_core> + int t_cryptonote_protocol_handler<t_core>::handle_notify_get_txpool_complement(int command, NOTIFY_GET_TXPOOL_COMPLEMENT::request& arg, cryptonote_connection_context& context) + { + MLOG_P2P_MESSAGE("Received NOTIFY_GET_TXPOOL_COMPLEMENT (" << arg.hashes.size() << " txes)"); + + std::vector<std::pair<cryptonote::blobdata, block>> local_blocks; + std::vector<cryptonote::blobdata> local_txs; + + std::vector<cryptonote::blobdata> txes; + if (!m_core.get_txpool_complement(arg.hashes, txes)) + { + LOG_ERROR_CCONTEXT("failed to get txpool complement"); + return 1; + } + + NOTIFY_NEW_TRANSACTIONS::request new_txes; + new_txes.txs = std::move(txes); + + MLOG_P2P_MESSAGE + ( + "-->>NOTIFY_NEW_TRANSACTIONS: " + << ", txs.size()=" << new_txes.txs.size() + ); + + post_notify<NOTIFY_NEW_TRANSACTIONS>(new_txes, context); + return 1; + } + //------------------------------------------------------------------------------------------------------------------------ + template<class t_core> int t_cryptonote_protocol_handler<t_core>::handle_notify_new_transactions(int command, NOTIFY_NEW_TRANSACTIONS::request& arg, cryptonote_connection_context& context) { MLOG_P2P_MESSAGE("Received NOTIFY_NEW_TRANSACTIONS (" << arg.txs.size() << " txes)"); @@ -910,7 +931,7 @@ namespace cryptonote for (size_t i = 0; i < arg.txs.size(); ++i) { cryptonote::tx_verification_context tvc{}; - m_core.handle_incoming_tx({arg.txs[i], crypto::null_hash}, tvc, relay_method::flood, true); + m_core.handle_incoming_tx({arg.txs[i], crypto::null_hash}, tvc, relay_method::fluff, true); if(tvc.m_verifivation_failed) { LOG_PRINT_CCONTEXT_L1("Tx verification failed, dropping connection"); @@ -1673,9 +1694,9 @@ skip: const float max_multiplier = 10.f; const float min_multiplier = 1.25f; float multiplier = max_multiplier; - if (dt/1e6 >= REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY) + if (dt >= REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY) { - multiplier = max_multiplier - (dt/1e6-REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY) * (max_multiplier - min_multiplier) / (REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD - REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY); + multiplier = max_multiplier - (dt-REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY) * (max_multiplier - min_multiplier) / (REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD - REQUEST_NEXT_SCHEDULED_SPAN_THRESHOLD_STANDBY); multiplier = std::min(max_multiplier, std::max(min_multiplier, multiplier)); } if (dl_speed * .8f > ctx.m_current_speed_down * multiplier) @@ -2181,7 +2202,8 @@ skip: MGINFO_YELLOW(ENDL << "**********************************************************************" << ENDL << "You are now synchronized with the network. You may now start monero-wallet-cli." << ENDL << ENDL - << "Use the \"help\" command to see the list of available commands." << ENDL + << "Use the \"help\" command to see a simplified list of available commands." << ENDL + << "Use the \"help_advanced\" command to see an advanced list of available commands." << ENDL << "**********************************************************************"); m_sync_timer.pause(); if (ELPP->vRegistry()->allowed(el::Level::Info, "sync-info")) @@ -2202,6 +2224,27 @@ skip: } m_core.safesyncmode(true); m_p2p->clear_used_stripe_peers(); + + // ask for txpool complement from any suitable node if we did not yet + val_expected = true; + if (m_ask_for_txpool_complement.compare_exchange_strong(val_expected, false)) + { + m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool + { + if(context.m_state < cryptonote_connection_context::state_synchronizing) + { + MDEBUG(context << "not ready, ignoring"); + return true; + } + if (!request_txpool_complement(context)) + { + MERROR(context << "Failed to request txpool complement"); + return true; + } + return false; + }); + } + return true; } //------------------------------------------------------------------------------------------------------------------------ @@ -2351,7 +2394,22 @@ skip: local mempool before doing the relay. The code was already updating the DB twice on received transactions - it is difficult to workaround this due to the internal design. */ - return m_p2p->send_txs(std::move(arg.txs), zone, source, m_core, m_core.pad_transactions()) != epee::net_utils::zone::invalid; + return m_p2p->send_txs(std::move(arg.txs), zone, source, m_core) != epee::net_utils::zone::invalid; + } + //------------------------------------------------------------------------------------------------------------------------ + template<class t_core> + bool t_cryptonote_protocol_handler<t_core>::request_txpool_complement(cryptonote_connection_context &context) + { + NOTIFY_GET_TXPOOL_COMPLEMENT::request r = {}; + if (!m_core.get_pool_transaction_hashes(r.hashes, false)) + { + MERROR("Failed to get txpool hashes"); + return false; + } + MLOG_P2P_MESSAGE("-->>NOTIFY_GET_TXPOOL_COMPLEMENT: hashes.size()=" << r.hashes.size() ); + post_notify<NOTIFY_GET_TXPOOL_COMPLEMENT>(r, context); + MLOG_PEER_STATE("requesting txpool complement"); + return true; } //------------------------------------------------------------------------------------------------------------------------ template<class t_core> @@ -2464,7 +2522,10 @@ skip: MINFO("Target height decreasing from " << previous_target << " to " << target); m_core.set_target_blockchain_height(target); if (target == 0 && context.m_state > cryptonote_connection_context::state_before_handshake && !m_stopping) + { MCWARNING("global", "monerod is now disconnected from the network"); + m_ask_for_txpool_complement = true; + } } m_block_queue.flush_spans(context.m_connection_id, false); diff --git a/src/cryptonote_protocol/enums.h b/src/cryptonote_protocol/enums.h index ad4eedf4c..2ec622d94 100644 --- a/src/cryptonote_protocol/enums.h +++ b/src/cryptonote_protocol/enums.h @@ -38,6 +38,6 @@ namespace cryptonote none = 0, //!< Received via RPC with `do_not_relay` set local, //!< Received via RPC; trying to send over i2p/tor, etc. block, //!< Received in block, takes precedence over others - flood //!< Received/sent over public networks + fluff //!< Received/sent over public networks }; } diff --git a/src/cryptonote_protocol/levin_notify.cpp b/src/cryptonote_protocol/levin_notify.cpp index 4b41b5bfc..e45c34e02 100644 --- a/src/cryptonote_protocol/levin_notify.cpp +++ b/src/cryptonote_protocol/levin_notify.cpp @@ -33,6 +33,7 @@ #include <chrono> #include <deque> #include <stdexcept> +#include <utility> #include "common/expect.h" #include "common/varint.h" @@ -43,6 +44,14 @@ #include "net/dandelionpp.h" #include "p2p/net_node.h" +namespace +{ + int get_command_from_message(const cryptonote::blobdata &msg) + { + return msg.size() >= sizeof(epee::levin::bucket_head2) ? SWAP32LE(((epee::levin::bucket_head2*)msg.data())->m_command) : 0; + } +} + namespace cryptonote { namespace levin @@ -57,6 +66,37 @@ namespace levin constexpr const std::chrono::seconds noise_min_delay{CRYPTONOTE_NOISE_MIN_DELAY}; constexpr const std::chrono::seconds noise_delay_range{CRYPTONOTE_NOISE_DELAY_RANGE}; + /* A custom duration is used for the poisson distribution because of the + variance. If 5 seconds is given to `std::poisson_distribution`, 95% of + the values fall between 1-9s in 1s increments (not granular enough). If + 5000 milliseconds is given, 95% of the values fall between 4859ms-5141ms + in 1ms increments (not enough time variance). Providing 20 quarter + seconds yields 95% of the values between 3s-7.25s in 1/4s increments. */ + using fluff_stepsize = std::chrono::duration<std::chrono::milliseconds::rep, std::ratio<1, 4>>; + constexpr const std::chrono::seconds fluff_average_in{CRYPTONOTE_DANDELIONPP_FLUSH_AVERAGE}; + + /*! Bitcoin Core is using 1/2 average seconds for outgoing connections + compared to incoming. The thinking is that the user controls outgoing + connections (Dandelion++ makes similar assumptions in its stem + algorithm). The randomization yields 95% values between 1s-4s in + 1/4s increments. */ + constexpr const fluff_stepsize fluff_average_out{fluff_stepsize{fluff_average_in} / 2}; + + class random_poisson + { + std::poisson_distribution<fluff_stepsize::rep> dist; + public: + explicit random_poisson(fluff_stepsize average) + : dist(average.count() < 0 ? 0 : average.count()) + {} + + fluff_stepsize operator()() + { + crypto::random_device rand{}; + return fluff_stepsize{dist(rand)}; + } + }; + /*! Select a randomized duration from 0 to `range`. The precision will be to the systems `steady_clock`. As an example, supplying 3 seconds to this function will select a duration from [0, 3] seconds, and the increments @@ -129,6 +169,16 @@ namespace levin return fullBlob; } + bool make_payload_send_txs(connections& p2p, std::vector<blobdata>&& txs, const boost::uuids::uuid& destination, const bool pad) + { + const cryptonote::blobdata blob = make_tx_payload(std::move(txs), pad); + p2p.for_connection(destination, [&blob](detail::p2p_context& context) { + on_levin_traffic(context, true, true, false, blob.size(), get_command_from_message(blob)); + return true; + }); + return p2p.notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan<std::uint8_t>(blob), destination); + } + /* The current design uses `asio::strand`s. The documentation isn't as clear as it should be - a `strand` has an internal `mutex` and `bool`. The `mutex` synchronizes thread access and the `bool` is set when a thread is @@ -187,15 +237,18 @@ namespace levin { struct zone { - explicit zone(boost::asio::io_service& io_service, std::shared_ptr<connections> p2p, epee::byte_slice noise_in, bool is_public) + explicit zone(boost::asio::io_service& io_service, std::shared_ptr<connections> p2p, epee::byte_slice noise_in, bool is_public, bool pad_txs) : p2p(std::move(p2p)), noise(std::move(noise_in)), next_epoch(io_service), + flush_txs(io_service), strand(io_service), map(), channels(), + flush_time(std::chrono::steady_clock::time_point::max()), connection_count(0), - is_public(is_public) + is_public(is_public), + pad_txs(pad_txs) { for (std::size_t count = 0; !noise.empty() && count < CRYPTONOTE_NOISE_CHANNELS; ++count) channels.emplace_back(io_service); @@ -204,11 +257,14 @@ namespace levin const std::shared_ptr<connections> p2p; const epee::byte_slice noise; //!< `!empty()` means zone is using noise channels boost::asio::steady_timer next_epoch; + boost::asio::steady_timer flush_txs; boost::asio::io_service::strand strand; net::dandelionpp::connection_map map;//!< Tracks outgoing uuid's for noise channels or Dandelion++ stems std::deque<noise_channel> channels; //!< Never touch after init; only update elements on `noise_channel.strand` + std::chrono::steady_clock::time_point flush_time; //!< Next expected Dandelion++ fluff flush std::atomic<std::size_t> connection_count; //!< Only update in strand, can be read at any time const bool is_public; //!< Zone is public ipv4/ipv6 connections + const bool pad_txs; //!< Pad txs to the next boundary for privacy }; } // detail @@ -245,49 +301,112 @@ namespace levin } }; - //! Sends a message to every active connection - class flood_notify + //! Sends txs on connections with expired timers, and queues callback for next timer expiration (if any). + struct fluff_flush { std::shared_ptr<detail::zone> zone_; - epee::byte_slice message_; // Requires manual copy - boost::uuids::uuid source_; + std::chrono::steady_clock::time_point flush_time_; - public: - explicit flood_notify(std::shared_ptr<detail::zone> zone, epee::byte_slice message, const boost::uuids::uuid& source) - : zone_(std::move(zone)), message_(message.clone()), source_(source) - {} + static void queue(std::shared_ptr<detail::zone> zone, const std::chrono::steady_clock::time_point flush_time) + { + assert(zone != nullptr); + assert(zone->strand.running_in_this_thread()); - flood_notify(flood_notify&&) = default; - flood_notify(const flood_notify& source) - : zone_(source.zone_), message_(source.message_.clone()), source_(source.source_) - {} + detail::zone& this_zone = *zone; + this_zone.flush_time = flush_time; + this_zone.flush_txs.expires_at(flush_time); + this_zone.flush_txs.async_wait(this_zone.strand.wrap(fluff_flush{std::move(zone), flush_time})); + } - void operator()() const + void operator()(const boost::system::error_code error) { if (!zone_ || !zone_->p2p) return; assert(zone_->strand.running_in_this_thread()); - /* The foreach should be quick, but then it iterates and acquires the - same lock for every connection. So do in a strand because two threads - will ping-pong each other with cacheline invalidations. Revisit if - algorithm changes or the locking strategy within the levin config - class changes. */ - - std::vector<boost::uuids::uuid> connections; - connections.reserve(connection_id_reserve_size); - zone_->p2p->foreach_connection([this, &connections] (detail::p2p_context& context) { - /* Only send to outgoing connections when "flooding" over i2p/tor. - Otherwise this makes the tx linkable to a hidden service address, - making things linkable across connections. */ + const bool timer_error = bool(error); + if (timer_error) + { + if (error != boost::system::errc::operation_canceled) + throw boost::system::system_error{error, "fluff_flush timer failed"}; + + // new timer canceled this one set in future + if (zone_->flush_time < flush_time_) + return; + } + + const auto now = std::chrono::steady_clock::now(); + auto next_flush = std::chrono::steady_clock::time_point::max(); + std::vector<std::pair<std::vector<blobdata>, boost::uuids::uuid>> connections{}; + zone_->p2p->foreach_connection([timer_error, now, &next_flush, &connections] (detail::p2p_context& context) + { + if (!context.fluff_txs.empty()) + { + if (context.flush_time <= now || timer_error) // flush on canceled timer + { + context.flush_time = std::chrono::steady_clock::time_point::max(); + connections.emplace_back(std::move(context.fluff_txs), context.m_connection_id); + context.fluff_txs.clear(); + } + else // not flushing yet + next_flush = std::min(next_flush, context.flush_time); + } + else // nothing to flush + context.flush_time = std::chrono::steady_clock::time_point::max(); + return true; + }); + + for (auto& connection : connections) + make_payload_send_txs(*zone_->p2p, std::move(connection.first), connection.second, zone_->pad_txs); + + if (next_flush != std::chrono::steady_clock::time_point::max()) + fluff_flush::queue(std::move(zone_), next_flush); + else + zone_->flush_time = next_flush; // signal that no timer is set + } + }; + + /*! The "fluff" portion of the Dandelion++ algorithm. Every tx is queued + per-connection and flushed with a randomized poisson timer. This + implementation only has one system timer per-zone, and instead tracks + the lowest flush time. */ + struct fluff_notify + { + std::shared_ptr<detail::zone> zone_; + std::vector<blobdata> txs_; + boost::uuids::uuid source_; + + void operator()() + { + if (!zone_ || !zone_->p2p || txs_.empty()) + return; + + assert(zone_->strand.running_in_this_thread()); + + const auto now = std::chrono::steady_clock::now(); + auto next_flush = std::chrono::steady_clock::time_point::max(); + + random_poisson in_duration(fluff_average_in); + random_poisson out_duration(fluff_average_out); + + zone_->p2p->foreach_connection([this, now, &in_duration, &out_duration, &next_flush] (detail::p2p_context& context) + { if (this->source_ != context.m_connection_id && (this->zone_->is_public || !context.m_is_income)) - connections.emplace_back(context.m_connection_id); + { + if (context.fluff_txs.empty()) + context.flush_time = now + (context.m_is_income ? in_duration() : out_duration()); + + next_flush = std::min(next_flush, context.flush_time); + context.fluff_txs.reserve(context.fluff_txs.size() + this->txs_.size()); + for (const blobdata& tx : this->txs_) + context.fluff_txs.push_back(tx); // must copy instead of move (multiple conns) + } return true; }); - for (const boost::uuids::uuid& connection : connections) - zone_->p2p->send(message_.clone(), connection); + if (next_flush < zone_->flush_time) + fluff_flush::queue(std::move(zone_), next_flush); } }; @@ -432,6 +551,10 @@ namespace levin else message = zone_->noise.clone(); + zone_->p2p->for_connection(channel.connection, [&](detail::p2p_context& context) { + on_levin_traffic(context, true, true, false, message.size(), "noise"); + return true; + }); if (zone_->p2p->send(std::move(message), channel.connection)) { if (!channel.queue.empty() && channel.active.empty()) @@ -451,7 +574,7 @@ namespace levin } }; - //! Prepares connections for new channel epoch and sets timer for next epoch + //! Prepares connections for new channel/dandelionpp epoch and sets timer for next epoch struct start_epoch { // Variables allow for Dandelion++ extension @@ -481,8 +604,8 @@ namespace levin }; } // anonymous - notify::notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, bool is_public) - : zone_(std::make_shared<detail::zone>(service, std::move(p2p), std::move(noise), is_public)) + notify::notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, const bool is_public, const bool pad_txs) + : zone_(std::make_shared<detail::zone>(service, std::move(p2p), std::move(noise), is_public, pad_txs)) { if (!zone_->p2p) throw std::logic_error{"cryptonote::levin::notify cannot have nullptr p2p argument"}; @@ -533,9 +656,19 @@ namespace levin channel.next_noise.cancel(); } - bool notify::send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source, const bool pad_txs) + void notify::run_fluff() { if (!zone_) + return; + zone_->flush_txs.cancel(); + } + + bool notify::send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source) + { + if (txs.empty()) + return true; + + if (!zone_) return false; if (!zone_->noise.empty() && !zone_->channels.empty()) @@ -565,12 +698,7 @@ namespace levin } else { - const std::string payload = make_tx_payload(std::move(txs), pad_txs); - epee::byte_slice message = - epee::levin::make_notify(NOTIFY_NEW_TRANSACTIONS::ID, epee::strspan<std::uint8_t>(payload)); - - // traditional monero send technique - zone_->strand.dispatch(flood_notify{zone_, std::move(message), source}); + zone_->strand.dispatch(fluff_notify{zone_, std::move(txs), source}); } return true; diff --git a/src/cryptonote_protocol/levin_notify.h b/src/cryptonote_protocol/levin_notify.h index 8bc9b72fa..ce652d933 100644 --- a/src/cryptonote_protocol/levin_notify.h +++ b/src/cryptonote_protocol/levin_notify.h @@ -82,7 +82,7 @@ namespace levin {} //! Construct an instance with available notification `zones`. - explicit notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, bool is_public); + explicit notify(boost::asio::io_service& service, std::shared_ptr<connections> p2p, epee::byte_slice noise, bool is_public, bool pad_txs); notify(const notify&) = delete; notify(notify&&) = default; @@ -104,11 +104,14 @@ namespace levin //! Run the logic for the next stem timeout imemdiately. Only use in testing. void run_stems(); + //! Run the logic for flushing all Dandelion++ fluff queued txs. Only use in testing. + void run_fluff(); + /*! Send txs using `cryptonote_protocol_defs.h` payload format wrapped in a levin header. The message will be sent in a "discreet" manner if enabled - if `!noise.empty()` then the `command`/`payload` will be queued to send at the next available noise interval. Otherwise, a - standard Monero flood notification will be used. + Dandelion++ fluff algorithm will be used. \note Eventually Dandelion++ stem sending will be used here when enabled. @@ -117,12 +120,9 @@ namespace levin \param source The source of the notification. `is_nil()` indicates this node is the source. Dandelion++ will use this to map a source to a particular stem. - \param pad_txs A request to pad txs to help conceal origin via - statistical analysis. Ignored if noise was enabled during - construction. \return True iff the notification is queued for sending. */ - bool send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source, bool pad_txs); + bool send_txs(std::vector<blobdata> txs, const boost::uuids::uuid& source); }; } // levin } // net |